Compare commits

..

6 Commits

Author SHA1 Message Date
yihong
11a4f54c49 fix: update typos rules to fix ci (#5621)
Signed-off-by: yihong0618 <zouzou0208@gmail.com>
2025-03-01 09:21:36 +00:00
Ruihang Xia
d363c8ee3c fix: check physical region before use (#5612)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-02-28 06:46:48 +00:00
xiaoniaoyouhuajiang
50b521c526 feat: add vec_dim function (#5587)
* feat:add `vec_dim` function

* delete unused imports

* Modified to be implemented correctly

* fix comment

* add order for sqlness test
2025-02-27 15:54:48 +00:00
Ning Sun
c9d70e0e28 refactor: add pipeline concept to OTLP traces and remove OTLP over gRPC (#5605) 2025-02-27 14:01:45 +00:00
Weny Xu
c0c87652c3 chore: bump version to 0.13.0 (#5611)
chore: bump main branch version to 0.13.0
2025-02-27 13:19:59 +00:00
discord9
faaa0affd0 docs: tsbs update (#5608)
chore: tsbs update
2025-02-27 08:14:48 +00:00
81 changed files with 963 additions and 2184 deletions

2
.github/CODEOWNERS vendored
View File

@@ -4,7 +4,7 @@
* @GreptimeTeam/db-approver
## [Module] Database Engine
## [Module] Databse Engine
/src/index @zhongzc
/src/mito2 @evenyag @v0y4g3r @waynexia
/src/query @evenyag

View File

@@ -41,14 +41,7 @@ runs:
username: ${{ inputs.dockerhub-image-registry-username }}
password: ${{ inputs.dockerhub-image-registry-token }}
- name: Set up qemu for multi-platform builds
uses: docker/setup-qemu-action@v3
with:
platforms: linux/amd64,linux/arm64
# The latest version will lead to segmentation fault.
image: tonistiigi/binfmt:qemu-v7.0.0-28
- name: Build and push dev-builder-ubuntu image # Build image for amd64 and arm64 platform.
- name: Build and push dev-builder-ubuntu image
shell: bash
if: ${{ inputs.build-dev-builder-ubuntu == 'true' }}
run: |
@@ -59,7 +52,7 @@ runs:
IMAGE_NAMESPACE=${{ inputs.dockerhub-image-namespace }} \
DEV_BUILDER_IMAGE_TAG=${{ inputs.version }}
- name: Build and push dev-builder-centos image # Only build image for amd64 platform.
- name: Build and push dev-builder-centos image
shell: bash
if: ${{ inputs.build-dev-builder-centos == 'true' }}
run: |
@@ -76,7 +69,8 @@ runs:
run: |
make dev-builder \
BASE_IMAGE=android \
BUILDX_MULTI_PLATFORM_BUILD=amd64 \
IMAGE_REGISTRY=${{ inputs.dockerhub-image-registry }} \
IMAGE_NAMESPACE=${{ inputs.dockerhub-image-namespace }} \
DEV_BUILDER_IMAGE_TAG=${{ inputs.version }}
DEV_BUILDER_IMAGE_TAG=${{ inputs.version }} && \
docker push ${{ inputs.dockerhub-image-registry }}/${{ inputs.dockerhub-image-namespace }}/dev-builder-android:${{ inputs.version }}

View File

@@ -52,7 +52,7 @@ runs:
uses: ./.github/actions/build-greptime-binary
with:
base-image: ubuntu
features: servers/dashboard
features: servers/dashboard,pg_kvbackend
cargo-profile: ${{ inputs.cargo-profile }}
artifacts-dir: greptime-linux-${{ inputs.arch }}-${{ inputs.version }}
version: ${{ inputs.version }}
@@ -70,7 +70,7 @@ runs:
if: ${{ inputs.arch == 'amd64' && inputs.dev-mode == 'false' }} # Builds greptime for centos if the host machine is amd64.
with:
base-image: centos
features: servers/dashboard
features: servers/dashboard,pg_kvbackend
cargo-profile: ${{ inputs.cargo-profile }}
artifacts-dir: greptime-linux-${{ inputs.arch }}-centos-${{ inputs.version }}
version: ${{ inputs.version }}

View File

@@ -47,6 +47,7 @@ runs:
shell: pwsh
run: make test sqlness-test
env:
RUSTUP_WINDOWS_PATH_ADD_BIN: 1 # Workaround for https://github.com/nextest-rs/nextest/issues/1493
RUST_BACKTRACE: 1
SQLNESS_OPTS: "--preserve-state"

View File

@@ -64,11 +64,11 @@ inputs:
upload-max-retry-times:
description: Max retry times for uploading artifacts to S3
required: false
default: "30"
default: "20"
upload-retry-timeout:
description: Timeout for uploading artifacts to S3
required: false
default: "120" # minutes
default: "30" # minutes
runs:
using: composite
steps:

View File

@@ -8,15 +8,15 @@ inputs:
default: 2
description: "Number of Datanode replicas"
meta-replicas:
default: 2
default: 1
description: "Number of Metasrv replicas"
image-registry:
image-registry:
default: "docker.io"
description: "Image registry"
image-repository:
image-repository:
default: "greptime/greptimedb"
description: "Image repository"
image-tag:
image-tag:
default: "latest"
description: 'Image tag'
etcd-endpoints:
@@ -32,12 +32,12 @@ runs:
steps:
- name: Install GreptimeDB operator
uses: nick-fields/retry@v3
with:
with:
timeout_minutes: 3
max_attempts: 3
shell: bash
command: |
helm repo add greptime https://greptimeteam.github.io/helm-charts/
helm repo add greptime https://greptimeteam.github.io/helm-charts/
helm repo update
helm upgrade \
--install \
@@ -48,10 +48,10 @@ runs:
--wait-for-jobs
- name: Install GreptimeDB cluster
shell: bash
run: |
run: |
helm upgrade \
--install my-greptimedb \
--set meta.backendStorage.etcd.endpoints=${{ inputs.etcd-endpoints }} \
--set meta.etcdEndpoints=${{ inputs.etcd-endpoints }} \
--set meta.enableRegionFailover=${{ inputs.enable-region-failover }} \
--set image.registry=${{ inputs.image-registry }} \
--set image.repository=${{ inputs.image-repository }} \
@@ -59,7 +59,7 @@ runs:
--set base.podTemplate.main.resources.requests.cpu=50m \
--set base.podTemplate.main.resources.requests.memory=256Mi \
--set base.podTemplate.main.resources.limits.cpu=2000m \
--set base.podTemplate.main.resources.limits.memory=3Gi \
--set base.podTemplate.main.resources.limits.memory=2Gi \
--set frontend.replicas=${{ inputs.frontend-replicas }} \
--set datanode.replicas=${{ inputs.datanode-replicas }} \
--set meta.replicas=${{ inputs.meta-replicas }} \
@@ -72,7 +72,7 @@ runs:
- name: Wait for GreptimeDB
shell: bash
run: |
while true; do
while true; do
PHASE=$(kubectl -n my-greptimedb get gtc my-greptimedb -o jsonpath='{.status.clusterPhase}')
if [ "$PHASE" == "Running" ]; then
echo "Cluster is ready"
@@ -86,10 +86,10 @@ runs:
- name: Print GreptimeDB info
if: always()
shell: bash
run: |
run: |
kubectl get all --show-labels -n my-greptimedb
- name: Describe Nodes
if: always()
shell: bash
run: |
run: |
kubectl describe nodes

View File

@@ -2,14 +2,13 @@ meta:
configData: |-
[runtime]
global_rt_size = 4
[wal]
provider = "kafka"
broker_endpoints = ["kafka.kafka-cluster.svc.cluster.local:9092"]
num_topics = 3
auto_prune_interval = "30s"
trigger_flush_threshold = 100
[datanode]
[datanode.client]
timeout = "120s"
@@ -22,7 +21,7 @@ datanode:
[wal]
provider = "kafka"
broker_endpoints = ["kafka.kafka-cluster.svc.cluster.local:9092"]
overwrite_entry_start_id = true
linger = "2ms"
frontend:
configData: |-
[runtime]

View File

@@ -56,7 +56,7 @@ runs:
- name: Start EC2 runner
if: startsWith(inputs.runner, 'ec2')
uses: machulav/ec2-github-runner@v2.3.8
uses: machulav/ec2-github-runner@v2
id: start-linux-arm64-ec2-runner
with:
mode: start

View File

@@ -33,7 +33,7 @@ runs:
- name: Stop EC2 runner
if: ${{ inputs.label && inputs.ec2-instance-id }}
uses: machulav/ec2-github-runner@v2.3.8
uses: machulav/ec2-github-runner@v2
with:
mode: stop
label: ${{ inputs.label }}

15
.github/labeler.yaml vendored
View File

@@ -1,15 +0,0 @@
ci:
- changed-files:
- any-glob-to-any-file: .github/**
docker:
- changed-files:
- any-glob-to-any-file: docker/**
documentation:
- changed-files:
- any-glob-to-any-file: docs/**
dashboard:
- changed-files:
- any-glob-to-any-file: grafana/**

View File

@@ -1,42 +0,0 @@
#!/bin/bash
# Get current version
CURRENT_VERSION=$1
if [ -z "$CURRENT_VERSION" ]; then
echo "Error: Failed to get current version"
exit 1
fi
# Get the latest version from GitHub Releases
API_RESPONSE=$(curl -s "https://api.github.com/repos/GreptimeTeam/greptimedb/releases/latest")
if [ -z "$API_RESPONSE" ] || [ "$(echo "$API_RESPONSE" | jq -r '.message')" = "Not Found" ]; then
echo "Error: Failed to fetch latest version from GitHub"
exit 1
fi
# Get the latest version
LATEST_VERSION=$(echo "$API_RESPONSE" | jq -r '.tag_name')
if [ -z "$LATEST_VERSION" ] || [ "$LATEST_VERSION" = "null" ]; then
echo "Error: No valid version found in GitHub releases"
exit 1
fi
# Cleaned up version number format (removed possible 'v' prefix and -nightly suffix)
CLEAN_CURRENT=$(echo "$CURRENT_VERSION" | sed 's/^v//' | sed 's/-nightly-.*//')
CLEAN_LATEST=$(echo "$LATEST_VERSION" | sed 's/^v//' | sed 's/-nightly-.*//')
echo "Current version: $CLEAN_CURRENT"
echo "Latest release version: $CLEAN_LATEST"
# Use sort -V to compare versions
HIGHER_VERSION=$(printf "%s\n%s" "$CLEAN_CURRENT" "$CLEAN_LATEST" | sort -V | tail -n1)
if [ "$HIGHER_VERSION" = "$CLEAN_CURRENT" ]; then
echo "Current version ($CLEAN_CURRENT) is NEWER than or EQUAL to latest ($CLEAN_LATEST)"
echo "should-push-latest-tag=true" >> $GITHUB_OUTPUT
else
echo "Current version ($CLEAN_CURRENT) is OLDER than latest ($CLEAN_LATEST)"
echo "should-push-latest-tag=false" >> $GITHUB_OUTPUT
fi

View File

@@ -8,25 +8,24 @@ set -e
# - If it's a nightly build, the version is 'nightly-YYYYMMDD-$(git rev-parse --short HEAD)', like 'nightly-20230712-e5b243c'.
# create_version ${GIHUB_EVENT_NAME} ${NEXT_RELEASE_VERSION} ${NIGHTLY_RELEASE_PREFIX}
function create_version() {
# Read from environment variables.
# Read from envrionment variables.
if [ -z "$GITHUB_EVENT_NAME" ]; then
echo "GITHUB_EVENT_NAME is empty" >&2
echo "GITHUB_EVENT_NAME is empty"
exit 1
fi
if [ -z "$NEXT_RELEASE_VERSION" ]; then
echo "NEXT_RELEASE_VERSION is empty, use version from Cargo.toml" >&2
# NOTE: Need a `v` prefix for the version string.
export NEXT_RELEASE_VERSION=v$(grep '^version = ' Cargo.toml | cut -d '"' -f 2 | head -n 1)
echo "NEXT_RELEASE_VERSION is empty"
exit 1
fi
if [ -z "$NIGHTLY_RELEASE_PREFIX" ]; then
echo "NIGHTLY_RELEASE_PREFIX is empty" >&2
echo "NIGHTLY_RELEASE_PREFIX is empty"
exit 1
fi
# Reuse $NEXT_RELEASE_VERSION to identify whether it's a nightly build.
# It will be like 'nightly-20230808-7d0d8dc6'.
# It will be like 'nigtly-20230808-7d0d8dc6'.
if [ "$NEXT_RELEASE_VERSION" = nightly ]; then
echo "$NIGHTLY_RELEASE_PREFIX-$(date "+%Y%m%d")-$(git rev-parse --short HEAD)"
exit 0
@@ -36,7 +35,7 @@ function create_version() {
# It will be like 'dev-2023080819-f0e7216c'.
if [ "$NEXT_RELEASE_VERSION" = dev ]; then
if [ -z "$COMMIT_SHA" ]; then
echo "COMMIT_SHA is empty in dev build" >&2
echo "COMMIT_SHA is empty in dev build"
exit 1
fi
echo "dev-$(date "+%Y%m%d-%s")-$(echo "$COMMIT_SHA" | cut -c1-8)"
@@ -46,7 +45,7 @@ function create_version() {
# Note: Only output 'version=xxx' to stdout when everything is ok, so that it can be used in GitHub Actions Outputs.
if [ "$GITHUB_EVENT_NAME" = push ]; then
if [ -z "$GITHUB_REF_NAME" ]; then
echo "GITHUB_REF_NAME is empty in push event" >&2
echo "GITHUB_REF_NAME is empty in push event"
exit 1
fi
echo "$GITHUB_REF_NAME"
@@ -55,15 +54,15 @@ function create_version() {
elif [ "$GITHUB_EVENT_NAME" = schedule ]; then
echo "$NEXT_RELEASE_VERSION-$NIGHTLY_RELEASE_PREFIX-$(date "+%Y%m%d")"
else
echo "Unsupported GITHUB_EVENT_NAME: $GITHUB_EVENT_NAME" >&2
echo "Unsupported GITHUB_EVENT_NAME: $GITHUB_EVENT_NAME"
exit 1
fi
}
# You can run as following examples:
# GITHUB_EVENT_NAME=push NEXT_RELEASE_VERSION=v0.4.0 NIGHTLY_RELEASE_PREFIX=nightly GITHUB_REF_NAME=v0.3.0 ./create-version.sh
# GITHUB_EVENT_NAME=workflow_dispatch NEXT_RELEASE_VERSION=v0.4.0 NIGHTLY_RELEASE_PREFIX=nightly ./create-version.sh
# GITHUB_EVENT_NAME=schedule NEXT_RELEASE_VERSION=v0.4.0 NIGHTLY_RELEASE_PREFIX=nightly ./create-version.sh
# GITHUB_EVENT_NAME=schedule NEXT_RELEASE_VERSION=nightly NIGHTLY_RELEASE_PREFIX=nightly ./create-version.sh
# GITHUB_EVENT_NAME=workflow_dispatch COMMIT_SHA=f0e7216c4bb6acce9b29a21ec2d683be2e3f984a NEXT_RELEASE_VERSION=dev NIGHTLY_RELEASE_PREFIX=nightly ./create-version.sh
# GITHUB_EVENT_NAME=push NEXT_RELEASE_VERSION=v0.4.0 NIGHTLY_RELEASE_PREFIX=nigtly GITHUB_REF_NAME=v0.3.0 ./create-version.sh
# GITHUB_EVENT_NAME=workflow_dispatch NEXT_RELEASE_VERSION=v0.4.0 NIGHTLY_RELEASE_PREFIX=nigtly ./create-version.sh
# GITHUB_EVENT_NAME=schedule NEXT_RELEASE_VERSION=v0.4.0 NIGHTLY_RELEASE_PREFIX=nigtly ./create-version.sh
# GITHUB_EVENT_NAME=schedule NEXT_RELEASE_VERSION=nightly NIGHTLY_RELEASE_PREFIX=nigtly ./create-version.sh
# GITHUB_EVENT_NAME=workflow_dispatch COMMIT_SHA=f0e7216c4bb6acce9b29a21ec2d683be2e3f984a NEXT_RELEASE_VERSION=dev NIGHTLY_RELEASE_PREFIX=nigtly ./create-version.sh
create_version

View File

@@ -10,7 +10,7 @@ GREPTIMEDB_IMAGE_TAG=${GREPTIMEDB_IMAGE_TAG:-latest}
ETCD_CHART="oci://registry-1.docker.io/bitnamicharts/etcd"
GREPTIME_CHART="https://greptimeteam.github.io/helm-charts/"
# Create a cluster with 1 control-plane node and 5 workers.
# Ceate a cluster with 1 control-plane node and 5 workers.
function create_kind_cluster() {
cat <<EOF | kind create cluster --name "${CLUSTER}" --image kindest/node:"$KUBERNETES_VERSION" --config=-
kind: Cluster
@@ -68,7 +68,7 @@ function deploy_greptimedb_cluster() {
helm install "$cluster_name" greptime/greptimedb-cluster \
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
--set meta.etcdEndpoints="etcd.$install_namespace:2379" \
-n "$install_namespace"
# Wait for greptimedb cluster to be ready.
@@ -103,7 +103,7 @@ function deploy_greptimedb_cluster_with_s3_storage() {
helm install "$cluster_name" greptime/greptimedb-cluster -n "$install_namespace" \
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
--set meta.etcdEndpoints="etcd.$install_namespace:2379" \
--set storage.s3.bucket="$AWS_CI_TEST_BUCKET" \
--set storage.s3.region="$AWS_REGION" \
--set storage.s3.root="$DATA_ROOT" \

View File

@@ -1,37 +0,0 @@
#!/bin/bash
DEV_BUILDER_IMAGE_TAG=$1
update_dev_builder_version() {
if [ -z "$DEV_BUILDER_IMAGE_TAG" ]; then
echo "Error: Should specify the dev-builder image tag"
exit 1
fi
# Configure Git configs.
git config --global user.email greptimedb-ci@greptime.com
git config --global user.name greptimedb-ci
# Checkout a new branch.
BRANCH_NAME="ci/update-dev-builder-$(date +%Y%m%d%H%M%S)"
git checkout -b $BRANCH_NAME
# Update the dev-builder image tag in the Makefile.
sed -i "s/DEV_BUILDER_IMAGE_TAG ?=.*/DEV_BUILDER_IMAGE_TAG ?= ${DEV_BUILDER_IMAGE_TAG}/g" Makefile
# Commit the changes.
git add Makefile
git commit -m "ci: update dev-builder image tag"
git push origin $BRANCH_NAME
# Create a Pull Request.
gh pr create \
--title "ci: update dev-builder image tag" \
--body "This PR updates the dev-builder image tag" \
--base main \
--head $BRANCH_NAME \
--reviewer zyy17 \
--reviewer daviderli614
}
update_dev_builder_version

View File

@@ -1,46 +0,0 @@
#!/bin/bash
set -e
VERSION=${VERSION}
GITHUB_TOKEN=${GITHUB_TOKEN}
update_helm_charts_version() {
# Configure Git configs.
git config --global user.email update-helm-charts-version@greptime.com
git config --global user.name update-helm-charts-version
# Clone helm-charts repository.
git clone "https://x-access-token:${GITHUB_TOKEN}@github.com/GreptimeTeam/helm-charts.git"
cd helm-charts
# Set default remote for gh CLI
gh repo set-default GreptimeTeam/helm-charts
# Checkout a new branch.
BRANCH_NAME="chore/greptimedb-${VERSION}"
git checkout -b $BRANCH_NAME
# Update version.
make update-version CHART=greptimedb-cluster VERSION=${VERSION}
make update-version CHART=greptimedb-standalone VERSION=${VERSION}
# Update docs.
make docs
# Commit the changes.
git add .
git commit -s -m "chore: Update GreptimeDB version to ${VERSION}"
git push origin $BRANCH_NAME
# Create a Pull Request.
gh pr create \
--title "chore: Update GreptimeDB version to ${VERSION}" \
--body "This PR updates the GreptimeDB version." \
--base main \
--head $BRANCH_NAME \
--reviewer zyy17 \
--reviewer daviderli614
}
update_helm_charts_version

View File

@@ -1,42 +0,0 @@
#!/bin/bash
set -e
VERSION=${VERSION}
GITHUB_TOKEN=${GITHUB_TOKEN}
update_homebrew_greptime_version() {
# Configure Git configs.
git config --global user.email update-greptime-version@greptime.com
git config --global user.name update-greptime-version
# Clone helm-charts repository.
git clone "https://x-access-token:${GITHUB_TOKEN}@github.com/GreptimeTeam/homebrew-greptime.git"
cd homebrew-greptime
# Set default remote for gh CLI
gh repo set-default GreptimeTeam/homebrew-greptime
# Checkout a new branch.
BRANCH_NAME="chore/greptimedb-${VERSION}"
git checkout -b $BRANCH_NAME
# Update version.
make update-greptime-version VERSION=${VERSION}
# Commit the changes.
git add .
git commit -s -m "chore: Update GreptimeDB version to ${VERSION}"
git push origin $BRANCH_NAME
# Create a Pull Request.
gh pr create \
--title "chore: Update GreptimeDB version to ${VERSION}" \
--body "This PR updates the GreptimeDB version." \
--base main \
--head $BRANCH_NAME \
--reviewer zyy17 \
--reviewer daviderli614
}
update_homebrew_greptime_version

View File

@@ -41,7 +41,7 @@ function upload_artifacts() {
# Updates the latest version information in AWS S3 if UPDATE_VERSION_INFO is true.
function update_version_info() {
if [ "$UPDATE_VERSION_INFO" == "true" ]; then
# If it's the official release(like v1.0.0, v1.0.1, v1.0.2, etc.), update latest-version.txt.
# If it's the officail release(like v1.0.0, v1.0.1, v1.0.2, etc.), update latest-version.txt.
if [[ "$VERSION" =~ ^v[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
echo "Updating latest-version.txt"
echo "$VERSION" > latest-version.txt

View File

@@ -14,7 +14,7 @@ name: Build API docs
jobs:
apidoc:
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v4
with:

View File

@@ -16,11 +16,11 @@ on:
description: The runner uses to build linux-amd64 artifacts
default: ec2-c6i.4xlarge-amd64
options:
- ubuntu-22.04
- ubuntu-22.04-8-cores
- ubuntu-22.04-16-cores
- ubuntu-22.04-32-cores
- ubuntu-22.04-64-cores
- ubuntu-20.04
- ubuntu-20.04-8-cores
- ubuntu-20.04-16-cores
- ubuntu-20.04-32-cores
- ubuntu-20.04-64-cores
- ec2-c6i.xlarge-amd64 # 4C8G
- ec2-c6i.2xlarge-amd64 # 8C16G
- ec2-c6i.4xlarge-amd64 # 16C32G
@@ -55,11 +55,6 @@ on:
description: Build and push images to DockerHub and ACR
required: false
default: true
upload_artifacts_to_s3:
type: boolean
description: Whether upload artifacts to s3
required: false
default: false
cargo_profile:
type: choice
description: The cargo profile to use in building GreptimeDB.
@@ -88,7 +83,7 @@ jobs:
allocate-runners:
name: Allocate runners
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
outputs:
linux-amd64-runner: ${{ steps.start-linux-amd64-runner.outputs.label }}
linux-arm64-runner: ${{ steps.start-linux-arm64-runner.outputs.label }}
@@ -223,7 +218,7 @@ jobs:
build-linux-amd64-artifacts,
build-linux-arm64-artifacts,
]
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
outputs:
build-result: ${{ steps.set-build-result.outputs.build-result }}
steps:
@@ -244,13 +239,6 @@ jobs:
push-latest-tag: false # Don't push the latest tag to registry.
dev-mode: true # Only build the standard images.
- name: Echo Docker image tag to step summary
run: |
echo "## Docker Image Tag" >> $GITHUB_STEP_SUMMARY
echo "Image Tag: \`${{ needs.allocate-runners.outputs.version }}\`" >> $GITHUB_STEP_SUMMARY
echo "Full Image Name: \`docker.io/${{ vars.IMAGE_NAMESPACE }}/${{ vars.DEV_BUILD_IMAGE_NAME }}:${{ needs.allocate-runners.outputs.version }}\`" >> $GITHUB_STEP_SUMMARY
echo "Pull Command: \`docker pull docker.io/${{ vars.IMAGE_NAMESPACE }}/${{ vars.DEV_BUILD_IMAGE_NAME }}:${{ needs.allocate-runners.outputs.version }}\`" >> $GITHUB_STEP_SUMMARY
- name: Set build result
id: set-build-result
run: |
@@ -263,7 +251,7 @@ jobs:
allocate-runners,
release-images-to-dockerhub,
]
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
continue-on-error: true
steps:
- uses: actions/checkout@v4
@@ -286,7 +274,7 @@ jobs:
aws-cn-access-key-id: ${{ secrets.AWS_CN_ACCESS_KEY_ID }}
aws-cn-secret-access-key: ${{ secrets.AWS_CN_SECRET_ACCESS_KEY }}
aws-cn-region: ${{ vars.AWS_RELEASE_BUCKET_REGION }}
upload-to-s3: ${{ inputs.upload_artifacts_to_s3 }}
upload-to-s3: false
dev-mode: true # Only build the standard images(exclude centos images).
push-latest-tag: false # Don't push the latest tag to registry.
update-version-info: false # Don't update the version info in S3.
@@ -295,7 +283,7 @@ jobs:
name: Stop linux-amd64 runner
# Only run this job when the runner is allocated.
if: ${{ always() }}
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
needs: [
allocate-runners,
build-linux-amd64-artifacts,
@@ -321,7 +309,7 @@ jobs:
name: Stop linux-arm64 runner
# Only run this job when the runner is allocated.
if: ${{ always() }}
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
needs: [
allocate-runners,
build-linux-arm64-artifacts,
@@ -349,7 +337,7 @@ jobs:
needs: [
release-images-to-dockerhub
]
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
permissions:
issues: write

View File

@@ -22,9 +22,8 @@ concurrency:
jobs:
check-typos-and-docs:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Check typos and docs
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v4
with:
@@ -37,8 +36,7 @@ jobs:
|| (echo "'config/config.md' is not up-to-date, please run 'make config-docs'." && exit 1)
license-header-check:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
name: Check License Header
steps:
- uses: actions/checkout@v4
@@ -47,12 +45,11 @@ jobs:
- uses: korandoru/hawkeye@v5
check:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Check
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ ubuntu-latest ]
os: [ ubuntu-20.04 ]
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
@@ -74,9 +71,8 @@ jobs:
run: cargo check --locked --workspace --all-targets
toml:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Toml Check
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
@@ -89,12 +85,11 @@ jobs:
run: taplo format --check
build:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Build GreptimeDB binaries
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ ubuntu-latest ]
os: [ ubuntu-20.04 ]
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
@@ -132,7 +127,6 @@ jobs:
version: current
fuzztest:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Fuzz Test
needs: build
runs-on: ubuntu-latest
@@ -189,13 +183,11 @@ jobs:
max-total-time: 120
unstable-fuzztest:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Unstable Fuzz Test
needs: build-greptime-ci
runs-on: ubuntu-latest
timeout-minutes: 60
strategy:
fail-fast: false
matrix:
target: [ "unstable_fuzz_create_table_standalone" ]
steps:
@@ -223,12 +215,12 @@ jobs:
run: |
sudo apt update && sudo apt install -y libfuzzer-14-dev
cargo install cargo-fuzz cargo-gc-bin --force
- name: Download pre-built binary
- name: Download pre-built binariy
uses: actions/download-artifact@v4
with:
name: bin
path: .
- name: Unzip binary
- name: Unzip bianry
run: |
tar -xvf ./bin.tar.gz
rm ./bin.tar.gz
@@ -250,19 +242,13 @@ jobs:
name: unstable-fuzz-logs
path: /tmp/unstable-greptime/
retention-days: 3
- name: Describe pods
if: failure()
shell: bash
run: |
kubectl describe pod -n my-greptimedb
build-greptime-ci:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Build GreptimeDB binary (profile-CI)
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ ubuntu-latest ]
os: [ ubuntu-20.04 ]
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
@@ -281,7 +267,7 @@ jobs:
- name: Install cargo-gc-bin
shell: bash
run: cargo install cargo-gc-bin --force
- name: Build greptime binary
- name: Build greptime bianry
shell: bash
# `cargo gc` will invoke `cargo build` with specified args
run: cargo gc --profile ci -- --bin greptime --features pg_kvbackend
@@ -299,13 +285,11 @@ jobs:
version: current
distributed-fuzztest:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Fuzz Test (Distributed, ${{ matrix.mode.name }}, ${{ matrix.target }})
runs-on: ubuntu-latest
needs: build-greptime-ci
timeout-minutes: 60
strategy:
fail-fast: false
matrix:
target: [ "fuzz_create_table", "fuzz_alter_table", "fuzz_create_database", "fuzz_create_logical_table", "fuzz_alter_logical_table", "fuzz_insert", "fuzz_insert_logical_table" ]
mode:
@@ -335,9 +319,9 @@ jobs:
name: Setup Minio
uses: ./.github/actions/setup-minio
- if: matrix.mode.kafka
name: Setup Kafka cluster
name: Setup Kafka cluser
uses: ./.github/actions/setup-kafka-cluster
- name: Setup Etcd cluster
- name: Setup Etcd cluser
uses: ./.github/actions/setup-etcd-cluster
# Prepares for fuzz tests
- uses: arduino/setup-protoc@v3
@@ -410,11 +394,6 @@ jobs:
shell: bash
run: |
kubectl describe nodes
- name: Describe pod
if: failure()
shell: bash
run: |
kubectl describe pod -n my-greptimedb
- name: Export kind logs
if: failure()
shell: bash
@@ -437,13 +416,11 @@ jobs:
docker system prune -f
distributed-fuzztest-with-chaos:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Fuzz Test with Chaos (Distributed, ${{ matrix.mode.name }}, ${{ matrix.target }})
runs-on: ubuntu-latest
needs: build-greptime-ci
timeout-minutes: 60
strategy:
fail-fast: false
matrix:
target: ["fuzz_migrate_mito_regions", "fuzz_migrate_metric_regions", "fuzz_failover_mito_regions", "fuzz_failover_metric_regions"]
mode:
@@ -488,9 +465,9 @@ jobs:
name: Setup Minio
uses: ./.github/actions/setup-minio
- if: matrix.mode.kafka
name: Setup Kafka cluster
name: Setup Kafka cluser
uses: ./.github/actions/setup-kafka-cluster
- name: Setup Etcd cluster
- name: Setup Etcd cluser
uses: ./.github/actions/setup-etcd-cluster
# Prepares for fuzz tests
- uses: arduino/setup-protoc@v3
@@ -564,11 +541,6 @@ jobs:
shell: bash
run: |
kubectl describe nodes
- name: Describe pods
if: failure()
shell: bash
run: |
kubectl describe pod -n my-greptimedb
- name: Export kind logs
if: failure()
shell: bash
@@ -591,14 +563,12 @@ jobs:
docker system prune -f
sqlness:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Sqlness Test (${{ matrix.mode.name }})
needs: build
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ ubuntu-latest ]
os: [ ubuntu-20.04 ]
mode:
- name: "Basic"
opts: ""
@@ -606,7 +576,7 @@ jobs:
- name: "Remote WAL"
opts: "-w kafka -k 127.0.0.1:9092"
kafka: true
- name: "PostgreSQL KvBackend"
- name: "Pg Kvbackend"
opts: "--setup-pg"
kafka: false
timeout-minutes: 60
@@ -636,9 +606,8 @@ jobs:
retention-days: 3
fmt:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Rustfmt
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
@@ -654,9 +623,8 @@ jobs:
run: make fmt-check
clippy:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Clippy
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
@@ -680,7 +648,6 @@ jobs:
run: make clippy
conflict-check:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Check for conflict
runs-on: ubuntu-latest
steps:
@@ -691,7 +658,7 @@ jobs:
uses: olivernybroe/action-conflict-finder@v4.0
test:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' && github.event_name != 'merge_group' }}
if: github.event_name != 'merge_group'
runs-on: ubuntu-22.04-arm
timeout-minutes: 60
needs: [conflict-check, clippy, fmt]
@@ -706,7 +673,7 @@ jobs:
- name: Install toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
cache: false
cache: false
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
@@ -737,14 +704,13 @@ jobs:
GT_MINIO_ENDPOINT_URL: http://127.0.0.1:9000
GT_ETCD_ENDPOINTS: http://127.0.0.1:2379
GT_POSTGRES_ENDPOINTS: postgres://greptimedb:admin@127.0.0.1:5432/postgres
GT_MYSQL_ENDPOINTS: mysql://greptimedb:admin@127.0.0.1:3306/mysql
GT_KAFKA_ENDPOINTS: 127.0.0.1:9092
GT_KAFKA_SASL_ENDPOINTS: 127.0.0.1:9093
UNITTEST_LOG_DIR: "__unittest_logs"
coverage:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' && github.event_name == 'merge_group' }}
runs-on: ubuntu-22.04-8-cores
if: github.event_name == 'merge_group'
runs-on: ubuntu-20.04-8-cores
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
@@ -789,7 +755,6 @@ jobs:
GT_MINIO_ENDPOINT_URL: http://127.0.0.1:9000
GT_ETCD_ENDPOINTS: http://127.0.0.1:2379
GT_POSTGRES_ENDPOINTS: postgres://greptimedb:admin@127.0.0.1:5432/postgres
GT_MYSQL_ENDPOINTS: mysql://greptimedb:admin@127.0.0.1:3306/mysql
GT_KAFKA_ENDPOINTS: 127.0.0.1:9092
GT_KAFKA_SASL_ENDPOINTS: 127.0.0.1:9093
UNITTEST_LOG_DIR: "__unittest_logs"
@@ -803,10 +768,9 @@ jobs:
verbose: true
# compat:
# if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
# name: Compatibility Test
# needs: build
# runs-on: ubuntu-22.04
# runs-on: ubuntu-20.04
# timeout-minutes: 60
# steps:
# - uses: actions/checkout@v4

View File

@@ -9,7 +9,7 @@ concurrency:
jobs:
docbot:
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
permissions:
pull-requests: write
contents: read

View File

@@ -31,7 +31,7 @@ name: CI
jobs:
typos:
name: Spell Check with Typos
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v4
with:
@@ -39,7 +39,7 @@ jobs:
- uses: crate-ci/typos@master
license-header-check:
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
name: Check License Header
steps:
- uses: actions/checkout@v4
@@ -49,29 +49,29 @@ jobs:
check:
name: Check
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
steps:
- run: 'echo "No action required"'
fmt:
name: Rustfmt
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
steps:
- run: 'echo "No action required"'
clippy:
name: Clippy
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
steps:
- run: 'echo "No action required"'
coverage:
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
steps:
- run: 'echo "No action required"'
test:
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
steps:
- run: 'echo "No action required"'
@@ -80,7 +80,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ ubuntu-latest ]
os: [ ubuntu-20.04 ]
mode:
- name: "Basic"
- name: "Remote WAL"

View File

@@ -14,11 +14,11 @@ on:
description: The runner uses to build linux-amd64 artifacts
default: ec2-c6i.4xlarge-amd64
options:
- ubuntu-22.04
- ubuntu-22.04-8-cores
- ubuntu-22.04-16-cores
- ubuntu-22.04-32-cores
- ubuntu-22.04-64-cores
- ubuntu-20.04
- ubuntu-20.04-8-cores
- ubuntu-20.04-16-cores
- ubuntu-20.04-32-cores
- ubuntu-20.04-64-cores
- ec2-c6i.xlarge-amd64 # 4C8G
- ec2-c6i.2xlarge-amd64 # 8C16G
- ec2-c6i.4xlarge-amd64 # 16C32G
@@ -70,7 +70,7 @@ jobs:
allocate-runners:
name: Allocate runners
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
outputs:
linux-amd64-runner: ${{ steps.start-linux-amd64-runner.outputs.label }}
linux-arm64-runner: ${{ steps.start-linux-arm64-runner.outputs.label }}
@@ -182,7 +182,7 @@ jobs:
build-linux-amd64-artifacts,
build-linux-arm64-artifacts,
]
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
outputs:
nightly-build-result: ${{ steps.set-nightly-build-result.outputs.nightly-build-result }}
steps:
@@ -214,7 +214,7 @@ jobs:
allocate-runners,
release-images-to-dockerhub,
]
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
# When we push to ACR, it's easy to fail due to some unknown network issues.
# However, we don't want to fail the whole workflow because of this.
# The ACR have daily sync with DockerHub, so don't worry about the image not being updated.
@@ -249,7 +249,7 @@ jobs:
name: Stop linux-amd64 runner
# Only run this job when the runner is allocated.
if: ${{ always() }}
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
needs: [
allocate-runners,
build-linux-amd64-artifacts,
@@ -275,7 +275,7 @@ jobs:
name: Stop linux-arm64 runner
# Only run this job when the runner is allocated.
if: ${{ always() }}
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
needs: [
allocate-runners,
build-linux-arm64-artifacts,
@@ -303,7 +303,7 @@ jobs:
needs: [
release-images-to-dockerhub
]
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
permissions:
issues: write
env:

View File

@@ -13,7 +13,7 @@ jobs:
sqlness-test:
name: Run sqlness test
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
steps:
- name: Checkout
uses: actions/checkout@v4
@@ -107,6 +107,7 @@ jobs:
CARGO_BUILD_RUSTFLAGS: "-C linker=lld-link"
RUST_BACKTRACE: 1
CARGO_INCREMENTAL: 0
RUSTUP_WINDOWS_PATH_ADD_BIN: 1 # Workaround for https://github.com/nextest-rs/nextest/issues/1493
GT_S3_BUCKET: ${{ vars.AWS_CI_TEST_BUCKET }}
GT_S3_ACCESS_KEY_ID: ${{ secrets.AWS_CI_TEST_ACCESS_KEY_ID }}
GT_S3_ACCESS_KEY: ${{ secrets.AWS_CI_TEST_SECRET_ACCESS_KEY }}
@@ -117,22 +118,22 @@ jobs:
name: Run clean build on Linux
runs-on: ubuntu-latest
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
timeout-minutes: 45
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
persist-credentials: false
- uses: cachix/install-nix-action@v31
- run: nix develop --command cargo check --bin greptime
env:
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=mold"
- uses: cachix/install-nix-action@v27
with:
nix_path: nixpkgs=channel:nixos-24.11
- run: nix develop --command cargo build
check-status:
name: Check status
needs: [sqlness-test, sqlness-windows, test-on-windows]
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
outputs:
check-result: ${{ steps.set-check-result.outputs.check-result }}
steps:
@@ -145,7 +146,7 @@ jobs:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' && always() }} # Not requiring successful dependent jobs, always run.
name: Send notification to Greptime team
needs: [check-status]
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL_DEVELOP_CHANNEL }}
steps:

View File

@@ -1,42 +0,0 @@
name: 'PR Labeling'
on:
pull_request_target:
types:
- opened
- synchronize
- reopened
permissions:
contents: read
pull-requests: write
issues: write
jobs:
labeler:
runs-on: ubuntu-latest
steps:
- name: Checkout sources
uses: actions/checkout@v4
- uses: actions/labeler@v5
with:
configuration-path: ".github/labeler.yaml"
repo-token: "${{ secrets.GITHUB_TOKEN }}"
size-label:
runs-on: ubuntu-latest
steps:
- uses: pascalgn/size-label-action@v0.5.5
env:
GITHUB_TOKEN: "${{ secrets.GITHUB_TOKEN }}"
with:
sizes: >
{
"0": "XS",
"100": "S",
"300": "M",
"1000": "L",
"1500": "XL",
"2000": "XXL"
}

View File

@@ -24,20 +24,12 @@ on:
description: Release dev-builder-android image
required: false
default: false
update_dev_builder_image_tag:
type: boolean
description: Update the DEV_BUILDER_IMAGE_TAG in Makefile and create a PR
required: false
default: false
jobs:
release-dev-builder-images:
name: Release dev builder images
# The jobs are triggered by the following events:
# 1. Manually triggered workflow_dispatch event
# 2. Push event when the PR that modifies the `rust-toolchain.toml` or `docker/dev-builder/**` is merged to main
if: ${{ github.event_name == 'push' || inputs.release_dev_builder_ubuntu_image || inputs.release_dev_builder_centos_image || inputs.release_dev_builder_android_image }}
runs-on: ubuntu-latest
if: ${{ inputs.release_dev_builder_ubuntu_image || inputs.release_dev_builder_centos_image || inputs.release_dev_builder_android_image }} # Only manually trigger this job.
runs-on: ubuntu-20.04-16-cores
outputs:
version: ${{ steps.set-version.outputs.version }}
steps:
@@ -65,13 +57,13 @@ jobs:
version: ${{ env.VERSION }}
dockerhub-image-registry-username: ${{ secrets.DOCKERHUB_USERNAME }}
dockerhub-image-registry-token: ${{ secrets.DOCKERHUB_TOKEN }}
build-dev-builder-ubuntu: ${{ inputs.release_dev_builder_ubuntu_image || github.event_name == 'push' }}
build-dev-builder-centos: ${{ inputs.release_dev_builder_centos_image || github.event_name == 'push' }}
build-dev-builder-android: ${{ inputs.release_dev_builder_android_image || github.event_name == 'push' }}
build-dev-builder-ubuntu: ${{ inputs.release_dev_builder_ubuntu_image }}
build-dev-builder-centos: ${{ inputs.release_dev_builder_centos_image }}
build-dev-builder-android: ${{ inputs.release_dev_builder_android_image }}
release-dev-builder-images-ecr:
name: Release dev builder images to AWS ECR
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
needs: [
release-dev-builder-images
]
@@ -93,7 +85,7 @@ jobs:
- name: Push dev-builder-ubuntu image
shell: bash
if: ${{ inputs.release_dev_builder_ubuntu_image || github.event_name == 'push' }}
if: ${{ inputs.release_dev_builder_ubuntu_image }}
env:
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
@@ -114,7 +106,7 @@ jobs:
- name: Push dev-builder-centos image
shell: bash
if: ${{ inputs.release_dev_builder_centos_image || github.event_name == 'push' }}
if: ${{ inputs.release_dev_builder_centos_image }}
env:
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
@@ -135,7 +127,7 @@ jobs:
- name: Push dev-builder-android image
shell: bash
if: ${{ inputs.release_dev_builder_android_image || github.event_name == 'push' }}
if: ${{ inputs.release_dev_builder_android_image }}
env:
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
@@ -156,7 +148,7 @@ jobs:
release-dev-builder-images-cn: # Note: Be careful issue: https://github.com/containers/skopeo/issues/1874 and we decide to use the latest stable skopeo container.
name: Release dev builder images to CN region
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
needs: [
release-dev-builder-images
]
@@ -170,7 +162,7 @@ jobs:
- name: Push dev-builder-ubuntu image
shell: bash
if: ${{ inputs.release_dev_builder_ubuntu_image || github.event_name == 'push' }}
if: ${{ inputs.release_dev_builder_ubuntu_image }}
env:
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
@@ -184,7 +176,7 @@ jobs:
- name: Push dev-builder-centos image
shell: bash
if: ${{ inputs.release_dev_builder_centos_image || github.event_name == 'push' }}
if: ${{ inputs.release_dev_builder_centos_image }}
env:
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
@@ -198,7 +190,7 @@ jobs:
- name: Push dev-builder-android image
shell: bash
if: ${{ inputs.release_dev_builder_android_image || github.event_name == 'push' }}
if: ${{ inputs.release_dev_builder_android_image }}
env:
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
@@ -209,24 +201,3 @@ jobs:
quay.io/skopeo/stable:latest \
copy -a docker://docker.io/$IMAGE_NAMESPACE/dev-builder-android:$IMAGE_VERSION \
docker://$ACR_IMAGE_REGISTRY/$IMAGE_NAMESPACE/dev-builder-android:$IMAGE_VERSION
update-dev-builder-image-tag:
name: Update dev-builder image tag
runs-on: ubuntu-latest
permissions:
contents: write
pull-requests: write
if: ${{ github.event_name == 'push' || inputs.update_dev_builder_image_tag }}
needs: [
release-dev-builder-images
]
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Update dev-builder image tag
shell: bash
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
./.github/scripts/update-dev-builder-version.sh ${{ needs.release-dev-builder-images.outputs.version }}

View File

@@ -18,11 +18,11 @@ on:
description: The runner uses to build linux-amd64 artifacts
default: ec2-c6i.4xlarge-amd64
options:
- ubuntu-22.04
- ubuntu-22.04-8-cores
- ubuntu-22.04-16-cores
- ubuntu-22.04-32-cores
- ubuntu-22.04-64-cores
- ubuntu-20.04
- ubuntu-20.04-8-cores
- ubuntu-20.04-16-cores
- ubuntu-20.04-32-cores
- ubuntu-20.04-64-cores
- ec2-c6i.xlarge-amd64 # 4C8G
- ec2-c6i.2xlarge-amd64 # 8C16G
- ec2-c6i.4xlarge-amd64 # 16C32G
@@ -88,14 +88,16 @@ env:
# Controls whether to run tests, include unit-test, integration-test and sqlness.
DISABLE_RUN_TESTS: ${{ inputs.skip_test || vars.DEFAULT_SKIP_TEST }}
# The scheduled version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-YYYYMMDD', like v0.2.0-nightly-20230313;
# The scheduled version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-YYYYMMDD', like v0.2.0-nigthly-20230313;
NIGHTLY_RELEASE_PREFIX: nightly
# Note: The NEXT_RELEASE_VERSION should be modified manually by every formal release.
NEXT_RELEASE_VERSION: v0.13.0
jobs:
allocate-runners:
name: Allocate runners
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
outputs:
linux-amd64-runner: ${{ steps.start-linux-amd64-runner.outputs.label }}
linux-arm64-runner: ${{ steps.start-linux-arm64-runner.outputs.label }}
@@ -110,8 +112,6 @@ jobs:
# The 'version' use as the global tag name of the release workflow.
version: ${{ steps.create-version.outputs.version }}
should-push-latest-tag: ${{ steps.check-version.outputs.should-push-latest-tag }}
steps:
- name: Checkout
uses: actions/checkout@v4
@@ -126,7 +126,7 @@ jobs:
# The create-version will create a global variable named 'version' in the global workflows.
# - If it's a tag push release, the version is the tag name(${{ github.ref_name }});
# - If it's a scheduled release, the version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-$buildTime', like v0.2.0-nightly-20230313;
# - If it's a scheduled release, the version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-$buildTime', like v0.2.0-nigthly-20230313;
# - If it's a manual release, the version is '${{ env.NEXT_RELEASE_VERSION }}-<short-git-sha>-YYYYMMDDSS', like v0.2.0-e5b243c-2023071245;
- name: Create version
id: create-version
@@ -135,13 +135,9 @@ jobs:
env:
GITHUB_EVENT_NAME: ${{ github.event_name }}
GITHUB_REF_NAME: ${{ github.ref_name }}
NEXT_RELEASE_VERSION: ${{ env.NEXT_RELEASE_VERSION }}
NIGHTLY_RELEASE_PREFIX: ${{ env.NIGHTLY_RELEASE_PREFIX }}
- name: Check version
id: check-version
run: |
./.github/scripts/check-version.sh "${{ steps.create-version.outputs.version }}"
- name: Allocate linux-amd64 runner
if: ${{ inputs.build_linux_amd64_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
uses: ./.github/actions/start-runner
@@ -303,7 +299,7 @@ jobs:
build-linux-amd64-artifacts,
build-linux-arm64-artifacts,
]
runs-on: ubuntu-latest
runs-on: ubuntu-2004-16-cores
outputs:
build-image-result: ${{ steps.set-build-image-result.outputs.build-image-result }}
steps:
@@ -321,7 +317,7 @@ jobs:
image-registry-username: ${{ secrets.DOCKERHUB_USERNAME }}
image-registry-password: ${{ secrets.DOCKERHUB_TOKEN }}
version: ${{ needs.allocate-runners.outputs.version }}
push-latest-tag: ${{ needs.allocate-runners.outputs.should-push-latest-tag == 'true' && github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
push-latest-tag: true
- name: Set build image result
id: set-build-image-result
@@ -339,7 +335,7 @@ jobs:
build-windows-artifacts,
release-images-to-dockerhub,
]
runs-on: ubuntu-latest-16-cores
runs-on: ubuntu-20.04
# When we push to ACR, it's easy to fail due to some unknown network issues.
# However, we don't want to fail the whole workflow because of this.
# The ACR have daily sync with DockerHub, so don't worry about the image not being updated.
@@ -368,7 +364,7 @@ jobs:
dev-mode: false
upload-to-s3: true
update-version-info: true
push-latest-tag: ${{ needs.allocate-runners.outputs.should-push-latest-tag == 'true' && github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
push-latest-tag: true
publish-github-release:
name: Create GitHub release and upload artifacts
@@ -381,7 +377,7 @@ jobs:
build-windows-artifacts,
release-images-to-dockerhub,
]
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v4
with:
@@ -395,12 +391,12 @@ jobs:
### Stop runners ###
# It's very necessary to split the job of releasing runners into 'stop-linux-amd64-runner' and 'stop-linux-arm64-runner'.
# Because we can terminate the specified EC2 instance immediately after the job is finished without unnecessary waiting.
# Because we can terminate the specified EC2 instance immediately after the job is finished without uncessary waiting.
stop-linux-amd64-runner: # It's always run as the last job in the workflow to make sure that the runner is released.
name: Stop linux-amd64 runner
# Only run this job when the runner is allocated.
if: ${{ always() }}
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
needs: [
allocate-runners,
build-linux-amd64-artifacts,
@@ -426,7 +422,7 @@ jobs:
name: Stop linux-arm64 runner
# Only run this job when the runner is allocated.
if: ${{ always() }}
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
needs: [
allocate-runners,
build-linux-arm64-artifacts,
@@ -448,11 +444,11 @@ jobs:
aws-region: ${{ vars.EC2_RUNNER_REGION }}
github-token: ${{ secrets.GH_PERSONAL_ACCESS_TOKEN }}
bump-downstream-repo-versions:
name: Bump downstream repo versions
bump-doc-version:
name: Bump doc version
if: ${{ github.event_name == 'push' || github.event_name == 'schedule' }}
needs: [allocate-runners, publish-github-release]
runs-on: ubuntu-latest
needs: [allocate-runners]
runs-on: ubuntu-20.04
# Permission reference: https://docs.github.com/en/actions/using-jobs/assigning-permissions-to-jobs
permissions:
issues: write # Allows the action to create issues for cyborg.
@@ -463,58 +459,13 @@ jobs:
fetch-depth: 0
persist-credentials: false
- uses: ./.github/actions/setup-cyborg
- name: Bump downstream repo versions
- name: Bump doc version
working-directory: cyborg
run: pnpm tsx bin/bump-versions.ts
run: pnpm tsx bin/bump-doc-version.ts
env:
TARGET_REPOS: website,docs,demo
VERSION: ${{ needs.allocate-runners.outputs.version }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
WEBSITE_REPO_TOKEN: ${{ secrets.WEBSITE_REPO_TOKEN }}
DOCS_REPO_TOKEN: ${{ secrets.DOCS_REPO_TOKEN }}
DEMO_REPO_TOKEN: ${{ secrets.DEMO_REPO_TOKEN }}
bump-helm-charts-version:
name: Bump helm charts version
if: ${{ github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
needs: [allocate-runners, publish-github-release]
runs-on: ubuntu-latest
permissions:
contents: write
pull-requests: write
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Bump helm charts version
env:
GITHUB_TOKEN: ${{ secrets.HELM_CHARTS_REPO_TOKEN }}
VERSION: ${{ needs.allocate-runners.outputs.version }}
run: |
./.github/scripts/update-helm-charts-version.sh
bump-homebrew-greptime-version:
name: Bump homebrew greptime version
if: ${{ github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
needs: [allocate-runners, publish-github-release]
runs-on: ubuntu-latest
permissions:
contents: write
pull-requests: write
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Bump homebrew greptime version
env:
GITHUB_TOKEN: ${{ secrets.HOMEBREW_GREPTIME_REPO_TOKEN }}
VERSION: ${{ needs.allocate-runners.outputs.version }}
run: |
./.github/scripts/update-homebrew-greptme-version.sh
notification:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' && (github.event_name == 'push' || github.event_name == 'schedule') && always() }}
@@ -524,7 +475,7 @@ jobs:
build-macos-artifacts,
build-windows-artifacts,
]
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
# Permission reference: https://docs.github.com/en/actions/using-jobs/assigning-permissions-to-jobs
permissions:
issues: write # Allows the action to create issues for cyborg.

View File

@@ -11,17 +11,14 @@ concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true
permissions:
issues: write
contents: write
pull-requests: write
jobs:
check:
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
timeout-minutes: 10
steps:
- uses: actions/checkout@v4
with:
persist-credentials: false
- uses: ./.github/actions/setup-cyborg
- name: Check Pull Request
working-directory: cyborg

7
.gitignore vendored
View File

@@ -54,10 +54,3 @@ tests-fuzz/corpus/
# Nix
.direnv
.envrc
## default data home
greptimedb_data
# github
!/.github

156
Cargo.lock generated
View File

@@ -185,7 +185,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
[[package]]
name = "api"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"common-base",
"common-decimal",
@@ -710,7 +710,7 @@ dependencies = [
[[package]]
name = "auth"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"api",
"async-trait",
@@ -1324,7 +1324,7 @@ dependencies = [
[[package]]
name = "cache"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"catalog",
"common-error",
@@ -1348,7 +1348,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"api",
"arrow",
@@ -1661,7 +1661,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
[[package]]
name = "cli"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"async-trait",
"auth",
@@ -1703,7 +1703,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.12.2",
"substrait 0.13.0",
"table",
"tempfile",
"tokio",
@@ -1712,7 +1712,7 @@ dependencies = [
[[package]]
name = "client"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"api",
"arc-swap",
@@ -1739,7 +1739,7 @@ dependencies = [
"rand",
"serde_json",
"snafu 0.8.5",
"substrait 0.12.2",
"substrait 0.13.0",
"substrait 0.37.3",
"tokio",
"tokio-stream",
@@ -1780,7 +1780,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"async-trait",
"auth",
@@ -1791,7 +1791,6 @@ dependencies = [
"clap 4.5.19",
"cli",
"client",
"colored",
"common-base",
"common-catalog",
"common-config",
@@ -1826,10 +1825,7 @@ dependencies = [
"mito2",
"moka",
"nu-ansi-term",
"object-store",
"parquet",
"plugins",
"pprof",
"prometheus",
"prost 0.13.3",
"query",
@@ -1845,7 +1841,7 @@ dependencies = [
"similar-asserts",
"snafu 0.8.5",
"store-api",
"substrait 0.12.2",
"substrait 0.13.0",
"table",
"temp-env",
"tempfile",
@@ -1862,16 +1858,6 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0"
[[package]]
name = "colored"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c"
dependencies = [
"lazy_static",
"windows-sys 0.59.0",
]
[[package]]
name = "combine"
version = "4.6.7"
@@ -1901,7 +1887,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"anymap2",
"async-trait",
@@ -1923,11 +1909,11 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "0.12.2"
version = "0.13.0"
[[package]]
name = "common-config"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"common-base",
"common-error",
@@ -1952,7 +1938,7 @@ dependencies = [
[[package]]
name = "common-datasource"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"arrow",
"arrow-schema",
@@ -1988,7 +1974,7 @@ dependencies = [
[[package]]
name = "common-decimal"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"bigdecimal 0.4.5",
"common-error",
@@ -2001,7 +1987,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"http 1.1.0",
"snafu 0.8.5",
@@ -2011,7 +1997,7 @@ dependencies = [
[[package]]
name = "common-frontend"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"async-trait",
"common-error",
@@ -2021,7 +2007,7 @@ dependencies = [
[[package]]
name = "common-function"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -2069,7 +2055,7 @@ dependencies = [
[[package]]
name = "common-greptimedb-telemetry"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"async-trait",
"common-runtime",
@@ -2086,7 +2072,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"api",
"arrow-flight",
@@ -2114,7 +2100,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"api",
"common-base",
@@ -2133,7 +2119,7 @@ dependencies = [
[[package]]
name = "common-macro"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"arc-swap",
"common-query",
@@ -2147,7 +2133,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"common-error",
"common-macro",
@@ -2160,7 +2146,7 @@ dependencies = [
[[package]]
name = "common-meta"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"anymap2",
"api",
@@ -2220,7 +2206,7 @@ dependencies = [
[[package]]
name = "common-options"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"common-grpc",
"humantime-serde",
@@ -2229,11 +2215,11 @@ dependencies = [
[[package]]
name = "common-plugins"
version = "0.12.2"
version = "0.13.0"
[[package]]
name = "common-pprof"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"common-error",
"common-macro",
@@ -2245,7 +2231,7 @@ dependencies = [
[[package]]
name = "common-procedure"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"async-stream",
"async-trait",
@@ -2272,7 +2258,7 @@ dependencies = [
[[package]]
name = "common-procedure-test"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"async-trait",
"common-procedure",
@@ -2280,7 +2266,7 @@ dependencies = [
[[package]]
name = "common-query"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"api",
"async-trait",
@@ -2306,7 +2292,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"arc-swap",
"common-error",
@@ -2325,7 +2311,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -2355,7 +2341,7 @@ dependencies = [
[[package]]
name = "common-telemetry"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"atty",
"backtrace",
@@ -2383,7 +2369,7 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"client",
"common-query",
@@ -2395,7 +2381,7 @@ dependencies = [
[[package]]
name = "common-time"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"arrow",
"chrono",
@@ -2413,7 +2399,7 @@ dependencies = [
[[package]]
name = "common-version"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"build-data",
"const_format",
@@ -2423,7 +2409,7 @@ dependencies = [
[[package]]
name = "common-wal"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"common-base",
"common-error",
@@ -3354,7 +3340,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"api",
"arrow-flight",
@@ -3406,7 +3392,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.12.2",
"substrait 0.13.0",
"table",
"tokio",
"toml 0.8.19",
@@ -3415,7 +3401,7 @@ dependencies = [
[[package]]
name = "datatypes"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"arrow",
"arrow-array",
@@ -4059,7 +4045,7 @@ dependencies = [
[[package]]
name = "file-engine"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"api",
"async-trait",
@@ -4169,7 +4155,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
[[package]]
name = "flow"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"api",
"arrow",
@@ -4230,7 +4216,7 @@ dependencies = [
"snafu 0.8.5",
"store-api",
"strum 0.25.0",
"substrait 0.12.2",
"substrait 0.13.0",
"table",
"tokio",
"tonic 0.12.3",
@@ -4285,7 +4271,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]]
name = "frontend"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"api",
"arc-swap",
@@ -5553,7 +5539,7 @@ dependencies = [
[[package]]
name = "index"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"async-trait",
"asynchronous-codec",
@@ -6345,7 +6331,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "log-query"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"chrono",
"common-error",
@@ -6357,7 +6343,7 @@ dependencies = [
[[package]]
name = "log-store"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"async-stream",
"async-trait",
@@ -6650,7 +6636,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"api",
"async-trait",
@@ -6677,7 +6663,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"api",
"async-trait",
@@ -6763,7 +6749,7 @@ dependencies = [
[[package]]
name = "metric-engine"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"api",
"aquamarine",
@@ -6861,7 +6847,7 @@ dependencies = [
[[package]]
name = "mito2"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"api",
"aquamarine",
@@ -7558,7 +7544,7 @@ dependencies = [
[[package]]
name = "object-store"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"anyhow",
"bytes",
@@ -7807,7 +7793,7 @@ dependencies = [
[[package]]
name = "operator"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -7855,7 +7841,7 @@ dependencies = [
"sql",
"sqlparser 0.52.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=71dd86058d2af97b9925093d40c4e03360403170)",
"store-api",
"substrait 0.12.2",
"substrait 0.13.0",
"table",
"tokio",
"tokio-util",
@@ -8092,7 +8078,7 @@ dependencies = [
[[package]]
name = "partition"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"api",
"async-trait",
@@ -8360,7 +8346,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipeline"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -8500,7 +8486,7 @@ dependencies = [
[[package]]
name = "plugins"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"auth",
"clap 4.5.19",
@@ -8762,7 +8748,7 @@ dependencies = [
[[package]]
name = "promql"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"ahash 0.8.11",
"async-trait",
@@ -9007,7 +8993,7 @@ dependencies = [
[[package]]
name = "puffin"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"async-compression 0.4.13",
"async-trait",
@@ -9048,7 +9034,7 @@ dependencies = [
[[package]]
name = "query"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -9113,7 +9099,7 @@ dependencies = [
"sqlparser 0.52.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=71dd86058d2af97b9925093d40c4e03360403170)",
"statrs",
"store-api",
"substrait 0.12.2",
"substrait 0.13.0",
"table",
"tokio",
"tokio-stream",
@@ -10458,7 +10444,7 @@ dependencies = [
[[package]]
name = "servers"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -10575,7 +10561,7 @@ dependencies = [
[[package]]
name = "session"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"api",
"arc-swap",
@@ -10884,7 +10870,7 @@ dependencies = [
[[package]]
name = "sql"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"api",
"chrono",
@@ -10938,7 +10924,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -11255,7 +11241,7 @@ dependencies = [
[[package]]
name = "store-api"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"api",
"aquamarine",
@@ -11385,7 +11371,7 @@ dependencies = [
[[package]]
name = "substrait"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"async-trait",
"bytes",
@@ -11566,7 +11552,7 @@ dependencies = [
[[package]]
name = "table"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"api",
"async-trait",
@@ -11817,7 +11803,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]]
name = "tests-fuzz"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"arbitrary",
"async-trait",
@@ -11861,7 +11847,7 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "0.12.2"
version = "0.13.0"
dependencies = [
"api",
"arrow-flight",
@@ -11927,7 +11913,7 @@ dependencies = [
"sql",
"sqlx",
"store-api",
"substrait 0.12.2",
"substrait 0.13.0",
"table",
"tempfile",
"time",

View File

@@ -67,7 +67,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.12.2"
version = "0.13.0"
edition = "2021"
license = "Apache-2.0"

View File

@@ -231,7 +231,6 @@ overwrite_entry_start_id = false
# secret_access_key = "123456"
# endpoint = "https://s3.amazonaws.com"
# region = "us-west-2"
# enable_virtual_host_style = false
# Example of using Oss as the storage.
# [storage]

View File

@@ -318,7 +318,6 @@ retry_delay = "500ms"
# secret_access_key = "123456"
# endpoint = "https://s3.amazonaws.com"
# region = "us-west-2"
# enable_virtual_host_style = false
# Example of using Oss as the storage.
# [storage]

View File

@@ -1,156 +0,0 @@
/*
* Copyright 2023 Greptime Team
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import * as core from "@actions/core";
import {obtainClient} from "@/common";
interface RepoConfig {
tokenEnv: string;
repo: string;
workflowLogic: (version: string) => [string, string] | null;
}
const REPO_CONFIGS: Record<string, RepoConfig> = {
website: {
tokenEnv: "WEBSITE_REPO_TOKEN",
repo: "website",
workflowLogic: (version: string) => {
// Skip nightly versions for website
if (version.includes('nightly')) {
console.log('Nightly version detected for website, skipping workflow trigger.');
return null;
}
return ['bump-patch-version.yml', version];
}
},
demo: {
tokenEnv: "DEMO_REPO_TOKEN",
repo: "demo-scene",
workflowLogic: (version: string) => {
// Skip nightly versions for demo
if (version.includes('nightly')) {
console.log('Nightly version detected for demo, skipping workflow trigger.');
return null;
}
return ['bump-patch-version.yml', version];
}
},
docs: {
tokenEnv: "DOCS_REPO_TOKEN",
repo: "docs",
workflowLogic: (version: string) => {
// Check if it's a nightly version
if (version.includes('nightly')) {
return ['bump-nightly-version.yml', version];
}
const parts = version.split('.');
if (parts.length !== 3) {
throw new Error('Invalid version format');
}
// If patch version (last number) is 0, it's a major version
// Return only major.minor version
if (parts[2] === '0') {
return ['bump-version.yml', `${parts[0]}.${parts[1]}`];
}
// Otherwise it's a patch version, use full version
return ['bump-patch-version.yml', version];
}
}
};
async function triggerWorkflow(repoConfig: RepoConfig, workflowId: string, version: string) {
const client = obtainClient(repoConfig.tokenEnv);
try {
await client.rest.actions.createWorkflowDispatch({
owner: "GreptimeTeam",
repo: repoConfig.repo,
workflow_id: workflowId,
ref: "main",
inputs: {
version,
},
});
console.log(`Successfully triggered ${workflowId} workflow for ${repoConfig.repo} with version ${version}`);
} catch (error) {
core.setFailed(`Failed to trigger workflow for ${repoConfig.repo}: ${error.message}`);
throw error;
}
}
async function processRepo(repoName: string, version: string) {
const repoConfig = REPO_CONFIGS[repoName];
if (!repoConfig) {
throw new Error(`Unknown repository: ${repoName}`);
}
try {
const workflowResult = repoConfig.workflowLogic(version);
if (workflowResult === null) {
// Skip this repo (e.g., nightly version for website)
return;
}
const [workflowId, apiVersion] = workflowResult;
await triggerWorkflow(repoConfig, workflowId, apiVersion);
} catch (error) {
core.setFailed(`Error processing ${repoName} with version ${version}: ${error.message}`);
throw error;
}
}
async function main() {
const version = process.env.VERSION;
if (!version) {
core.setFailed("VERSION environment variable is required");
process.exit(1);
}
// Remove 'v' prefix if exists
const cleanVersion = version.startsWith('v') ? version.slice(1) : version;
// Get target repositories from environment variable
// Default to both if not specified
const targetRepos = process.env.TARGET_REPOS?.split(',').map(repo => repo.trim()) || ['website', 'docs'];
console.log(`Processing version ${cleanVersion} for repositories: ${targetRepos.join(', ')}`);
const errors: string[] = [];
// Process each repository
for (const repo of targetRepos) {
try {
await processRepo(repo, cleanVersion);
} catch (error) {
errors.push(`${repo}: ${error.message}`);
}
}
if (errors.length > 0) {
core.setFailed(`Failed to process some repositories: ${errors.join('; ')}`);
process.exit(1);
}
console.log('All repositories processed successfully');
}
// Execute main function
main().catch((error) => {
core.setFailed(`Unexpected error: ${error.message}`);
process.exit(1);
});

View File

@@ -55,25 +55,12 @@ async function main() {
await client.rest.issues.addLabels({
owner, repo, issue_number: number, labels: [labelDocsRequired],
})
// Get available assignees for the docs repo
const assigneesResponse = await docsClient.rest.issues.listAssignees({
owner: 'GreptimeTeam',
repo: 'docs',
})
const validAssignees = assigneesResponse.data.map(assignee => assignee.login)
core.info(`Available assignees: ${validAssignees.join(', ')}`)
// Check if the actor is a valid assignee, otherwise fallback to fengjiachun
const assignee = validAssignees.includes(actor) ? actor : 'fengjiachun'
core.info(`Assigning issue to: ${assignee}`)
await docsClient.rest.issues.create({
owner: 'GreptimeTeam',
repo: 'docs',
title: `Update docs for ${title}`,
body: `A document change request is generated from ${html_url}`,
assignee: assignee,
assignee: actor,
}).then((res) => {
core.info(`Created issue ${res.data}`)
})

View File

@@ -0,0 +1,40 @@
# TSBS benchmark - v0.12.0
## Environment
### Amazon EC2
| | |
|---------|-------------------------|
| Machine | c5d.2xlarge |
| CPU | 8 core |
| Memory | 16GB |
| Disk | 100GB (GP3) |
| OS | Ubuntu Server 24.04 LTS |
## Write performance
| Environment | Ingest rate (rows/s) |
|-----------------|----------------------|
| EC2 c5d.2xlarge | 326839.28 |
## Query performance
| Query type | EC2 c5d.2xlarge (ms) |
|-----------------------|----------------------|
| cpu-max-all-1 | 12.46 |
| cpu-max-all-8 | 24.20 |
| double-groupby-1 | 673.08 |
| double-groupby-5 | 963.99 |
| double-groupby-all | 1330.05 |
| groupby-orderby-limit | 952.46 |
| high-cpu-1 | 5.08 |
| high-cpu-all | 4638.57 |
| lastpoint | 591.02 |
| single-groupby-1-1-1 | 4.06 |
| single-groupby-1-1-12 | 4.73 |
| single-groupby-1-8-1 | 8.23 |
| single-groupby-5-1-1 | 4.61 |
| single-groupby-5-1-12 | 5.61 |
| single-groupby-5-8-1 | 9.74 |

View File

@@ -53,54 +53,6 @@ get_arch_type() {
esac
}
# Verify SHA256 checksum
verify_sha256() {
file="$1"
expected_sha256="$2"
if command -v sha256sum >/dev/null 2>&1; then
actual_sha256=$(sha256sum "$file" | cut -d' ' -f1)
elif command -v shasum >/dev/null 2>&1; then
actual_sha256=$(shasum -a 256 "$file" | cut -d' ' -f1)
else
echo "Warning: No SHA256 verification tool found (sha256sum or shasum). Skipping checksum verification."
return 0
fi
if [ "$actual_sha256" = "$expected_sha256" ]; then
echo "SHA256 checksum verified successfully."
return 0
else
echo "Error: SHA256 checksum verification failed!"
echo "Expected: $expected_sha256"
echo "Actual: $actual_sha256"
return 1
fi
}
# Prompt for user confirmation (compatible with different shells)
prompt_confirmation() {
message="$1"
printf "%s (y/N): " "$message"
# Try to read user input, fallback if read fails
answer=""
if read answer </dev/tty 2>/dev/null; then
case "$answer" in
[Yy]|[Yy][Ee][Ss])
return 0
;;
*)
return 1
;;
esac
else
echo ""
echo "Cannot read user input. Defaulting to No."
return 1
fi
}
download_artifact() {
if [ -n "${OS_TYPE}" ] && [ -n "${ARCH_TYPE}" ]; then
# Use the latest stable released version.
@@ -119,104 +71,17 @@ download_artifact() {
fi
echo "Downloading ${BIN}, OS: ${OS_TYPE}, Arch: ${ARCH_TYPE}, Version: ${VERSION}"
PKG_NAME="${BIN}-${OS_TYPE}-${ARCH_TYPE}-${VERSION}"
PACKAGE_NAME="${PKG_NAME}.tar.gz"
SHA256_FILE="${PKG_NAME}.sha256sum"
PACKAGE_NAME="${BIN}-${OS_TYPE}-${ARCH_TYPE}-${VERSION}.tar.gz"
if [ -n "${PACKAGE_NAME}" ]; then
# Check if files already exist and prompt for override
if [ -f "${PACKAGE_NAME}" ]; then
echo "File ${PACKAGE_NAME} already exists."
if prompt_confirmation "Do you want to override it?"; then
echo "Overriding existing file..."
rm -f "${PACKAGE_NAME}"
else
echo "Skipping download. Using existing file."
fi
fi
if [ -f "${BIN}" ]; then
echo "Binary ${BIN} already exists."
if prompt_confirmation "Do you want to override it?"; then
echo "Will override existing binary..."
rm -f "${BIN}"
else
echo "Installation cancelled."
exit 0
fi
fi
# Download package if not exists
if [ ! -f "${PACKAGE_NAME}" ]; then
echo "Downloading ${PACKAGE_NAME}..."
# Use curl instead of wget for better compatibility
if command -v curl >/dev/null 2>&1; then
if ! curl -L -o "${PACKAGE_NAME}" "https://github.com/${GITHUB_ORG}/${GITHUB_REPO}/releases/download/${VERSION}/${PACKAGE_NAME}"; then
echo "Error: Failed to download ${PACKAGE_NAME}"
exit 1
fi
elif command -v wget >/dev/null 2>&1; then
if ! wget -O "${PACKAGE_NAME}" "https://github.com/${GITHUB_ORG}/${GITHUB_REPO}/releases/download/${VERSION}/${PACKAGE_NAME}"; then
echo "Error: Failed to download ${PACKAGE_NAME}"
exit 1
fi
else
echo "Error: Neither curl nor wget is available for downloading."
exit 1
fi
fi
# Download and verify SHA256 checksum
echo "Downloading SHA256 checksum..."
sha256_download_success=0
if command -v curl >/dev/null 2>&1; then
if curl -L -s -o "${SHA256_FILE}" "https://github.com/${GITHUB_ORG}/${GITHUB_REPO}/releases/download/${VERSION}/${SHA256_FILE}" 2>/dev/null; then
sha256_download_success=1
fi
elif command -v wget >/dev/null 2>&1; then
if wget -q -O "${SHA256_FILE}" "https://github.com/${GITHUB_ORG}/${GITHUB_REPO}/releases/download/${VERSION}/${SHA256_FILE}" 2>/dev/null; then
sha256_download_success=1
fi
fi
if [ $sha256_download_success -eq 1 ] && [ -f "${SHA256_FILE}" ]; then
expected_sha256=$(cat "${SHA256_FILE}" | cut -d' ' -f1)
if [ -n "$expected_sha256" ]; then
if ! verify_sha256 "${PACKAGE_NAME}" "${expected_sha256}"; then
echo "SHA256 verification failed. Removing downloaded file."
rm -f "${PACKAGE_NAME}" "${SHA256_FILE}"
exit 1
fi
else
echo "Warning: Could not parse SHA256 checksum from file."
fi
rm -f "${SHA256_FILE}"
else
echo "Warning: Could not download SHA256 checksum file. Skipping verification."
fi
wget "https://github.com/${GITHUB_ORG}/${GITHUB_REPO}/releases/download/${VERSION}/${PACKAGE_NAME}"
# Extract the binary and clean the rest.
echo "Extracting ${PACKAGE_NAME}..."
if ! tar xf "${PACKAGE_NAME}"; then
echo "Error: Failed to extract ${PACKAGE_NAME}"
exit 1
fi
# Find the binary in the extracted directory
extracted_dir="${PACKAGE_NAME%.tar.gz}"
if [ -f "${extracted_dir}/${BIN}" ]; then
mv "${extracted_dir}/${BIN}" "${PWD}/"
rm -f "${PACKAGE_NAME}"
rm -rf "${extracted_dir}"
chmod +x "${BIN}"
echo "Installation completed successfully!"
echo "Run './${BIN} --help' to get started"
else
echo "Error: Binary ${BIN} not found in extracted archive"
rm -f "${PACKAGE_NAME}"
rm -rf "${extracted_dir}"
exit 1
fi
tar xvf "${PACKAGE_NAME}" && \
mv "${PACKAGE_NAME%.tar.gz}/${BIN}" "${PWD}" && \
rm -r "${PACKAGE_NAME}" && \
rm -r "${PACKAGE_NAME%.tar.gz}" && \
echo "Run './${BIN} --help' to get started"
fi
fi
}

View File

@@ -9,10 +9,6 @@ default-run = "greptime"
name = "greptime"
path = "src/bin/greptime.rs"
[[bin]]
name = "objbench"
path = "src/bin/objbench.rs"
[features]
default = ["servers/pprof", "servers/mem-prof"]
tokio-console = ["common-telemetry/tokio-console"]
@@ -24,7 +20,6 @@ workspace = true
async-trait.workspace = true
auth.workspace = true
base64.workspace = true
colored = "2.0"
cache.workspace = true
catalog.workspace = true
chrono.workspace = true
@@ -60,9 +55,6 @@ futures.workspace = true
human-panic = "2.0"
humantime.workspace = true
lazy_static.workspace = true
object-store.workspace = true
parquet = "53"
pprof = "0.14"
meta-client.workspace = true
meta-srv.workspace = true
metric-engine.workspace = true

View File

@@ -21,8 +21,6 @@ use cmd::{cli, datanode, flownode, frontend, metasrv, standalone, App};
use common_version::version;
use servers::install_ring_crypto_provider;
pub mod objbench;
#[derive(Parser)]
#[command(name = "greptime", author, version, long_version = version(), about)]
#[command(propagate_version = true)]

View File

@@ -1,602 +0,0 @@
// Copyright 2025 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::{Path, PathBuf};
use std::time::Instant;
use clap::Parser;
use cmd::error::{self, Result};
use colored::Colorize;
use datanode::config::ObjectStoreConfig;
use mito2::config::{FulltextIndexConfig, MitoConfig, Mode};
use mito2::read::Source;
use mito2::sst::file::{FileHandle, FileId, FileMeta};
use mito2::sst::file_purger::{FilePurger, FilePurgerRef, PurgeRequest};
use mito2::sst::parquet::{WriteOptions, PARQUET_METADATA_KEY};
use mito2::{build_access_layer, Metrics, OperationType, SstWriteRequest};
use object_store::ObjectStore;
use serde::{Deserialize, Serialize};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
#[tokio::main]
pub async fn main() {
// common_telemetry::init_default_ut_logging();
let cmd = Command::parse();
if let Err(e) = cmd.run().await {
eprintln!("{}: {}", "Error".red().bold(), e);
std::process::exit(1);
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
#[serde(default)]
pub struct StorageConfigWrapper {
storage: StorageConfig,
}
/// Storage engine config
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
#[serde(default)]
pub struct StorageConfig {
/// The working directory of database
pub data_home: String,
#[serde(flatten)]
pub store: ObjectStoreConfig,
}
#[derive(Debug, Parser)]
pub struct Command {
/// Path to the object-store config file (TOML). Must deserialize into datanode::config::ObjectStoreConfig.
#[clap(long, value_name = "FILE")]
pub config: PathBuf,
/// Source SST file path in object-store (e.g. "region_dir/<uuid>.parquet").
#[clap(long, value_name = "PATH")]
pub source: String,
/// Target SST file path in object-store; its parent directory is used as destination region dir.
#[clap(long, value_name = "PATH")]
pub target: String,
/// Verbose output
#[clap(short, long, default_value_t = false)]
pub verbose: bool,
/// Output file path for pprof flamegraph (enables profiling)
#[clap(long, value_name = "FILE")]
pub pprof_file: Option<PathBuf>,
}
impl Command {
pub async fn run(&self) -> Result<()> {
if self.verbose {
common_telemetry::init_default_ut_logging();
}
println!("{}", "Starting objbench...".cyan().bold());
// Build object store from config
let cfg_str = std::fs::read_to_string(&self.config).map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("failed to read config {}: {e}", self.config.display()),
}
.build()
})?;
let store_cfg: StorageConfigWrapper = toml::from_str(&cfg_str).map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("failed to parse config {}: {e}", self.config.display()),
}
.build()
})?;
let object_store = build_object_store(&store_cfg.storage).await?;
println!("{} Object store initialized", "".green());
// Prepare source identifiers
let (src_region_dir, src_file_id) = split_sst_path(&self.source)?;
println!("{} Source path parsed: {}", "".green(), self.source);
// Load parquet metadata to extract RegionMetadata and file stats
println!("{}", "Loading parquet metadata...".yellow());
let file_size = object_store
.stat(&self.source)
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("stat failed: {e}"),
}
.build()
})?
.content_length();
let parquet_meta = load_parquet_metadata(object_store.clone(), &self.source, file_size)
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("read parquet metadata failed: {e}"),
}
.build()
})?;
let region_meta = extract_region_metadata(&self.source, &parquet_meta)?;
let num_rows = parquet_meta.file_metadata().num_rows() as u64;
let num_row_groups = parquet_meta.num_row_groups() as u64;
println!(
"{} Metadata loaded - rows: {}, size: {} bytes",
"".green(),
num_rows,
file_size
);
// Build a FileHandle for the source file
let file_meta = FileMeta {
region_id: region_meta.region_id,
file_id: src_file_id,
time_range: Default::default(),
level: 0,
file_size,
available_indexes: Default::default(),
index_file_size: 0,
num_rows,
num_row_groups,
sequence: None,
};
let src_handle = FileHandle::new(file_meta, new_noop_file_purger());
// Build the reader for a single file via ParquetReaderBuilder
println!("{}", "Building reader...".yellow());
let (_src_access_layer, _cache_manager) =
build_access_layer_simple(src_region_dir.clone(), object_store.clone()).await?;
let reader_build_start = Instant::now();
let reader = mito2::sst::parquet::reader::ParquetReaderBuilder::new(
src_region_dir.clone(),
src_handle.clone(),
object_store.clone(),
)
.expected_metadata(Some(region_meta.clone()))
.build()
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("build reader failed: {e}"),
}
.build()
})?;
let reader_build_elapsed = reader_build_start.elapsed();
let total_rows = reader.parquet_metadata().file_metadata().num_rows();
println!("{} Reader built in {:?}", "".green(), reader_build_elapsed);
// Prepare target access layer for writing
println!("{}", "Preparing target access layer...".yellow());
let (tgt_access_layer, tgt_cache_manager) =
build_access_layer_simple(self.target.clone(), object_store.clone()).await?;
// Build write request
let fulltext_index_config = FulltextIndexConfig {
create_on_compaction: Mode::Disable,
..Default::default()
};
let write_opts = WriteOptions::default();
let write_req = SstWriteRequest {
op_type: OperationType::Compact,
metadata: region_meta,
source: Source::Reader(Box::new(reader)),
cache_manager: tgt_cache_manager,
storage: None,
max_sequence: None,
index_options: Default::default(),
inverted_index_config: MitoConfig::default().inverted_index,
fulltext_index_config,
bloom_filter_index_config: MitoConfig::default().bloom_filter_index,
};
// Write SST
println!("{}", "Writing SST...".yellow());
let mut metrics = Metrics::default();
// Start profiling if pprof_file is specified
#[cfg(unix)]
let profiler_guard = if self.pprof_file.is_some() {
println!("{} Starting profiling...", "".yellow());
Some(
pprof::ProfilerGuardBuilder::default()
.frequency(99)
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
.build()
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("Failed to start profiler: {e}"),
}
.build()
})?,
)
} else {
None
};
#[cfg(not(unix))]
if self.pprof_file.is_some() {
eprintln!(
"{}: Profiling is not supported on this platform",
"Warning".yellow()
);
}
let write_start = Instant::now();
let infos = tgt_access_layer
.write_sst(write_req, &write_opts, &mut metrics)
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("write_sst failed: {e}"),
}
.build()
})?;
let write_elapsed = write_start.elapsed();
// Stop profiling and generate flamegraph if enabled
#[cfg(unix)]
if let (Some(guard), Some(pprof_file)) = (profiler_guard, &self.pprof_file) {
println!("{} Generating flamegraph...", "🔥".yellow());
match guard.report().build() {
Ok(report) => {
let mut flamegraph_data = Vec::new();
if let Err(e) = report.flamegraph(&mut flamegraph_data) {
eprintln!(
"{}: Failed to generate flamegraph: {}",
"Warning".yellow(),
e
);
} else if let Err(e) = std::fs::write(pprof_file, flamegraph_data) {
eprintln!(
"{}: Failed to write flamegraph to {}: {}",
"Warning".yellow(),
pprof_file.display(),
e
);
} else {
println!(
"{} Flamegraph saved to {}",
"".green(),
pprof_file.display().to_string().cyan()
);
}
}
Err(e) => {
eprintln!(
"{}: Failed to generate pprof report: {}",
"Warning".yellow(),
e
);
}
}
}
assert_eq!(infos.len(), 1);
let dst_file_id = infos[0].file_id;
let dst_file_path = format!("{}{}", self.target, dst_file_id.as_parquet(),);
// Report results with ANSI colors
println!("\n{} {}", "Write complete!".green().bold(), "".green());
println!(" {}: {}", "Destination file".bold(), dst_file_path.cyan());
println!(" {}: {}", "Rows".bold(), total_rows.to_string().cyan());
println!(
" {}: {}",
"File size".bold(),
format!("{} bytes", file_size).cyan()
);
println!(
" {}: {:?}",
"Reader build time".bold(),
reader_build_elapsed
);
println!(" {}: {:?}", "Total time".bold(), write_elapsed);
// Print metrics in a formatted way
println!(
" {}: {:?}, sum: {:?}",
"Metrics".bold(),
metrics,
metrics.sum()
);
// Print infos
println!(" {}: {:?}", "Index".bold(), infos[0].index_metadata);
// Cleanup
println!("\n{}", "Cleaning up...".yellow());
object_store.delete(&dst_file_path).await.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("Failed to delete dest file {}: {}", dst_file_path, e),
}
.build()
})?;
println!("{} Temporary file deleted", "".green());
println!("\n{}", "Benchmark completed successfully!".green().bold());
Ok(())
}
}
fn split_sst_path(path: &str) -> Result<(String, FileId)> {
let p = Path::new(path);
let file_name = p.file_name().and_then(|s| s.to_str()).ok_or_else(|| {
error::IllegalConfigSnafu {
msg: "invalid source path".to_string(),
}
.build()
})?;
let uuid_str = file_name.strip_suffix(".parquet").ok_or_else(|| {
error::IllegalConfigSnafu {
msg: "expect .parquet file".to_string(),
}
.build()
})?;
let file_id = FileId::parse_str(uuid_str).map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("invalid file id: {e}"),
}
.build()
})?;
let parent = p
.parent()
.and_then(|s| s.to_str())
.unwrap_or("")
.to_string();
Ok((parent, file_id))
}
fn extract_region_metadata(
file_path: &str,
meta: &parquet::file::metadata::ParquetMetaData,
) -> Result<RegionMetadataRef> {
use parquet::format::KeyValue;
let kvs: Option<&Vec<KeyValue>> = meta.file_metadata().key_value_metadata();
let Some(kvs) = kvs else {
return Err(error::IllegalConfigSnafu {
msg: format!("{file_path}: missing parquet key_value metadata"),
}
.build());
};
let json = kvs
.iter()
.find(|kv| kv.key == PARQUET_METADATA_KEY)
.and_then(|kv| kv.value.as_ref())
.ok_or_else(|| {
error::IllegalConfigSnafu {
msg: format!("{file_path}: key {PARQUET_METADATA_KEY} not found or empty"),
}
.build()
})?;
let region: RegionMetadata = RegionMetadata::from_json(json).map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("invalid region metadata json: {e}"),
}
.build()
})?;
Ok(std::sync::Arc::new(region))
}
async fn build_object_store(sc: &StorageConfig) -> Result<ObjectStore> {
use datanode::config::ObjectStoreConfig::*;
let oss = &sc.store;
match oss {
File(_) => {
use object_store::services::Fs;
let builder = Fs::default().root(&sc.data_home);
Ok(ObjectStore::new(builder)
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("init fs backend failed: {e}"),
}
.build()
})?
.finish())
}
S3(s3) => {
use common_base::secrets::ExposeSecret;
use object_store::services::S3;
use object_store::util;
let root = util::normalize_dir(&s3.root);
let mut builder = S3::default()
.root(&root)
.bucket(&s3.bucket)
.access_key_id(s3.access_key_id.expose_secret())
.secret_access_key(s3.secret_access_key.expose_secret());
if let Some(ep) = &s3.endpoint {
builder = builder.endpoint(ep);
}
if let Some(region) = &s3.region {
builder = builder.region(region);
}
if s3.enable_virtual_host_style {
builder = builder.enable_virtual_host_style();
}
Ok(ObjectStore::new(builder)
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("init s3 backend failed: {e}"),
}
.build()
})?
.finish())
}
Oss(oss) => {
use common_base::secrets::ExposeSecret;
use object_store::services::Oss;
use object_store::util;
let root = util::normalize_dir(&oss.root);
let builder = Oss::default()
.root(&root)
.bucket(&oss.bucket)
.endpoint(&oss.endpoint)
.access_key_id(oss.access_key_id.expose_secret())
.access_key_secret(oss.access_key_secret.expose_secret());
Ok(ObjectStore::new(builder)
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("init oss backend failed: {e}"),
}
.build()
})?
.finish())
}
Azblob(az) => {
use common_base::secrets::ExposeSecret;
use object_store::services::Azblob;
use object_store::util;
let root = util::normalize_dir(&az.root);
let mut builder = Azblob::default()
.root(&root)
.container(&az.container)
.endpoint(&az.endpoint)
.account_name(az.account_name.expose_secret())
.account_key(az.account_key.expose_secret());
if let Some(token) = &az.sas_token {
builder = builder.sas_token(token);
}
Ok(ObjectStore::new(builder)
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("init azblob backend failed: {e}"),
}
.build()
})?
.finish())
}
Gcs(gcs) => {
use common_base::secrets::ExposeSecret;
use object_store::services::Gcs;
use object_store::util;
let root = util::normalize_dir(&gcs.root);
let builder = Gcs::default()
.root(&root)
.bucket(&gcs.bucket)
.scope(&gcs.scope)
.credential_path(gcs.credential_path.expose_secret())
.credential(gcs.credential.expose_secret())
.endpoint(&gcs.endpoint);
Ok(ObjectStore::new(builder)
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("init gcs backend failed: {e}"),
}
.build()
})?
.finish())
}
}
}
async fn build_access_layer_simple(
region_dir: String,
object_store: ObjectStore,
) -> Result<(
std::sync::Arc<mito2::AccessLayer>,
std::sync::Arc<mito2::CacheManager>,
)> {
// Minimal index aux path setup
let mut mito_cfg = MitoConfig::default();
// Use a temporary directory as aux path
let data_home = std::env::temp_dir().join("greptime_objbench");
let _ = std::fs::create_dir_all(&data_home);
let _ = mito_cfg.index.sanitize(
data_home.to_str().unwrap_or("/tmp"),
&mito_cfg.inverted_index,
);
let access_layer = build_access_layer(&region_dir, object_store, &mito_cfg)
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("build_access_layer failed: {e}"),
}
.build()
})?;
Ok((
access_layer,
std::sync::Arc::new(mito2::CacheManager::default()),
))
}
fn new_noop_file_purger() -> FilePurgerRef {
#[derive(Debug)]
struct Noop;
impl FilePurger for Noop {
fn send_request(&self, _request: PurgeRequest) {}
}
std::sync::Arc::new(Noop)
}
async fn load_parquet_metadata(
object_store: ObjectStore,
path: &str,
file_size: u64,
) -> std::result::Result<
parquet::file::metadata::ParquetMetaData,
Box<dyn std::error::Error + Send + Sync>,
> {
use parquet::file::metadata::ParquetMetaDataReader;
use parquet::file::FOOTER_SIZE;
let actual_size = if file_size == 0 {
object_store.stat(path).await?.content_length()
} else {
file_size
};
if actual_size < FOOTER_SIZE as u64 {
return Err("file too small".into());
}
let prefetch: u64 = 64 * 1024;
let start = actual_size.saturating_sub(prefetch);
let buffer = object_store
.read_with(path)
.range(start..actual_size)
.await?
.to_vec();
let buffer_len = buffer.len();
let mut footer = [0; 8];
footer.copy_from_slice(&buffer[buffer_len - FOOTER_SIZE..]);
let metadata_len = ParquetMetaDataReader::decode_footer(&footer)? as u64;
if actual_size - (FOOTER_SIZE as u64) < metadata_len {
return Err("invalid footer/metadata length".into());
}
if (metadata_len as usize) <= buffer_len - FOOTER_SIZE {
let metadata_start = buffer_len - metadata_len as usize - FOOTER_SIZE;
let meta = ParquetMetaDataReader::decode_metadata(
&buffer[metadata_start..buffer_len - FOOTER_SIZE],
)?;
Ok(meta)
} else {
let metadata_start = actual_size - metadata_len - FOOTER_SIZE as u64;
let data = object_store
.read_with(path)
.range(metadata_start..(actual_size - FOOTER_SIZE as u64))
.await?
.to_vec();
let meta = ParquetMetaDataReader::decode_metadata(&data)?;
Ok(meta)
}
}
#[cfg(test)]
mod tests {
use super::StorageConfigWrapper;
#[test]
fn test_decode() {
let cfg = std::fs::read_to_string("/home/lei/datanode-bulk.toml").unwrap();
let storage: StorageConfigWrapper = toml::from_str(&cfg).unwrap();
println!("{:?}", storage);
}
}

View File

@@ -22,6 +22,7 @@ mod scalar_add;
mod scalar_mul;
pub(crate) mod sum;
mod vector_add;
mod vector_dim;
mod vector_div;
mod vector_mul;
mod vector_norm;
@@ -54,6 +55,7 @@ impl VectorFunction {
registry.register(Arc::new(vector_mul::VectorMulFunction));
registry.register(Arc::new(vector_div::VectorDivFunction));
registry.register(Arc::new(vector_norm::VectorNormFunction));
registry.register(Arc::new(vector_dim::VectorDimFunction));
registry.register(Arc::new(elem_sum::ElemSumFunction));
registry.register(Arc::new(elem_product::ElemProductFunction));
}

View File

@@ -0,0 +1,172 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::borrow::Cow;
use std::fmt::Display;
use common_query::error::InvalidFuncArgsSnafu;
use common_query::prelude::{Signature, TypeSignature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{MutableVector, UInt64VectorBuilder, VectorRef};
use snafu::ensure;
use crate::function::{Function, FunctionContext};
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const};
const NAME: &str = "vec_dim";
/// Returns the dimension of the vector.
///
/// # Example
///
/// ```sql
/// SELECT vec_dim('[7.0, 8.0, 9.0, 10.0]');
///
/// +---------------------------------------------------------------+
/// | vec_dim(Utf8("[7.0, 8.0, 9.0, 10.0]")) |
/// +---------------------------------------------------------------+
/// | 4 |
/// +---------------------------------------------------------------+
///
#[derive(Debug, Clone, Default)]
pub struct VectorDimFunction;
impl Function for VectorDimFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(
&self,
_input_types: &[ConcreteDataType],
) -> common_query::error::Result<ConcreteDataType> {
Ok(ConcreteDataType::uint64_datatype())
}
fn signature(&self) -> Signature {
Signature::one_of(
vec![
TypeSignature::Exact(vec![ConcreteDataType::string_datatype()]),
TypeSignature::Exact(vec![ConcreteDataType::binary_datatype()]),
],
Volatility::Immutable,
)
}
fn eval(
&self,
_func_ctx: FunctionContext,
columns: &[VectorRef],
) -> common_query::error::Result<VectorRef> {
ensure!(
columns.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly one, have: {}",
columns.len()
)
}
);
let arg0 = &columns[0];
let len = arg0.len();
let mut result = UInt64VectorBuilder::with_capacity(len);
if len == 0 {
return Ok(result.to_vector());
}
let arg0_const = as_veclit_if_const(arg0)?;
for i in 0..len {
let arg0 = match arg0_const.as_ref() {
Some(arg0) => Some(Cow::Borrowed(arg0.as_ref())),
None => as_veclit(arg0.get_ref(i))?,
};
let Some(arg0) = arg0 else {
result.push_null();
continue;
};
result.push(Some(arg0.len() as u64));
}
Ok(result.to_vector())
}
}
impl Display for VectorDimFunction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", NAME.to_ascii_uppercase())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_query::error::Error;
use datatypes::vectors::StringVector;
use super::*;
#[test]
fn test_vec_dim() {
let func = VectorDimFunction;
let input0 = Arc::new(StringVector::from(vec![
Some("[0.0,2.0,3.0]".to_string()),
Some("[1.0,2.0,3.0,4.0]".to_string()),
None,
Some("[5.0]".to_string()),
]));
let result = func.eval(FunctionContext::default(), &[input0]).unwrap();
let result = result.as_ref();
assert_eq!(result.len(), 4);
assert_eq!(result.get_ref(0).as_u64().unwrap(), Some(3));
assert_eq!(result.get_ref(1).as_u64().unwrap(), Some(4));
assert!(result.get_ref(2).is_null());
assert_eq!(result.get_ref(3).as_u64().unwrap(), Some(1));
}
#[test]
fn test_dim_error() {
let func = VectorDimFunction;
let input0 = Arc::new(StringVector::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
Some("[4.0,5.0,6.0]".to_string()),
None,
Some("[2.0,3.0,3.0]".to_string()),
]));
let input1 = Arc::new(StringVector::from(vec![
Some("[1.0,1.0,1.0]".to_string()),
Some("[6.0,5.0,4.0]".to_string()),
Some("[3.0,2.0,2.0]".to_string()),
]));
let result = func.eval(FunctionContext::default(), &[input0, input1]);
match result {
Err(Error::InvalidFuncArgs { err_msg, .. }) => {
assert_eq!(
err_msg,
"The length of the args is not correct, expect exactly one, have: 2"
)
}
_ => unreachable!(),
}
}
}

View File

@@ -171,10 +171,6 @@ pub struct S3Config {
pub secret_access_key: SecretString,
pub endpoint: Option<String>,
pub region: Option<String>,
/// Enable virtual host style so that opendal will send API requests in virtual host style instead of path style.
/// By default, opendal will send API to https://s3.us-east-1.amazonaws.com/bucket_name
/// Enabled, opendal will send API to https://bucket_name.s3.us-east-1.amazonaws.com
pub enable_virtual_host_style: bool,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
pub http_client: HttpClientConfig,
@@ -189,7 +185,6 @@ impl PartialEq for S3Config {
&& self.secret_access_key.expose_secret() == other.secret_access_key.expose_secret()
&& self.endpoint == other.endpoint
&& self.region == other.region
&& self.enable_virtual_host_style == other.enable_virtual_host_style
&& self.cache == other.cache
&& self.http_client == other.http_client
}
@@ -294,7 +289,6 @@ impl Default for S3Config {
root: String::default(),
access_key_id: SecretString::from(String::default()),
secret_access_key: SecretString::from(String::default()),
enable_virtual_host_style: false,
endpoint: Option::default(),
region: Option::default(),
cache: ObjectStorageCacheConfig::default(),

View File

@@ -41,13 +41,10 @@ pub(crate) async fn new_s3_object_store(s3_config: &S3Config) -> Result<ObjectSt
if s3_config.endpoint.is_some() {
builder = builder.endpoint(s3_config.endpoint.as_ref().unwrap());
}
};
if s3_config.region.is_some() {
builder = builder.region(s3_config.region.as_ref().unwrap());
}
if s3_config.enable_virtual_host_style {
builder = builder.enable_virtual_host_style();
}
};
Ok(ObjectStore::new(builder)
.context(error::InitBackendSnafu)?

View File

@@ -133,7 +133,7 @@ impl Flownode for FlowWorkerManager {
.map_err(to_meta_err(snafu::location!()))?;
debug!(
"Done to flush flow_id={:?} with {} input rows flushed, {} rows sent and {} output rows flushed",
"Done to flush flow_id={:?} with {} input rows flushed, {} rows sended and {} output rows flushed",
flow_id, flushed_input_rows, rows_send, row
);
Ok(FlowResponse {

View File

@@ -214,7 +214,6 @@ impl HeartbeatTask {
if let Some(message) = message {
Self::new_heartbeat_request(&heartbeat_request, Some(message), &latest_report)
} else {
warn!("Sender has been dropped, exiting the heartbeat loop");
// Receives None that means Sender was dropped, we need to break the current loop
break
}
@@ -256,11 +255,7 @@ impl HeartbeatTask {
error!(e; "Error while handling heartbeat response");
}
}
Ok(None) => {
warn!("Heartbeat response stream closed");
capture_self.start_with_retry(retry_interval).await;
break;
}
Ok(None) => break,
Err(e) => {
error!(e; "Occur error while reading heartbeat response");
capture_self.start_with_retry(retry_interval).await;

View File

@@ -23,7 +23,7 @@ use common_meta::heartbeat::handler::{
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_telemetry::{debug, error, info, warn};
use common_telemetry::{debug, error, info};
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
use servers::addrs;
use servers::heartbeat_options::HeartbeatOptions;
@@ -42,8 +42,8 @@ use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
pub struct HeartbeatTask {
peer_addr: String,
meta_client: Arc<MetaClient>,
report_interval: Duration,
retry_interval: Duration,
report_interval: u64,
retry_interval: u64,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
start_time_ms: u64,
}
@@ -58,8 +58,8 @@ impl HeartbeatTask {
HeartbeatTask {
peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
meta_client,
report_interval: heartbeat_opts.interval,
retry_interval: heartbeat_opts.retry_interval,
report_interval: heartbeat_opts.interval.as_millis() as u64,
retry_interval: heartbeat_opts.retry_interval.as_millis() as u64,
resp_handler_executor,
start_time_ms: common_time::util::current_time_millis() as u64,
}
@@ -103,15 +103,13 @@ impl HeartbeatTask {
HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
}
}
Ok(None) => {
warn!("Heartbeat response stream closed");
capture_self.start_with_retry(retry_interval).await;
break;
}
Ok(None) => break,
Err(e) => {
HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
error!(e; "Occur error while reading heartbeat response");
capture_self.start_with_retry(retry_interval).await;
capture_self
.start_with_retry(Duration::from_millis(retry_interval))
.await;
break;
}
@@ -179,13 +177,12 @@ impl HeartbeatTask {
if let Some(message) = message {
Self::new_heartbeat_request(&heartbeat_request, Some(message))
} else {
warn!("Sender has been dropped, exiting the heartbeat loop");
// Receives None that means Sender was dropped, we need to break the current loop
break
}
}
_ = &mut sleep => {
sleep.as_mut().reset(Instant::now() + report_interval);
sleep.as_mut().reset(Instant::now() + Duration::from_millis(report_interval));
Self::new_heartbeat_request(&heartbeat_request, None)
}
};

View File

@@ -36,11 +36,11 @@ use servers::error::{
TableNotFoundSnafu,
};
use servers::http::jaeger::QueryTraceParams;
use servers::otlp::trace::{
use servers::otlp::trace::v0::{
DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_ID_COLUMN,
SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
TRACE_TABLE_NAME,
};
use servers::otlp::trace::TRACE_TABLE_NAME;
use servers::query_handler::JaegerQueryHandler;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};

View File

@@ -72,7 +72,10 @@ impl OpenTelemetryProtocolHandler for Instance {
#[tracing::instrument(skip_all)]
async fn traces(
&self,
pipeline_handler: PipelineHandlerRef,
request: ExportTraceServiceRequest,
pipeline: PipelineWay,
pipeline_params: GreptimePipelineParams,
table_name: String,
ctx: QueryContextRef,
) -> ServerResult<Output> {
@@ -87,9 +90,14 @@ impl OpenTelemetryProtocolHandler for Instance {
.get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_execute(ctx.clone())?;
let spans = otlp::trace::parse(request);
let (requests, rows) = otlp::trace::to_grpc_insert_requests(table_name, spans)?;
let (requests, rows) = otlp::trace::to_grpc_insert_requests(
request,
pipeline,
pipeline_params,
table_name,
&ctx,
pipeline_handler,
)?;
OTLP_TRACES_ROWS.inc_by(rows as u64);

View File

@@ -164,7 +164,6 @@ where
let grpc_server = builder
.database_handler(greptime_request_handler.clone())
.prometheus_handler(self.instance.clone(), user_provider.clone())
.otlp_handler(self.instance.clone(), user_provider)
.flight_handler(Arc::new(greptime_request_handler))
.build();
Ok(grpc_server)

View File

@@ -42,16 +42,7 @@ impl BloomFilterApplier {
) -> Result<Vec<Range<usize>>> {
let rows_per_segment = self.meta.rows_per_segment as usize;
let start_seg = search_range.start / rows_per_segment;
let mut end_seg = search_range.end.div_ceil(rows_per_segment);
if end_seg == self.meta.segment_loc_indices.len() + 1 {
// In a previous version, there was a bug where if the last segment was all null,
// this segment would not be written into the index. This caused the slice
// `self.meta.segment_loc_indices[start_seg..end_seg]` to go out of bounds due to
// the missing segment. Since the `search` function does not search for nulls,
// we can simply ignore the last segment in this buggy scenario.
end_seg -= 1;
}
let end_seg = search_range.end.div_ceil(rows_per_segment);
let locs = &self.meta.segment_loc_indices[start_seg..end_seg];

View File

@@ -64,9 +64,6 @@ pub struct BloomFilterCreator {
/// Storage for finalized Bloom filters.
finalized_bloom_filters: FinalizedBloomFilterStorage,
/// Row count that finalized so far.
finalized_row_count: usize,
/// Global memory usage of the bloom filter creator.
global_memory_usage: Arc<AtomicUsize>,
}
@@ -99,7 +96,6 @@ impl BloomFilterCreator {
global_memory_usage,
global_memory_usage_threshold,
),
finalized_row_count: 0,
}
}
@@ -140,7 +136,6 @@ impl BloomFilterCreator {
if self.accumulated_row_count % self.rows_per_segment == 0 {
self.finalize_segment().await?;
self.finalized_row_count = self.accumulated_row_count;
}
}
@@ -166,7 +161,6 @@ impl BloomFilterCreator {
if self.accumulated_row_count % self.rows_per_segment == 0 {
self.finalize_segment().await?;
self.finalized_row_count = self.accumulated_row_count;
}
Ok(())
@@ -174,7 +168,7 @@ impl BloomFilterCreator {
/// Finalizes any remaining segments and writes the bloom filters and metadata to the provided writer.
pub async fn finish(&mut self, mut writer: impl AsyncWrite + Unpin) -> Result<()> {
if self.accumulated_row_count > self.finalized_row_count {
if !self.cur_seg_distinct_elems.is_empty() {
self.finalize_segment().await?;
}
@@ -412,35 +406,4 @@ mod tests {
assert!(bf.contains(&b"f"));
}
}
#[tokio::test]
async fn test_final_seg_all_null() {
let mut writer = Cursor::new(Vec::new());
let mut creator = BloomFilterCreator::new(
2,
Arc::new(MockExternalTempFileProvider::new()),
Arc::new(AtomicUsize::new(0)),
None,
);
creator
.push_n_row_elems(4, vec![b"a".to_vec(), b"b".to_vec()])
.await
.unwrap();
creator.push_row_elems(Vec::new()).await.unwrap();
creator.finish(&mut writer).await.unwrap();
let bytes = writer.into_inner();
let total_size = bytes.len();
let meta_size_offset = total_size - 4;
let meta_size = u32::from_le_bytes((&bytes[meta_size_offset..]).try_into().unwrap());
let meta_bytes = &bytes[total_size - meta_size as usize - 4..total_size - 4];
let meta = BloomFilterMeta::decode(meta_bytes).unwrap();
assert_eq!(meta.rows_per_segment, 2);
assert_eq!(meta.segment_count, 3);
assert_eq!(meta.row_count, 5);
}
}

View File

@@ -27,9 +27,10 @@ use snafu::OptionExt;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status, Streaming};
use tonic::{Request, Response, Streaming};
use crate::error::{self, Result};
use crate::error;
use crate::error::Result;
use crate::handler::{HeartbeatHandlerGroup, Pusher, PusherId};
use crate::metasrv::{Context, Metasrv};
use crate::metrics::METRIC_META_HEARTBEAT_RECV;
@@ -108,12 +109,6 @@ impl heartbeat_server::Heartbeat for Metasrv {
if is_not_leader {
warn!("Quit because it is no longer the leader");
let _ = tx
.send(Err(Status::aborted(format!(
"The requested metasrv node is not leader, node addr: {}",
ctx.server_addr
))))
.await;
break;
}
}

View File

@@ -162,15 +162,38 @@ impl MetricEngineInner {
let physical_region_id = validate_create_logical_regions(&requests)?;
let data_region_id = utils::to_data_region_id(physical_region_id);
ensure!(
self.state
.read()
.unwrap()
.exist_physical_region(data_region_id),
PhysicalRegionNotFoundSnafu {
region_id: data_region_id,
}
);
// Filters out the requests that the logical region already exists
let requests = {
let state = self.state.read().unwrap();
let logical_region_exists = state.logical_region_exists_filter(data_region_id);
// TODO(weny): log the skipped logical regions
requests
.into_iter()
.filter(|(region_id, _)| !logical_region_exists(region_id))
.collect::<Vec<_>>()
let mut skipped = Vec::with_capacity(requests.len());
let mut kept_requests = Vec::with_capacity(requests.len());
for (region_id, request) in requests {
if state.is_logical_region_exist(region_id) {
skipped.push(region_id);
} else {
kept_requests.push((region_id, request));
}
}
// log skipped regions
if !skipped.is_empty() {
info!(
"Skipped creating logical regions {skipped:?} because they already exist",
skipped = skipped
);
}
kept_requests
};
// Finds new columns to add to physical region

View File

@@ -83,18 +83,6 @@ pub(crate) struct MetricEngineState {
}
impl MetricEngineState {
pub fn logical_region_exists_filter(
&self,
physical_region_id: RegionId,
) -> impl for<'a> Fn(&'a RegionId) -> bool + use<'_> {
let state = self
.physical_region_states()
.get(&physical_region_id)
.unwrap();
move |logical_region_id| state.logical_regions().contains(logical_region_id)
}
pub fn add_physical_region(
&mut self,
physical_region_id: RegionId,

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use object_store::services::Fs;
use object_store::util::{join_dir, with_instrument_layers};
@@ -43,29 +42,6 @@ pub type AccessLayerRef = Arc<AccessLayer>;
/// SST write results.
pub type SstInfoArray = SmallVec<[SstInfo; 2]>;
#[derive(Debug, Default)]
pub struct Metrics {
pub read: Duration,
pub write: Duration,
pub convert: Duration,
pub index_update: Duration,
pub index_finish: Duration,
pub close: Duration,
pub num_series: usize,
// SST Opendal metrics.
pub opendal_create_cost: Duration,
pub opendal_num_writes: usize,
pub opendal_write_cost: Duration,
pub opendal_complete_cost: Duration,
}
impl Metrics {
pub fn sum(&self) -> Duration {
self.read + self.write + self.convert + self.index_update + self.index_finish + self.close
}
}
/// A layer to access SST files under the same directory.
pub struct AccessLayer {
region_dir: String,
@@ -145,11 +121,10 @@ impl AccessLayer {
/// Writes a SST with specific `file_id` and `metadata` to the layer.
///
/// Returns the info of the SST. If no data written, returns None.
pub async fn write_sst(
pub(crate) async fn write_sst(
&self,
request: SstWriteRequest,
write_opts: &WriteOptions,
metrics: &mut Metrics,
) -> Result<SstInfoArray> {
let region_id = request.metadata.region_id;
let cache_manager = request.cache_manager.clone();
@@ -192,16 +167,9 @@ impl AccessLayer {
path_provider,
)
.await;
let sst_info = writer
.write_all(request.source, request.max_sequence, write_opts, metrics)
.await?;
let opendal_metrics = writer.opendal_metrics_val();
metrics.opendal_create_cost += opendal_metrics.create_cost;
metrics.opendal_num_writes += opendal_metrics.num_writes;
metrics.opendal_write_cost += opendal_metrics.write_cost;
metrics.opendal_complete_cost += opendal_metrics.complete_cost;
sst_info
writer
.write_all(request.source, request.max_sequence, write_opts)
.await?
};
// Put parquet metadata to cache manager.
@@ -221,53 +189,28 @@ impl AccessLayer {
}
}
/// Helper to build an [AccessLayerRef] with internal index managers.
///
/// This is a convenience constructor intended for tooling that needs to
/// interact with SSTs without wiring all indexing internals manually.
pub async fn build_access_layer(
region_dir: &str,
object_store: ObjectStore,
config: &crate::config::MitoConfig,
) -> Result<AccessLayerRef> {
let puffin_manager_factory = PuffinManagerFactory::new(
&config.index.aux_path,
config.index.staging_size.as_bytes(),
Some(config.index.write_buffer_size.as_bytes() as _),
config.index.staging_ttl,
)
.await?;
let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path).await?;
Ok(Arc::new(AccessLayer::new(
region_dir,
object_store,
puffin_manager_factory,
intermediate_manager,
)))
}
/// `OperationType` represents the origin of the `SstWriteRequest`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum OperationType {
pub(crate) enum OperationType {
Flush,
Compact,
}
/// Contents to build a SST.
pub struct SstWriteRequest {
pub op_type: OperationType,
pub metadata: RegionMetadataRef,
pub source: Source,
pub cache_manager: CacheManagerRef,
pub(crate) struct SstWriteRequest {
pub(crate) op_type: OperationType,
pub(crate) metadata: RegionMetadataRef,
pub(crate) source: Source,
pub(crate) cache_manager: CacheManagerRef,
#[allow(dead_code)]
pub storage: Option<String>,
pub max_sequence: Option<SequenceNumber>,
pub(crate) storage: Option<String>,
pub(crate) max_sequence: Option<SequenceNumber>,
/// Configs for index
pub index_options: IndexOptions,
pub inverted_index_config: InvertedIndexConfig,
pub fulltext_index_config: FulltextIndexConfig,
pub bloom_filter_index_config: BloomFilterConfig,
pub(crate) index_options: IndexOptions,
pub(crate) inverted_index_config: InvertedIndexConfig,
pub(crate) fulltext_index_config: FulltextIndexConfig,
pub(crate) bloom_filter_index_config: BloomFilterConfig,
}
pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {

View File

@@ -40,7 +40,6 @@ use crate::sst::index::IndexerBuilderImpl;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::parquet::WriteOptions;
use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
use crate::Metrics;
/// A cache for uploading files to remote object stores.
///
@@ -141,12 +140,7 @@ impl WriteCache {
.await;
let sst_info = writer
.write_all(
write_request.source,
write_request.max_sequence,
write_opts,
&mut Metrics::default(),
)
.write_all(write_request.source, write_request.max_sequence, write_opts)
.await?;
timer.stop_and_record();

View File

@@ -27,7 +27,7 @@ use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use crate::access_layer::{AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest};
use crate::access_layer::{AccessLayer, AccessLayerRef, OperationType, SstWriteRequest};
use crate::cache::{CacheManager, CacheManagerRef};
use crate::compaction::picker::{new_picker, PickerOutput};
use crate::compaction::{find_ttl, CompactionSstReaderBuilder};
@@ -340,7 +340,6 @@ impl Compactor for DefaultCompactor {
bloom_filter_index_config,
},
&write_opts,
&mut Metrics::default(),
)
.await?
.into_iter()

View File

@@ -25,7 +25,7 @@ use store_api::storage::RegionId;
use strum::IntoStaticStr;
use tokio::sync::{mpsc, watch};
use crate::access_layer::{AccessLayerRef, Metrics, OperationType, SstWriteRequest};
use crate::access_layer::{AccessLayerRef, OperationType, SstWriteRequest};
use crate::cache::CacheManagerRef;
use crate::config::MitoConfig;
use crate::error::{
@@ -366,7 +366,7 @@ impl RegionFlushTask {
let ssts_written = self
.access_layer
.write_sst(write_request, &write_opts, &mut Metrics::default())
.write_sst(write_request, &write_opts)
.await?;
if ssts_written.is_empty() {
// No data written.

View File

@@ -44,12 +44,6 @@ mod time_provider;
pub mod wal;
mod worker;
// Public re-exports for tooling convenience
pub use access_layer::{
build_access_layer, AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest,
};
pub use cache::{CacheManager, CacheManagerRef};
#[cfg_attr(doc, aquamarine::aquamarine)]
/// # Mito developer document
///

View File

@@ -109,7 +109,6 @@ mod tests {
new_batch_with_binary, new_source, sst_file_handle, sst_region_metadata,
};
use crate::test_util::{check_reader_result, TestEnv};
use crate::Metrics;
const FILE_DIR: &str = "/";
@@ -166,7 +165,7 @@ mod tests {
.await;
let info = writer
.write_all(source, None, &write_opts, &mut Metrics::default())
.write_all(source, None, &write_opts)
.await
.unwrap()
.remove(0);
@@ -223,7 +222,7 @@ mod tests {
.await;
writer
.write_all(source, None, &write_opts, &mut Metrics::default())
.write_all(source, None, &write_opts)
.await
.unwrap()
.remove(0);
@@ -294,7 +293,7 @@ mod tests {
.await;
let sst_info = writer
.write_all(source, None, &write_opts, &mut Metrics::default())
.write_all(source, None, &write_opts)
.await
.unwrap()
.remove(0);
@@ -335,7 +334,7 @@ mod tests {
)
.await;
writer
.write_all(source, None, &write_opts, &mut Metrics::default())
.write_all(source, None, &write_opts)
.await
.unwrap()
.remove(0);
@@ -390,7 +389,7 @@ mod tests {
)
.await;
writer
.write_all(source, None, &write_opts, &mut Metrics::default())
.write_all(source, None, &write_opts)
.await
.unwrap()
.remove(0);
@@ -428,7 +427,7 @@ mod tests {
.await;
writer
.write_all(source, None, &write_opts, &mut Metrics::default())
.write_all(source, None, &write_opts)
.await
.unwrap()
.remove(0);

View File

@@ -1117,6 +1117,7 @@ impl ParquetReader {
self.context.read_format().metadata()
}
#[cfg(test)]
pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
self.context.reader_builder().parquet_meta.clone()
}

View File

@@ -17,19 +17,14 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use bytes::Bytes;
use common_time::Timestamp;
use datatypes::arrow::datatypes::SchemaRef;
use futures::future::BoxFuture;
use object_store::{FuturesAsyncWriter, ObjectStore, Writer};
use parquet::arrow::async_writer::AsyncFileWriter;
use object_store::{FuturesAsyncWriter, ObjectStore};
use parquet::arrow::AsyncArrowWriter;
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::errors::ParquetError;
use parquet::file::metadata::KeyValue;
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
use parquet::schema::types::ColumnPath;
@@ -50,13 +45,12 @@ use crate::sst::parquet::format::WriteFormat;
use crate::sst::parquet::helper::parse_parquet_metadata;
use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY};
use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
use crate::Metrics;
/// Parquet SST writer.
pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
/// Path provider that creates SST and index file paths according to file id.
path_provider: P,
writer: Option<AsyncArrowWriter<OpenDalWriter>>,
writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
/// Current active file id.
current_file: FileId,
writer_factory: F,
@@ -67,18 +61,11 @@ pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvide
/// Current active indexer.
current_indexer: Option<Indexer>,
bytes_written: Arc<AtomicUsize>,
opendal_metrics: Arc<Mutex<OpenDalMetrics>>,
}
pub trait WriterFactory {
type Writer: AsyncWrite + Send + Unpin;
fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
fn create_opendal(
&mut self,
file_path: &str,
size: Arc<AtomicUsize>,
) -> impl Future<Output = Result<OpenDalWriter>>;
}
pub struct ObjectStoreWriterFactory {
@@ -97,22 +84,6 @@ impl WriterFactory for ObjectStoreWriterFactory {
.map(|v| v.into_futures_async_write().compat_write())
.context(OpenDalSnafu)
}
async fn create_opendal(
&mut self,
file_path: &str,
size: Arc<AtomicUsize>,
) -> Result<OpenDalWriter> {
let writer = self
.object_store
.writer_with(file_path)
.chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
.concurrent(DEFAULT_WRITE_CONCURRENCY)
.await
.context(OpenDalSnafu)?;
Ok(OpenDalWriter::new(writer, size))
}
}
impl<I, P> ParquetWriter<ObjectStoreWriterFactory, I, P>
@@ -134,10 +105,6 @@ where
)
.await
}
pub fn opendal_metrics_val(&self) -> OpenDalMetrics {
self.opendal_metrics.lock().unwrap().clone()
}
}
impl<F, I, P> ParquetWriter<F, I, P>
@@ -165,7 +132,6 @@ where
indexer_builder,
current_indexer: Some(indexer),
bytes_written: Arc::new(AtomicUsize::new(0)),
opendal_metrics: Arc::new(Mutex::new(OpenDalMetrics::default())),
}
}
@@ -190,33 +156,20 @@ where
mut source: Source,
override_sequence: Option<SequenceNumber>, // override the `sequence` field from `Source`
opts: &WriteOptions,
metrics: &mut Metrics,
) -> Result<SstInfoArray> {
let write_format =
WriteFormat::new(self.metadata.clone()).with_override_sequence(override_sequence);
let mut stats = SourceStats::default();
let mut last_key = None;
while let Some(res) = self
.write_next_batch(&mut source, &write_format, opts, metrics)
.write_next_batch(&mut source, &write_format, opts)
.await
.transpose()
{
match res {
Ok(mut batch) => {
if let Some(last) = &last_key {
if last != batch.primary_key() {
metrics.num_series += 1;
last_key = Some(batch.primary_key().to_vec());
}
} else {
metrics.num_series += 1;
}
stats.update(&batch);
let index_start = Instant::now();
self.get_or_create_indexer().await.update(&mut batch).await;
metrics.index_update += index_start.elapsed();
}
Err(e) => {
self.get_or_create_indexer().await.abort().await;
@@ -225,9 +178,7 @@ where
}
}
let index_finish_start = Instant::now();
let index_output = self.get_or_create_indexer().await.finish().await;
metrics.index_finish += index_finish_start.elapsed();
if stats.num_rows == 0 {
return Ok(smallvec![]);
@@ -238,10 +189,9 @@ where
return Ok(smallvec![]);
};
let close_start = Instant::now();
arrow_writer.flush().await.context(WriteParquetSnafu)?;
let file_meta = arrow_writer.close().await.context(WriteParquetSnafu)?;
metrics.close += close_start.elapsed();
let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
// Safety: num rows > 0 so we must have min/max.
@@ -288,25 +238,17 @@ where
source: &mut Source,
write_format: &WriteFormat,
opts: &WriteOptions,
metrics: &mut Metrics,
) -> Result<Option<Batch>> {
let read_start = Instant::now();
let Some(batch) = source.next_batch().await? else {
return Ok(None);
};
metrics.read += read_start.elapsed();
let convert_start = Instant::now();
let arrow_batch = write_format.convert_batch(&batch)?;
metrics.convert += convert_start.elapsed();
let write_start = Instant::now();
self.maybe_init_writer(write_format.arrow_schema(), opts)
.await?
.write(&arrow_batch)
.await
.context(WriteParquetSnafu)?;
metrics.write += write_start.elapsed();
Ok(Some(batch))
}
@@ -314,7 +256,7 @@ where
&mut self,
schema: &SchemaRef,
opts: &WriteOptions,
) -> Result<&mut AsyncArrowWriter<OpenDalWriter>> {
) -> Result<&mut AsyncArrowWriter<SizeAwareWriter<F::Writer>>> {
if let Some(ref mut w) = self.writer {
Ok(w)
} else {
@@ -332,17 +274,10 @@ where
let writer_props = props_builder.build();
let sst_file_path = self.path_provider.build_sst_file_path(self.current_file);
// let writer = SizeAwareWriter::new(
// self.writer_factory.create(&sst_file_path).await?,
// self.bytes_written.clone(),
// );
let create_start = Instant::now();
let mut writer = self
.writer_factory
.create_opendal(&sst_file_path, self.bytes_written.clone())
.await?;
self.opendal_metrics.lock().unwrap().create_cost += create_start.elapsed();
writer = writer.with_metrics(self.opendal_metrics.clone());
let writer = SizeAwareWriter::new(
self.writer_factory.create(&sst_file_path).await?,
self.bytes_written.clone(),
);
let arrow_writer =
AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
.context(WriteParquetSnafu)?;
@@ -382,78 +317,6 @@ impl SourceStats {
}
}
#[derive(Default, Debug, Clone)]
pub(crate) struct OpenDalMetrics {
pub(crate) create_cost: Duration,
pub(crate) num_writes: usize,
pub(crate) write_cost: Duration,
pub(crate) complete_cost: Duration,
}
/// Workaround for [AsyncArrowWriter] does not provide a method to
/// get total bytes written after close.
pub struct OpenDalWriter {
inner: Writer,
size: Arc<AtomicUsize>,
metrics: Option<Arc<Mutex<OpenDalMetrics>>>,
}
impl OpenDalWriter {
fn new(inner: Writer, size: Arc<AtomicUsize>) -> Self {
Self {
inner,
size: size.clone(),
metrics: None,
}
}
fn with_metrics(mut self, metrics: Arc<Mutex<OpenDalMetrics>>) -> Self {
self.metrics = Some(metrics);
self
}
}
impl AsyncFileWriter for OpenDalWriter {
fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<(), ParquetError>> {
let write_start = Instant::now();
let size = self.size.clone();
let metrics = self.metrics.clone();
Box::pin(async move {
let bytes_written = bs.len();
self.inner
.write(bs)
.await
.map_err(|err| ParquetError::External(Box::new(err)))?;
size.fetch_add(bytes_written, Ordering::Relaxed);
if let Some(metrics) = metrics {
let mut m = metrics.lock().unwrap();
m.num_writes += 1;
m.write_cost += write_start.elapsed();
}
Ok(())
})
}
fn complete(&mut self) -> BoxFuture<'_, Result<(), ParquetError>> {
let complete_start = Instant::now();
let metrics = self.metrics.clone();
Box::pin(async move {
self.inner
.close()
.await
.map(|_| ())
.map_err(|err| ParquetError::External(Box::new(err)))?;
if let Some(metrics) = metrics {
let mut m = metrics.lock().unwrap();
m.complete_cost += complete_start.elapsed();
}
Ok(())
})
}
}
/// Workaround for [AsyncArrowWriter] does not provide a method to
/// get total bytes written after close.
struct SizeAwareWriter<W> {

View File

@@ -20,12 +20,9 @@ pub mod processor;
pub mod transform;
pub mod value;
use std::sync::Arc;
use error::{
IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, YamlLoadSnafu, YamlParseSnafu,
};
use itertools::Itertools;
use processor::{Processor, Processors};
use snafu::{ensure, OptionExt, ResultExt};
use transform::{Transformer, Transforms};
@@ -34,7 +31,6 @@ use yaml_rust::YamlLoader;
use crate::dispatcher::{Dispatcher, Rule};
use crate::etl::error::Result;
use crate::{GreptimeTransformer, PipelineVersion};
const DESCRIPTION: &str = "description";
const PROCESSORS: &str = "processors";
@@ -214,57 +210,6 @@ pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str
.context(IntermediateKeyIndexSnafu { kind, key })
}
/// SelectInfo is used to store the selected keys from OpenTelemetry record attrs
/// The key is used to uplift value from the attributes and serve as column name in the table
#[derive(Default)]
pub struct SelectInfo {
pub keys: Vec<String>,
}
/// Try to convert a string to SelectInfo
/// The string should be a comma-separated list of keys
/// example: "key1,key2,key3"
/// The keys will be sorted and deduplicated
impl From<String> for SelectInfo {
fn from(value: String) -> Self {
let mut keys: Vec<String> = value.split(',').map(|s| s.to_string()).sorted().collect();
keys.dedup();
SelectInfo { keys }
}
}
impl SelectInfo {
pub fn is_empty(&self) -> bool {
self.keys.is_empty()
}
}
pub const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity";
/// Enum for holding information of a pipeline, which is either pipeline itself,
/// or information that be used to retrieve a pipeline from `PipelineHandler`
pub enum PipelineDefinition {
Resolved(Arc<Pipeline<GreptimeTransformer>>),
ByNameAndValue((String, PipelineVersion)),
GreptimeIdentityPipeline,
}
impl PipelineDefinition {
pub fn from_name(name: &str, version: PipelineVersion) -> Self {
if name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME {
Self::GreptimeIdentityPipeline
} else {
Self::ByNameAndValue((name.to_owned(), version))
}
}
}
pub enum PipelineWay {
OtlpLogDirect(Box<SelectInfo>),
Pipeline(PipelineDefinition),
}
#[cfg(test)]
mod tests {
use api::v1::Rows;

View File

@@ -25,10 +25,10 @@ pub use etl::transform::{GreptimeTransformer, Transformer};
pub use etl::value::{Array, Map, Value};
pub use etl::{
error as etl_error, json_array_to_intermediate_state, json_to_intermediate_state, parse,
Content, DispatchedTo, Pipeline, PipelineDefinition, PipelineExecOutput, PipelineMap,
PipelineWay, SelectInfo, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
Content, DispatchedTo, Pipeline, PipelineExecOutput, PipelineMap,
};
pub use manager::{
error, pipeline_operator, table, util, PipelineInfo, PipelineRef, PipelineTableRef,
PipelineVersion,
error, pipeline_operator, table, util, PipelineDefinition, PipelineInfo, PipelineRef,
PipelineTableRef, PipelineVersion, PipelineWay, SelectInfo,
GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
};

View File

@@ -16,6 +16,8 @@ use std::sync::Arc;
use common_time::Timestamp;
use datatypes::timestamp::TimestampNanosecond;
use itertools::Itertools;
use util::to_pipeline_version;
use crate::table::PipelineTable;
use crate::{GreptimeTransformer, Pipeline};
@@ -37,3 +39,78 @@ pub type PipelineInfo = (Timestamp, PipelineRef);
pub type PipelineTableRef = Arc<PipelineTable>;
pub type PipelineRef = Arc<Pipeline<GreptimeTransformer>>;
/// SelectInfo is used to store the selected keys from OpenTelemetry record attrs
/// The key is used to uplift value from the attributes and serve as column name in the table
#[derive(Default)]
pub struct SelectInfo {
pub keys: Vec<String>,
}
/// Try to convert a string to SelectInfo
/// The string should be a comma-separated list of keys
/// example: "key1,key2,key3"
/// The keys will be sorted and deduplicated
impl From<String> for SelectInfo {
fn from(value: String) -> Self {
let mut keys: Vec<String> = value.split(',').map(|s| s.to_string()).sorted().collect();
keys.dedup();
SelectInfo { keys }
}
}
impl SelectInfo {
pub fn is_empty(&self) -> bool {
self.keys.is_empty()
}
}
pub const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity";
pub const GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME: &str = "greptime_trace_v1";
/// Enum for holding information of a pipeline, which is either pipeline itself,
/// or information that be used to retrieve a pipeline from `PipelineHandler`
pub enum PipelineDefinition {
Resolved(Arc<Pipeline<GreptimeTransformer>>),
ByNameAndValue((String, PipelineVersion)),
GreptimeIdentityPipeline,
}
impl PipelineDefinition {
pub fn from_name(name: &str, version: PipelineVersion) -> Self {
if name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME {
Self::GreptimeIdentityPipeline
} else {
Self::ByNameAndValue((name.to_owned(), version))
}
}
}
pub enum PipelineWay {
OtlpLogDirect(Box<SelectInfo>),
Pipeline(PipelineDefinition),
OtlpTraceDirectV0,
OtlpTraceDirectV1,
}
impl PipelineWay {
pub fn from_name_and_default(
name: Option<&str>,
version: Option<&str>,
default_pipeline: PipelineWay,
) -> error::Result<PipelineWay> {
if let Some(pipeline_name) = name {
if pipeline_name == GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME {
Ok(PipelineWay::OtlpTraceDirectV1)
} else {
Ok(PipelineWay::Pipeline(PipelineDefinition::from_name(
pipeline_name,
to_pipeline_version(version)?,
)))
}
} else {
Ok(default_pipeline)
}
}
}

View File

@@ -23,10 +23,10 @@ use crate::table::{
};
use crate::PipelineVersion;
pub fn to_pipeline_version(version_str: Option<String>) -> Result<PipelineVersion> {
pub fn to_pipeline_version(version_str: Option<&str>) -> Result<PipelineVersion> {
match version_str {
Some(version) => {
let ts = Timestamp::from_str_utc(&version)
let ts = Timestamp::from_str_utc(version)
.map_err(|_| InvalidPipelineVersionSnafu { version }.build())?;
Ok(Some(TimestampNanosecond(ts)))
}
@@ -73,14 +73,14 @@ mod tests {
assert!(none_result.is_ok());
assert!(none_result.unwrap().is_none());
let some_result = to_pipeline_version(Some("2023-01-01 00:00:00Z".to_string()));
let some_result = to_pipeline_version(Some("2023-01-01 00:00:00Z"));
assert!(some_result.is_ok());
assert_eq!(
some_result.unwrap(),
Some(TimestampNanosecond::new(1672531200000000000))
);
let invalid = to_pipeline_version(Some("invalid".to_string()));
let invalid = to_pipeline_version(Some("invalid"));
assert!(invalid.is_err());
}

View File

@@ -18,7 +18,6 @@ mod cancellation;
mod database;
pub mod flight;
pub mod greptime_handler;
mod otlp;
pub mod prom_query_gateway;
pub mod region_server;

View File

@@ -29,12 +29,6 @@ pub struct AuthMiddlewareLayer {
user_provider: Option<UserProviderRef>,
}
impl AuthMiddlewareLayer {
pub fn with(user_provider: Option<UserProviderRef>) -> Self {
Self { user_provider }
}
}
impl<S> Layer<S> for AuthMiddlewareLayer {
type Service = AuthMiddleware<S>;

View File

@@ -19,25 +19,18 @@ use arrow_flight::flight_service_server::FlightServiceServer;
use auth::UserProviderRef;
use common_grpc::error::{Error, InvalidConfigFilePathSnafu, Result};
use common_runtime::Runtime;
use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer;
use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer;
use snafu::ResultExt;
use tokio::sync::Mutex;
use tonic::codec::CompressionEncoding;
use tonic::service::RoutesBuilder;
use tonic::transport::{Identity, ServerTlsConfig};
use tower::ServiceBuilder;
use super::flight::{FlightCraftRef, FlightCraftWrapper};
use super::region_server::{RegionServerHandlerRef, RegionServerRequestHandler};
use super::{GrpcServer, GrpcServerConfig};
use crate::grpc::authorize::AuthMiddlewareLayer;
use crate::grpc::database::DatabaseService;
use crate::grpc::greptime_handler::GreptimeRequestHandler;
use crate::grpc::otlp::OtlpService;
use crate::grpc::prom_query_gateway::PrometheusGatewayService;
use crate::prometheus_handler::PrometheusHandlerRef;
use crate::query_handler::OpenTelemetryProtocolHandlerRef;
use crate::tls::TlsOption;
/// Add a gRPC service (`service`) to a `builder`([RoutesBuilder]).
@@ -127,37 +120,6 @@ impl GrpcServerBuilder {
self
}
/// Add handler for OpenTelemetry Protocol (OTLP) requests.
pub fn otlp_handler(
mut self,
otlp_handler: OpenTelemetryProtocolHandlerRef,
user_provider: Option<UserProviderRef>,
) -> Self {
let tracing_service = TraceServiceServer::new(OtlpService::new(otlp_handler.clone()))
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd);
let trace_server = ServiceBuilder::new()
.layer(AuthMiddlewareLayer::with(user_provider.clone()))
.service(tracing_service);
self.routes_builder.add_service(trace_server);
let metrics_service = MetricsServiceServer::new(OtlpService::new(otlp_handler))
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd);
let metrics_server = ServiceBuilder::new()
.layer(AuthMiddlewareLayer::with(user_provider))
.service(metrics_service);
self.routes_builder.add_service(metrics_server);
self
}
pub fn routes_builder_mut(&mut self) -> &mut RoutesBuilder {
&mut self.routes_builder
}

View File

@@ -205,7 +205,7 @@ pub async fn delete_pipeline(
reason: "version is required",
})?;
let version = to_pipeline_version(Some(version_str.clone())).context(PipelineSnafu)?;
let version = to_pipeline_version(Some(&version_str)).context(PipelineSnafu)?;
query_ctx.set_channel(Channel::Http);
let query_ctx = Arc::new(query_ctx);
@@ -445,8 +445,8 @@ pub async fn pipeline_dryrun(
match params.pipeline {
None => {
let version =
to_pipeline_version(params.pipeline_version).context(PipelineSnafu)?;
let version = to_pipeline_version(params.pipeline_version.as_deref())
.context(PipelineSnafu)?;
let pipeline_name = check_pipeline_name_exists(params.pipeline_name)?;
let pipeline = handler
.get_pipeline(&pipeline_name, version, query_ctx.clone())
@@ -486,7 +486,8 @@ pub async fn pipeline_dryrun(
// is specified using query param.
let pipeline_name = check_pipeline_name_exists(query_params.pipeline_name)?;
let version = to_pipeline_version(query_params.version).context(PipelineSnafu)?;
let version =
to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
let ignore_errors = query_params.ignore_errors.unwrap_or(false);
@@ -532,7 +533,7 @@ pub async fn log_ingester(
reason: "table is required",
})?;
let version = to_pipeline_version(query_params.version).context(PipelineSnafu)?;
let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
let ignore_errors = query_params.ignore_errors.unwrap_or(false);

View File

@@ -34,11 +34,11 @@ use crate::error::{
};
use crate::http::HttpRecordsOutput;
use crate::metrics::METRIC_JAEGER_QUERY_ELAPSED;
use crate::otlp::trace::{
use crate::otlp::trace::v0::{
DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_ID_COLUMN,
SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
TRACE_TABLE_NAME,
};
use crate::otlp::trace::TRACE_TABLE_NAME;
use crate::query_handler::JaegerQueryHandlerRef;
/// JaegerAPIResponse is the response of Jaeger HTTP API.

View File

@@ -29,8 +29,7 @@ use opentelemetry_proto::tonic::collector::metrics::v1::{
use opentelemetry_proto::tonic::collector::trace::v1::{
ExportTraceServiceRequest, ExportTraceServiceResponse,
};
use pipeline::util::to_pipeline_version;
use pipeline::{PipelineDefinition, PipelineWay};
use pipeline::PipelineWay;
use prost::Message;
use session::context::{Channel, QueryContext};
use snafu::prelude::*;
@@ -75,6 +74,7 @@ pub async fn metrics(
pub async fn traces(
State(handler): State<OpenTelemetryProtocolHandlerRef>,
TraceTableName(table_name): TraceTableName,
pipeline_info: PipelineInfo,
Extension(mut query_ctx): Extension<QueryContext>,
bytes: Bytes,
) -> Result<OtlpResponse<ExportTraceServiceResponse>> {
@@ -88,8 +88,29 @@ pub async fn traces(
.start_timer();
let request =
ExportTraceServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?;
let pipeline = PipelineWay::from_name_and_default(
pipeline_info.pipeline_name.as_deref(),
pipeline_info.pipeline_version.as_deref(),
PipelineWay::OtlpTraceDirectV0,
)
.context(PipelineSnafu)?;
let pipeline_params = pipeline_info.pipeline_params;
// here we use nightly feature `trait_upcasting` to convert handler to
// pipeline_handler
let pipeline_handler: Arc<dyn PipelineHandler + Send + Sync> = handler.clone();
handler
.traces(request, table_name, query_ctx)
.traces(
pipeline_handler,
request,
pipeline,
pipeline_params,
table_name,
query_ctx,
)
.await
.map(|o| OtlpResponse {
resp_body: ExportTraceServiceResponse {
@@ -118,15 +139,12 @@ pub async fn logs(
.start_timer();
let request = ExportLogsServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?;
let pipeline = if let Some(pipeline_name) = pipeline_info.pipeline_name {
PipelineWay::Pipeline(PipelineDefinition::from_name(
&pipeline_name,
to_pipeline_version(pipeline_info.pipeline_version).context(PipelineSnafu)?,
))
} else {
PipelineWay::OtlpLogDirect(Box::new(select_info))
};
let pipeline = PipelineWay::from_name_and_default(
pipeline_info.pipeline_name.as_deref(),
pipeline_info.pipeline_version.as_deref(),
PipelineWay::OtlpLogDirect(Box::new(select_info)),
)
.context(PipelineSnafu)?;
let pipeline_params = pipeline_info.pipeline_params;
// here we use nightly feature `trait_upcasting` to convert handler to

View File

@@ -32,7 +32,8 @@ use snafu::{ensure, ResultExt};
use super::trace::attributes::OtlpAnyValue;
use super::utils::{bytes_to_hex_string, key_value_to_jsonb};
use crate::error::{
IncompatibleSchemaSnafu, PipelineTransformSnafu, Result, UnsupportedJsonDataTypeForTagSnafu,
IncompatibleSchemaSnafu, NotSupportedSnafu, PipelineTransformSnafu, Result,
UnsupportedJsonDataTypeForTagSnafu,
};
use crate::pipeline::run_pipeline;
use crate::query_handler::PipelineHandlerRef;
@@ -98,6 +99,10 @@ pub async fn to_grpc_insert_requests(
let insert_requests = RowInsertRequests { inserts };
Ok((insert_requests, len))
}
_ => NotSupportedSnafu {
feat: "Unsupported pipeline for logs",
}
.fail(),
}
}

View File

@@ -12,183 +12,42 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, RowInsertRequests};
use common_grpc::precision::Precision;
use itertools::Itertools;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use opentelemetry_proto::tonic::common::v1::any_value;
use self::span::{parse_span, TraceSpan, TraceSpans};
use crate::error::Result;
use crate::otlp::utils::{make_column_data, make_string_column_data};
use crate::row_writer::{self, MultiTableData, TableData};
const APPROXIMATE_COLUMN_COUNT: usize = 24;
pub const TRACE_TABLE_NAME: &str = "opentelemetry_traces";
pub const SERVICE_NAME_COLUMN: &str = "service_name";
pub const TRACE_ID_COLUMN: &str = "trace_id";
pub const TIMESTAMP_COLUMN: &str = "timestamp";
pub const DURATION_NANO_COLUMN: &str = "duration_nano";
pub const SPAN_ID_COLUMN: &str = "span_id";
pub const SPAN_NAME_COLUMN: &str = "span_name";
pub const SPAN_KIND_COLUMN: &str = "span_kind";
pub const SPAN_ATTRIBUTES_COLUMN: &str = "span_attributes";
/// The span kind prefix in the database.
/// If the span kind is `server`, it will be stored as `SPAN_KIND_SERVER` in the database.
pub const SPAN_KIND_PREFIX: &str = "SPAN_KIND_";
pub mod attributes;
pub mod span;
pub mod v0;
/// Convert OpenTelemetry traces to SpanTraces
///
/// See
/// <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto>
/// for data structure of OTLP traces.
pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans {
let span_size = request
.resource_spans
.iter()
.flat_map(|res| res.scope_spans.iter())
.flat_map(|scope| scope.spans.iter())
.count();
let mut spans = Vec::with_capacity(span_size);
for resource_spans in request.resource_spans {
let resource_attrs = resource_spans
.resource
.map(|r| r.attributes)
.unwrap_or_default();
let service_name = resource_attrs
.iter()
.find_or_first(|kv| kv.key == "service.name")
.and_then(|kv| kv.value.clone())
.and_then(|v| match v.value {
Some(any_value::Value::StringValue(s)) => Some(s),
Some(any_value::Value::BytesValue(b)) => {
Some(String::from_utf8_lossy(&b).to_string())
}
_ => None,
});
use api::v1::RowInsertRequests;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use pipeline::{GreptimePipelineParams, PipelineWay};
use session::context::QueryContextRef;
for scope_spans in resource_spans.scope_spans {
let scope = scope_spans.scope.unwrap_or_default();
for span in scope_spans.spans {
spans.push(parse_span(
service_name.clone(),
&resource_attrs,
&scope,
span,
));
}
}
}
spans
}
use crate::error::{NotSupportedSnafu, Result};
use crate::query_handler::PipelineHandlerRef;
pub const TRACE_TABLE_NAME: &str = "opentelemetry_traces";
/// Convert SpanTraces to GreptimeDB row insert requests.
/// Returns `InsertRequests` and total number of rows to ingest
pub fn to_grpc_insert_requests(
request: ExportTraceServiceRequest,
pipeline: PipelineWay,
pipeline_params: GreptimePipelineParams,
table_name: String,
spans: TraceSpans,
query_ctx: &QueryContextRef,
pipeline_handler: PipelineHandlerRef,
) -> Result<(RowInsertRequests, usize)> {
let mut multi_table_writer = MultiTableData::default();
let one_table_writer = multi_table_writer.get_or_default_table_data(
table_name,
APPROXIMATE_COLUMN_COUNT,
spans.len(),
);
for span in spans {
write_span_to_row(one_table_writer, span)?;
}
Ok(multi_table_writer.into_row_insert_requests())
}
pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> {
let mut row = writer.alloc_one_row();
// write ts
row_writer::write_ts_to_nanos(
writer,
"timestamp",
Some(span.start_in_nanosecond as i64),
Precision::Nanosecond,
&mut row,
)?;
// write ts fields
let fields = vec![
make_column_data(
"timestamp_end",
ColumnDataType::TimestampNanosecond,
ValueData::TimestampNanosecondValue(span.end_in_nanosecond as i64),
match pipeline {
PipelineWay::OtlpTraceDirectV0 => v0::v0_to_grpc_insert_requests(
request,
pipeline,
pipeline_params,
table_name,
query_ctx,
pipeline_handler,
),
make_column_data(
"duration_nano",
ColumnDataType::Uint64,
ValueData::U64Value(span.end_in_nanosecond - span.start_in_nanosecond),
),
];
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
if let Some(service_name) = span.service_name {
row_writer::write_tag(writer, "service_name", service_name, &mut row)?;
_ => NotSupportedSnafu {
feat: "Unsupported pipeline for logs",
}
.fail(),
}
// tags
let iter = vec![
("trace_id", span.trace_id),
("span_id", span.span_id),
("parent_span_id", span.parent_span_id),
]
.into_iter()
.map(|(col, val)| (col.to_string(), val));
row_writer::write_tags(writer, iter, &mut row)?;
// write fields
let fields = vec![
make_string_column_data("span_kind", span.span_kind),
make_string_column_data("span_name", span.span_name),
make_string_column_data("span_status_code", span.span_status_code),
make_string_column_data("span_status_message", span.span_status_message),
make_string_column_data("trace_state", span.trace_state),
];
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
row_writer::write_json(
writer,
"span_attributes",
span.span_attributes.into(),
&mut row,
)?;
row_writer::write_json(writer, "span_events", span.span_events.into(), &mut row)?;
row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?;
// write fields
let fields = vec![
make_string_column_data("scope_name", span.scope_name),
make_string_column_data("scope_version", span.scope_version),
];
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
row_writer::write_json(
writer,
"scope_attributes",
span.scope_attributes.into(),
&mut row,
)?;
row_writer::write_json(
writer,
"resource_attributes",
span.resource_attributes.into(),
&mut row,
)?;
writer.add_row(row);
Ok(())
}

View File

@@ -0,0 +1,198 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, RowInsertRequests};
use common_grpc::precision::Precision;
use itertools::Itertools;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use opentelemetry_proto::tonic::common::v1::any_value;
use pipeline::{GreptimePipelineParams, PipelineWay};
use session::context::QueryContextRef;
use super::span::{parse_span, TraceSpan, TraceSpans};
use crate::error::Result;
use crate::otlp::utils::{make_column_data, make_string_column_data};
use crate::query_handler::PipelineHandlerRef;
use crate::row_writer::{self, MultiTableData, TableData};
const APPROXIMATE_COLUMN_COUNT: usize = 24;
pub const SERVICE_NAME_COLUMN: &str = "service_name";
pub const TRACE_ID_COLUMN: &str = "trace_id";
pub const TIMESTAMP_COLUMN: &str = "timestamp";
pub const DURATION_NANO_COLUMN: &str = "duration_nano";
pub const SPAN_ID_COLUMN: &str = "span_id";
pub const SPAN_NAME_COLUMN: &str = "span_name";
pub const SPAN_KIND_COLUMN: &str = "span_kind";
pub const SPAN_ATTRIBUTES_COLUMN: &str = "span_attributes";
/// The span kind prefix in the database.
/// If the span kind is `server`, it will be stored as `SPAN_KIND_SERVER` in the database.
pub const SPAN_KIND_PREFIX: &str = "SPAN_KIND_";
/// Convert OpenTelemetry traces to SpanTraces
///
/// See
/// <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto>
/// for data structure of OTLP traces.
pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans {
let span_size = request
.resource_spans
.iter()
.flat_map(|res| res.scope_spans.iter())
.flat_map(|scope| scope.spans.iter())
.count();
let mut spans = Vec::with_capacity(span_size);
for resource_spans in request.resource_spans {
let resource_attrs = resource_spans
.resource
.map(|r| r.attributes)
.unwrap_or_default();
let service_name = resource_attrs
.iter()
.find_or_first(|kv| kv.key == "service.name")
.and_then(|kv| kv.value.clone())
.and_then(|v| match v.value {
Some(any_value::Value::StringValue(s)) => Some(s),
Some(any_value::Value::BytesValue(b)) => {
Some(String::from_utf8_lossy(&b).to_string())
}
_ => None,
});
for scope_spans in resource_spans.scope_spans {
let scope = scope_spans.scope.unwrap_or_default();
for span in scope_spans.spans {
spans.push(parse_span(
service_name.clone(),
&resource_attrs,
&scope,
span,
));
}
}
}
spans
}
/// Convert SpanTraces to GreptimeDB row insert requests.
/// Returns `InsertRequests` and total number of rows to ingest
pub fn v0_to_grpc_insert_requests(
request: ExportTraceServiceRequest,
_pipeline: PipelineWay,
_pipeline_params: GreptimePipelineParams,
table_name: String,
_query_ctx: &QueryContextRef,
_pipeline_handler: PipelineHandlerRef,
) -> Result<(RowInsertRequests, usize)> {
let spans = parse(request);
let mut multi_table_writer = MultiTableData::default();
let one_table_writer = multi_table_writer.get_or_default_table_data(
table_name,
APPROXIMATE_COLUMN_COUNT,
spans.len(),
);
for span in spans {
write_span_to_row(one_table_writer, span)?;
}
Ok(multi_table_writer.into_row_insert_requests())
}
pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> {
let mut row = writer.alloc_one_row();
// write ts
row_writer::write_ts_to_nanos(
writer,
"timestamp",
Some(span.start_in_nanosecond as i64),
Precision::Nanosecond,
&mut row,
)?;
// write ts fields
let fields = vec![
make_column_data(
"timestamp_end",
ColumnDataType::TimestampNanosecond,
ValueData::TimestampNanosecondValue(span.end_in_nanosecond as i64),
),
make_column_data(
"duration_nano",
ColumnDataType::Uint64,
ValueData::U64Value(span.end_in_nanosecond - span.start_in_nanosecond),
),
];
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
if let Some(service_name) = span.service_name {
row_writer::write_tag(writer, "service_name", service_name, &mut row)?;
}
// tags
let iter = vec![
("trace_id", span.trace_id),
("span_id", span.span_id),
("parent_span_id", span.parent_span_id),
]
.into_iter()
.map(|(col, val)| (col.to_string(), val));
row_writer::write_tags(writer, iter, &mut row)?;
// write fields
let fields = vec![
make_string_column_data("span_kind", span.span_kind),
make_string_column_data("span_name", span.span_name),
make_string_column_data("span_status_code", span.span_status_code),
make_string_column_data("span_status_message", span.span_status_message),
make_string_column_data("trace_state", span.trace_state),
];
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
row_writer::write_json(
writer,
"span_attributes",
span.span_attributes.into(),
&mut row,
)?;
row_writer::write_json(writer, "span_events", span.span_events.into(), &mut row)?;
row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?;
// write fields
let fields = vec![
make_string_column_data("scope_name", span.scope_name),
make_string_column_data("scope_version", span.scope_version),
];
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
row_writer::write_json(
writer,
"scope_attributes",
span.scope_attributes.into(),
&mut row,
)?;
row_writer::write_json(
writer,
"resource_attributes",
span.resource_attributes.into(),
&mut row,
)?;
writer.add_row(row);
Ok(())
}

View File

@@ -107,7 +107,10 @@ pub trait OpenTelemetryProtocolHandler: PipelineHandler {
/// Handling opentelemetry traces request
async fn traces(
&self,
pipeline_handler: PipelineHandlerRef,
request: ExportTraceServiceRequest,
pipeline: PipelineWay,
pipeline_params: GreptimePipelineParams,
table_name: String,
ctx: QueryContextRef,
) -> Result<Output>;

View File

@@ -1070,7 +1070,6 @@ fn drop_lines_with_inconsistent_results(input: String) -> String {
"root =",
"endpoint =",
"region =",
"enable_virtual_host_style =",
"cache_path =",
"cache_capacity =",
"sas_token =",

View File

@@ -284,3 +284,45 @@ FROM (
| [-4,-20,-54] |
+-------------------------------+
SELECT vec_dim('[7.0, 8.0, 9.0, 10.0]');
+----------------------------------------+
| vec_dim(Utf8("[7.0, 8.0, 9.0, 10.0]")) |
+----------------------------------------+
| 4 |
+----------------------------------------+
SELECT v, vec_dim(v)
FROM (
SELECT '[1.0, 2.0, 3.0]' AS v
UNION ALL
SELECT '[-1.0]' AS v
UNION ALL
SELECT '[4.0, 5.0, 6.0]' AS v
) Order By vec_dim(v) ASC;
+-----------------+------------+
| v | vec_dim(v) |
+-----------------+------------+
| [-1.0] | 1 |
| [1.0, 2.0, 3.0] | 3 |
| [4.0, 5.0, 6.0] | 3 |
+-----------------+------------+
SELECT v, vec_dim(v)
FROM (
SELECT '[1.0, 2.0, 3.0]' AS v
UNION ALL
SELECT '[-1.0]' AS v
UNION ALL
SELECT '[7.0, 8.0, 9.0, 10.0]' AS v
) Order By vec_dim(v) ASC;
+-----------------------+------------+
| v | vec_dim(v) |
+-----------------------+------------+
| [-1.0] | 1 |
| [1.0, 2.0, 3.0] | 3 |
| [7.0, 8.0, 9.0, 10.0] | 4 |
+-----------------------+------------+

View File

@@ -79,3 +79,23 @@ FROM (
UNION ALL
SELECT '[4.0, 5.0, 6.0]' AS v
);
SELECT vec_dim('[7.0, 8.0, 9.0, 10.0]');
SELECT v, vec_dim(v)
FROM (
SELECT '[1.0, 2.0, 3.0]' AS v
UNION ALL
SELECT '[-1.0]' AS v
UNION ALL
SELECT '[4.0, 5.0, 6.0]' AS v
) Order By vec_dim(v) ASC;
SELECT v, vec_dim(v)
FROM (
SELECT '[1.0, 2.0, 3.0]' AS v
UNION ALL
SELECT '[-1.0]' AS v
UNION ALL
SELECT '[7.0, 8.0, 9.0, 10.0]' AS v
) Order By vec_dim(v) ASC;

View File

@@ -3,11 +3,9 @@ Pn = "Pn"
ue = "ue"
worl = "worl"
ot = "ot"
unqualifed = "unqualifed"
typ = "typ"
varidic = "varidic"
typs = "typs"
varadic = "varadic"
unqualifed = "unqualifed"
[files]
extend-exclude = [