mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-24 23:19:57 +00:00
Compare commits
15 Commits
v0.12.1
...
feat/objbe
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0d5b423eb7 | ||
|
|
26bdb6a413 | ||
|
|
2fe21469f8 | ||
|
|
3aa67c7af4 | ||
|
|
e0d3e6ae97 | ||
|
|
2ce476dc42 | ||
|
|
69a816fa0c | ||
|
|
dcf5a62014 | ||
|
|
f3aa967aae | ||
|
|
93e8510b2a | ||
|
|
53c58494fd | ||
|
|
741c5e2fb1 | ||
|
|
d68215dc88 | ||
|
|
bcd63fdb87 | ||
|
|
f4c527cddf |
42
.github/scripts/check-version.sh
vendored
Executable file
42
.github/scripts/check-version.sh
vendored
Executable file
@@ -0,0 +1,42 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Get current version
|
||||
CURRENT_VERSION=$1
|
||||
if [ -z "$CURRENT_VERSION" ]; then
|
||||
echo "Error: Failed to get current version"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Get the latest version from GitHub Releases
|
||||
API_RESPONSE=$(curl -s "https://api.github.com/repos/GreptimeTeam/greptimedb/releases/latest")
|
||||
|
||||
if [ -z "$API_RESPONSE" ] || [ "$(echo "$API_RESPONSE" | jq -r '.message')" = "Not Found" ]; then
|
||||
echo "Error: Failed to fetch latest version from GitHub"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Get the latest version
|
||||
LATEST_VERSION=$(echo "$API_RESPONSE" | jq -r '.tag_name')
|
||||
|
||||
if [ -z "$LATEST_VERSION" ] || [ "$LATEST_VERSION" = "null" ]; then
|
||||
echo "Error: No valid version found in GitHub releases"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Cleaned up version number format (removed possible 'v' prefix and -nightly suffix)
|
||||
CLEAN_CURRENT=$(echo "$CURRENT_VERSION" | sed 's/^v//' | sed 's/-nightly-.*//')
|
||||
CLEAN_LATEST=$(echo "$LATEST_VERSION" | sed 's/^v//' | sed 's/-nightly-.*//')
|
||||
|
||||
echo "Current version: $CLEAN_CURRENT"
|
||||
echo "Latest release version: $CLEAN_LATEST"
|
||||
|
||||
# Use sort -V to compare versions
|
||||
HIGHER_VERSION=$(printf "%s\n%s" "$CLEAN_CURRENT" "$CLEAN_LATEST" | sort -V | tail -n1)
|
||||
|
||||
if [ "$HIGHER_VERSION" = "$CLEAN_CURRENT" ]; then
|
||||
echo "Current version ($CLEAN_CURRENT) is NEWER than or EQUAL to latest ($CLEAN_LATEST)"
|
||||
echo "should-push-latest-tag=true" >> $GITHUB_OUTPUT
|
||||
else
|
||||
echo "Current version ($CLEAN_CURRENT) is OLDER than latest ($CLEAN_LATEST)"
|
||||
echo "should-push-latest-tag=false" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
2
.github/workflows/develop.yml
vendored
2
.github/workflows/develop.yml
vendored
@@ -706,7 +706,7 @@ jobs:
|
||||
- name: Install toolchain
|
||||
uses: actions-rust-lang/setup-rust-toolchain@v1
|
||||
with:
|
||||
cache: false
|
||||
cache: false
|
||||
- name: Rust Cache
|
||||
uses: Swatinem/rust-cache@v2
|
||||
with:
|
||||
|
||||
13
.github/workflows/release.yml
vendored
13
.github/workflows/release.yml
vendored
@@ -110,6 +110,8 @@ jobs:
|
||||
|
||||
# The 'version' use as the global tag name of the release workflow.
|
||||
version: ${{ steps.create-version.outputs.version }}
|
||||
|
||||
should-push-latest-tag: ${{ steps.check-version.outputs.should-push-latest-tag }}
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
@@ -135,6 +137,11 @@ jobs:
|
||||
GITHUB_REF_NAME: ${{ github.ref_name }}
|
||||
NIGHTLY_RELEASE_PREFIX: ${{ env.NIGHTLY_RELEASE_PREFIX }}
|
||||
|
||||
- name: Check version
|
||||
id: check-version
|
||||
run: |
|
||||
./.github/scripts/check-version.sh "${{ steps.create-version.outputs.version }}"
|
||||
|
||||
- name: Allocate linux-amd64 runner
|
||||
if: ${{ inputs.build_linux_amd64_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
|
||||
uses: ./.github/actions/start-runner
|
||||
@@ -314,7 +321,7 @@ jobs:
|
||||
image-registry-username: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
image-registry-password: ${{ secrets.DOCKERHUB_TOKEN }}
|
||||
version: ${{ needs.allocate-runners.outputs.version }}
|
||||
push-latest-tag: ${{ github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
|
||||
push-latest-tag: ${{ needs.allocate-runners.outputs.should-push-latest-tag == 'true' && github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
|
||||
|
||||
- name: Set build image result
|
||||
id: set-build-image-result
|
||||
@@ -332,7 +339,7 @@ jobs:
|
||||
build-windows-artifacts,
|
||||
release-images-to-dockerhub,
|
||||
]
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-latest-16-cores
|
||||
# When we push to ACR, it's easy to fail due to some unknown network issues.
|
||||
# However, we don't want to fail the whole workflow because of this.
|
||||
# The ACR have daily sync with DockerHub, so don't worry about the image not being updated.
|
||||
@@ -361,7 +368,7 @@ jobs:
|
||||
dev-mode: false
|
||||
upload-to-s3: true
|
||||
update-version-info: true
|
||||
push-latest-tag: ${{ github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
|
||||
push-latest-tag: ${{ needs.allocate-runners.outputs.should-push-latest-tag == 'true' && github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
|
||||
|
||||
publish-github-release:
|
||||
name: Create GitHub release and upload artifacts
|
||||
|
||||
10
.github/workflows/semantic-pull-request.yml
vendored
10
.github/workflows/semantic-pull-request.yml
vendored
@@ -11,17 +11,17 @@ concurrency:
|
||||
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
|
||||
cancel-in-progress: true
|
||||
|
||||
permissions:
|
||||
issues: write
|
||||
contents: write
|
||||
pull-requests: write
|
||||
|
||||
jobs:
|
||||
check:
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
pull-requests: write # Add permissions to modify PRs
|
||||
issues: write
|
||||
timeout-minutes: 10
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
persist-credentials: false
|
||||
- uses: ./.github/actions/setup-cyborg
|
||||
- name: Check Pull Request
|
||||
working-directory: cyborg
|
||||
|
||||
156
Cargo.lock
generated
156
Cargo.lock
generated
@@ -185,7 +185,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
|
||||
|
||||
[[package]]
|
||||
name = "api"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"common-base",
|
||||
"common-decimal",
|
||||
@@ -710,7 +710,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "auth"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -1324,7 +1324,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cache"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"catalog",
|
||||
"common-error",
|
||||
@@ -1348,7 +1348,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
|
||||
|
||||
[[package]]
|
||||
name = "catalog"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow",
|
||||
@@ -1661,7 +1661,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
|
||||
|
||||
[[package]]
|
||||
name = "cli"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"auth",
|
||||
@@ -1703,7 +1703,7 @@ dependencies = [
|
||||
"session",
|
||||
"snafu 0.8.5",
|
||||
"store-api",
|
||||
"substrait 0.12.1",
|
||||
"substrait 0.12.2",
|
||||
"table",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
@@ -1712,7 +1712,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "client"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -1739,7 +1739,7 @@ dependencies = [
|
||||
"rand",
|
||||
"serde_json",
|
||||
"snafu 0.8.5",
|
||||
"substrait 0.12.1",
|
||||
"substrait 0.12.2",
|
||||
"substrait 0.37.3",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
@@ -1780,7 +1780,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cmd"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"auth",
|
||||
@@ -1791,6 +1791,7 @@ dependencies = [
|
||||
"clap 4.5.19",
|
||||
"cli",
|
||||
"client",
|
||||
"colored",
|
||||
"common-base",
|
||||
"common-catalog",
|
||||
"common-config",
|
||||
@@ -1825,7 +1826,10 @@ dependencies = [
|
||||
"mito2",
|
||||
"moka",
|
||||
"nu-ansi-term",
|
||||
"object-store",
|
||||
"parquet",
|
||||
"plugins",
|
||||
"pprof",
|
||||
"prometheus",
|
||||
"prost 0.13.3",
|
||||
"query",
|
||||
@@ -1841,7 +1845,7 @@ dependencies = [
|
||||
"similar-asserts",
|
||||
"snafu 0.8.5",
|
||||
"store-api",
|
||||
"substrait 0.12.1",
|
||||
"substrait 0.12.2",
|
||||
"table",
|
||||
"temp-env",
|
||||
"tempfile",
|
||||
@@ -1858,6 +1862,16 @@ version = "1.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0"
|
||||
|
||||
[[package]]
|
||||
name = "colored"
|
||||
version = "2.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "combine"
|
||||
version = "4.6.7"
|
||||
@@ -1887,7 +1901,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
|
||||
|
||||
[[package]]
|
||||
name = "common-base"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"anymap2",
|
||||
"async-trait",
|
||||
@@ -1909,11 +1923,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-catalog"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
|
||||
[[package]]
|
||||
name = "common-config"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"common-base",
|
||||
"common-error",
|
||||
@@ -1938,7 +1952,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-datasource"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-schema",
|
||||
@@ -1974,7 +1988,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-decimal"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"bigdecimal 0.4.5",
|
||||
"common-error",
|
||||
@@ -1987,7 +2001,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-error"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"http 1.1.0",
|
||||
"snafu 0.8.5",
|
||||
@@ -1997,7 +2011,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-frontend"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"common-error",
|
||||
@@ -2007,7 +2021,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-function"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -2055,7 +2069,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-greptimedb-telemetry"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"common-runtime",
|
||||
@@ -2072,7 +2086,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-grpc"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow-flight",
|
||||
@@ -2100,7 +2114,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-grpc-expr"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"common-base",
|
||||
@@ -2119,7 +2133,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-macro"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"common-query",
|
||||
@@ -2133,7 +2147,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-mem-prof"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"common-error",
|
||||
"common-macro",
|
||||
@@ -2146,7 +2160,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-meta"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"anymap2",
|
||||
"api",
|
||||
@@ -2206,7 +2220,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-options"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"common-grpc",
|
||||
"humantime-serde",
|
||||
@@ -2215,11 +2229,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-plugins"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
|
||||
[[package]]
|
||||
name = "common-pprof"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"common-error",
|
||||
"common-macro",
|
||||
@@ -2231,7 +2245,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-procedure"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
@@ -2258,7 +2272,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-procedure-test"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"common-procedure",
|
||||
@@ -2266,7 +2280,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-query"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -2292,7 +2306,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-recordbatch"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"common-error",
|
||||
@@ -2311,7 +2325,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-runtime"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"clap 4.5.19",
|
||||
@@ -2341,7 +2355,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-telemetry"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"atty",
|
||||
"backtrace",
|
||||
@@ -2369,7 +2383,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-test-util"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"client",
|
||||
"common-query",
|
||||
@@ -2381,7 +2395,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-time"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"chrono",
|
||||
@@ -2399,7 +2413,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-version"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"build-data",
|
||||
"const_format",
|
||||
@@ -2409,7 +2423,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-wal"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"common-base",
|
||||
"common-error",
|
||||
@@ -3340,7 +3354,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "datanode"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow-flight",
|
||||
@@ -3392,7 +3406,7 @@ dependencies = [
|
||||
"session",
|
||||
"snafu 0.8.5",
|
||||
"store-api",
|
||||
"substrait 0.12.1",
|
||||
"substrait 0.12.2",
|
||||
"table",
|
||||
"tokio",
|
||||
"toml 0.8.19",
|
||||
@@ -3401,7 +3415,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "datatypes"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -4045,7 +4059,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "file-engine"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -4155,7 +4169,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
|
||||
|
||||
[[package]]
|
||||
name = "flow"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow",
|
||||
@@ -4216,7 +4230,7 @@ dependencies = [
|
||||
"snafu 0.8.5",
|
||||
"store-api",
|
||||
"strum 0.25.0",
|
||||
"substrait 0.12.1",
|
||||
"substrait 0.12.2",
|
||||
"table",
|
||||
"tokio",
|
||||
"tonic 0.12.3",
|
||||
@@ -4271,7 +4285,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
|
||||
|
||||
[[package]]
|
||||
name = "frontend"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -5539,7 +5553,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "index"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"asynchronous-codec",
|
||||
@@ -6331,7 +6345,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
|
||||
|
||||
[[package]]
|
||||
name = "log-query"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"common-error",
|
||||
@@ -6343,7 +6357,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "log-store"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
@@ -6636,7 +6650,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "meta-client"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -6663,7 +6677,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "meta-srv"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -6749,7 +6763,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "metric-engine"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"aquamarine",
|
||||
@@ -6847,7 +6861,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "mito2"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"aquamarine",
|
||||
@@ -7544,7 +7558,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "object-store"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bytes",
|
||||
@@ -7793,7 +7807,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "operator"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -7841,7 +7855,7 @@ dependencies = [
|
||||
"sql",
|
||||
"sqlparser 0.52.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=71dd86058d2af97b9925093d40c4e03360403170)",
|
||||
"store-api",
|
||||
"substrait 0.12.1",
|
||||
"substrait 0.12.2",
|
||||
"table",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
@@ -8078,7 +8092,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "partition"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -8346,7 +8360,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
|
||||
|
||||
[[package]]
|
||||
name = "pipeline"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -8486,7 +8500,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "plugins"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"auth",
|
||||
"clap 4.5.19",
|
||||
@@ -8748,7 +8762,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "promql"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"async-trait",
|
||||
@@ -8993,7 +9007,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "puffin"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"async-compression 0.4.13",
|
||||
"async-trait",
|
||||
@@ -9034,7 +9048,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "query"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -9099,7 +9113,7 @@ dependencies = [
|
||||
"sqlparser 0.52.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=71dd86058d2af97b9925093d40c4e03360403170)",
|
||||
"statrs",
|
||||
"store-api",
|
||||
"substrait 0.12.1",
|
||||
"substrait 0.12.2",
|
||||
"table",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
@@ -10444,7 +10458,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "servers"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -10561,7 +10575,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "session"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -10870,7 +10884,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "sql"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"chrono",
|
||||
@@ -10924,7 +10938,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "sqlness-runner"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"clap 4.5.19",
|
||||
@@ -11241,7 +11255,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "store-api"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"aquamarine",
|
||||
@@ -11371,7 +11385,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "substrait"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
@@ -11552,7 +11566,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "table"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -11803,7 +11817,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
|
||||
|
||||
[[package]]
|
||||
name = "tests-fuzz"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"arbitrary",
|
||||
"async-trait",
|
||||
@@ -11847,7 +11861,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tests-integration"
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow-flight",
|
||||
@@ -11913,7 +11927,7 @@ dependencies = [
|
||||
"sql",
|
||||
"sqlx",
|
||||
"store-api",
|
||||
"substrait 0.12.1",
|
||||
"substrait 0.12.2",
|
||||
"table",
|
||||
"tempfile",
|
||||
"time",
|
||||
|
||||
@@ -67,7 +67,7 @@ members = [
|
||||
resolver = "2"
|
||||
|
||||
[workspace.package]
|
||||
version = "0.12.1"
|
||||
version = "0.12.2"
|
||||
edition = "2021"
|
||||
license = "Apache-2.0"
|
||||
|
||||
|
||||
156
cyborg/bin/bump-versions.ts
Normal file
156
cyborg/bin/bump-versions.ts
Normal file
@@ -0,0 +1,156 @@
|
||||
/*
|
||||
* Copyright 2023 Greptime Team
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import * as core from "@actions/core";
|
||||
import {obtainClient} from "@/common";
|
||||
|
||||
interface RepoConfig {
|
||||
tokenEnv: string;
|
||||
repo: string;
|
||||
workflowLogic: (version: string) => [string, string] | null;
|
||||
}
|
||||
|
||||
const REPO_CONFIGS: Record<string, RepoConfig> = {
|
||||
website: {
|
||||
tokenEnv: "WEBSITE_REPO_TOKEN",
|
||||
repo: "website",
|
||||
workflowLogic: (version: string) => {
|
||||
// Skip nightly versions for website
|
||||
if (version.includes('nightly')) {
|
||||
console.log('Nightly version detected for website, skipping workflow trigger.');
|
||||
return null;
|
||||
}
|
||||
return ['bump-patch-version.yml', version];
|
||||
}
|
||||
},
|
||||
demo: {
|
||||
tokenEnv: "DEMO_REPO_TOKEN",
|
||||
repo: "demo-scene",
|
||||
workflowLogic: (version: string) => {
|
||||
// Skip nightly versions for demo
|
||||
if (version.includes('nightly')) {
|
||||
console.log('Nightly version detected for demo, skipping workflow trigger.');
|
||||
return null;
|
||||
}
|
||||
return ['bump-patch-version.yml', version];
|
||||
}
|
||||
},
|
||||
docs: {
|
||||
tokenEnv: "DOCS_REPO_TOKEN",
|
||||
repo: "docs",
|
||||
workflowLogic: (version: string) => {
|
||||
// Check if it's a nightly version
|
||||
if (version.includes('nightly')) {
|
||||
return ['bump-nightly-version.yml', version];
|
||||
}
|
||||
|
||||
const parts = version.split('.');
|
||||
if (parts.length !== 3) {
|
||||
throw new Error('Invalid version format');
|
||||
}
|
||||
|
||||
// If patch version (last number) is 0, it's a major version
|
||||
// Return only major.minor version
|
||||
if (parts[2] === '0') {
|
||||
return ['bump-version.yml', `${parts[0]}.${parts[1]}`];
|
||||
}
|
||||
|
||||
// Otherwise it's a patch version, use full version
|
||||
return ['bump-patch-version.yml', version];
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
async function triggerWorkflow(repoConfig: RepoConfig, workflowId: string, version: string) {
|
||||
const client = obtainClient(repoConfig.tokenEnv);
|
||||
try {
|
||||
await client.rest.actions.createWorkflowDispatch({
|
||||
owner: "GreptimeTeam",
|
||||
repo: repoConfig.repo,
|
||||
workflow_id: workflowId,
|
||||
ref: "main",
|
||||
inputs: {
|
||||
version,
|
||||
},
|
||||
});
|
||||
console.log(`Successfully triggered ${workflowId} workflow for ${repoConfig.repo} with version ${version}`);
|
||||
} catch (error) {
|
||||
core.setFailed(`Failed to trigger workflow for ${repoConfig.repo}: ${error.message}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function processRepo(repoName: string, version: string) {
|
||||
const repoConfig = REPO_CONFIGS[repoName];
|
||||
if (!repoConfig) {
|
||||
throw new Error(`Unknown repository: ${repoName}`);
|
||||
}
|
||||
|
||||
try {
|
||||
const workflowResult = repoConfig.workflowLogic(version);
|
||||
if (workflowResult === null) {
|
||||
// Skip this repo (e.g., nightly version for website)
|
||||
return;
|
||||
}
|
||||
|
||||
const [workflowId, apiVersion] = workflowResult;
|
||||
await triggerWorkflow(repoConfig, workflowId, apiVersion);
|
||||
} catch (error) {
|
||||
core.setFailed(`Error processing ${repoName} with version ${version}: ${error.message}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function main() {
|
||||
const version = process.env.VERSION;
|
||||
if (!version) {
|
||||
core.setFailed("VERSION environment variable is required");
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
// Remove 'v' prefix if exists
|
||||
const cleanVersion = version.startsWith('v') ? version.slice(1) : version;
|
||||
|
||||
// Get target repositories from environment variable
|
||||
// Default to both if not specified
|
||||
const targetRepos = process.env.TARGET_REPOS?.split(',').map(repo => repo.trim()) || ['website', 'docs'];
|
||||
|
||||
console.log(`Processing version ${cleanVersion} for repositories: ${targetRepos.join(', ')}`);
|
||||
|
||||
const errors: string[] = [];
|
||||
|
||||
// Process each repository
|
||||
for (const repo of targetRepos) {
|
||||
try {
|
||||
await processRepo(repo, cleanVersion);
|
||||
} catch (error) {
|
||||
errors.push(`${repo}: ${error.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
if (errors.length > 0) {
|
||||
core.setFailed(`Failed to process some repositories: ${errors.join('; ')}`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
console.log('All repositories processed successfully');
|
||||
}
|
||||
|
||||
// Execute main function
|
||||
main().catch((error) => {
|
||||
core.setFailed(`Unexpected error: ${error.message}`);
|
||||
process.exit(1);
|
||||
});
|
||||
@@ -55,12 +55,25 @@ async function main() {
|
||||
await client.rest.issues.addLabels({
|
||||
owner, repo, issue_number: number, labels: [labelDocsRequired],
|
||||
})
|
||||
|
||||
// Get available assignees for the docs repo
|
||||
const assigneesResponse = await docsClient.rest.issues.listAssignees({
|
||||
owner: 'GreptimeTeam',
|
||||
repo: 'docs',
|
||||
})
|
||||
const validAssignees = assigneesResponse.data.map(assignee => assignee.login)
|
||||
core.info(`Available assignees: ${validAssignees.join(', ')}`)
|
||||
|
||||
// Check if the actor is a valid assignee, otherwise fallback to fengjiachun
|
||||
const assignee = validAssignees.includes(actor) ? actor : 'fengjiachun'
|
||||
core.info(`Assigning issue to: ${assignee}`)
|
||||
|
||||
await docsClient.rest.issues.create({
|
||||
owner: 'GreptimeTeam',
|
||||
repo: 'docs',
|
||||
title: `Update docs for ${title}`,
|
||||
body: `A document change request is generated from ${html_url}`,
|
||||
assignee: actor,
|
||||
assignee: assignee,
|
||||
}).then((res) => {
|
||||
core.info(`Created issue ${res.data}`)
|
||||
})
|
||||
|
||||
@@ -53,6 +53,54 @@ get_arch_type() {
|
||||
esac
|
||||
}
|
||||
|
||||
# Verify SHA256 checksum
|
||||
verify_sha256() {
|
||||
file="$1"
|
||||
expected_sha256="$2"
|
||||
|
||||
if command -v sha256sum >/dev/null 2>&1; then
|
||||
actual_sha256=$(sha256sum "$file" | cut -d' ' -f1)
|
||||
elif command -v shasum >/dev/null 2>&1; then
|
||||
actual_sha256=$(shasum -a 256 "$file" | cut -d' ' -f1)
|
||||
else
|
||||
echo "Warning: No SHA256 verification tool found (sha256sum or shasum). Skipping checksum verification."
|
||||
return 0
|
||||
fi
|
||||
|
||||
if [ "$actual_sha256" = "$expected_sha256" ]; then
|
||||
echo "SHA256 checksum verified successfully."
|
||||
return 0
|
||||
else
|
||||
echo "Error: SHA256 checksum verification failed!"
|
||||
echo "Expected: $expected_sha256"
|
||||
echo "Actual: $actual_sha256"
|
||||
return 1
|
||||
fi
|
||||
}
|
||||
|
||||
# Prompt for user confirmation (compatible with different shells)
|
||||
prompt_confirmation() {
|
||||
message="$1"
|
||||
printf "%s (y/N): " "$message"
|
||||
|
||||
# Try to read user input, fallback if read fails
|
||||
answer=""
|
||||
if read answer </dev/tty 2>/dev/null; then
|
||||
case "$answer" in
|
||||
[Yy]|[Yy][Ee][Ss])
|
||||
return 0
|
||||
;;
|
||||
*)
|
||||
return 1
|
||||
;;
|
||||
esac
|
||||
else
|
||||
echo ""
|
||||
echo "Cannot read user input. Defaulting to No."
|
||||
return 1
|
||||
fi
|
||||
}
|
||||
|
||||
download_artifact() {
|
||||
if [ -n "${OS_TYPE}" ] && [ -n "${ARCH_TYPE}" ]; then
|
||||
# Use the latest stable released version.
|
||||
@@ -71,17 +119,104 @@ download_artifact() {
|
||||
fi
|
||||
|
||||
echo "Downloading ${BIN}, OS: ${OS_TYPE}, Arch: ${ARCH_TYPE}, Version: ${VERSION}"
|
||||
PACKAGE_NAME="${BIN}-${OS_TYPE}-${ARCH_TYPE}-${VERSION}.tar.gz"
|
||||
PKG_NAME="${BIN}-${OS_TYPE}-${ARCH_TYPE}-${VERSION}"
|
||||
PACKAGE_NAME="${PKG_NAME}.tar.gz"
|
||||
SHA256_FILE="${PKG_NAME}.sha256sum"
|
||||
|
||||
if [ -n "${PACKAGE_NAME}" ]; then
|
||||
wget "https://github.com/${GITHUB_ORG}/${GITHUB_REPO}/releases/download/${VERSION}/${PACKAGE_NAME}"
|
||||
# Check if files already exist and prompt for override
|
||||
if [ -f "${PACKAGE_NAME}" ]; then
|
||||
echo "File ${PACKAGE_NAME} already exists."
|
||||
if prompt_confirmation "Do you want to override it?"; then
|
||||
echo "Overriding existing file..."
|
||||
rm -f "${PACKAGE_NAME}"
|
||||
else
|
||||
echo "Skipping download. Using existing file."
|
||||
fi
|
||||
fi
|
||||
|
||||
if [ -f "${BIN}" ]; then
|
||||
echo "Binary ${BIN} already exists."
|
||||
if prompt_confirmation "Do you want to override it?"; then
|
||||
echo "Will override existing binary..."
|
||||
rm -f "${BIN}"
|
||||
else
|
||||
echo "Installation cancelled."
|
||||
exit 0
|
||||
fi
|
||||
fi
|
||||
|
||||
# Download package if not exists
|
||||
if [ ! -f "${PACKAGE_NAME}" ]; then
|
||||
echo "Downloading ${PACKAGE_NAME}..."
|
||||
# Use curl instead of wget for better compatibility
|
||||
if command -v curl >/dev/null 2>&1; then
|
||||
if ! curl -L -o "${PACKAGE_NAME}" "https://github.com/${GITHUB_ORG}/${GITHUB_REPO}/releases/download/${VERSION}/${PACKAGE_NAME}"; then
|
||||
echo "Error: Failed to download ${PACKAGE_NAME}"
|
||||
exit 1
|
||||
fi
|
||||
elif command -v wget >/dev/null 2>&1; then
|
||||
if ! wget -O "${PACKAGE_NAME}" "https://github.com/${GITHUB_ORG}/${GITHUB_REPO}/releases/download/${VERSION}/${PACKAGE_NAME}"; then
|
||||
echo "Error: Failed to download ${PACKAGE_NAME}"
|
||||
exit 1
|
||||
fi
|
||||
else
|
||||
echo "Error: Neither curl nor wget is available for downloading."
|
||||
exit 1
|
||||
fi
|
||||
fi
|
||||
|
||||
# Download and verify SHA256 checksum
|
||||
echo "Downloading SHA256 checksum..."
|
||||
sha256_download_success=0
|
||||
if command -v curl >/dev/null 2>&1; then
|
||||
if curl -L -s -o "${SHA256_FILE}" "https://github.com/${GITHUB_ORG}/${GITHUB_REPO}/releases/download/${VERSION}/${SHA256_FILE}" 2>/dev/null; then
|
||||
sha256_download_success=1
|
||||
fi
|
||||
elif command -v wget >/dev/null 2>&1; then
|
||||
if wget -q -O "${SHA256_FILE}" "https://github.com/${GITHUB_ORG}/${GITHUB_REPO}/releases/download/${VERSION}/${SHA256_FILE}" 2>/dev/null; then
|
||||
sha256_download_success=1
|
||||
fi
|
||||
fi
|
||||
|
||||
if [ $sha256_download_success -eq 1 ] && [ -f "${SHA256_FILE}" ]; then
|
||||
expected_sha256=$(cat "${SHA256_FILE}" | cut -d' ' -f1)
|
||||
if [ -n "$expected_sha256" ]; then
|
||||
if ! verify_sha256 "${PACKAGE_NAME}" "${expected_sha256}"; then
|
||||
echo "SHA256 verification failed. Removing downloaded file."
|
||||
rm -f "${PACKAGE_NAME}" "${SHA256_FILE}"
|
||||
exit 1
|
||||
fi
|
||||
else
|
||||
echo "Warning: Could not parse SHA256 checksum from file."
|
||||
fi
|
||||
rm -f "${SHA256_FILE}"
|
||||
else
|
||||
echo "Warning: Could not download SHA256 checksum file. Skipping verification."
|
||||
fi
|
||||
|
||||
# Extract the binary and clean the rest.
|
||||
tar xvf "${PACKAGE_NAME}" && \
|
||||
mv "${PACKAGE_NAME%.tar.gz}/${BIN}" "${PWD}" && \
|
||||
rm -r "${PACKAGE_NAME}" && \
|
||||
rm -r "${PACKAGE_NAME%.tar.gz}" && \
|
||||
echo "Run './${BIN} --help' to get started"
|
||||
echo "Extracting ${PACKAGE_NAME}..."
|
||||
if ! tar xf "${PACKAGE_NAME}"; then
|
||||
echo "Error: Failed to extract ${PACKAGE_NAME}"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Find the binary in the extracted directory
|
||||
extracted_dir="${PACKAGE_NAME%.tar.gz}"
|
||||
if [ -f "${extracted_dir}/${BIN}" ]; then
|
||||
mv "${extracted_dir}/${BIN}" "${PWD}/"
|
||||
rm -f "${PACKAGE_NAME}"
|
||||
rm -rf "${extracted_dir}"
|
||||
chmod +x "${BIN}"
|
||||
echo "Installation completed successfully!"
|
||||
echo "Run './${BIN} --help' to get started"
|
||||
else
|
||||
echo "Error: Binary ${BIN} not found in extracted archive"
|
||||
rm -f "${PACKAGE_NAME}"
|
||||
rm -rf "${extracted_dir}"
|
||||
exit 1
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
}
|
||||
|
||||
@@ -9,6 +9,10 @@ default-run = "greptime"
|
||||
name = "greptime"
|
||||
path = "src/bin/greptime.rs"
|
||||
|
||||
[[bin]]
|
||||
name = "objbench"
|
||||
path = "src/bin/objbench.rs"
|
||||
|
||||
[features]
|
||||
default = ["servers/pprof", "servers/mem-prof"]
|
||||
tokio-console = ["common-telemetry/tokio-console"]
|
||||
@@ -20,6 +24,7 @@ workspace = true
|
||||
async-trait.workspace = true
|
||||
auth.workspace = true
|
||||
base64.workspace = true
|
||||
colored = "2.0"
|
||||
cache.workspace = true
|
||||
catalog.workspace = true
|
||||
chrono.workspace = true
|
||||
@@ -55,6 +60,9 @@ futures.workspace = true
|
||||
human-panic = "2.0"
|
||||
humantime.workspace = true
|
||||
lazy_static.workspace = true
|
||||
object-store.workspace = true
|
||||
parquet = "53"
|
||||
pprof = "0.14"
|
||||
meta-client.workspace = true
|
||||
meta-srv.workspace = true
|
||||
metric-engine.workspace = true
|
||||
|
||||
@@ -21,6 +21,8 @@ use cmd::{cli, datanode, flownode, frontend, metasrv, standalone, App};
|
||||
use common_version::version;
|
||||
use servers::install_ring_crypto_provider;
|
||||
|
||||
pub mod objbench;
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(name = "greptime", author, version, long_version = version(), about)]
|
||||
#[command(propagate_version = true)]
|
||||
|
||||
602
src/cmd/src/bin/objbench.rs
Normal file
602
src/cmd/src/bin/objbench.rs
Normal file
@@ -0,0 +1,602 @@
|
||||
// Copyright 2025 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::Instant;
|
||||
|
||||
use clap::Parser;
|
||||
use cmd::error::{self, Result};
|
||||
use colored::Colorize;
|
||||
use datanode::config::ObjectStoreConfig;
|
||||
use mito2::config::{FulltextIndexConfig, MitoConfig, Mode};
|
||||
use mito2::read::Source;
|
||||
use mito2::sst::file::{FileHandle, FileId, FileMeta};
|
||||
use mito2::sst::file_purger::{FilePurger, FilePurgerRef, PurgeRequest};
|
||||
use mito2::sst::parquet::{WriteOptions, PARQUET_METADATA_KEY};
|
||||
use mito2::{build_access_layer, Metrics, OperationType, SstWriteRequest};
|
||||
use object_store::ObjectStore;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
|
||||
|
||||
#[tokio::main]
|
||||
pub async fn main() {
|
||||
// common_telemetry::init_default_ut_logging();
|
||||
let cmd = Command::parse();
|
||||
if let Err(e) = cmd.run().await {
|
||||
eprintln!("{}: {}", "Error".red().bold(), e);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
|
||||
#[serde(default)]
|
||||
pub struct StorageConfigWrapper {
|
||||
storage: StorageConfig,
|
||||
}
|
||||
|
||||
/// Storage engine config
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
|
||||
#[serde(default)]
|
||||
pub struct StorageConfig {
|
||||
/// The working directory of database
|
||||
pub data_home: String,
|
||||
#[serde(flatten)]
|
||||
pub store: ObjectStoreConfig,
|
||||
}
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
pub struct Command {
|
||||
/// Path to the object-store config file (TOML). Must deserialize into datanode::config::ObjectStoreConfig.
|
||||
#[clap(long, value_name = "FILE")]
|
||||
pub config: PathBuf,
|
||||
|
||||
/// Source SST file path in object-store (e.g. "region_dir/<uuid>.parquet").
|
||||
#[clap(long, value_name = "PATH")]
|
||||
pub source: String,
|
||||
|
||||
/// Target SST file path in object-store; its parent directory is used as destination region dir.
|
||||
#[clap(long, value_name = "PATH")]
|
||||
pub target: String,
|
||||
|
||||
/// Verbose output
|
||||
#[clap(short, long, default_value_t = false)]
|
||||
pub verbose: bool,
|
||||
|
||||
/// Output file path for pprof flamegraph (enables profiling)
|
||||
#[clap(long, value_name = "FILE")]
|
||||
pub pprof_file: Option<PathBuf>,
|
||||
}
|
||||
|
||||
impl Command {
|
||||
pub async fn run(&self) -> Result<()> {
|
||||
if self.verbose {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
}
|
||||
|
||||
println!("{}", "Starting objbench...".cyan().bold());
|
||||
|
||||
// Build object store from config
|
||||
let cfg_str = std::fs::read_to_string(&self.config).map_err(|e| {
|
||||
error::IllegalConfigSnafu {
|
||||
msg: format!("failed to read config {}: {e}", self.config.display()),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
let store_cfg: StorageConfigWrapper = toml::from_str(&cfg_str).map_err(|e| {
|
||||
error::IllegalConfigSnafu {
|
||||
msg: format!("failed to parse config {}: {e}", self.config.display()),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
|
||||
let object_store = build_object_store(&store_cfg.storage).await?;
|
||||
println!("{} Object store initialized", "✓".green());
|
||||
|
||||
// Prepare source identifiers
|
||||
let (src_region_dir, src_file_id) = split_sst_path(&self.source)?;
|
||||
println!("{} Source path parsed: {}", "✓".green(), self.source);
|
||||
|
||||
// Load parquet metadata to extract RegionMetadata and file stats
|
||||
println!("{}", "Loading parquet metadata...".yellow());
|
||||
let file_size = object_store
|
||||
.stat(&self.source)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error::IllegalConfigSnafu {
|
||||
msg: format!("stat failed: {e}"),
|
||||
}
|
||||
.build()
|
||||
})?
|
||||
.content_length();
|
||||
let parquet_meta = load_parquet_metadata(object_store.clone(), &self.source, file_size)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error::IllegalConfigSnafu {
|
||||
msg: format!("read parquet metadata failed: {e}"),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
|
||||
let region_meta = extract_region_metadata(&self.source, &parquet_meta)?;
|
||||
let num_rows = parquet_meta.file_metadata().num_rows() as u64;
|
||||
let num_row_groups = parquet_meta.num_row_groups() as u64;
|
||||
|
||||
println!(
|
||||
"{} Metadata loaded - rows: {}, size: {} bytes",
|
||||
"✓".green(),
|
||||
num_rows,
|
||||
file_size
|
||||
);
|
||||
|
||||
// Build a FileHandle for the source file
|
||||
let file_meta = FileMeta {
|
||||
region_id: region_meta.region_id,
|
||||
file_id: src_file_id,
|
||||
time_range: Default::default(),
|
||||
level: 0,
|
||||
file_size,
|
||||
available_indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
num_rows,
|
||||
num_row_groups,
|
||||
sequence: None,
|
||||
};
|
||||
let src_handle = FileHandle::new(file_meta, new_noop_file_purger());
|
||||
|
||||
// Build the reader for a single file via ParquetReaderBuilder
|
||||
println!("{}", "Building reader...".yellow());
|
||||
let (_src_access_layer, _cache_manager) =
|
||||
build_access_layer_simple(src_region_dir.clone(), object_store.clone()).await?;
|
||||
let reader_build_start = Instant::now();
|
||||
let reader = mito2::sst::parquet::reader::ParquetReaderBuilder::new(
|
||||
src_region_dir.clone(),
|
||||
src_handle.clone(),
|
||||
object_store.clone(),
|
||||
)
|
||||
.expected_metadata(Some(region_meta.clone()))
|
||||
.build()
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error::IllegalConfigSnafu {
|
||||
msg: format!("build reader failed: {e}"),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
|
||||
let reader_build_elapsed = reader_build_start.elapsed();
|
||||
let total_rows = reader.parquet_metadata().file_metadata().num_rows();
|
||||
println!("{} Reader built in {:?}", "✓".green(), reader_build_elapsed);
|
||||
|
||||
// Prepare target access layer for writing
|
||||
println!("{}", "Preparing target access layer...".yellow());
|
||||
let (tgt_access_layer, tgt_cache_manager) =
|
||||
build_access_layer_simple(self.target.clone(), object_store.clone()).await?;
|
||||
|
||||
// Build write request
|
||||
let fulltext_index_config = FulltextIndexConfig {
|
||||
create_on_compaction: Mode::Disable,
|
||||
..Default::default()
|
||||
};
|
||||
let write_opts = WriteOptions::default();
|
||||
let write_req = SstWriteRequest {
|
||||
op_type: OperationType::Compact,
|
||||
metadata: region_meta,
|
||||
source: Source::Reader(Box::new(reader)),
|
||||
cache_manager: tgt_cache_manager,
|
||||
storage: None,
|
||||
max_sequence: None,
|
||||
index_options: Default::default(),
|
||||
inverted_index_config: MitoConfig::default().inverted_index,
|
||||
fulltext_index_config,
|
||||
bloom_filter_index_config: MitoConfig::default().bloom_filter_index,
|
||||
};
|
||||
|
||||
// Write SST
|
||||
println!("{}", "Writing SST...".yellow());
|
||||
let mut metrics = Metrics::default();
|
||||
|
||||
// Start profiling if pprof_file is specified
|
||||
#[cfg(unix)]
|
||||
let profiler_guard = if self.pprof_file.is_some() {
|
||||
println!("{} Starting profiling...", "⚡".yellow());
|
||||
Some(
|
||||
pprof::ProfilerGuardBuilder::default()
|
||||
.frequency(99)
|
||||
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
|
||||
.build()
|
||||
.map_err(|e| {
|
||||
error::IllegalConfigSnafu {
|
||||
msg: format!("Failed to start profiler: {e}"),
|
||||
}
|
||||
.build()
|
||||
})?,
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
#[cfg(not(unix))]
|
||||
if self.pprof_file.is_some() {
|
||||
eprintln!(
|
||||
"{}: Profiling is not supported on this platform",
|
||||
"Warning".yellow()
|
||||
);
|
||||
}
|
||||
|
||||
let write_start = Instant::now();
|
||||
let infos = tgt_access_layer
|
||||
.write_sst(write_req, &write_opts, &mut metrics)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error::IllegalConfigSnafu {
|
||||
msg: format!("write_sst failed: {e}"),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
|
||||
let write_elapsed = write_start.elapsed();
|
||||
|
||||
// Stop profiling and generate flamegraph if enabled
|
||||
#[cfg(unix)]
|
||||
if let (Some(guard), Some(pprof_file)) = (profiler_guard, &self.pprof_file) {
|
||||
println!("{} Generating flamegraph...", "🔥".yellow());
|
||||
match guard.report().build() {
|
||||
Ok(report) => {
|
||||
let mut flamegraph_data = Vec::new();
|
||||
if let Err(e) = report.flamegraph(&mut flamegraph_data) {
|
||||
eprintln!(
|
||||
"{}: Failed to generate flamegraph: {}",
|
||||
"Warning".yellow(),
|
||||
e
|
||||
);
|
||||
} else if let Err(e) = std::fs::write(pprof_file, flamegraph_data) {
|
||||
eprintln!(
|
||||
"{}: Failed to write flamegraph to {}: {}",
|
||||
"Warning".yellow(),
|
||||
pprof_file.display(),
|
||||
e
|
||||
);
|
||||
} else {
|
||||
println!(
|
||||
"{} Flamegraph saved to {}",
|
||||
"✓".green(),
|
||||
pprof_file.display().to_string().cyan()
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!(
|
||||
"{}: Failed to generate pprof report: {}",
|
||||
"Warning".yellow(),
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
assert_eq!(infos.len(), 1);
|
||||
let dst_file_id = infos[0].file_id;
|
||||
let dst_file_path = format!("{}{}", self.target, dst_file_id.as_parquet(),);
|
||||
|
||||
// Report results with ANSI colors
|
||||
println!("\n{} {}", "Write complete!".green().bold(), "✓".green());
|
||||
println!(" {}: {}", "Destination file".bold(), dst_file_path.cyan());
|
||||
println!(" {}: {}", "Rows".bold(), total_rows.to_string().cyan());
|
||||
println!(
|
||||
" {}: {}",
|
||||
"File size".bold(),
|
||||
format!("{} bytes", file_size).cyan()
|
||||
);
|
||||
println!(
|
||||
" {}: {:?}",
|
||||
"Reader build time".bold(),
|
||||
reader_build_elapsed
|
||||
);
|
||||
println!(" {}: {:?}", "Total time".bold(), write_elapsed);
|
||||
|
||||
// Print metrics in a formatted way
|
||||
println!(
|
||||
" {}: {:?}, sum: {:?}",
|
||||
"Metrics".bold(),
|
||||
metrics,
|
||||
metrics.sum()
|
||||
);
|
||||
|
||||
// Print infos
|
||||
println!(" {}: {:?}", "Index".bold(), infos[0].index_metadata);
|
||||
|
||||
// Cleanup
|
||||
println!("\n{}", "Cleaning up...".yellow());
|
||||
object_store.delete(&dst_file_path).await.map_err(|e| {
|
||||
error::IllegalConfigSnafu {
|
||||
msg: format!("Failed to delete dest file {}: {}", dst_file_path, e),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
println!("{} Temporary file deleted", "✓".green());
|
||||
|
||||
println!("\n{}", "Benchmark completed successfully!".green().bold());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn split_sst_path(path: &str) -> Result<(String, FileId)> {
|
||||
let p = Path::new(path);
|
||||
let file_name = p.file_name().and_then(|s| s.to_str()).ok_or_else(|| {
|
||||
error::IllegalConfigSnafu {
|
||||
msg: "invalid source path".to_string(),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
let uuid_str = file_name.strip_suffix(".parquet").ok_or_else(|| {
|
||||
error::IllegalConfigSnafu {
|
||||
msg: "expect .parquet file".to_string(),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
let file_id = FileId::parse_str(uuid_str).map_err(|e| {
|
||||
error::IllegalConfigSnafu {
|
||||
msg: format!("invalid file id: {e}"),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
let parent = p
|
||||
.parent()
|
||||
.and_then(|s| s.to_str())
|
||||
.unwrap_or("")
|
||||
.to_string();
|
||||
Ok((parent, file_id))
|
||||
}
|
||||
|
||||
fn extract_region_metadata(
|
||||
file_path: &str,
|
||||
meta: &parquet::file::metadata::ParquetMetaData,
|
||||
) -> Result<RegionMetadataRef> {
|
||||
use parquet::format::KeyValue;
|
||||
let kvs: Option<&Vec<KeyValue>> = meta.file_metadata().key_value_metadata();
|
||||
let Some(kvs) = kvs else {
|
||||
return Err(error::IllegalConfigSnafu {
|
||||
msg: format!("{file_path}: missing parquet key_value metadata"),
|
||||
}
|
||||
.build());
|
||||
};
|
||||
let json = kvs
|
||||
.iter()
|
||||
.find(|kv| kv.key == PARQUET_METADATA_KEY)
|
||||
.and_then(|kv| kv.value.as_ref())
|
||||
.ok_or_else(|| {
|
||||
error::IllegalConfigSnafu {
|
||||
msg: format!("{file_path}: key {PARQUET_METADATA_KEY} not found or empty"),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
let region: RegionMetadata = RegionMetadata::from_json(json).map_err(|e| {
|
||||
error::IllegalConfigSnafu {
|
||||
msg: format!("invalid region metadata json: {e}"),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
Ok(std::sync::Arc::new(region))
|
||||
}
|
||||
|
||||
async fn build_object_store(sc: &StorageConfig) -> Result<ObjectStore> {
|
||||
use datanode::config::ObjectStoreConfig::*;
|
||||
let oss = &sc.store;
|
||||
match oss {
|
||||
File(_) => {
|
||||
use object_store::services::Fs;
|
||||
let builder = Fs::default().root(&sc.data_home);
|
||||
Ok(ObjectStore::new(builder)
|
||||
.map_err(|e| {
|
||||
error::IllegalConfigSnafu {
|
||||
msg: format!("init fs backend failed: {e}"),
|
||||
}
|
||||
.build()
|
||||
})?
|
||||
.finish())
|
||||
}
|
||||
S3(s3) => {
|
||||
use common_base::secrets::ExposeSecret;
|
||||
use object_store::services::S3;
|
||||
use object_store::util;
|
||||
let root = util::normalize_dir(&s3.root);
|
||||
let mut builder = S3::default()
|
||||
.root(&root)
|
||||
.bucket(&s3.bucket)
|
||||
.access_key_id(s3.access_key_id.expose_secret())
|
||||
.secret_access_key(s3.secret_access_key.expose_secret());
|
||||
if let Some(ep) = &s3.endpoint {
|
||||
builder = builder.endpoint(ep);
|
||||
}
|
||||
if let Some(region) = &s3.region {
|
||||
builder = builder.region(region);
|
||||
}
|
||||
if s3.enable_virtual_host_style {
|
||||
builder = builder.enable_virtual_host_style();
|
||||
}
|
||||
Ok(ObjectStore::new(builder)
|
||||
.map_err(|e| {
|
||||
error::IllegalConfigSnafu {
|
||||
msg: format!("init s3 backend failed: {e}"),
|
||||
}
|
||||
.build()
|
||||
})?
|
||||
.finish())
|
||||
}
|
||||
Oss(oss) => {
|
||||
use common_base::secrets::ExposeSecret;
|
||||
use object_store::services::Oss;
|
||||
use object_store::util;
|
||||
let root = util::normalize_dir(&oss.root);
|
||||
let builder = Oss::default()
|
||||
.root(&root)
|
||||
.bucket(&oss.bucket)
|
||||
.endpoint(&oss.endpoint)
|
||||
.access_key_id(oss.access_key_id.expose_secret())
|
||||
.access_key_secret(oss.access_key_secret.expose_secret());
|
||||
Ok(ObjectStore::new(builder)
|
||||
.map_err(|e| {
|
||||
error::IllegalConfigSnafu {
|
||||
msg: format!("init oss backend failed: {e}"),
|
||||
}
|
||||
.build()
|
||||
})?
|
||||
.finish())
|
||||
}
|
||||
Azblob(az) => {
|
||||
use common_base::secrets::ExposeSecret;
|
||||
use object_store::services::Azblob;
|
||||
use object_store::util;
|
||||
let root = util::normalize_dir(&az.root);
|
||||
let mut builder = Azblob::default()
|
||||
.root(&root)
|
||||
.container(&az.container)
|
||||
.endpoint(&az.endpoint)
|
||||
.account_name(az.account_name.expose_secret())
|
||||
.account_key(az.account_key.expose_secret());
|
||||
if let Some(token) = &az.sas_token {
|
||||
builder = builder.sas_token(token);
|
||||
}
|
||||
Ok(ObjectStore::new(builder)
|
||||
.map_err(|e| {
|
||||
error::IllegalConfigSnafu {
|
||||
msg: format!("init azblob backend failed: {e}"),
|
||||
}
|
||||
.build()
|
||||
})?
|
||||
.finish())
|
||||
}
|
||||
Gcs(gcs) => {
|
||||
use common_base::secrets::ExposeSecret;
|
||||
use object_store::services::Gcs;
|
||||
use object_store::util;
|
||||
let root = util::normalize_dir(&gcs.root);
|
||||
let builder = Gcs::default()
|
||||
.root(&root)
|
||||
.bucket(&gcs.bucket)
|
||||
.scope(&gcs.scope)
|
||||
.credential_path(gcs.credential_path.expose_secret())
|
||||
.credential(gcs.credential.expose_secret())
|
||||
.endpoint(&gcs.endpoint);
|
||||
Ok(ObjectStore::new(builder)
|
||||
.map_err(|e| {
|
||||
error::IllegalConfigSnafu {
|
||||
msg: format!("init gcs backend failed: {e}"),
|
||||
}
|
||||
.build()
|
||||
})?
|
||||
.finish())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn build_access_layer_simple(
|
||||
region_dir: String,
|
||||
object_store: ObjectStore,
|
||||
) -> Result<(
|
||||
std::sync::Arc<mito2::AccessLayer>,
|
||||
std::sync::Arc<mito2::CacheManager>,
|
||||
)> {
|
||||
// Minimal index aux path setup
|
||||
let mut mito_cfg = MitoConfig::default();
|
||||
// Use a temporary directory as aux path
|
||||
let data_home = std::env::temp_dir().join("greptime_objbench");
|
||||
let _ = std::fs::create_dir_all(&data_home);
|
||||
let _ = mito_cfg.index.sanitize(
|
||||
data_home.to_str().unwrap_or("/tmp"),
|
||||
&mito_cfg.inverted_index,
|
||||
);
|
||||
let access_layer = build_access_layer(®ion_dir, object_store, &mito_cfg)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error::IllegalConfigSnafu {
|
||||
msg: format!("build_access_layer failed: {e}"),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
Ok((
|
||||
access_layer,
|
||||
std::sync::Arc::new(mito2::CacheManager::default()),
|
||||
))
|
||||
}
|
||||
|
||||
fn new_noop_file_purger() -> FilePurgerRef {
|
||||
#[derive(Debug)]
|
||||
struct Noop;
|
||||
impl FilePurger for Noop {
|
||||
fn send_request(&self, _request: PurgeRequest) {}
|
||||
}
|
||||
std::sync::Arc::new(Noop)
|
||||
}
|
||||
|
||||
async fn load_parquet_metadata(
|
||||
object_store: ObjectStore,
|
||||
path: &str,
|
||||
file_size: u64,
|
||||
) -> std::result::Result<
|
||||
parquet::file::metadata::ParquetMetaData,
|
||||
Box<dyn std::error::Error + Send + Sync>,
|
||||
> {
|
||||
use parquet::file::metadata::ParquetMetaDataReader;
|
||||
use parquet::file::FOOTER_SIZE;
|
||||
let actual_size = if file_size == 0 {
|
||||
object_store.stat(path).await?.content_length()
|
||||
} else {
|
||||
file_size
|
||||
};
|
||||
if actual_size < FOOTER_SIZE as u64 {
|
||||
return Err("file too small".into());
|
||||
}
|
||||
let prefetch: u64 = 64 * 1024;
|
||||
let start = actual_size.saturating_sub(prefetch);
|
||||
let buffer = object_store
|
||||
.read_with(path)
|
||||
.range(start..actual_size)
|
||||
.await?
|
||||
.to_vec();
|
||||
let buffer_len = buffer.len();
|
||||
let mut footer = [0; 8];
|
||||
footer.copy_from_slice(&buffer[buffer_len - FOOTER_SIZE..]);
|
||||
let metadata_len = ParquetMetaDataReader::decode_footer(&footer)? as u64;
|
||||
if actual_size - (FOOTER_SIZE as u64) < metadata_len {
|
||||
return Err("invalid footer/metadata length".into());
|
||||
}
|
||||
if (metadata_len as usize) <= buffer_len - FOOTER_SIZE {
|
||||
let metadata_start = buffer_len - metadata_len as usize - FOOTER_SIZE;
|
||||
let meta = ParquetMetaDataReader::decode_metadata(
|
||||
&buffer[metadata_start..buffer_len - FOOTER_SIZE],
|
||||
)?;
|
||||
Ok(meta)
|
||||
} else {
|
||||
let metadata_start = actual_size - metadata_len - FOOTER_SIZE as u64;
|
||||
let data = object_store
|
||||
.read_with(path)
|
||||
.range(metadata_start..(actual_size - FOOTER_SIZE as u64))
|
||||
.await?
|
||||
.to_vec();
|
||||
let meta = ParquetMetaDataReader::decode_metadata(&data)?;
|
||||
Ok(meta)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::StorageConfigWrapper;
|
||||
|
||||
#[test]
|
||||
fn test_decode() {
|
||||
let cfg = std::fs::read_to_string("/home/lei/datanode-bulk.toml").unwrap();
|
||||
let storage: StorageConfigWrapper = toml::from_str(&cfg).unwrap();
|
||||
println!("{:?}", storage);
|
||||
}
|
||||
}
|
||||
@@ -133,7 +133,7 @@ impl Flownode for FlowWorkerManager {
|
||||
.map_err(to_meta_err(snafu::location!()))?;
|
||||
|
||||
debug!(
|
||||
"Done to flush flow_id={:?} with {} input rows flushed, {} rows sended and {} output rows flushed",
|
||||
"Done to flush flow_id={:?} with {} input rows flushed, {} rows sent and {} output rows flushed",
|
||||
flow_id, flushed_input_rows, rows_send, row
|
||||
);
|
||||
Ok(FlowResponse {
|
||||
|
||||
@@ -214,6 +214,7 @@ impl HeartbeatTask {
|
||||
if let Some(message) = message {
|
||||
Self::new_heartbeat_request(&heartbeat_request, Some(message), &latest_report)
|
||||
} else {
|
||||
warn!("Sender has been dropped, exiting the heartbeat loop");
|
||||
// Receives None that means Sender was dropped, we need to break the current loop
|
||||
break
|
||||
}
|
||||
@@ -255,7 +256,11 @@ impl HeartbeatTask {
|
||||
error!(e; "Error while handling heartbeat response");
|
||||
}
|
||||
}
|
||||
Ok(None) => break,
|
||||
Ok(None) => {
|
||||
warn!("Heartbeat response stream closed");
|
||||
capture_self.start_with_retry(retry_interval).await;
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
error!(e; "Occur error while reading heartbeat response");
|
||||
capture_self.start_with_retry(retry_interval).await;
|
||||
|
||||
@@ -23,7 +23,7 @@ use common_meta::heartbeat::handler::{
|
||||
};
|
||||
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
|
||||
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
|
||||
use common_telemetry::{debug, error, info};
|
||||
use common_telemetry::{debug, error, info, warn};
|
||||
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
|
||||
use servers::addrs;
|
||||
use servers::heartbeat_options::HeartbeatOptions;
|
||||
@@ -42,8 +42,8 @@ use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
|
||||
pub struct HeartbeatTask {
|
||||
peer_addr: String,
|
||||
meta_client: Arc<MetaClient>,
|
||||
report_interval: u64,
|
||||
retry_interval: u64,
|
||||
report_interval: Duration,
|
||||
retry_interval: Duration,
|
||||
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
start_time_ms: u64,
|
||||
}
|
||||
@@ -58,8 +58,8 @@ impl HeartbeatTask {
|
||||
HeartbeatTask {
|
||||
peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
|
||||
meta_client,
|
||||
report_interval: heartbeat_opts.interval.as_millis() as u64,
|
||||
retry_interval: heartbeat_opts.retry_interval.as_millis() as u64,
|
||||
report_interval: heartbeat_opts.interval,
|
||||
retry_interval: heartbeat_opts.retry_interval,
|
||||
resp_handler_executor,
|
||||
start_time_ms: common_time::util::current_time_millis() as u64,
|
||||
}
|
||||
@@ -103,13 +103,15 @@ impl HeartbeatTask {
|
||||
HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
|
||||
}
|
||||
}
|
||||
Ok(None) => break,
|
||||
Ok(None) => {
|
||||
warn!("Heartbeat response stream closed");
|
||||
capture_self.start_with_retry(retry_interval).await;
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
|
||||
error!(e; "Occur error while reading heartbeat response");
|
||||
capture_self
|
||||
.start_with_retry(Duration::from_millis(retry_interval))
|
||||
.await;
|
||||
capture_self.start_with_retry(retry_interval).await;
|
||||
|
||||
break;
|
||||
}
|
||||
@@ -177,12 +179,13 @@ impl HeartbeatTask {
|
||||
if let Some(message) = message {
|
||||
Self::new_heartbeat_request(&heartbeat_request, Some(message))
|
||||
} else {
|
||||
warn!("Sender has been dropped, exiting the heartbeat loop");
|
||||
// Receives None that means Sender was dropped, we need to break the current loop
|
||||
break
|
||||
}
|
||||
}
|
||||
_ = &mut sleep => {
|
||||
sleep.as_mut().reset(Instant::now() + Duration::from_millis(report_interval));
|
||||
sleep.as_mut().reset(Instant::now() + report_interval);
|
||||
Self::new_heartbeat_request(&heartbeat_request, None)
|
||||
}
|
||||
};
|
||||
|
||||
@@ -42,7 +42,16 @@ impl BloomFilterApplier {
|
||||
) -> Result<Vec<Range<usize>>> {
|
||||
let rows_per_segment = self.meta.rows_per_segment as usize;
|
||||
let start_seg = search_range.start / rows_per_segment;
|
||||
let end_seg = search_range.end.div_ceil(rows_per_segment);
|
||||
let mut end_seg = search_range.end.div_ceil(rows_per_segment);
|
||||
|
||||
if end_seg == self.meta.segment_loc_indices.len() + 1 {
|
||||
// In a previous version, there was a bug where if the last segment was all null,
|
||||
// this segment would not be written into the index. This caused the slice
|
||||
// `self.meta.segment_loc_indices[start_seg..end_seg]` to go out of bounds due to
|
||||
// the missing segment. Since the `search` function does not search for nulls,
|
||||
// we can simply ignore the last segment in this buggy scenario.
|
||||
end_seg -= 1;
|
||||
}
|
||||
|
||||
let locs = &self.meta.segment_loc_indices[start_seg..end_seg];
|
||||
|
||||
|
||||
@@ -64,6 +64,9 @@ pub struct BloomFilterCreator {
|
||||
/// Storage for finalized Bloom filters.
|
||||
finalized_bloom_filters: FinalizedBloomFilterStorage,
|
||||
|
||||
/// Row count that finalized so far.
|
||||
finalized_row_count: usize,
|
||||
|
||||
/// Global memory usage of the bloom filter creator.
|
||||
global_memory_usage: Arc<AtomicUsize>,
|
||||
}
|
||||
@@ -96,6 +99,7 @@ impl BloomFilterCreator {
|
||||
global_memory_usage,
|
||||
global_memory_usage_threshold,
|
||||
),
|
||||
finalized_row_count: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -136,6 +140,7 @@ impl BloomFilterCreator {
|
||||
|
||||
if self.accumulated_row_count % self.rows_per_segment == 0 {
|
||||
self.finalize_segment().await?;
|
||||
self.finalized_row_count = self.accumulated_row_count;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -161,6 +166,7 @@ impl BloomFilterCreator {
|
||||
|
||||
if self.accumulated_row_count % self.rows_per_segment == 0 {
|
||||
self.finalize_segment().await?;
|
||||
self.finalized_row_count = self.accumulated_row_count;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -168,7 +174,7 @@ impl BloomFilterCreator {
|
||||
|
||||
/// Finalizes any remaining segments and writes the bloom filters and metadata to the provided writer.
|
||||
pub async fn finish(&mut self, mut writer: impl AsyncWrite + Unpin) -> Result<()> {
|
||||
if !self.cur_seg_distinct_elems.is_empty() {
|
||||
if self.accumulated_row_count > self.finalized_row_count {
|
||||
self.finalize_segment().await?;
|
||||
}
|
||||
|
||||
@@ -406,4 +412,35 @@ mod tests {
|
||||
assert!(bf.contains(&b"f"));
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_final_seg_all_null() {
|
||||
let mut writer = Cursor::new(Vec::new());
|
||||
let mut creator = BloomFilterCreator::new(
|
||||
2,
|
||||
Arc::new(MockExternalTempFileProvider::new()),
|
||||
Arc::new(AtomicUsize::new(0)),
|
||||
None,
|
||||
);
|
||||
|
||||
creator
|
||||
.push_n_row_elems(4, vec![b"a".to_vec(), b"b".to_vec()])
|
||||
.await
|
||||
.unwrap();
|
||||
creator.push_row_elems(Vec::new()).await.unwrap();
|
||||
|
||||
creator.finish(&mut writer).await.unwrap();
|
||||
|
||||
let bytes = writer.into_inner();
|
||||
let total_size = bytes.len();
|
||||
let meta_size_offset = total_size - 4;
|
||||
let meta_size = u32::from_le_bytes((&bytes[meta_size_offset..]).try_into().unwrap());
|
||||
|
||||
let meta_bytes = &bytes[total_size - meta_size as usize - 4..total_size - 4];
|
||||
let meta = BloomFilterMeta::decode(meta_bytes).unwrap();
|
||||
|
||||
assert_eq!(meta.rows_per_segment, 2);
|
||||
assert_eq!(meta.segment_count, 3);
|
||||
assert_eq!(meta.row_count, 5);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,10 +27,9 @@ use snafu::OptionExt;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tonic::{Request, Response, Streaming};
|
||||
use tonic::{Request, Response, Status, Streaming};
|
||||
|
||||
use crate::error;
|
||||
use crate::error::Result;
|
||||
use crate::error::{self, Result};
|
||||
use crate::handler::{HeartbeatHandlerGroup, Pusher, PusherId};
|
||||
use crate::metasrv::{Context, Metasrv};
|
||||
use crate::metrics::METRIC_META_HEARTBEAT_RECV;
|
||||
@@ -109,6 +108,12 @@ impl heartbeat_server::Heartbeat for Metasrv {
|
||||
|
||||
if is_not_leader {
|
||||
warn!("Quit because it is no longer the leader");
|
||||
let _ = tx
|
||||
.send(Err(Status::aborted(format!(
|
||||
"The requested metasrv node is not leader, node addr: {}",
|
||||
ctx.server_addr
|
||||
))))
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use object_store::services::Fs;
|
||||
use object_store::util::{join_dir, with_instrument_layers};
|
||||
@@ -42,6 +43,29 @@ pub type AccessLayerRef = Arc<AccessLayer>;
|
||||
/// SST write results.
|
||||
pub type SstInfoArray = SmallVec<[SstInfo; 2]>;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct Metrics {
|
||||
pub read: Duration,
|
||||
pub write: Duration,
|
||||
pub convert: Duration,
|
||||
pub index_update: Duration,
|
||||
pub index_finish: Duration,
|
||||
pub close: Duration,
|
||||
pub num_series: usize,
|
||||
|
||||
// SST Opendal metrics.
|
||||
pub opendal_create_cost: Duration,
|
||||
pub opendal_num_writes: usize,
|
||||
pub opendal_write_cost: Duration,
|
||||
pub opendal_complete_cost: Duration,
|
||||
}
|
||||
|
||||
impl Metrics {
|
||||
pub fn sum(&self) -> Duration {
|
||||
self.read + self.write + self.convert + self.index_update + self.index_finish + self.close
|
||||
}
|
||||
}
|
||||
|
||||
/// A layer to access SST files under the same directory.
|
||||
pub struct AccessLayer {
|
||||
region_dir: String,
|
||||
@@ -121,10 +145,11 @@ impl AccessLayer {
|
||||
/// Writes a SST with specific `file_id` and `metadata` to the layer.
|
||||
///
|
||||
/// Returns the info of the SST. If no data written, returns None.
|
||||
pub(crate) async fn write_sst(
|
||||
pub async fn write_sst(
|
||||
&self,
|
||||
request: SstWriteRequest,
|
||||
write_opts: &WriteOptions,
|
||||
metrics: &mut Metrics,
|
||||
) -> Result<SstInfoArray> {
|
||||
let region_id = request.metadata.region_id;
|
||||
let cache_manager = request.cache_manager.clone();
|
||||
@@ -167,9 +192,16 @@ impl AccessLayer {
|
||||
path_provider,
|
||||
)
|
||||
.await;
|
||||
writer
|
||||
.write_all(request.source, request.max_sequence, write_opts)
|
||||
.await?
|
||||
let sst_info = writer
|
||||
.write_all(request.source, request.max_sequence, write_opts, metrics)
|
||||
.await?;
|
||||
let opendal_metrics = writer.opendal_metrics_val();
|
||||
metrics.opendal_create_cost += opendal_metrics.create_cost;
|
||||
metrics.opendal_num_writes += opendal_metrics.num_writes;
|
||||
metrics.opendal_write_cost += opendal_metrics.write_cost;
|
||||
metrics.opendal_complete_cost += opendal_metrics.complete_cost;
|
||||
|
||||
sst_info
|
||||
};
|
||||
|
||||
// Put parquet metadata to cache manager.
|
||||
@@ -189,28 +221,53 @@ impl AccessLayer {
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper to build an [AccessLayerRef] with internal index managers.
|
||||
///
|
||||
/// This is a convenience constructor intended for tooling that needs to
|
||||
/// interact with SSTs without wiring all indexing internals manually.
|
||||
pub async fn build_access_layer(
|
||||
region_dir: &str,
|
||||
object_store: ObjectStore,
|
||||
config: &crate::config::MitoConfig,
|
||||
) -> Result<AccessLayerRef> {
|
||||
let puffin_manager_factory = PuffinManagerFactory::new(
|
||||
&config.index.aux_path,
|
||||
config.index.staging_size.as_bytes(),
|
||||
Some(config.index.write_buffer_size.as_bytes() as _),
|
||||
config.index.staging_ttl,
|
||||
)
|
||||
.await?;
|
||||
let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path).await?;
|
||||
Ok(Arc::new(AccessLayer::new(
|
||||
region_dir,
|
||||
object_store,
|
||||
puffin_manager_factory,
|
||||
intermediate_manager,
|
||||
)))
|
||||
}
|
||||
|
||||
/// `OperationType` represents the origin of the `SstWriteRequest`.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub(crate) enum OperationType {
|
||||
pub enum OperationType {
|
||||
Flush,
|
||||
Compact,
|
||||
}
|
||||
|
||||
/// Contents to build a SST.
|
||||
pub(crate) struct SstWriteRequest {
|
||||
pub(crate) op_type: OperationType,
|
||||
pub(crate) metadata: RegionMetadataRef,
|
||||
pub(crate) source: Source,
|
||||
pub(crate) cache_manager: CacheManagerRef,
|
||||
pub struct SstWriteRequest {
|
||||
pub op_type: OperationType,
|
||||
pub metadata: RegionMetadataRef,
|
||||
pub source: Source,
|
||||
pub cache_manager: CacheManagerRef,
|
||||
#[allow(dead_code)]
|
||||
pub(crate) storage: Option<String>,
|
||||
pub(crate) max_sequence: Option<SequenceNumber>,
|
||||
pub storage: Option<String>,
|
||||
pub max_sequence: Option<SequenceNumber>,
|
||||
|
||||
/// Configs for index
|
||||
pub(crate) index_options: IndexOptions,
|
||||
pub(crate) inverted_index_config: InvertedIndexConfig,
|
||||
pub(crate) fulltext_index_config: FulltextIndexConfig,
|
||||
pub(crate) bloom_filter_index_config: BloomFilterConfig,
|
||||
pub index_options: IndexOptions,
|
||||
pub inverted_index_config: InvertedIndexConfig,
|
||||
pub fulltext_index_config: FulltextIndexConfig,
|
||||
pub bloom_filter_index_config: BloomFilterConfig,
|
||||
}
|
||||
|
||||
pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {
|
||||
|
||||
8
src/mito2/src/cache/write_cache.rs
vendored
8
src/mito2/src/cache/write_cache.rs
vendored
@@ -40,6 +40,7 @@ use crate::sst::index::IndexerBuilderImpl;
|
||||
use crate::sst::parquet::writer::ParquetWriter;
|
||||
use crate::sst::parquet::WriteOptions;
|
||||
use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
|
||||
use crate::Metrics;
|
||||
|
||||
/// A cache for uploading files to remote object stores.
|
||||
///
|
||||
@@ -140,7 +141,12 @@ impl WriteCache {
|
||||
.await;
|
||||
|
||||
let sst_info = writer
|
||||
.write_all(write_request.source, write_request.max_sequence, write_opts)
|
||||
.write_all(
|
||||
write_request.source,
|
||||
write_request.max_sequence,
|
||||
write_opts,
|
||||
&mut Metrics::default(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
timer.stop_and_record();
|
||||
|
||||
@@ -27,7 +27,7 @@ use snafu::{OptionExt, ResultExt};
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::access_layer::{AccessLayer, AccessLayerRef, OperationType, SstWriteRequest};
|
||||
use crate::access_layer::{AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest};
|
||||
use crate::cache::{CacheManager, CacheManagerRef};
|
||||
use crate::compaction::picker::{new_picker, PickerOutput};
|
||||
use crate::compaction::{find_ttl, CompactionSstReaderBuilder};
|
||||
@@ -340,6 +340,7 @@ impl Compactor for DefaultCompactor {
|
||||
bloom_filter_index_config,
|
||||
},
|
||||
&write_opts,
|
||||
&mut Metrics::default(),
|
||||
)
|
||||
.await?
|
||||
.into_iter()
|
||||
|
||||
@@ -25,7 +25,7 @@ use store_api::storage::RegionId;
|
||||
use strum::IntoStaticStr;
|
||||
use tokio::sync::{mpsc, watch};
|
||||
|
||||
use crate::access_layer::{AccessLayerRef, OperationType, SstWriteRequest};
|
||||
use crate::access_layer::{AccessLayerRef, Metrics, OperationType, SstWriteRequest};
|
||||
use crate::cache::CacheManagerRef;
|
||||
use crate::config::MitoConfig;
|
||||
use crate::error::{
|
||||
@@ -366,7 +366,7 @@ impl RegionFlushTask {
|
||||
|
||||
let ssts_written = self
|
||||
.access_layer
|
||||
.write_sst(write_request, &write_opts)
|
||||
.write_sst(write_request, &write_opts, &mut Metrics::default())
|
||||
.await?;
|
||||
if ssts_written.is_empty() {
|
||||
// No data written.
|
||||
|
||||
@@ -44,6 +44,12 @@ mod time_provider;
|
||||
pub mod wal;
|
||||
mod worker;
|
||||
|
||||
// Public re-exports for tooling convenience
|
||||
pub use access_layer::{
|
||||
build_access_layer, AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest,
|
||||
};
|
||||
pub use cache::{CacheManager, CacheManagerRef};
|
||||
|
||||
#[cfg_attr(doc, aquamarine::aquamarine)]
|
||||
/// # Mito developer document
|
||||
///
|
||||
|
||||
@@ -109,6 +109,7 @@ mod tests {
|
||||
new_batch_with_binary, new_source, sst_file_handle, sst_region_metadata,
|
||||
};
|
||||
use crate::test_util::{check_reader_result, TestEnv};
|
||||
use crate::Metrics;
|
||||
|
||||
const FILE_DIR: &str = "/";
|
||||
|
||||
@@ -165,7 +166,7 @@ mod tests {
|
||||
.await;
|
||||
|
||||
let info = writer
|
||||
.write_all(source, None, &write_opts)
|
||||
.write_all(source, None, &write_opts, &mut Metrics::default())
|
||||
.await
|
||||
.unwrap()
|
||||
.remove(0);
|
||||
@@ -222,7 +223,7 @@ mod tests {
|
||||
.await;
|
||||
|
||||
writer
|
||||
.write_all(source, None, &write_opts)
|
||||
.write_all(source, None, &write_opts, &mut Metrics::default())
|
||||
.await
|
||||
.unwrap()
|
||||
.remove(0);
|
||||
@@ -293,7 +294,7 @@ mod tests {
|
||||
.await;
|
||||
|
||||
let sst_info = writer
|
||||
.write_all(source, None, &write_opts)
|
||||
.write_all(source, None, &write_opts, &mut Metrics::default())
|
||||
.await
|
||||
.unwrap()
|
||||
.remove(0);
|
||||
@@ -334,7 +335,7 @@ mod tests {
|
||||
)
|
||||
.await;
|
||||
writer
|
||||
.write_all(source, None, &write_opts)
|
||||
.write_all(source, None, &write_opts, &mut Metrics::default())
|
||||
.await
|
||||
.unwrap()
|
||||
.remove(0);
|
||||
@@ -389,7 +390,7 @@ mod tests {
|
||||
)
|
||||
.await;
|
||||
writer
|
||||
.write_all(source, None, &write_opts)
|
||||
.write_all(source, None, &write_opts, &mut Metrics::default())
|
||||
.await
|
||||
.unwrap()
|
||||
.remove(0);
|
||||
@@ -427,7 +428,7 @@ mod tests {
|
||||
.await;
|
||||
|
||||
writer
|
||||
.write_all(source, None, &write_opts)
|
||||
.write_all(source, None, &write_opts, &mut Metrics::default())
|
||||
.await
|
||||
.unwrap()
|
||||
.remove(0);
|
||||
|
||||
@@ -1117,7 +1117,6 @@ impl ParquetReader {
|
||||
self.context.read_format().metadata()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
|
||||
self.context.reader_builder().parquet_meta.clone()
|
||||
}
|
||||
|
||||
@@ -17,14 +17,19 @@
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use bytes::Bytes;
|
||||
use common_time::Timestamp;
|
||||
use datatypes::arrow::datatypes::SchemaRef;
|
||||
use object_store::{FuturesAsyncWriter, ObjectStore};
|
||||
use futures::future::BoxFuture;
|
||||
use object_store::{FuturesAsyncWriter, ObjectStore, Writer};
|
||||
use parquet::arrow::async_writer::AsyncFileWriter;
|
||||
use parquet::arrow::AsyncArrowWriter;
|
||||
use parquet::basic::{Compression, Encoding, ZstdLevel};
|
||||
use parquet::errors::ParquetError;
|
||||
use parquet::file::metadata::KeyValue;
|
||||
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
|
||||
use parquet::schema::types::ColumnPath;
|
||||
@@ -45,12 +50,13 @@ use crate::sst::parquet::format::WriteFormat;
|
||||
use crate::sst::parquet::helper::parse_parquet_metadata;
|
||||
use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY};
|
||||
use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
|
||||
use crate::Metrics;
|
||||
|
||||
/// Parquet SST writer.
|
||||
pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
|
||||
/// Path provider that creates SST and index file paths according to file id.
|
||||
path_provider: P,
|
||||
writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
|
||||
writer: Option<AsyncArrowWriter<OpenDalWriter>>,
|
||||
/// Current active file id.
|
||||
current_file: FileId,
|
||||
writer_factory: F,
|
||||
@@ -61,11 +67,18 @@ pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvide
|
||||
/// Current active indexer.
|
||||
current_indexer: Option<Indexer>,
|
||||
bytes_written: Arc<AtomicUsize>,
|
||||
opendal_metrics: Arc<Mutex<OpenDalMetrics>>,
|
||||
}
|
||||
|
||||
pub trait WriterFactory {
|
||||
type Writer: AsyncWrite + Send + Unpin;
|
||||
fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
|
||||
|
||||
fn create_opendal(
|
||||
&mut self,
|
||||
file_path: &str,
|
||||
size: Arc<AtomicUsize>,
|
||||
) -> impl Future<Output = Result<OpenDalWriter>>;
|
||||
}
|
||||
|
||||
pub struct ObjectStoreWriterFactory {
|
||||
@@ -84,6 +97,22 @@ impl WriterFactory for ObjectStoreWriterFactory {
|
||||
.map(|v| v.into_futures_async_write().compat_write())
|
||||
.context(OpenDalSnafu)
|
||||
}
|
||||
|
||||
async fn create_opendal(
|
||||
&mut self,
|
||||
file_path: &str,
|
||||
size: Arc<AtomicUsize>,
|
||||
) -> Result<OpenDalWriter> {
|
||||
let writer = self
|
||||
.object_store
|
||||
.writer_with(file_path)
|
||||
.chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
|
||||
.concurrent(DEFAULT_WRITE_CONCURRENCY)
|
||||
.await
|
||||
.context(OpenDalSnafu)?;
|
||||
|
||||
Ok(OpenDalWriter::new(writer, size))
|
||||
}
|
||||
}
|
||||
|
||||
impl<I, P> ParquetWriter<ObjectStoreWriterFactory, I, P>
|
||||
@@ -105,6 +134,10 @@ where
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub fn opendal_metrics_val(&self) -> OpenDalMetrics {
|
||||
self.opendal_metrics.lock().unwrap().clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl<F, I, P> ParquetWriter<F, I, P>
|
||||
@@ -132,6 +165,7 @@ where
|
||||
indexer_builder,
|
||||
current_indexer: Some(indexer),
|
||||
bytes_written: Arc::new(AtomicUsize::new(0)),
|
||||
opendal_metrics: Arc::new(Mutex::new(OpenDalMetrics::default())),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -156,20 +190,33 @@ where
|
||||
mut source: Source,
|
||||
override_sequence: Option<SequenceNumber>, // override the `sequence` field from `Source`
|
||||
opts: &WriteOptions,
|
||||
metrics: &mut Metrics,
|
||||
) -> Result<SstInfoArray> {
|
||||
let write_format =
|
||||
WriteFormat::new(self.metadata.clone()).with_override_sequence(override_sequence);
|
||||
let mut stats = SourceStats::default();
|
||||
let mut last_key = None;
|
||||
|
||||
while let Some(res) = self
|
||||
.write_next_batch(&mut source, &write_format, opts)
|
||||
.write_next_batch(&mut source, &write_format, opts, metrics)
|
||||
.await
|
||||
.transpose()
|
||||
{
|
||||
match res {
|
||||
Ok(mut batch) => {
|
||||
if let Some(last) = &last_key {
|
||||
if last != batch.primary_key() {
|
||||
metrics.num_series += 1;
|
||||
last_key = Some(batch.primary_key().to_vec());
|
||||
}
|
||||
} else {
|
||||
metrics.num_series += 1;
|
||||
}
|
||||
|
||||
stats.update(&batch);
|
||||
let index_start = Instant::now();
|
||||
self.get_or_create_indexer().await.update(&mut batch).await;
|
||||
metrics.index_update += index_start.elapsed();
|
||||
}
|
||||
Err(e) => {
|
||||
self.get_or_create_indexer().await.abort().await;
|
||||
@@ -178,7 +225,9 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
let index_finish_start = Instant::now();
|
||||
let index_output = self.get_or_create_indexer().await.finish().await;
|
||||
metrics.index_finish += index_finish_start.elapsed();
|
||||
|
||||
if stats.num_rows == 0 {
|
||||
return Ok(smallvec![]);
|
||||
@@ -189,9 +238,10 @@ where
|
||||
return Ok(smallvec![]);
|
||||
};
|
||||
|
||||
let close_start = Instant::now();
|
||||
arrow_writer.flush().await.context(WriteParquetSnafu)?;
|
||||
|
||||
let file_meta = arrow_writer.close().await.context(WriteParquetSnafu)?;
|
||||
metrics.close += close_start.elapsed();
|
||||
let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
|
||||
|
||||
// Safety: num rows > 0 so we must have min/max.
|
||||
@@ -238,17 +288,25 @@ where
|
||||
source: &mut Source,
|
||||
write_format: &WriteFormat,
|
||||
opts: &WriteOptions,
|
||||
metrics: &mut Metrics,
|
||||
) -> Result<Option<Batch>> {
|
||||
let read_start = Instant::now();
|
||||
let Some(batch) = source.next_batch().await? else {
|
||||
return Ok(None);
|
||||
};
|
||||
metrics.read += read_start.elapsed();
|
||||
|
||||
let convert_start = Instant::now();
|
||||
let arrow_batch = write_format.convert_batch(&batch)?;
|
||||
metrics.convert += convert_start.elapsed();
|
||||
|
||||
let write_start = Instant::now();
|
||||
self.maybe_init_writer(write_format.arrow_schema(), opts)
|
||||
.await?
|
||||
.write(&arrow_batch)
|
||||
.await
|
||||
.context(WriteParquetSnafu)?;
|
||||
metrics.write += write_start.elapsed();
|
||||
Ok(Some(batch))
|
||||
}
|
||||
|
||||
@@ -256,7 +314,7 @@ where
|
||||
&mut self,
|
||||
schema: &SchemaRef,
|
||||
opts: &WriteOptions,
|
||||
) -> Result<&mut AsyncArrowWriter<SizeAwareWriter<F::Writer>>> {
|
||||
) -> Result<&mut AsyncArrowWriter<OpenDalWriter>> {
|
||||
if let Some(ref mut w) = self.writer {
|
||||
Ok(w)
|
||||
} else {
|
||||
@@ -274,10 +332,17 @@ where
|
||||
let writer_props = props_builder.build();
|
||||
|
||||
let sst_file_path = self.path_provider.build_sst_file_path(self.current_file);
|
||||
let writer = SizeAwareWriter::new(
|
||||
self.writer_factory.create(&sst_file_path).await?,
|
||||
self.bytes_written.clone(),
|
||||
);
|
||||
// let writer = SizeAwareWriter::new(
|
||||
// self.writer_factory.create(&sst_file_path).await?,
|
||||
// self.bytes_written.clone(),
|
||||
// );
|
||||
let create_start = Instant::now();
|
||||
let mut writer = self
|
||||
.writer_factory
|
||||
.create_opendal(&sst_file_path, self.bytes_written.clone())
|
||||
.await?;
|
||||
self.opendal_metrics.lock().unwrap().create_cost += create_start.elapsed();
|
||||
writer = writer.with_metrics(self.opendal_metrics.clone());
|
||||
let arrow_writer =
|
||||
AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
|
||||
.context(WriteParquetSnafu)?;
|
||||
@@ -317,6 +382,78 @@ impl SourceStats {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Clone)]
|
||||
pub(crate) struct OpenDalMetrics {
|
||||
pub(crate) create_cost: Duration,
|
||||
pub(crate) num_writes: usize,
|
||||
pub(crate) write_cost: Duration,
|
||||
pub(crate) complete_cost: Duration,
|
||||
}
|
||||
|
||||
/// Workaround for [AsyncArrowWriter] does not provide a method to
|
||||
/// get total bytes written after close.
|
||||
pub struct OpenDalWriter {
|
||||
inner: Writer,
|
||||
size: Arc<AtomicUsize>,
|
||||
metrics: Option<Arc<Mutex<OpenDalMetrics>>>,
|
||||
}
|
||||
|
||||
impl OpenDalWriter {
|
||||
fn new(inner: Writer, size: Arc<AtomicUsize>) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
size: size.clone(),
|
||||
metrics: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn with_metrics(mut self, metrics: Arc<Mutex<OpenDalMetrics>>) -> Self {
|
||||
self.metrics = Some(metrics);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncFileWriter for OpenDalWriter {
|
||||
fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<(), ParquetError>> {
|
||||
let write_start = Instant::now();
|
||||
let size = self.size.clone();
|
||||
let metrics = self.metrics.clone();
|
||||
Box::pin(async move {
|
||||
let bytes_written = bs.len();
|
||||
self.inner
|
||||
.write(bs)
|
||||
.await
|
||||
.map_err(|err| ParquetError::External(Box::new(err)))?;
|
||||
|
||||
size.fetch_add(bytes_written, Ordering::Relaxed);
|
||||
if let Some(metrics) = metrics {
|
||||
let mut m = metrics.lock().unwrap();
|
||||
m.num_writes += 1;
|
||||
m.write_cost += write_start.elapsed();
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn complete(&mut self) -> BoxFuture<'_, Result<(), ParquetError>> {
|
||||
let complete_start = Instant::now();
|
||||
let metrics = self.metrics.clone();
|
||||
Box::pin(async move {
|
||||
self.inner
|
||||
.close()
|
||||
.await
|
||||
.map(|_| ())
|
||||
.map_err(|err| ParquetError::External(Box::new(err)))?;
|
||||
|
||||
if let Some(metrics) = metrics {
|
||||
let mut m = metrics.lock().unwrap();
|
||||
m.complete_cost += complete_start.elapsed();
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Workaround for [AsyncArrowWriter] does not provide a method to
|
||||
/// get total bytes written after close.
|
||||
struct SizeAwareWriter<W> {
|
||||
|
||||
Reference in New Issue
Block a user