Compare commits

..

1 Commits

Author SHA1 Message Date
luofucong
517e9362ad build: release v0.3.0 2023-06-05 10:38:51 +08:00
54 changed files with 366 additions and 1298 deletions

View File

@@ -7,29 +7,20 @@ on:
- cron: '0 0 * * 1'
# Mannually trigger only builds binaries.
workflow_dispatch:
inputs:
dry_run:
description: 'Skip docker push and release steps'
type: boolean
default: true
skip_test:
description: 'Do not run tests during build'
type: boolean
default: false
name: Release
env:
RUST_TOOLCHAIN: nightly-2023-05-03
SCHEDULED_BUILD_VERSION_PREFIX: v0.4.0
SCHEDULED_BUILD_VERSION_PREFIX: v0.3.0
SCHEDULED_PERIOD: nightly
CARGO_PROFILE: nightly
# Controls whether to run tests, include unit-test, integration-test and sqlness.
DISABLE_RUN_TESTS: ${{ inputs.skip_test || false }}
DISABLE_RUN_TESTS: false
jobs:
build-macos:
@@ -290,7 +281,7 @@ jobs:
name: Build docker image
needs: [build-linux, build-macos]
runs-on: ubuntu-latest
if: github.repository == 'GreptimeTeam/greptimedb' && !(inputs.dry_run || false)
if: github.repository == 'GreptimeTeam/greptimedb' && github.event_name != 'workflow_dispatch'
steps:
- name: Checkout sources
uses: actions/checkout@v3
@@ -303,7 +294,7 @@ jobs:
- name: Configure scheduled build image tag # the tag would be ${SCHEDULED_BUILD_VERSION_PREFIX}-YYYYMMDD-${SCHEDULED_PERIOD}
shell: bash
if: github.event_name != 'push'
if: github.event_name == 'schedule'
run: |
buildTime=`date "+%Y%m%d"`
SCHEDULED_BUILD_VERSION=${{ env.SCHEDULED_BUILD_VERSION_PREFIX }}-$buildTime-${{ env.SCHEDULED_PERIOD }}
@@ -311,7 +302,7 @@ jobs:
- name: Configure tag # If the release tag is v0.1.0, then the image version tag will be 0.1.0.
shell: bash
if: github.event_name == 'push'
if: github.event_name != 'schedule'
run: |
VERSION=${{ github.ref_name }}
echo "IMAGE_TAG=${VERSION:1}" >> $GITHUB_ENV
@@ -376,7 +367,7 @@ jobs:
# Release artifacts only when all the artifacts are built successfully.
needs: [build-linux, build-macos, docker]
runs-on: ubuntu-latest
if: github.repository == 'GreptimeTeam/greptimedb' && !(inputs.dry_run || false)
if: github.repository == 'GreptimeTeam/greptimedb' && github.event_name != 'workflow_dispatch'
steps:
- name: Checkout sources
uses: actions/checkout@v3
@@ -386,7 +377,7 @@ jobs:
- name: Configure scheduled build version # the version would be ${SCHEDULED_BUILD_VERSION_PREFIX}-${SCHEDULED_PERIOD}-YYYYMMDD, like v0.2.0-nigthly-20230313.
shell: bash
if: github.event_name != 'push'
if: github.event_name == 'schedule'
run: |
buildTime=`date "+%Y%m%d"`
SCHEDULED_BUILD_VERSION=${{ env.SCHEDULED_BUILD_VERSION_PREFIX }}-${{ env.SCHEDULED_PERIOD }}-$buildTime
@@ -404,13 +395,13 @@ jobs:
fi
- name: Create scheduled build git tag
if: github.event_name != 'push'
if: github.event_name == 'schedule'
run: |
git tag ${{ env.SCHEDULED_BUILD_VERSION }}
- name: Publish scheduled release # configure the different release title and tags.
uses: ncipollo/release-action@v1
if: github.event_name != 'push'
if: github.event_name == 'schedule'
with:
name: "Release ${{ env.SCHEDULED_BUILD_VERSION }}"
prerelease: ${{ env.prerelease }}
@@ -422,7 +413,7 @@ jobs:
- name: Publish release
uses: ncipollo/release-action@v1
if: github.event_name == 'push'
if: github.event_name != 'schedule'
with:
name: "${{ github.ref_name }}"
prerelease: ${{ env.prerelease }}
@@ -435,7 +426,7 @@ jobs:
name: Push docker image to alibaba cloud container registry
needs: [docker]
runs-on: ubuntu-latest
if: github.repository == 'GreptimeTeam/greptimedb' && !(inputs.dry_run || false)
if: github.repository == 'GreptimeTeam/greptimedb' && github.event_name != 'workflow_dispatch'
continue-on-error: true
steps:
- name: Checkout sources
@@ -456,7 +447,7 @@ jobs:
- name: Configure scheduled build image tag # the tag would be ${SCHEDULED_BUILD_VERSION_PREFIX}-YYYYMMDD-${SCHEDULED_PERIOD}
shell: bash
if: github.event_name != 'push'
if: github.event_name == 'schedule'
run: |
buildTime=`date "+%Y%m%d"`
SCHEDULED_BUILD_VERSION=${{ env.SCHEDULED_BUILD_VERSION_PREFIX }}-$buildTime-${{ env.SCHEDULED_PERIOD }}
@@ -464,7 +455,7 @@ jobs:
- name: Configure tag # If the release tag is v0.1.0, then the image version tag will be 0.1.0.
shell: bash
if: github.event_name == 'push'
if: github.event_name != 'schedule'
run: |
VERSION=${{ github.ref_name }}
echo "IMAGE_TAG=${VERSION:1}" >> $GITHUB_ENV

340
Cargo.lock generated
View File

@@ -64,9 +64,9 @@ dependencies = [
[[package]]
name = "aho-corasick"
version = "1.0.2"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43f6cb1bf222025340178f382c426f13757b2960e89779dfcb319c32542a5a41"
checksum = "67fc08ce920c31afb70f013dcce1bfc3a3195de6a228474e45e1f145b36f8d04"
dependencies = [
"memchr",
]
@@ -199,7 +199,7 @@ checksum = "8f1f8f5a6f3d50d89e3797d7593a50f96bb2aaa20ca0cc7be1fb673232c91d72"
[[package]]
name = "api"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"arrow-flight",
"common-base",
@@ -831,9 +831,9 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b"
[[package]]
name = "bcder"
version = "0.7.2"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab26f019795af36086f2ca879aeeaae7566bdfd2fe0821a0328d3fdd9d1da2d9"
checksum = "69dfb7dc0d4aee3f8c723c43553b55662badf692b541ff8e4426df75dae8da9a"
dependencies = [
"bytes",
"smallvec",
@@ -841,10 +841,10 @@ dependencies = [
[[package]]
name = "benchmarks"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"arrow",
"clap 4.3.2",
"clap 4.3.0",
"client",
"indicatif",
"itertools",
@@ -1224,7 +1224,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"api",
"arc-swap",
@@ -1445,20 +1445,20 @@ dependencies = [
[[package]]
name = "clap"
version = "4.3.2"
version = "4.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "401a4694d2bf92537b6867d94de48c4842089645fdcdf6c71865b175d836e9c2"
checksum = "93aae7a4192245f70fe75dd9157fc7b4a5bf53e88d30bd4396f7d8f9284d5acc"
dependencies = [
"clap_builder",
"clap_derive 4.3.2",
"clap_derive 4.3.0",
"once_cell",
]
[[package]]
name = "clap_builder"
version = "4.3.1"
version = "4.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72394f3339a76daf211e57d4bcb374410f3965dcc606dd0e03738c7888766980"
checksum = "4f423e341edefb78c9caba2d9c7f7687d0e72e89df3ce3394554754393ac3990"
dependencies = [
"anstream",
"anstyle",
@@ -1482,9 +1482,9 @@ dependencies = [
[[package]]
name = "clap_derive"
version = "4.3.2"
version = "4.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8cd2b2a819ad6eec39e8f1d6b53001af1e5469f8c177579cdaeb313115b825f"
checksum = "191d9573962933b4027f932c600cd252ce27a8ad5979418fe78e43c07996f27b"
dependencies = [
"heck",
"proc-macro2",
@@ -1509,7 +1509,7 @@ checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b"
[[package]]
name = "client"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"api",
"arrow-flight",
@@ -1534,7 +1534,7 @@ dependencies = [
"prost",
"rand",
"snafu",
"substrait 0.4.0",
"substrait 0.2.0",
"substrait 0.7.5",
"tokio",
"tokio-stream",
@@ -1571,7 +1571,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"anymap",
"build-data",
@@ -1601,8 +1601,9 @@ dependencies = [
"servers",
"session",
"snafu",
"substrait 0.4.0",
"substrait 0.2.0",
"temp-env",
"tikv-jemalloc-ctl",
"tikv-jemallocator",
"tokio",
"toml",
@@ -1633,7 +1634,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"anymap",
"bitvec",
@@ -1647,7 +1648,7 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"async-trait",
"chrono",
@@ -1664,7 +1665,7 @@ dependencies = [
[[package]]
name = "common-datasource"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"arrow",
"arrow-schema",
@@ -1689,7 +1690,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"snafu",
"strum",
@@ -1697,7 +1698,7 @@ dependencies = [
[[package]]
name = "common-function"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"arc-swap",
"chrono-tz 0.6.3",
@@ -1720,7 +1721,7 @@ dependencies = [
[[package]]
name = "common-function-macro"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"arc-swap",
"backtrace",
@@ -1736,7 +1737,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"api",
"arrow-flight",
@@ -1766,7 +1767,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"api",
"async-trait",
@@ -1785,7 +1786,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"common-error",
"snafu",
@@ -1798,7 +1799,7 @@ dependencies = [
[[package]]
name = "common-meta"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"api",
"chrono",
@@ -1816,20 +1817,9 @@ dependencies = [
"tokio",
]
[[package]]
name = "common-pprof"
version = "0.4.0"
dependencies = [
"common-error",
"pprof",
"prost",
"snafu",
"tokio",
]
[[package]]
name = "common-procedure"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"async-stream",
"async-trait",
@@ -1851,7 +1841,7 @@ dependencies = [
[[package]]
name = "common-procedure-test"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"async-trait",
"common-procedure",
@@ -1859,7 +1849,7 @@ dependencies = [
[[package]]
name = "common-query"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"api",
"async-trait",
@@ -1879,7 +1869,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"common-error",
"datafusion",
@@ -1895,7 +1885,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"async-trait",
"common-error",
@@ -1911,7 +1901,7 @@ dependencies = [
[[package]]
name = "common-telemetry"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"backtrace",
"common-error",
@@ -1936,7 +1926,7 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"once_cell",
"rand",
@@ -1945,7 +1935,7 @@ dependencies = [
[[package]]
name = "common-time"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"chrono",
"chrono-tz 0.8.2",
@@ -2089,15 +2079,6 @@ version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa"
[[package]]
name = "cpp_demangle"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c76f98bdfc7f66172e6c7065f981ebb576ffc903fe4c0561d9f0c2509226dc6"
dependencies = [
"cfg-if 1.0.0",
]
[[package]]
name = "cpufeatures"
version = "0.2.7"
@@ -2399,7 +2380,7 @@ dependencies = [
"hashbrown 0.12.3",
"lock_api",
"once_cell",
"parking_lot_core 0.9.8",
"parking_lot_core 0.9.7",
]
[[package]]
@@ -2585,7 +2566,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"api",
"async-compat",
@@ -2641,7 +2622,7 @@ dependencies = [
"sql",
"storage",
"store-api",
"substrait 0.4.0",
"substrait 0.2.0",
"table",
"table-procedure",
"tokio",
@@ -2655,7 +2636,7 @@ dependencies = [
[[package]]
name = "datatypes"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"arrow",
"arrow-array",
@@ -2675,15 +2656,6 @@ dependencies = [
"snafu",
]
[[package]]
name = "debugid"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef552e6f588e446098f6ba40d89ac146c8c7b64aade83c051ee00bb5d2bc18d"
dependencies = [
"uuid",
]
[[package]]
name = "der"
version = "0.5.1"
@@ -3090,7 +3062,7 @@ dependencies = [
[[package]]
name = "file-table-engine"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"async-trait",
"common-catalog",
@@ -3128,18 +3100,6 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "findshlibs"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40b9e59cd0f7e0806cca4be089683ecb6434e602038df21fe6bf6711b2f07f64"
dependencies = [
"cc",
"lazy_static",
"libc",
"winapi",
]
[[package]]
name = "fixedbitset"
version = "0.4.2"
@@ -3154,9 +3114,9 @@ checksum = "cda653ca797810c02f7ca4b804b40b8b95ae046eb989d356bce17919a8c25499"
[[package]]
name = "flatbuffers"
version = "23.5.26"
version = "23.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dac53e22462d78c16d64a1cd22371b54cc3fe94aa15e7886a2fa6e5d1ab8640"
checksum = "77f5399c2c9c50ae9418e522842ad362f61ee48b346ac106807bd355a8a7c619"
dependencies = [
"bitflags 1.3.2",
"rustc_version 0.4.0",
@@ -3181,9 +3141,9 @@ checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "form_urlencoded"
version = "1.2.0"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a62bc1cf6f830c2ec14a513a9fb124d0a213a629668a4186f329db21fe045652"
checksum = "a9c384f161156f5260c24a097c56119f9be8c798586aecc13afbcbe7b7e26bf8"
dependencies = [
"percent-encoding",
]
@@ -3199,7 +3159,7 @@ dependencies = [
[[package]]
name = "frontend"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"api",
"async-compat",
@@ -3253,7 +3213,7 @@ dependencies = [
"storage",
"store-api",
"strfmt",
"substrait 0.4.0",
"substrait 0.2.0",
"table",
"tokio",
"toml",
@@ -4388,9 +4348,9 @@ checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39"
[[package]]
name = "idna"
version = "0.4.0"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c"
checksum = "e14ddfc70884202db2244c223200c204c2bda1bc6e0998d11b5e024d657209e6"
dependencies = [
"unicode-bidi",
"unicode-normalization",
@@ -4425,9 +4385,9 @@ dependencies = [
[[package]]
name = "indicatif"
version = "0.17.5"
version = "0.17.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ff8cc23a7393a397ed1d7f56e6365cba772aba9f9912ab968b03043c395d057"
checksum = "db45317f37ef454e6519b6c3ed7d377e5f23346f0823f86e65ca36912d1d0ef8"
dependencies = [
"console",
"instant",
@@ -4442,24 +4402,6 @@ version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa799dd5ed20a7e349f3b4639aa80d74549c81716d9ec4f994c9b5815598306"
[[package]]
name = "inferno"
version = "0.11.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fb7c1b80a1dfa604bb4a649a5c5aeef3d913f7c520cb42b40e534e8a61bcdfc"
dependencies = [
"ahash 0.8.3",
"indexmap",
"is-terminal",
"itoa",
"log",
"num-format",
"once_cell",
"quick-xml 0.26.0",
"rgb",
"str_stack",
]
[[package]]
name = "influxdb_line_protocol"
version = "0.1.0"
@@ -4757,9 +4699,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.145"
version = "0.2.144"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc86cde3ff845662b8f4ef6cb50ea0e20c524eb3d29ae048287e06a1b3fa6a81"
checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1"
[[package]]
name = "libgit2-sys"
@@ -4843,9 +4785,9 @@ checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519"
[[package]]
name = "lock_api"
version = "0.4.10"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1cc9717a20b1bb222f333e6a92fd32f7d8a18ddc5a3191a11af45dcbf4dcd16"
checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df"
dependencies = [
"autocfg",
"scopeguard",
@@ -4859,7 +4801,7 @@ checksum = "518ef76f2f87365916b142844c16d8fefd85039bc5699050210a7778ee1cd1de"
[[package]]
name = "log-store"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"arc-swap",
"async-stream",
@@ -5121,7 +5063,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"api",
"async-trait",
@@ -5149,7 +5091,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"anymap",
"api",
@@ -5341,7 +5283,7 @@ dependencies = [
[[package]]
name = "mito"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"anymap",
"arc-swap",
@@ -5365,7 +5307,6 @@ dependencies = [
"futures",
"key-lock",
"log-store",
"metrics",
"object-store",
"serde",
"serde_json",
@@ -5702,16 +5643,6 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "num-format"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a652d9771a63711fd3c3deb670acfbe5c30a4072e664d7a3bf5a9e1056ac72c3"
dependencies = [
"arrayvec",
"itoa",
]
[[package]]
name = "num-integer"
version = "0.1.45"
@@ -5803,16 +5734,16 @@ checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3"
[[package]]
name = "object"
version = "0.30.4"
version = "0.30.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03b4680b86d9cfafba8fc491dc9b6df26b68cf40e9e6cd73909194759a63c385"
checksum = "ea86265d3d3dcb6a27fc51bd29a4bf387fae9d2986b823079d4986af253eb439"
dependencies = [
"memchr",
]
[[package]]
name = "object-store"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"anyhow",
"async-trait",
@@ -5851,9 +5782,9 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.18.0"
version = "1.17.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
checksum = "9670a07f94779e00908f3e686eab508878ebb390ba6e604d3a284c00e8d0487b"
[[package]]
name = "oorandom"
@@ -6094,7 +6025,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
"parking_lot_core 0.9.8",
"parking_lot_core 0.9.7",
]
[[package]]
@@ -6113,18 +6044,18 @@ dependencies = [
[[package]]
name = "parking_lot_core"
version = "0.9.8"
version = "0.9.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447"
checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521"
dependencies = [
"backtrace",
"cfg-if 1.0.0",
"libc",
"petgraph",
"redox_syscall 0.3.5",
"redox_syscall 0.2.16",
"smallvec",
"thread-id",
"windows-targets 0.48.0",
"windows-sys 0.45.0",
]
[[package]]
@@ -6185,7 +6116,7 @@ dependencies = [
[[package]]
name = "partition"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"api",
"async-trait",
@@ -6264,9 +6195,9 @@ dependencies = [
[[package]]
name = "percent-encoding"
version = "2.3.0"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e"
[[package]]
name = "pest"
@@ -6588,32 +6519,6 @@ dependencies = [
"postgres-protocol",
]
[[package]]
name = "pprof"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "196ded5d4be535690899a4631cc9f18cdc41b7ebf24a79400f46f48e49a11059"
dependencies = [
"backtrace",
"cfg-if 1.0.0",
"findshlibs",
"inferno",
"libc",
"log",
"nix 0.26.2",
"once_cell",
"parking_lot 0.12.1",
"prost",
"prost-build",
"prost-derive",
"protobuf",
"sha2",
"smallvec",
"symbolic-demangle",
"tempfile",
"thiserror",
]
[[package]]
name = "ppv-lite86"
version = "0.2.17"
@@ -6772,7 +6677,7 @@ dependencies = [
[[package]]
name = "promql"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"async-recursion",
"async-trait",
@@ -7022,7 +6927,7 @@ dependencies = [
[[package]]
name = "query"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"ahash 0.8.3",
"approx_eq",
@@ -7076,21 +6981,12 @@ dependencies = [
"stats-cli",
"store-api",
"streaming-stats",
"substrait 0.4.0",
"substrait 0.2.0",
"table",
"tokio",
"tokio-stream",
]
[[package]]
name = "quick-xml"
version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f50b1c63b38611e7d4d7f68b82d3ad0cc71a2ad2e7f61fc10f1328d917c93cd"
dependencies = [
"memchr",
]
[[package]]
name = "quick-xml"
version = "0.27.1"
@@ -7279,11 +7175,11 @@ dependencies = [
[[package]]
name = "regex"
version = "1.8.4"
version = "1.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0ab3ca65655bb1e41f2a8c8cd662eb4fb035e67c3f78da1d61dffe89d07300f"
checksum = "81ca098a9821bd52d6b24fd8b10bd081f47d39c22778cafaa75a2857a62c6390"
dependencies = [
"aho-corasick 1.0.2",
"aho-corasick 1.0.1",
"memchr",
"regex-syntax 0.7.2",
]
@@ -7446,15 +7342,6 @@ dependencies = [
"thiserror",
]
[[package]]
name = "rgb"
version = "0.8.36"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20ec2d3e3fc7a92ced357df9cebd5a10b6fb2aa1ee797bf7e9ce2f17dffc8f59"
dependencies = [
"bytemuck",
]
[[package]]
name = "ring"
version = "0.16.20"
@@ -8252,7 +8139,7 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "script"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"arrow",
"async-trait",
@@ -8507,7 +8394,7 @@ dependencies = [
[[package]]
name = "servers"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"aide",
"api",
@@ -8527,7 +8414,6 @@ dependencies = [
"common-grpc",
"common-grpc-expr",
"common-mem-prof",
"common-pprof",
"common-query",
"common-recordbatch",
"common-runtime",
@@ -8576,7 +8462,6 @@ dependencies = [
"sql",
"strum",
"table",
"tikv-jemalloc-ctl",
"tokio",
"tokio-postgres",
"tokio-postgres-rustls",
@@ -8591,7 +8476,7 @@ dependencies = [
[[package]]
name = "session"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"arc-swap",
"common-catalog",
@@ -8866,7 +8751,7 @@ dependencies = [
[[package]]
name = "sql"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"api",
"common-base",
@@ -8912,7 +8797,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"async-trait",
"client",
@@ -9056,12 +8941,6 @@ dependencies = [
"optional",
]
[[package]]
name = "stable_deref_trait"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
[[package]]
name = "static_assertions"
version = "1.1.0"
@@ -9093,7 +8972,7 @@ dependencies = [
[[package]]
name = "storage"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"arc-swap",
"arrow",
@@ -9144,7 +9023,7 @@ dependencies = [
[[package]]
name = "store-api"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"async-stream",
"async-trait",
@@ -9169,12 +9048,6 @@ version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e08d8363704e6c71fc928674353e6b7c23dcea9d82d7012c8faf2a3a025f8d0"
[[package]]
name = "str_stack"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9091b6114800a5f2141aee1d1b9d6ca3592ac062dc5decb3764ec5895a47b4eb"
[[package]]
name = "streaming-stats"
version = "0.2.3"
@@ -9259,7 +9132,7 @@ dependencies = [
[[package]]
name = "substrait"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"async-recursion",
"async-trait",
@@ -9331,29 +9204,6 @@ version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc"
[[package]]
name = "symbolic-common"
version = "10.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b55cdc318ede251d0957f07afe5fed912119b8c1bc5a7804151826db999e737"
dependencies = [
"debugid",
"memmap2",
"stable_deref_trait",
"uuid",
]
[[package]]
name = "symbolic-demangle"
version = "10.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79be897be8a483a81fff6a3a4e195b4ac838ef73ca42d348b3f722da9902e489"
dependencies = [
"cpp_demangle",
"rustc-demangle",
"symbolic-common",
]
[[package]]
name = "syn"
version = "1.0.109"
@@ -9414,7 +9264,7 @@ dependencies = [
[[package]]
name = "table"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"anymap",
"async-trait",
@@ -9450,7 +9300,7 @@ dependencies = [
[[package]]
name = "table-procedure"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"async-trait",
"catalog",
@@ -9543,7 +9393,7 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "0.4.0"
version = "0.2.0"
dependencies = [
"api",
"async-trait",
@@ -10643,9 +10493,9 @@ checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "url"
version = "2.4.0"
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50bff7831e19200a85b17131d085c25d7811bc4e186efdaf54bbd132994a88cb"
checksum = "0d68c799ae75762b8c3fe375feb6600ef5602c883c5d21eb51c09f22b83c4643"
dependencies = [
"form_urlencoded",
"idna",
@@ -11222,9 +11072,9 @@ dependencies = [
[[package]]
name = "xml-rs"
version = "0.8.14"
version = "0.8.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52839dc911083a8ef63efa4d039d1f58b5e409f923e44c80828f206f66e5541c"
checksum = "2d8f380ae16a37b30e6a2cf67040608071384b1450c189e61bea3ff57cde922d"
[[package]]
name = "xz2"

View File

@@ -17,7 +17,6 @@ members = [
"src/common/meta",
"src/common/procedure",
"src/common/procedure-test",
"src/common/pprof",
"src/common/query",
"src/common/recordbatch",
"src/common/runtime",
@@ -50,7 +49,7 @@ members = [
]
[workspace.package]
version = "0.4.0"
version = "0.3.0"
edition = "2021"
license = "Apache-2.0"

View File

@@ -106,7 +106,7 @@ Please see [the online document site](https://docs.greptime.com/getting-started/
Read the [complete getting started guide](https://docs.greptime.com/getting-started/overview#connect) on our [official document site](https://docs.greptime.com/).
To write and query data, GreptimeDB is compatible with multiple [protocols and clients](https://docs.greptime.com/user-guide/client/overview).
To write and query data, GreptimeDB is compatible with multiple [protocols and clients](https://docs.greptime.com/user-guide/clients).
## Resources
@@ -123,7 +123,7 @@ To write and query data, GreptimeDB is compatible with multiple [protocols and c
### Documentation
- GreptimeDB [User Guide](https://docs.greptime.com/user-guide/concepts/overview)
- GreptimeDB [User Guide](https://docs.greptime.com/user-guide/concepts.html)
- GreptimeDB [Developer
Guide](https://docs.greptime.com/developer-guide/overview.html)
- GreptimeDB [internal code document](https://greptimedb.rs)

View File

@@ -31,6 +31,7 @@ pub struct DatanodeClients {
impl Default for DatanodeClients {
fn default() -> Self {
// TODO(LFC): Make this channel config configurable.
let config = ChannelConfig::new().timeout(Duration::from_secs(8));
Self {

View File

@@ -254,6 +254,7 @@ impl Database {
let mut client = self.client.make_flight_client()?;
// TODO(LFC): Streaming get flight data.
let flight_data: Vec<FlightData> = client
.mut_inner()
.do_get(request)

View File

@@ -10,6 +10,7 @@ name = "greptime"
path = "src/bin/greptime.rs"
[features]
mem-prof = ["tikv-jemallocator", "tikv-jemalloc-ctl"]
tokio-console = ["common-telemetry/tokio-console"]
[dependencies]
@@ -41,7 +42,8 @@ servers = { path = "../servers" }
session = { path = "../session" }
snafu.workspace = true
substrait = { path = "../common/substrait" }
tikv-jemallocator = "0.5"
tikv-jemalloc-ctl = { version = "0.5", optional = true }
tikv-jemallocator = { version = "0.5", optional = true }
tokio.workspace = true
[dev-dependencies]

View File

@@ -180,19 +180,15 @@ fn full_version() -> &'static str {
)
}
fn log_env_flags() {
info!("command line arguments");
for argument in std::env::args() {
info!("argument: {}", argument);
}
}
#[cfg(feature = "mem-prof")]
#[global_allocator]
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[tokio::main]
async fn main() -> Result<()> {
let cmd = Command::parse();
// TODO(dennis):
// 1. adds ip/port to app
let app_name = &cmd.subcmd.to_string();
let opts = cmd.load_options()?;
@@ -209,14 +205,6 @@ async fn main() -> Result<()> {
// Report app version as gauge.
gauge!("app_version", 1.0, "short_version" => short_version(), "version" => full_version());
// Log version and argument flags.
info!(
"short_version: {}, full_version: {}",
short_version(),
full_version()
);
log_env_flags();
let mut app = cmd.build(opts).await?;
tokio::select! {

View File

@@ -23,7 +23,7 @@ pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to read OPT_PROF, source: {}", source))]
#[snafu(display("Failed to read OPT_PROF"))]
ReadOptProf { source: tikv_jemalloc_ctl::Error },
#[snafu(display("Memory profiling is not enabled"))]
@@ -32,17 +32,13 @@ pub enum Error {
#[snafu(display("Failed to build temp file from given path: {:?}", path))]
BuildTempPath { path: PathBuf, location: Location },
#[snafu(display("Failed to open temp file: {}, source: {}", path, source))]
#[snafu(display("Failed to open temp file: {}", path))]
OpenTempFile {
path: String,
source: std::io::Error,
},
#[snafu(display(
"Failed to dump profiling data to temp file: {:?}, source: {}",
path,
source
))]
#[snafu(display("Failed to dump profiling data to temp file: {:?}", path))]
DumpProfileData {
path: PathBuf,
source: tikv_jemalloc_ctl::Error,

View File

@@ -1,16 +0,0 @@
[package]
name = "common-pprof"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
common-error = { path = "../error" }
pprof = { version = "0.11", features = [
"flamegraph",
"prost-codec",
"protobuf",
] }
prost.workspace = true
snafu.workspace = true
tokio.workspace = true

View File

@@ -1,28 +0,0 @@
# Profiling CPU
## Build GreptimeDB with `pprof` feature
```bash
cargo build --features=pprof
```
## HTTP API
Sample at 99 Hertz, for 5 seconds, output report in [protobuf format](https://github.com/google/pprof/blob/master/proto/profile.proto).
```bash
curl -s '0:4000/v1/prof/cpu' > /tmp/pprof.out
```
Then you can use `pprof` command with the protobuf file.
```bash
go tool pprof -top /tmp/pprof.out
```
Sample at 99 Hertz, for 60 seconds, output report in flamegraph format.
```bash
curl -s '0:4000/v1/prof/cpu?seconds=60&output=flamegraph' > /tmp/pprof.svg
```
Sample at 49 Hertz, for 10 seconds, output report in text format.
```bash
curl -s '0:4000/v1/prof/cpu?seconds=10&frequency=49&output=text' > /tmp/pprof.txt
```

View File

@@ -1,124 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::time::Duration;
use common_error::prelude::{ErrorExt, StatusCode};
use prost::Message;
use snafu::{Location, ResultExt, Snafu};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display(
"Failed to create profiler guard, source: {}, location: {}",
source,
location
))]
CreateGuard {
source: pprof::Error,
location: Location,
},
#[snafu(display("Failed to create report, source: {}, location: {}", source, location))]
CreateReport {
source: pprof::Error,
location: Location,
},
#[snafu(display(
"Failed to create flamegraph, source: {}, location: {}",
source,
location
))]
CreateFlamegraph {
source: pprof::Error,
location: Location,
},
#[snafu(display(
"Failed to create pprof report, source: {}, location: {}",
source,
location
))]
ReportPprof {
source: pprof::Error,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
StatusCode::Unexpected
}
fn as_any(&self) -> &dyn Any {
self
}
}
/// CPU profiler utility.
// Inspired by https://github.com/datafuselabs/databend/blob/67f445e83cd4eceda98f6c1c114858929d564029/src/common/base/src/base/profiling.rs
#[derive(Debug)]
pub struct Profiling {
/// Sample duration.
duration: Duration,
/// Sample frequency.
frequency: i32,
}
impl Profiling {
/// Creates a new profiler.
pub fn new(duration: Duration, frequency: i32) -> Profiling {
Profiling {
duration,
frequency,
}
}
/// Profiles and returns a generated pprof report.
pub async fn report(&self) -> Result<pprof::Report> {
let guard = pprof::ProfilerGuardBuilder::default()
.frequency(self.frequency)
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
.build()
.context(CreateGuardSnafu)?;
tokio::time::sleep(self.duration).await;
guard.report().build().context(CreateReportSnafu)
}
/// Profiles and returns a generated flamegraph.
pub async fn dump_flamegraph(&self) -> Result<Vec<u8>> {
let mut body: Vec<u8> = Vec::new();
let report = self.report().await?;
report
.flamegraph(&mut body)
.context(CreateFlamegraphSnafu)?;
Ok(body)
}
/// Profiles and returns a generated proto.
pub async fn dump_proto(&self) -> Result<Vec<u8>> {
let report = self.report().await?;
// Generate googles pprof format report.
let profile = report.pprof().context(ReportPprofSnafu)?;
let body = profile.encode_to_vec();
Ok(body)
}
}

View File

@@ -172,6 +172,7 @@ impl DfAccumulator for DfAccumulatorAdaptor {
}
fn size(&self) -> usize {
// TODO(LFC): Implement new "size" method for Accumulator.
0
}
}

View File

@@ -194,6 +194,7 @@ impl DfPhysicalPlan for DfPhysicalPlanAdapter {
}
fn statistics(&self) -> Statistics {
// TODO(LFC): impl statistics
Statistics::default()
}
}

View File

@@ -285,6 +285,9 @@ impl Instance {
requests: InsertRequests,
ctx: QueryContextRef,
) -> Result<Output> {
// TODO(LFC): Optimize concurrent table creation and table alteration.
// Currently table creation is guarded by a distributed lock in Metasrv. However, table
// alteration is not. We should all switch to procedures in Metasrv.
let _ = future::join_all(
requests
.inserts
@@ -560,7 +563,6 @@ impl PromHandler for Instance {
let stmt = QueryLanguageParser::parse_promql(query).with_context(|_| ParsePromQLSnafu {
query: query.clone(),
})?;
self.statement_executor
.execute_stmt(stmt, query_ctx)
.await

View File

@@ -598,6 +598,7 @@ impl DistInstance {
Ok(Output::AffectedRows(affected_rows as usize))
}
// TODO(LFC): Like insertions above, refactor GRPC deletion impl here.
async fn handle_dist_delete(
&self,
request: DeleteRequest,
@@ -661,6 +662,8 @@ impl GrpcQueryHandler for DistInstance {
match expr {
DdlExpr::CreateDatabase(expr) => self.handle_create_database(expr, ctx).await,
DdlExpr::CreateTable(mut expr) => {
// TODO(LFC): Support creating distributed table through GRPC interface.
// Currently only SQL supports it; how to design the fields in CreateTableExpr?
let _ = self.create_table(&mut expr, None).await;
Ok(Output::AffectedRows(0))
}

View File

@@ -74,6 +74,12 @@ impl HeartbeatHandler for RegionFailureHandler {
let Some(stat) = acc.stat.as_ref() else { return Ok(()) };
// TODO(LFC): Filter out the stalled heartbeats:
// After the region failover is done, the distribution of region is changed.
// We can compare the heartbeat info here with the global region placement metadata,
// and remove the incorrect region ident keys in failure detect runner
// (by sending a control message).
let heartbeat = DatanodeHeartbeat {
region_idents: stat
.region_stats

View File

@@ -146,21 +146,25 @@ impl MetaSrv {
common_runtime::spawn_bg(async move {
loop {
match rx.recv().await {
Ok(msg) => match msg {
LeaderChangeMessage::Elected(_) => {
if let Err(e) = procedure_manager.recover().await {
error!("Failed to recover procedures, error: {e}");
Ok(msg) => {
match msg {
LeaderChangeMessage::Elected(_) => {
if let Err(e) = procedure_manager.recover().await {
error!("Failed to recover procedures, error: {e}");
}
}
LeaderChangeMessage::StepDown(leader) => {
// TODO(LFC): TBC
error!("Leader :{:?} step down", leader);
}
}
LeaderChangeMessage::StepDown(leader) => {
error!("Leader :{:?} step down", leader);
}
},
}
Err(RecvError::Closed) => {
error!("Not expected, is leader election loop still running?");
break;
}
Err(RecvError::Lagged(_)) => {
// TODO(LFC): TBC
break;
}
}

View File

@@ -43,6 +43,16 @@ impl UpdateRegionMetadata {
Self { candidate }
}
// TODO(LFC): Update the two table metadata values in a batch atomically.
//
// Though the updating of the two metadata values is guarded by a distributed lock,
// it does not robust enough. For example, the lock lease could be expired in the middle of
// one's updating, letting others to start updating concurrently. For now, we set the lease of
// the distributed lock to 10 seconds, which is long enough here to get the job done.
//
// Maybe we should introduce "version" companion values to these two metadata values, and
// use ETCD transaction request to update them?
/// Updates the metadata of the table. Specifically, the [TableGlobalValue] and [TableRouteValue].
async fn update_metadata(
&self,

View File

@@ -16,7 +16,6 @@ mod health;
mod heartbeat;
mod leader;
mod meta;
mod route;
use std::collections::HashMap;
use std::convert::Infallible;
@@ -74,13 +73,6 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin {
},
);
let router = router.route(
"/route",
route::RouteHandler {
kv_store: meta_srv.kv_store(),
},
);
let router = Router::nest("/admin", router);
Admin::new(router)

View File

@@ -1,86 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::meta::{RangeRequest, RangeResponse, TableRouteValue};
use common_meta::key::TABLE_ROUTE_PREFIX;
use prost::Message;
use snafu::{OptionExt, ResultExt};
use tonic::codegen::http;
use super::HttpHandler;
use crate::error::Result;
use crate::service::store::kv::KvStoreRef;
use crate::{error, util};
pub struct RouteHandler {
pub kv_store: KvStoreRef,
}
#[async_trait::async_trait]
impl HttpHandler for RouteHandler {
async fn handle(
&self,
_path: &str,
params: &HashMap<String, String>,
) -> Result<http::Response<String>> {
let full_table_name = params
.get("full_table_name")
.map(|full_table_name| full_table_name.replace('.', "-"))
.context(error::MissingRequiredParameterSnafu {
param: "full_table_name",
})?;
let route_key = format!("{}-{}", TABLE_ROUTE_PREFIX, full_table_name).into_bytes();
let range_end = util::get_prefix_end_key(&route_key);
let req = RangeRequest {
key: route_key,
range_end,
keys_only: false,
..Default::default()
};
let resp = self.kv_store.range(req).await?;
let show = pretty_fmt(resp)?;
http::Response::builder()
.status(http::StatusCode::OK)
.body(show)
.context(error::InvalidHttpBodySnafu)
}
}
fn pretty_fmt(response: RangeResponse) -> Result<String> {
let mut show = "".to_string();
for kv in response.kvs.into_iter() {
let route_key = String::from_utf8(kv.key).unwrap();
let route_val =
TableRouteValue::decode(&kv.value[..]).context(error::DecodeTableRouteSnafu)?;
show.push_str("route_key:\n");
show.push_str(&route_key);
show.push('\n');
show.push_str("route_value:\n");
show.push_str(&format!("{:#?}", route_val));
show.push('\n');
}
Ok(show)
}

View File

@@ -21,7 +21,6 @@ common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-datasource = { path = "../common/datasource" }
common-telemetry = { path = "../common/telemetry" }
common-test-util = { path = "../common/test-util", optional = true }
common-time = { path = "../common/time" }
dashmap = "5.4"
datafusion.workspace = true
@@ -30,7 +29,6 @@ datatypes = { path = "../datatypes" }
futures.workspace = true
key-lock = "0.1"
log-store = { path = "../log-store" }
metrics.workspace = true
object-store = { path = "../object-store" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
@@ -38,6 +36,7 @@ snafu.workspace = true
storage = { path = "../storage" }
store-api = { path = "../store-api" }
table = { path = "../table" }
common-test-util = { path = "../common/test-util", optional = true }
tokio.workspace = true
[dev-dependencies]

View File

@@ -25,7 +25,3 @@ pub const MITO_CREATE_TABLE_UPDATE_MANIFEST_ELAPSED: &str =
pub const MITO_OPEN_TABLE_ELAPSED: &str = "datanode.mito.open_table";
/// Elapsed time of altering tables
pub const MITO_ALTER_TABLE_ELAPSED: &str = "datanode.mito.alter_table";
/// Elapsed time of insertion
pub const MITO_INSERT_ELAPSED: &str = "datanode.mito.insert";
/// Insert batch size.
pub const MITO_INSERT_BATCH_SIZE: &str = "datanode.mito.insert_batch_size";

View File

@@ -29,7 +29,6 @@ use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatch, RecordBatchStreamAdaptor, SendableRecordBatchStream};
use common_telemetry::{info, logging};
use datatypes::schema::Schema;
use metrics::histogram;
use object_store::ObjectStore;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator};
@@ -58,8 +57,6 @@ use crate::error::{
};
use crate::manifest::action::*;
use crate::manifest::TableManifest;
use crate::metrics::{MITO_INSERT_BATCH_SIZE, MITO_INSERT_ELAPSED};
#[inline]
fn table_manifest_dir(table_dir: &str) -> String {
assert!(table_dir.ends_with('/'));
@@ -86,8 +83,6 @@ impl<R: Region> Table for MitoTable<R> {
}
async fn insert(&self, request: InsertRequest) -> TableResult<usize> {
let _timer = common_telemetry::timer!(MITO_INSERT_ELAPSED);
if request.columns_values.is_empty() {
return Ok(0);
}
@@ -110,8 +105,6 @@ impl<R: Region> Table for MitoTable<R> {
// columns_values is not empty, it's safe to unwrap
let rows_num = columns_values.values().next().unwrap().len();
histogram!(MITO_INSERT_BATCH_SIZE, rows_num as f64);
logging::trace!(
"Insert into table {} region {} with data: {:?}",
self.table_info().name,

View File

@@ -116,25 +116,13 @@ impl QueryEngineState {
.cloned()
}
/// Register an aggregate function.
///
/// # Panics
/// Will panic if the function with same name is already registered.
///
/// Panicking consideration: currently the aggregated functions are all statically registered,
/// user cannot define their own aggregate functions on the fly. So we can panic here. If that
/// invariant is broken in the future, we should return an error instead of panicking.
pub fn register_aggregate_function(&self, func: AggregateFunctionMetaRef) {
let name = func.name();
let x = self
.aggregate_functions
// TODO(LFC): Return some error if there exists an aggregate function with the same name.
// Simply overwrite the old value for now.
self.aggregate_functions
.write()
.unwrap()
.insert(name.clone(), func);
assert!(
x.is_none(),
"Already registered aggregate function '{name}'"
);
.insert(func.name(), func);
}
#[inline]

View File

@@ -96,6 +96,7 @@ pub async fn show_databases(
stmt: ShowDatabases,
catalog_manager: CatalogManagerRef,
) -> Result<Output> {
// TODO(LFC): supports WHERE
ensure!(
matches!(stmt.kind, ShowKind::All | ShowKind::Like(_)),
error::UnsupportedExprSnafu {
@@ -135,6 +136,7 @@ pub async fn show_tables(
catalog_manager: CatalogManagerRef,
query_ctx: QueryContextRef,
) -> Result<Output> {
// TODO(LFC): supports WHERE
ensure!(
matches!(stmt.kind, ShowKind::All | ShowKind::Like(_)),
error::UnsupportedExprSnafu {

View File

@@ -5,7 +5,6 @@ edition.workspace = true
license.workspace = true
[features]
pprof = ["dep:common-pprof"]
mem-prof = ["dep:common-mem-prof"]
dashboard = []
@@ -26,7 +25,6 @@ common-error = { path = "../common/error" }
common-grpc = { path = "../common/grpc" }
common-grpc-expr = { path = "../common/grpc-expr" }
common-mem-prof = { path = "../common/mem-prof", optional = true }
common-pprof = { path = "../common/pprof", optional = true }
common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-runtime = { path = "../common/runtime" }
@@ -73,7 +71,6 @@ snap = "1"
sql = { path = "../sql" }
strum = { version = "0.24", features = ["derive"] }
table = { path = "../table" }
tikv-jemalloc-ctl = { version = "0.5", features = ["use_std"] }
tokio-rustls = "0.24"
tokio-stream = { version = "0.1", features = ["net"] }
tokio.workspace = true

View File

@@ -266,19 +266,6 @@ pub enum Error {
source: tokio::task::JoinError,
location: Location,
},
#[cfg(feature = "pprof")]
#[snafu(display("Failed to dump pprof data, source: {}", source))]
DumpPprof {
#[snafu(backtrace)]
source: common_pprof::Error,
},
#[snafu(display("Failed to update jemalloc metrics, source: {source}, location: {location}"))]
UpdateJemallocMetrics {
source: tikv_jemalloc_ctl::Error,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -354,11 +341,6 @@ impl ErrorExt for Error {
StatusCode::Unknown
}
}
#[cfg(feature = "pprof")]
DumpPprof { source, .. } => source.status_code(),
UpdateJemallocMetrics { .. } => StatusCode::Internal,
}
}

View File

@@ -15,18 +15,14 @@
//! PrometheusGateway provides a gRPC interface to query Prometheus metrics
//! by PromQL. The behavior is similar to the Prometheus HTTP API.
use std::sync::Arc;
use api::v1::prometheus_gateway_server::PrometheusGateway;
use api::v1::promql_request::Promql;
use api::v1::{PromqlRequest, PromqlResponse, ResponseHeader};
use async_trait::async_trait;
use common_error::prelude::ErrorExt;
use common_telemetry::timer;
use common_time::util::current_time_rfc3339;
use promql_parser::parser::ValueType;
use query::parser::PromQuery;
use session::context::QueryContext;
use snafu::OptionExt;
use tonic::{Request, Response};
@@ -72,9 +68,23 @@ impl PrometheusGateway for PrometheusGatewayService {
};
let query_context = create_query_context(inner.header.as_ref());
let json_response = self
.handle_inner(prom_query, query_context, is_range_query)
.await;
let _timer = timer!(
crate::metrics::METRIC_SERVER_GRPC_PROM_REQUEST_TIMER,
&[(
crate::metrics::METRIC_DB_LABEL,
query_context.get_db_string()
)]
);
let result = self.handler.do_query(&prom_query, query_context).await;
let (metric_name, mut result_type) =
retrieve_metric_name_and_result_type(&prom_query.query).unwrap_or_default();
// range query only returns matrix
if is_range_query {
result_type = Some(ValueType::Matrix)
};
let json_response = PromJsonResponse::from_query_result(result, metric_name, result_type)
.await
.0;
let json_bytes = serde_json::to_string(&json_response).unwrap().into_bytes();
let response = Response::new(PromqlResponse {
@@ -89,34 +99,4 @@ impl PrometheusGatewayService {
pub fn new(handler: PromHandlerRef) -> Self {
Self { handler }
}
async fn handle_inner(
&self,
query: PromQuery,
ctx: Arc<QueryContext>,
is_range_query: bool,
) -> PromJsonResponse {
let _timer = timer!(
crate::metrics::METRIC_SERVER_GRPC_PROM_REQUEST_TIMER,
&[(crate::metrics::METRIC_DB_LABEL, ctx.get_db_string())]
);
let result = self.handler.do_query(&query, ctx).await;
let (metric_name, mut result_type) =
match retrieve_metric_name_and_result_type(&query.query) {
Ok((metric_name, result_type)) => (metric_name.unwrap_or_default(), result_type),
Err(err) => {
return PromJsonResponse::error(err.status_code().to_string(), err.to_string())
.0
}
};
// range query only returns matrix
if is_range_query {
result_type = ValueType::Matrix;
};
PromJsonResponse::from_query_result(result, metric_name, result_type)
.await
.0
}
}

View File

@@ -12,18 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod admin;
pub mod authorize;
pub mod handler;
pub mod influxdb;
pub mod mem_prof;
pub mod opentsdb;
mod pprof;
pub mod prometheus;
pub mod script;
mod admin;
#[cfg(feature = "dashboard")]
mod dashboard;
#[cfg(feature = "mem-prof")]
pub mod mem_prof;
use std::net::SocketAddr;
use std::sync::Arc;
@@ -503,6 +503,15 @@ impl HttpServer {
);
}
// mem profiler
#[cfg(feature = "mem-prof")]
{
router = router.nest(
&format!("/{HTTP_API_VERSION}/prof"),
Router::new().route("/mem", routing::get(crate::http::mem_prof::mem_prof)),
);
}
if let Some(metrics_handler) = self.metrics_handler {
router = router.nest("", self.route_metrics(metrics_handler));
}
@@ -547,19 +556,6 @@ impl HttpServer {
HttpAuth::<BoxBody>::new(self.user_provider.clone()),
)),
)
// Handlers for debug, we don't expect a timeout.
.nest(
&format!("/{HTTP_API_VERSION}/prof"),
Router::new()
.route(
"/cpu",
routing::get(pprof::pprof_handler).post(pprof::pprof_handler),
)
.route(
"/mem",
routing::get(mem_prof::mem_prof_handler).post(mem_prof::mem_prof_handler),
),
)
}
fn route_metrics<S>(&self, metrics_handler: MetricsHandler) -> Router<S> {

View File

@@ -19,14 +19,14 @@ use aide::transform::TransformOperation;
use axum::extract::{Json, Query, State};
use axum::{Extension, Form};
use common_error::status_code::StatusCode;
use common_telemetry::{error, timer};
use common_telemetry::timer;
use query::parser::PromQuery;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use session::context::UserInfo;
use crate::http::{ApiState, JsonResponse};
use crate::metrics::{JEMALLOC_COLLECTOR, PROCESS_COLLECTOR};
use crate::metrics::PROCESS_COLLECTOR;
use crate::metrics_handler::MetricsHandler;
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
@@ -137,11 +137,7 @@ pub async fn metrics(
) -> String {
// Collect process metrics.
PROCESS_COLLECTOR.collect();
if let Some(c) = JEMALLOC_COLLECTOR.as_ref() {
if let Err(e) = c.update() {
error!(e; "Failed to update jemalloc metrics");
}
}
state.render()
}

View File

@@ -14,14 +14,13 @@
use axum::http::StatusCode;
use axum::response::IntoResponse;
use snafu::ResultExt;
use crate::error::DumpProfileDataSnafu;
#[cfg(feature = "mem-prof")]
#[axum_macros::debug_handler]
pub async fn mem_prof_handler() -> crate::error::Result<impl IntoResponse> {
use snafu::ResultExt;
use crate::error::DumpProfileDataSnafu;
pub async fn mem_prof() -> crate::error::Result<impl IntoResponse> {
Ok((
StatusCode::OK,
common_mem_prof::dump_profile()
@@ -29,12 +28,3 @@ pub async fn mem_prof_handler() -> crate::error::Result<impl IntoResponse> {
.context(DumpProfileDataSnafu)?,
))
}
#[cfg(not(feature = "mem-prof"))]
#[axum_macros::debug_handler]
pub async fn mem_prof_handler() -> crate::error::Result<impl IntoResponse> {
Ok((
StatusCode::NOT_IMPLEMENTED,
"The 'mem-prof' feature is disabled",
))
}

View File

@@ -1,98 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(feature = "pprof")]
pub mod handler {
use std::num::NonZeroI32;
use std::time::Duration;
use axum::extract::Query;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use common_pprof::Profiling;
use common_telemetry::logging;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use crate::error::{DumpPprofSnafu, Result};
/// Output format.
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub enum Output {
/// googles pprof format report in protobuf.
Proto,
/// Simple text format.
Text,
/// svg flamegraph.
Flamegraph,
}
#[derive(Serialize, Deserialize, Debug, JsonSchema)]
#[serde(default)]
pub struct PprofQuery {
seconds: u64,
frequency: NonZeroI32,
output: Output,
}
impl Default for PprofQuery {
fn default() -> PprofQuery {
PprofQuery {
seconds: 5,
// Safety: 99 is non zero.
frequency: NonZeroI32::new(99).unwrap(),
output: Output::Proto,
}
}
}
#[axum_macros::debug_handler]
pub async fn pprof_handler(Query(req): Query<PprofQuery>) -> Result<impl IntoResponse> {
logging::info!("start pprof, request: {:?}", req);
let profiling = Profiling::new(Duration::from_secs(req.seconds), req.frequency.into());
let body = match req.output {
Output::Proto => profiling.dump_proto().await.context(DumpPprofSnafu)?,
Output::Text => {
let report = profiling.report().await.context(DumpPprofSnafu)?;
format!("{:?}", report).into_bytes()
}
Output::Flamegraph => profiling.dump_flamegraph().await.context(DumpPprofSnafu)?,
};
logging::info!("finish pprof");
Ok((StatusCode::OK, body))
}
}
#[cfg(not(feature = "pprof"))]
pub mod handler {
use axum::http::StatusCode;
use axum::response::IntoResponse;
use crate::error::Result;
#[axum_macros::debug_handler]
pub async fn pprof_handler() -> Result<impl IntoResponse> {
Ok((
StatusCode::NOT_IMPLEMENTED,
"The 'pprof' feature is disabled",
))
}
}
pub use handler::pprof_handler;

View File

@@ -15,20 +15,12 @@
use std::task::{Context, Poll};
use std::time::Instant;
use common_telemetry::error;
use hyper::Body;
use metrics::gauge;
use metrics_process::Collector;
use once_cell::sync::Lazy;
use snafu::ResultExt;
use tikv_jemalloc_ctl::stats::{allocated_mib, resident_mib};
use tikv_jemalloc_ctl::{epoch, epoch_mib, stats};
use tonic::body::BoxBody;
use tower::{Layer, Service};
use crate::error;
use crate::error::UpdateJemallocMetricsSnafu;
pub(crate) const METRIC_DB_LABEL: &str = "db";
pub(crate) const METRIC_CODE_LABEL: &str = "code";
pub(crate) const METRIC_TYPE_LABEL: &str = "type";
@@ -67,8 +59,6 @@ pub(crate) const METRIC_GRPC_REQUESTS_ELAPSED: &str = "servers.grpc_requests_ela
pub(crate) const METRIC_METHOD_LABEL: &str = "method";
pub(crate) const METRIC_PATH_LABEL: &str = "path";
pub(crate) const METRIC_STATUS_LABEL: &str = "status";
pub(crate) const METRIC_JEMALLOC_RESIDENT: &str = "sys.jemalloc.resident";
pub(crate) const METRIC_JEMALLOC_ALLOCATED: &str = "sys.jemalloc.allocated";
/// Prometheus style process metrics collector.
pub(crate) static PROCESS_COLLECTOR: Lazy<Collector> = Lazy::new(|| {
@@ -78,49 +68,6 @@ pub(crate) static PROCESS_COLLECTOR: Lazy<Collector> = Lazy::new(|| {
collector
});
pub(crate) static JEMALLOC_COLLECTOR: Lazy<Option<JemallocCollector>> = Lazy::new(|| {
let collector = JemallocCollector::try_new()
.map_err(|e| {
error!(e; "Failed to retrieve jemalloc metrics");
e
})
.ok();
collector.map(|c| {
if let Err(e) = c.update() {
error!(e; "Failed to update jemalloc metrics");
};
c
})
});
pub(crate) struct JemallocCollector {
epoch: epoch_mib,
allocated: allocated_mib,
resident: resident_mib,
}
impl JemallocCollector {
pub(crate) fn try_new() -> error::Result<Self> {
let e = epoch::mib().context(UpdateJemallocMetricsSnafu)?;
let allocated = stats::allocated::mib().context(UpdateJemallocMetricsSnafu)?;
let resident = stats::resident::mib().context(UpdateJemallocMetricsSnafu)?;
Ok(Self {
epoch: e,
allocated,
resident,
})
}
pub(crate) fn update(&self) -> error::Result<()> {
self.epoch.advance().context(UpdateJemallocMetricsSnafu)?;
let allocated = self.allocated.read().context(UpdateJemallocMetricsSnafu)?;
let resident = self.resident.read().context(UpdateJemallocMetricsSnafu)?;
gauge!(METRIC_JEMALLOC_ALLOCATED, allocated as f64);
gauge!(METRIC_JEMALLOC_RESIDENT, resident as f64);
Ok(())
}
}
// Based on https://github.com/hyperium/tonic/blob/master/examples/src/tower/server.rs
// See https://github.com/hyperium/tonic/issues/242
/// A metrics middleware.

View File

@@ -16,7 +16,6 @@
//! Inspired by Databend's "[mysql_federated.rs](https://github.com/datafuselabs/databend/blob/ac706bf65845e6895141c96c0a10bad6fdc2d367/src/query/service/src/servers/mysql/mysql_federated.rs)".
use std::collections::HashMap;
use std::env;
use std::sync::Arc;
use common_query::Output;
@@ -31,6 +30,9 @@ use regex::bytes::RegexSet;
use regex::Regex;
use session::context::QueryContextRef;
// TODO(LFC): Include GreptimeDB's version and git commit tag etc.
const MYSQL_VERSION: &str = "8.0.26";
static SELECT_VAR_PATTERN: Lazy<Regex> = Lazy::new(|| Regex::new("(?i)^(SELECT @@(.*))").unwrap());
static MYSQL_CONN_JAVA_PATTERN: Lazy<Regex> =
Lazy::new(|| Regex::new("(?i)^(/\\* mysql-connector-j(.*))").unwrap());
@@ -283,7 +285,7 @@ fn check_others(query: &str, query_ctx: QueryContextRef) -> Option<Output> {
}
let recordbatches = if SELECT_VERSION_PATTERN.is_match(query) {
Some(select_function("version()", &get_version()))
Some(select_function("version()", MYSQL_VERSION))
} else if SELECT_DATABASE_PATTERN.is_match(query) {
let schema = query_ctx.current_schema();
Some(select_function("database()", &schema))
@@ -316,16 +318,8 @@ pub(crate) fn check(query: &str, query_ctx: QueryContextRef) -> Option<Output> {
.or_else(|| check_others(query, query_ctx))
}
// get GreptimeDB's version.
fn get_version() -> String {
format!(
"{}-greptime",
env::var("CARGO_PKG_VERSION").unwrap_or_else(|_| "unknown".to_string()),
)
}
#[cfg(test)]
mod test {
use session::context::QueryContext;
use super::*;
@@ -351,15 +345,13 @@ mod test {
}
let query = "select version()";
let expected = format!(
r#"+----------------+
| version() |
+----------------+
| {}-greptime |
+----------------+"#,
env::var("CARGO_PKG_VERSION").unwrap_or_else(|_| "unknown".to_string())
);
test(query, &expected);
let expected = "\
+-----------+
| version() |
+-----------+
| 8.0.26 |
+-----------+";
test(query, expected);
let query = "SELECT @@version_comment LIMIT 1";
let expected = "\

View File

@@ -88,6 +88,9 @@ impl MysqlInstanceShim {
trace!("Start executing query: '{}'", query);
let start = Instant::now();
// TODO(LFC): Find a better way to deal with these special federated queries:
// `check` uses regex to filter out unsupported statements emitted by MySQL's federated
// components, this is quick and dirty, there must be a better way to do it.
let output =
if let Some(output) = crate::mysql::federated::check(query, self.session.context()) {
vec![Ok(output)]

View File

@@ -157,6 +157,7 @@ impl MysqlServer {
info!("MySQL connection coming from: {}", stream.peer_addr()?);
io_runtime.spawn(async move {
increment_gauge!(crate::metrics::METRIC_MYSQL_CONNECTIONS, 1.0);
// TODO(LFC): Use `output_stream` to write large MySQL ResultSet to client.
if let Err(e) = Self::do_handle(stream, spawn_ref, spawn_config).await {
// TODO(LFC): Write this error to client as well, in MySQL text protocol.
// Looks like we have to expose opensrv-mysql's `PacketWriter`?

View File

@@ -42,7 +42,7 @@ use schemars::JsonSchema;
use serde::de::{self, MapAccess, Visitor};
use serde::{Deserialize, Serialize};
use session::context::{QueryContext, QueryContextRef};
use snafu::{ensure, Location, OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::oneshot::Sender;
use tokio::sync::{oneshot, Mutex};
use tower::ServiceBuilder;
@@ -52,7 +52,7 @@ use tower_http::trace::TraceLayer;
use crate::auth::UserProviderRef;
use crate::error::{
AlreadyStartedSnafu, CollectRecordbatchSnafu, Error, InternalSnafu, InvalidQuerySnafu, Result,
AlreadyStartedSnafu, CollectRecordbatchSnafu, Error, InternalSnafu, NotSupportedSnafu, Result,
StartHttpSnafu, UnexpectedResultSnafu,
};
use crate::http::authorize::HttpAuth;
@@ -97,7 +97,6 @@ impl PromServer {
.route("/query", routing::post(instant_query).get(instant_query))
.route("/query_range", routing::post(range_query).get(range_query))
.route("/labels", routing::post(labels_query).get(labels_query))
.route("/series", routing::post(series_query).get(series_query))
.route(
"/label/:label_name/values",
routing::get(label_values_query),
@@ -192,7 +191,6 @@ pub struct PromData {
pub enum PromResponse {
PromData(PromData),
Labels(Vec<String>),
Series(Vec<HashMap<String, String>>),
LabelValues(Vec<String>),
}
@@ -244,7 +242,7 @@ impl PromJsonResponse {
pub async fn from_query_result(
result: Result<Output>,
metric_name: String,
result_type: ValueType,
result_type: Option<ValueType>,
) -> Json<Self> {
let response: Result<Json<Self>> = try {
let json = match result? {
@@ -271,7 +269,7 @@ impl PromJsonResponse {
json
};
let result_type_string = result_type.to_string();
let result_type_string = result_type.map(|t| t.to_string()).unwrap_or_default();
match response {
Ok(resp) => resp,
@@ -295,7 +293,7 @@ impl PromJsonResponse {
fn record_batches_to_data(
batches: RecordBatches,
metric_name: String,
result_type: ValueType,
result_type: Option<ValueType>,
) -> Result<PromResponse> {
// infer semantic type of each column from schema.
// TODO(ruihang): wish there is a better way to do this.
@@ -390,21 +388,27 @@ impl PromJsonResponse {
.map(|(tags, mut values)| {
let metric = tags.into_iter().collect();
match result_type {
ValueType::Vector | ValueType::Scalar | ValueType::String => Ok(PromSeries {
metric,
value: values.pop(),
..Default::default()
}),
ValueType::Matrix => Ok(PromSeries {
Some(ValueType::Vector) | Some(ValueType::Scalar) | Some(ValueType::String) => {
Ok(PromSeries {
metric,
value: values.pop(),
..Default::default()
})
}
Some(ValueType::Matrix) => Ok(PromSeries {
metric,
values,
..Default::default()
}),
other => NotSupportedSnafu {
feat: format!("PromQL result type {other:?}"),
}
.fail(),
}
})
.collect::<Result<Vec<_>>>()?;
let result_type_string = result_type.to_string();
let result_type_string = result_type.map(|t| t.to_string()).unwrap_or_default();
let data = PromResponse::PromData(PromData {
result_type: result_type_string,
result,
@@ -446,10 +450,8 @@ pub async fn instant_query(
let query_ctx = QueryContext::with(catalog, schema);
let result = handler.do_query(&prom_query, Arc::new(query_ctx)).await;
let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) {
Ok((metric_name, result_type)) => (metric_name.unwrap_or_default(), result_type),
Err(err) => return PromJsonResponse::error(err.status_code().to_string(), err.to_string()),
};
let (metric_name, result_type) =
retrieve_metric_name_and_result_type(&prom_query.query).unwrap_or_default();
PromJsonResponse::from_query_result(result, metric_name, result_type).await
}
@@ -482,11 +484,9 @@ pub async fn range_query(
let query_ctx = QueryContext::with(catalog, schema);
let result = handler.do_query(&prom_query, Arc::new(query_ctx)).await;
let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) {
Err(err) => return PromJsonResponse::error(err.status_code().to_string(), err.to_string()),
Ok((metric_name, _)) => metric_name.unwrap_or_default(),
};
PromJsonResponse::from_query_result(result, metric_name, ValueType::Matrix).await
let (metric_name, _) =
retrieve_metric_name_and_result_type(&prom_query.query).unwrap_or_default();
PromJsonResponse::from_query_result(result, metric_name, Some(ValueType::Matrix)).await
}
#[derive(Debug, Default, Serialize, JsonSchema)]
@@ -593,30 +593,6 @@ pub async fn labels_query(
PromJsonResponse::success(PromResponse::Labels(sorted_labels))
}
async fn retrieve_series_from_query_result(
result: Result<Output>,
series: &mut Vec<HashMap<String, String>>,
table_name: &str,
) -> Result<()> {
match result? {
Output::RecordBatches(batches) => {
record_batches_to_series(batches, series, table_name)?;
Ok(())
}
Output::Stream(stream) => {
let batches = RecordBatches::try_collect(stream)
.await
.context(CollectRecordbatchSnafu)?;
record_batches_to_series(batches, series, table_name)?;
Ok(())
}
Output::AffectedRows(_) => Err(Error::UnexpectedResult {
reason: "expected data result, but got affected rows".to_string(),
location: Location::default(),
}),
}
}
/// Retrieve labels name from query result
async fn retrieve_labels_name_from_query_result(
result: Result<Output>,
@@ -641,28 +617,6 @@ async fn retrieve_labels_name_from_query_result(
}
}
fn record_batches_to_series(
batches: RecordBatches,
series: &mut Vec<HashMap<String, String>>,
table_name: &str,
) -> Result<()> {
for batch in batches.iter() {
for row in batch.rows() {
let mut element: HashMap<String, String> = row
.iter()
.enumerate()
.map(|(idx, column)| {
let column_name = batch.schema.column_name_by_index(idx);
(column_name.to_string(), column.to_string())
})
.collect();
element.insert("__name__".to_string(), table_name.to_string());
series.push(element);
}
}
Ok(())
}
/// Retrieve labels name from record batches
fn record_batches_to_labels_name(
batches: RecordBatches,
@@ -721,13 +675,12 @@ fn record_batches_to_labels_name(
pub(crate) fn retrieve_metric_name_and_result_type(
promql: &str,
) -> Result<(Option<String>, ValueType)> {
let promql_expr = promql_parser::parser::parse(promql)
.map_err(|reason| InvalidQuerySnafu { reason }.build())?;
let metric_name = promql_expr_to_metric_name(&promql_expr);
let result_type = promql_expr.value_type();
) -> Option<(String, Option<ValueType>)> {
let promql_expr = promql_parser::parser::parse(promql).ok()?;
let metric_name = promql_expr_to_metric_name(&promql_expr)?;
let result_type = Some(promql_expr.value_type());
Ok((metric_name, result_type))
Some((metric_name, result_type))
}
fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
@@ -850,12 +803,14 @@ async fn retrieve_label_values_from_record_batch(
ConcreteDataType::String(_) => {}
_ => return Ok(()),
}
for batch in batches.iter() {
let label_column = batch
.column(label_col_idx)
.as_any()
.downcast_ref::<StringVector>()
.unwrap();
for row_index in 0..batch.num_rows() {
if let Some(label_value) = label_column.get_data(row_index) {
labels_values.insert(label_value.to_string());
@@ -865,57 +820,3 @@ async fn retrieve_label_values_from_record_batch(
Ok(())
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct SeriesQuery {
start: Option<String>,
end: Option<String>,
#[serde(flatten)]
matches: Matches,
db: Option<String>,
}
#[axum_macros::debug_handler]
pub async fn series_query(
State(handler): State<PromHandlerRef>,
Query(params): Query<SeriesQuery>,
Form(form_params): Form<SeriesQuery>,
) -> Json<PromJsonResponse> {
let mut queries: Vec<String> = params.matches.0;
if queries.is_empty() {
queries = form_params.matches.0;
}
if queries.is_empty() {
return PromJsonResponse::error("Unsupported", "match[] parameter is required");
}
let start = params
.start
.or(form_params.start)
.unwrap_or_else(yesterday_rfc3339);
let end = params
.end
.or(form_params.end)
.unwrap_or_else(current_time_rfc3339);
let db = &params.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string());
let (catalog, schema) = super::parse_catalog_and_schema_from_client_database_name(db);
let query_ctx = Arc::new(QueryContext::with(catalog, schema));
let mut series = Vec::new();
for query in queries {
let table_name = query.clone();
let prom_query = PromQuery {
query,
start: start.clone(),
end: end.clone(),
// TODO: find a better value for step
step: DEFAULT_LOOKBACK_STRING.to_string(),
};
let result = handler.do_query(&prom_query, query_ctx.clone()).await;
if let Err(err) = retrieve_series_from_query_result(result, &mut series, &table_name).await
{
return PromJsonResponse::error(err.status_code().to_string(), err.to_string());
}
}
PromJsonResponse::success(PromResponse::Series(series))
}

View File

@@ -43,6 +43,7 @@ pub trait SqlQueryHandler {
query_ctx: QueryContextRef,
) -> Vec<std::result::Result<Output, Self::Error>>;
// TODO(LFC): revisit this for mysql prepared statement
async fn do_describe(
&self,
stmt: Statement,

View File

@@ -19,7 +19,6 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use common_telemetry::{logging, timer};
use metrics::counter;
pub use picker::{FlushPicker, PickerConfig};
pub use scheduler::{
FlushHandle, FlushRegionRequest, FlushRequest, FlushScheduler, FlushSchedulerRef,
@@ -33,7 +32,7 @@ use crate::error::Result;
use crate::manifest::action::*;
use crate::manifest::region::RegionManifest;
use crate::memtable::{IterContext, MemtableId, MemtableRef};
use crate::metrics::{FLUSH_BYTES_TOTAL, FLUSH_ELAPSED};
use crate::metrics::FLUSH_ELAPSED;
use crate::region::{RegionWriterRef, SharedDataRef};
use crate::sst::{AccessLayerRef, FileId, FileMeta, Source, SstInfo, WriteOptions};
use crate::wal::Wal;
@@ -298,9 +297,6 @@ impl<S: LogStore> FlushJob<S> {
.flatten()
.collect();
let flush_bytes = metas.iter().map(|f| f.file_size).sum();
counter!(FLUSH_BYTES_TOTAL, flush_bytes);
let file_ids = metas.iter().map(|f| f.file_id).collect::<Vec<_>>();
logging::info!("Successfully flush memtables, region:{region_id}, files: {file_ids:?}");
Ok(metas)

View File

@@ -341,13 +341,12 @@ mod tests {
builder.root(&tmp_dir.path().to_string_lossy());
let object_store = ObjectStore::new(builder).unwrap().finish();
let test_gc_duration = Duration::from_millis(50);
let manifest = RegionManifest::with_checkpointer(
"/manifest/",
object_store,
manifest_compress_type(compress),
None,
Some(test_gc_duration),
Some(Duration::from_millis(50)),
);
manifest.start().await.unwrap();
@@ -493,7 +492,7 @@ mod tests {
);
// wait for gc
tokio::time::sleep(test_gc_duration * 3).await;
tokio::time::sleep(Duration::from_millis(60)).await;
for v in checkpoint_versions {
if v < 4 {

View File

@@ -17,7 +17,6 @@ use store_api::storage::{OpType, SequenceNumber};
use super::MemtableRef;
use crate::error::Result;
use crate::memtable::KeyValues;
use crate::metrics::MEMTABLE_WRITE_ELAPSED;
use crate::write_batch::{Mutation, Payload};
/// Wraps logic of inserting key/values in [WriteBatch] to [Memtable].
@@ -41,8 +40,6 @@ impl Inserter {
/// Won't do schema validation if not configured. Caller (mostly the [`RegionWriter`]) should ensure the
/// schemas of `memtable` are consistent with `payload`'s.
pub fn insert_memtable(&mut self, payload: &Payload, memtable: &MemtableRef) -> Result<()> {
let _timer = common_telemetry::timer!(MEMTABLE_WRITE_ELAPSED);
if payload.is_empty() {
return Ok(());
}

View File

@@ -22,8 +22,6 @@ pub const FLUSH_REQUESTS_TOTAL: &str = "storage.flush.requests_total";
pub const FLUSH_ERRORS_TOTAL: &str = "storage.flush.errors_total";
/// Elapsed time of a flush job.
pub const FLUSH_ELAPSED: &str = "storage.flush.elapsed";
/// Counter of flushed bytes.
pub const FLUSH_BYTES_TOTAL: &str = "storage.flush.bytes_total";
/// Reason to flush.
pub const FLUSH_REASON: &str = "reason";
/// Gauge for open regions
@@ -34,7 +32,3 @@ pub const LOG_STORE_WRITE_ELAPSED: &str = "storage.logstore.write.elapsed";
pub const COMPACT_ELAPSED: &str = "storage.compact.elapsed";
/// Global write buffer size in bytes.
pub const WRITE_BUFFER_BYTES: &str = "storage.write_buffer_bytes";
/// Elapsed time of inserting memtable.
pub const MEMTABLE_WRITE_ELAPSED: &str = "storage.memtable.write.elapsed";
/// Elapsed time of preprocessing write batch.
pub const PREPROCESS_ELAPSED: &str = "storage.write.preprocess.elapsed";

View File

@@ -66,7 +66,7 @@ mod projection;
/// Create metadata of a region with schema: (timestamp, v0).
pub fn new_metadata(region_name: &str) -> RegionMetadata {
let desc = RegionDescBuilder::new(region_name)
.push_field_column(("v0", LogicalTypeId::String, true))
.push_field_column(("v0", LogicalTypeId::Int64, true))
.build();
desc.try_into().unwrap()
}
@@ -94,23 +94,22 @@ impl<S: LogStore> TesterBase<S> {
}
pub async fn close(&self) {
self.region.close(&CloseContext::default()).await.unwrap();
self.region.inner.wal.close().await.unwrap();
}
/// Put without version specified.
///
/// Format of data: (timestamp, v0), timestamp is key, v0 is value.
pub async fn put(&self, data: &[(i64, Option<String>)]) -> WriteResponse {
pub async fn put(&self, data: &[(i64, Option<i64>)]) -> WriteResponse {
self.try_put(data).await.unwrap()
}
/// Put without version specified, returns [`Result<WriteResponse>`]
///
/// Format of data: (timestamp, v0), timestamp is key, v0 is value.
pub async fn try_put(&self, data: &[(i64, Option<String>)]) -> Result<WriteResponse> {
let data: Vec<(TimestampMillisecond, Option<String>)> =
data.iter().map(|(l, r)| ((*l).into(), r.clone())).collect();
pub async fn try_put(&self, data: &[(i64, Option<i64>)]) -> Result<WriteResponse> {
let data: Vec<(TimestampMillisecond, Option<i64>)> =
data.iter().map(|(l, r)| ((*l).into(), *r)).collect();
// Build a batch without version.
let mut batch = new_write_batch_for_test(false);
let put_data = new_put_data(&data);
@@ -120,9 +119,9 @@ impl<S: LogStore> TesterBase<S> {
}
/// Put without version specified directly to inner writer.
pub async fn put_inner(&self, data: &[(i64, Option<String>)]) -> WriteResponse {
let data: Vec<(TimestampMillisecond, Option<String>)> =
data.iter().map(|(l, r)| ((*l).into(), r.clone())).collect();
pub async fn put_inner(&self, data: &[(i64, Option<i64>)]) -> WriteResponse {
let data: Vec<(TimestampMillisecond, Option<i64>)> =
data.iter().map(|(l, r)| ((*l).into(), *r)).collect();
let mut batch = new_write_batch_for_test(false);
let put_data = new_put_data(&data);
batch.put(put_data).unwrap();
@@ -138,7 +137,7 @@ impl<S: LogStore> TesterBase<S> {
}
/// Scan all data.
pub async fn full_scan(&self) -> Vec<(i64, Option<String>)> {
pub async fn full_scan(&self) -> Vec<(i64, Option<i64>)> {
logging::info!("Full scan with ctx {:?}", self.read_ctx);
let snapshot = self.region.snapshot(&self.read_ctx).unwrap();
@@ -187,7 +186,7 @@ impl<S: LogStore> TesterBase<S> {
}
/// Collect data from the reader.
pub async fn collect_reader(&self, mut reader: ChunkReaderImpl) -> Vec<(i64, Option<String>)> {
pub async fn collect_reader(&self, mut reader: ChunkReaderImpl) -> Vec<(i64, Option<i64>)> {
let mut dst = Vec::new();
while let Some(chunk) = reader.next_chunk().await.unwrap() {
let chunk = reader.project_chunk(chunk);
@@ -209,7 +208,7 @@ fn new_write_batch_for_test(enable_version_column: bool) -> WriteBatch {
LogicalTypeId::TimestampMillisecond,
false,
),
("v0", LogicalTypeId::String, true),
("v0", LogicalTypeId::Int64, true),
],
Some(0),
2,
@@ -222,7 +221,7 @@ fn new_write_batch_for_test(enable_version_column: bool) -> WriteBatch {
LogicalTypeId::TimestampMillisecond,
false,
),
("v0", LogicalTypeId::String, true),
("v0", LogicalTypeId::Int64, true),
],
Some(0),
1,
@@ -230,12 +229,12 @@ fn new_write_batch_for_test(enable_version_column: bool) -> WriteBatch {
}
}
fn new_put_data(data: &[(TimestampMillisecond, Option<String>)]) -> HashMap<String, VectorRef> {
fn new_put_data(data: &[(TimestampMillisecond, Option<i64>)]) -> HashMap<String, VectorRef> {
let mut put_data = HashMap::with_capacity(2);
let timestamps =
TimestampMillisecondVector::from_vec(data.iter().map(|v| v.0.into()).collect());
let values = StringVector::from(data.iter().map(|kv| kv.1.clone()).collect::<Vec<_>>());
let values = Int64Vector::from_owned_iterator(data.iter().map(|kv| kv.1));
put_data.insert(
test_util::TIMESTAMP_NAME.to_string(),
@@ -260,7 +259,7 @@ fn new_delete_data(keys: &[TimestampMillisecond]) -> HashMap<String, VectorRef>
delete_data
}
fn append_chunk_to(chunk: &Chunk, dst: &mut Vec<(i64, Option<String>)>) {
fn append_chunk_to(chunk: &Chunk, dst: &mut Vec<(i64, Option<i64>)>) {
assert_eq!(2, chunk.columns.len());
let timestamps = chunk.columns[0]
@@ -269,10 +268,10 @@ fn append_chunk_to(chunk: &Chunk, dst: &mut Vec<(i64, Option<String>)>) {
.unwrap();
let values = chunk.columns[1]
.as_any()
.downcast_ref::<StringVector>()
.downcast_ref::<Int64Vector>()
.unwrap();
for (ts, value) in timestamps.iter_data().zip(values.iter_data()) {
dst.push((ts.unwrap().into_native(), value.map(|s| s.to_string())));
dst.push((ts.unwrap().into_native(), value));
}
}

View File

@@ -18,7 +18,7 @@ use std::sync::Arc;
use common_test_util::temp_dir::create_temp_dir;
use datatypes::prelude::*;
use datatypes::timestamp::TimestampMillisecond;
use datatypes::vectors::{Int64Vector, StringVector, TimestampMillisecondVector, VectorRef};
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, VectorRef};
use log_store::raft_engine::log_store::RaftEngineLogStore;
use store_api::storage::{
AddColumn, AlterOperation, AlterRequest, Chunk, ChunkReader, ColumnDescriptor,
@@ -53,12 +53,12 @@ struct AlterTester {
struct DataRow {
key: Option<i64>,
ts: TimestampMillisecond,
v0: Option<String>,
v0: Option<i64>,
v1: Option<i64>,
}
impl DataRow {
fn new_with_string(key: Option<i64>, ts: i64, v0: Option<String>, v1: Option<i64>) -> Self {
fn new(key: Option<i64>, ts: i64, v0: Option<i64>, v1: Option<i64>) -> Self {
DataRow {
key,
ts: ts.into(),
@@ -66,10 +66,6 @@ impl DataRow {
v1,
}
}
fn new(key: Option<i64>, ts: i64, v0: Option<i64>, v1: Option<i64>) -> Self {
Self::new_with_string(key, ts, v0.map(|s| s.to_string()), v1)
}
}
fn new_put_data(data: &[DataRow]) -> HashMap<String, VectorRef> {
@@ -80,7 +76,7 @@ fn new_put_data(data: &[DataRow]) -> HashMap<String, VectorRef> {
.map(|v| Some(v.ts.into_native()))
.collect::<Vec<_>>(),
);
let values1 = StringVector::from(data.iter().map(|v| v.v0.clone()).collect::<Vec<_>>());
let values1 = Int64Vector::from(data.iter().map(|kv| kv.v0).collect::<Vec<_>>());
let values2 = Int64Vector::from(data.iter().map(|kv| kv.v1).collect::<Vec<_>>());
put_data.insert("k0".to_string(), Arc::new(keys) as VectorRef);
@@ -162,21 +158,13 @@ impl AlterTester {
/// Put data with initial schema.
async fn put_with_init_schema(&self, data: &[(i64, Option<i64>)]) {
// put of FileTesterBase always use initial schema version.
let data = data
.iter()
.map(|(ts, v0)| (*ts, v0.map(|v| v.to_string())))
.collect::<Vec<_>>();
self.base().put(&data).await;
self.base().put(data).await;
}
/// Put data to inner writer with initial schema.
async fn put_inner_with_init_schema(&self, data: &[(i64, Option<i64>)]) {
let data = data
.iter()
.map(|(ts, v0)| (*ts, v0.map(|v| v.to_string())))
.collect::<Vec<_>>();
// put of FileTesterBase always use initial schema version.
self.base().put_inner(&data).await;
self.base().put_inner(data).await;
}
async fn alter(&self, mut req: AlterRequest) {
@@ -191,7 +179,7 @@ impl AlterTester {
metadata.version()
}
async fn full_scan_with_init_schema(&self) -> Vec<(i64, Option<String>)> {
async fn full_scan_with_init_schema(&self) -> Vec<(i64, Option<i64>)> {
self.base().full_scan().await
}
@@ -231,17 +219,17 @@ fn append_chunk_to(chunk: &Chunk, dst: &mut Vec<DataRow>) {
.unwrap();
let v0_vector = chunk.columns[2]
.as_any()
.downcast_ref::<StringVector>()
.downcast_ref::<Int64Vector>()
.unwrap();
let v1_vector = chunk.columns[3]
.as_any()
.downcast_ref::<Int64Vector>()
.unwrap();
for i in 0..k0_vector.len() {
dst.push(DataRow::new_with_string(
dst.push(DataRow::new(
k0_vector.get_data(i),
ts_vector.get_data(i).unwrap().into(),
v0_vector.get_data(i).map(|s| s.to_string()),
v0_vector.get_data(i),
v1_vector.get_data(i),
));
}

View File

@@ -98,11 +98,11 @@ impl Tester {
self.base.as_mut().unwrap().read_ctx.batch_size = batch_size;
}
async fn put(&self, data: &[(i64, Option<String>)]) -> WriteResponse {
async fn put(&self, data: &[(i64, Option<i64>)]) -> WriteResponse {
self.base().put(data).await
}
async fn full_scan(&self) -> Vec<(i64, Option<String>)> {
async fn full_scan(&self) -> Vec<(i64, Option<i64>)> {
self.base().full_scan().await
}
@@ -122,11 +122,11 @@ async fn test_simple_put_scan() {
let tester = Tester::new(REGION_NAME, store_dir).await;
let data = vec![
(1000, Some(100.to_string())),
(1001, Some(101.to_string())),
(1000, Some(100)),
(1001, Some(101)),
(1002, None),
(1003, Some(103.to_string())),
(1004, Some(104.to_string())),
(1003, Some(103)),
(1004, Some(104)),
];
tester.put(&data).await;
@@ -143,7 +143,7 @@ async fn test_sequence_increase() {
let mut committed_sequence = tester.committed_sequence();
for i in 0..100 {
tester.put(&[(i, Some(1234.to_string()))]).await;
tester.put(&[(i, Some(1234))]).await;
committed_sequence += 1;
assert_eq!(committed_sequence, tester.committed_sequence());
@@ -161,9 +161,9 @@ async fn test_reopen() {
let mut all_data = Vec::new();
// Reopen region multiple times.
for i in 0..5 {
let data = (i, Some(i.to_string()));
tester.put(&[data.clone()]).await;
all_data.push(data.clone());
let data = (i, Some(i));
tester.put(&[data]).await;
all_data.push(data);
let output = tester.full_scan().await;
assert_eq!(all_data, output);
@@ -195,7 +195,7 @@ async fn test_scan_different_batch() {
let store_dir = dir.path().to_str().unwrap();
let mut tester = Tester::new(REGION_NAME, store_dir).await;
let data: Vec<_> = (0..=2000).map(|i| (i, Some(i.to_string()))).collect();
let data: Vec<_> = (0..=2000).map(|i| (i, Some(i))).collect();
for chunk in data.chunks(100) {
tester.put(chunk).await;
@@ -218,11 +218,11 @@ async fn test_put_delete_scan() {
let mut tester = Tester::new(REGION_NAME, store_dir).await;
let data = vec![
(1000, Some(100.to_string())),
(1001, Some(101.to_string())),
(1000, Some(100)),
(1001, Some(101)),
(1002, None),
(1003, None),
(1004, Some(104.to_string())),
(1004, Some(104)),
];
tester.put(&data).await;
@@ -232,11 +232,7 @@ async fn test_put_delete_scan() {
tester.delete(&keys).await;
let output = tester.full_scan().await;
let expect = vec![
(1000, Some(100.to_string())),
(1002, None),
(1004, Some(104.to_string())),
];
let expect = vec![(1000, Some(100)), (1002, None), (1004, Some(104))];
assert_eq!(expect, output);
// Deletion is also persistent.
@@ -252,11 +248,11 @@ async fn test_put_delete_absent_key() {
let mut tester = Tester::new(REGION_NAME, store_dir).await;
let data = vec![
(1000, Some(100.to_string())),
(1001, Some(101.to_string())),
(1000, Some(100)),
(1001, Some(101)),
(1002, None),
(1003, None),
(1004, Some(104.to_string())),
(1004, Some(104)),
];
tester.put(&data).await;
@@ -267,11 +263,7 @@ async fn test_put_delete_absent_key() {
tester.delete(&keys).await;
let output = tester.full_scan().await;
let expect = vec![
(1000, Some(100.to_string())),
(1001, Some(101.to_string())),
(1003, None),
];
let expect = vec![(1000, Some(100)), (1001, Some(101)), (1003, None)];
assert_eq!(expect, output);
// Deletion is also persistent.

View File

@@ -65,19 +65,11 @@ impl CloseTester {
}
async fn put(&self, data: &[(i64, Option<i64>)]) -> WriteResponse {
let data = data
.iter()
.map(|(ts, v0)| (*ts, v0.map(|v| v.to_string())))
.collect::<Vec<_>>();
self.base().put(&data).await
self.base().put(data).await
}
async fn try_put(&self, data: &[(i64, Option<i64>)]) -> Result<WriteResponse, Error> {
let data = data
.iter()
.map(|(ts, v0)| (*ts, v0.map(|v| v.to_string())))
.collect::<Vec<_>>();
self.base().try_put(&data).await
self.base().try_put(data).await
}
async fn try_alter(&self, mut req: AlterRequest) -> Result<(), Error> {

View File

@@ -187,11 +187,7 @@ impl CompactionTester {
}
async fn put(&self, data: &[(i64, Option<i64>)]) -> WriteResponse {
let data = data
.iter()
.map(|(ts, v0)| (*ts, v0.map(|v| v.to_string())))
.collect::<Vec<_>>();
self.base().put(&data).await
self.base().put(data).await
}
async fn flush(&self, wait: Option<bool>) {

View File

@@ -94,14 +94,10 @@ impl FlushTester {
}
async fn put(&self, data: &[(i64, Option<i64>)]) -> WriteResponse {
let data = data
.iter()
.map(|(ts, v0)| (*ts, v0.map(|v| v.to_string())))
.collect::<Vec<_>>();
self.base().put(&data).await
self.base().put(data).await
}
async fn full_scan(&self) -> Vec<(i64, Option<String>)> {
async fn full_scan(&self) -> Vec<(i64, Option<i64>)> {
self.base().full_scan().await
}
@@ -220,7 +216,7 @@ async fn test_flush_empty() {
let sst_dir = format!("{}/{}", store_dir, engine::region_sst_dir("", REGION_NAME));
assert!(!has_parquet_file(&sst_dir));
let expect = vec![(1000, Some(100.to_string())), (2000, Some(200.to_string()))];
let expect = vec![(1000, Some(100)), (2000, Some(200))];
let output = tester.full_scan().await;
assert_eq!(expect, output);
@@ -246,11 +242,7 @@ async fn test_read_after_flush() {
// Put element again.
tester.put(&[(3000, Some(300))]).await;
let expect = vec![
(1000, Some(100.to_string())),
(2000, Some(200.to_string())),
(3000, Some(300.to_string())),
];
let expect = vec![(1000, Some(100)), (2000, Some(200)), (3000, Some(300))];
let output = tester.full_scan().await;
assert_eq!(expect, output);
@@ -292,11 +284,7 @@ async fn test_merge_read_after_flush() {
// Overwrite row (In memtable).
tester.put(&[(2000, Some(203))]).await;
let expect = vec![
(1000, Some(100.to_string())),
(2000, Some(203.to_string())),
(3000, Some(300.to_string())),
];
let expect = vec![(1000, Some(100)), (2000, Some(203)), (3000, Some(300))];
let output = tester.full_scan().await;
assert_eq!(expect, output);

View File

@@ -39,7 +39,7 @@ use crate::manifest::action::{
};
use crate::memtable::{Inserter, MemtableBuilderRef, MemtableId, MemtableRef};
use crate::metadata::RegionMetadataRef;
use crate::metrics::{FLUSH_REASON, FLUSH_REQUESTS_TOTAL, PREPROCESS_ELAPSED};
use crate::metrics::{FLUSH_REASON, FLUSH_REQUESTS_TOTAL};
use crate::proto::wal::WalHeader;
use crate::region::{
CompactContext, RecoverdMetadata, RecoveredMetadataMap, RegionManifest, SharedDataRef,
@@ -670,8 +670,6 @@ impl WriterInner {
&mut self,
writer_ctx: &WriterContext<'_, S>,
) -> Result<()> {
let _timer = common_telemetry::timer!(PREPROCESS_ELAPSED);
let version_control = writer_ctx.version_control();
// Check whether memtable is full or flush should be triggered. We need to do this first since
// switching memtables will clear all mutable memtables.

View File

@@ -281,9 +281,7 @@ impl ParquetReader {
.with_row_groups(pruned_row_groups);
// if time range row filter is present, we can push down the filter to reduce rows to scan.
if let Some(row_filter) =
build_time_range_row_filter(self.time_range, &store_schema, &parquet_schema_desc)
{
if let Some(row_filter) = self.build_time_range_row_filter(&parquet_schema_desc) {
builder = builder.with_row_filter(row_filter);
}
@@ -299,55 +297,59 @@ impl ParquetReader {
ChunkStream::new(self.file_handle.clone(), adapter, Box::pin(chunk_stream))
}
}
/// Builds time range row filter.
fn build_time_range_row_filter(
time_range: TimestampRange,
store_schema: &Arc<StoreSchema>,
schema_desc: &SchemaDescriptor,
) -> Option<RowFilter> {
let ts_col_idx = store_schema.timestamp_index();
/// Builds time range row filter.
fn build_time_range_row_filter(&self, schema_desc: &SchemaDescriptor) -> Option<RowFilter> {
let ts_col_idx = self
.projected_schema
.schema_to_read()
.schema()
.timestamp_index()?;
let ts_col = self
.projected_schema
.schema_to_read()
.schema()
.timestamp_column()?;
let ts_col = store_schema.columns().get(ts_col_idx)?;
let ts_col_unit = match &ts_col.data_type {
ConcreteDataType::Int64(_) => TimeUnit::Millisecond,
ConcreteDataType::Timestamp(ts_type) => ts_type.unit(),
_ => unreachable!(),
};
let ts_col_unit = match &ts_col.desc.data_type {
ConcreteDataType::Int64(_) => TimeUnit::Millisecond,
ConcreteDataType::Timestamp(ts_type) => ts_type.unit(),
_ => unreachable!(),
};
let projection = ProjectionMask::roots(schema_desc, vec![ts_col_idx]);
let projection = ProjectionMask::roots(schema_desc, vec![ts_col_idx]);
// checks if converting time range unit into ts col unit will result into rounding error.
if time_unit_lossy(&self.time_range, ts_col_unit) {
let filter = RowFilter::new(vec![Box::new(PlainTimestampRowFilter::new(
self.time_range,
projection,
))]);
return Some(filter);
}
// checks if converting time range unit into ts col unit will result into rounding error.
if time_unit_lossy(&time_range, ts_col_unit) {
let filter = RowFilter::new(vec![Box::new(PlainTimestampRowFilter::new(
time_range, projection,
))]);
return Some(filter);
// If any of the conversion overflows, we cannot use arrow's computation method, instead
// we resort to plain filter that compares timestamp with given range, less efficient,
// but simpler.
// TODO(hl): If the range is gt_eq/lt, we also use PlainTimestampRowFilter, but these cases
// can also use arrow's gt_eq_scalar/lt_scalar methods.
let row_filter = if let (Some(lower), Some(upper)) = (
self.time_range
.start()
.and_then(|s| s.convert_to(ts_col_unit))
.map(|t| t.value()),
self.time_range
.end()
.and_then(|s| s.convert_to(ts_col_unit))
.map(|t| t.value()),
) {
Box::new(FastTimestampRowFilter::new(projection, lower, upper)) as _
} else {
Box::new(PlainTimestampRowFilter::new(self.time_range, projection)) as _
};
let filter = RowFilter::new(vec![row_filter]);
Some(filter)
}
// If any of the conversion overflows, we cannot use arrow's computation method, instead
// we resort to plain filter that compares timestamp with given range, less efficient,
// but simpler.
// TODO(hl): If the range is gt_eq/lt, we also use PlainTimestampRowFilter, but these cases
// can also use arrow's gt_eq_scalar/lt_scalar methods.
let row_filter = if let (Some(lower), Some(upper)) = (
time_range
.start()
.and_then(|s| s.convert_to(ts_col_unit))
.map(|t| t.value()),
time_range
.end()
.and_then(|s| s.convert_to(ts_col_unit))
.map(|t| t.value()),
) {
Box::new(FastTimestampRowFilter::new(projection, lower, upper)) as _
} else {
Box::new(PlainTimestampRowFilter::new(time_range, projection)) as _
};
let filter = RowFilter::new(vec![row_filter]);
Some(filter)
}
fn time_unit_lossy(range: &TimestampRange, ts_col_unit: TimeUnit) -> bool {

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::env;
use std::net::SocketAddr;
use std::sync::Arc;
@@ -36,11 +35,7 @@ use datanode::error::{CreateTableSnafu, Result};
use datanode::instance::Instance;
use datanode::sql::SqlHandler;
use datatypes::data_type::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema, RawSchema};
use datatypes::vectors::{
Float64VectorBuilder, MutableVector, StringVectorBuilder, TimestampMillisecondVectorBuilder,
};
use frontend::instance::Instance as FeInstance;
use frontend::service_config::{MysqlOptions, PostgresOptions};
use object_store::services::{Azblob, Oss, S3};
@@ -59,7 +54,7 @@ use servers::server::Server;
use servers::Mode;
use snafu::ResultExt;
use table::engine::{EngineContext, TableEngineRef};
use table::requests::{CreateTableRequest, InsertRequest, TableOptions};
use table::requests::{CreateTableRequest, TableOptions};
#[derive(Debug, Eq, PartialEq)]
pub enum StorageType {
@@ -276,7 +271,6 @@ pub async fn create_test_table(
catalog_manager: &CatalogManagerRef,
sql_handler: &SqlHandler,
ts_type: ConcreteDataType,
table_name: &str,
) -> Result<()> {
let column_schemas = vec![
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
@@ -284,6 +278,8 @@ pub async fn create_test_table(
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("ts", ts_type, true).with_time_index(true),
];
let table_name = "demo";
let table_engine: TableEngineRef = sql_handler
.table_engine_manager()
.engine(MITO_ENGINE)
@@ -331,7 +327,6 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router
instance.catalog_manager(),
instance.sql_handler(),
ConcreteDataType::timestamp_millisecond_datatype(),
"demo",
)
.await
.unwrap();
@@ -368,7 +363,6 @@ pub async fn setup_test_http_app_with_frontend(
frontend.catalog_manager(),
instance.sql_handler(),
ConcreteDataType::timestamp_millisecond_datatype(),
"demo",
)
.await
.unwrap();
@@ -388,83 +382,25 @@ pub async fn setup_test_http_app_with_frontend(
(app, guard)
}
fn mock_insert_request(host: &str, cpu: f64, memory: f64, ts: i64) -> InsertRequest {
let mut columns_values = HashMap::with_capacity(4);
let mut builder = StringVectorBuilder::with_capacity(1);
builder.push(Some(host));
columns_values.insert("host".to_string(), builder.to_vector());
let mut builder = Float64VectorBuilder::with_capacity(1);
builder.push(Some(cpu));
columns_values.insert("cpu".to_string(), builder.to_vector());
let mut builder = Float64VectorBuilder::with_capacity(1);
builder.push(Some(memory));
columns_values.insert("memory".to_string(), builder.to_vector());
let mut builder = TimestampMillisecondVectorBuilder::with_capacity(1);
builder.push(Some(ts.into()));
columns_values.insert("ts".to_string(), builder.to_vector());
InsertRequest {
catalog_name: common_catalog::consts::DEFAULT_CATALOG_NAME.to_string(),
schema_name: common_catalog::consts::DEFAULT_SCHEMA_NAME.to_string(),
table_name: "demo".to_string(),
columns_values,
region_number: 0,
}
}
pub async fn setup_test_prom_app_with_frontend(
store_type: StorageType,
name: &str,
) -> (Router, TestGuard) {
std::env::set_var("TZ", "UTC");
let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name);
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap());
let frontend = FeInstance::try_new_standalone(instance.clone())
.await
.unwrap();
instance.start().await.unwrap();
create_test_table(
frontend.catalog_manager(),
instance.sql_handler(),
ConcreteDataType::timestamp_millisecond_datatype(),
"demo",
)
.await
.unwrap();
let demo = frontend
.catalog_manager()
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "demo")
.await
.unwrap()
.unwrap();
let _ = demo
.insert(mock_insert_request("host1", 1.1, 2.2, 0))
.await
.unwrap();
let _ = demo
.insert(mock_insert_request("host2", 2.1, 4.3, 600000))
.await
.unwrap();
let http_opts = HttpOptions {
addr: format!("127.0.0.1:{}", ports::get_port()),
..Default::default()
};
let frontend_ref = Arc::new(frontend);
let http_server = HttpServerBuilder::new(http_opts)
.with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(frontend_ref.clone()))
.with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(frontend_ref.clone()))
.with_script_handler(frontend_ref.clone())
.with_prom_handler(frontend_ref.clone())
.build();
let prom_server = PromServer::create_server(frontend_ref);
let app = http_server.build(http_server.make_app());
let app = app.merge(prom_server.make_app());
let prom_server = PromServer::create_server(Arc::new(frontend) as _);
let app = prom_server.make_app();
(app, guard)
}

View File

@@ -415,7 +415,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) {
step: "5s".to_string(),
};
let range_query_request: PromqlRequest = PromqlRequest {
header: Some(header.clone()),
header: Some(header),
promql: Some(Promql::RangeQuery(range_query)),
};
let json_bytes = gateway_client
@@ -458,36 +458,6 @@ pub async fn test_prom_gateway_query(store_type: StorageType) {
};
assert_eq!(range_query_result, expected);
// query nonexistent data
let range_query = PromRangeQuery {
query: "test".to_string(),
start: "1000000000".to_string(),
end: "1000001000".to_string(),
step: "5s".to_string(),
};
let range_query_request: PromqlRequest = PromqlRequest {
header: Some(header),
promql: Some(Promql::RangeQuery(range_query)),
};
let json_bytes = gateway_client
.handle(range_query_request)
.await
.unwrap()
.into_inner()
.body;
let range_query_result = serde_json::from_slice::<PromJsonResponse>(&json_bytes).unwrap();
let expected = PromJsonResponse {
status: "success".to_string(),
data: PromResponse::PromData(PromData {
result_type: "matrix".to_string(),
result: vec![],
}),
error: None,
error_type: None,
warnings: None,
};
assert_eq!(range_query_result, expected);
// clean up
let _ = fe_grpc_server.shutdown().await;
guard.remove_all().await;

View File

@@ -18,7 +18,7 @@ use common_error::status_code::StatusCode as ErrorCode;
use serde_json::json;
use servers::http::handler::HealthResponse;
use servers::http::{JsonOutput, JsonResponse};
use servers::prom::{PromJsonResponse, PromResponse};
use servers::prom::PromJsonResponse;
use tests_integration::test_util::{
setup_test_http_app, setup_test_http_app_with_frontend, setup_test_prom_app_with_frontend,
StorageType,
@@ -315,7 +315,7 @@ pub async fn test_prom_http_api(store_type: StorageType) {
assert_eq!(res.status(), StatusCode::OK);
// labels
let res = client.get("/api/v1/labels?match[]=demo").send().await;
let res = client.get("/api/v1/labels?match[]=up").send().await;
assert_eq!(res.status(), StatusCode::OK);
let res = client
.post("/api/v1/labels?match[]=up")
@@ -323,19 +323,6 @@ pub async fn test_prom_http_api(store_type: StorageType) {
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let res = client
.get("/api/v1/labels?match[]=demo&start=0")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PromJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PromResponse>(json!(["__name__", "cpu", "host", "memory", "ts"]))
.unwrap()
);
// labels query with multiple match[] params
let res = client
.get("/api/v1/labels?match[]=up&match[]=down")
@@ -349,29 +336,6 @@ pub async fn test_prom_http_api(store_type: StorageType) {
.await;
assert_eq!(res.status(), StatusCode::OK);
// series
let res = client
.get("/api/v1/series?match[]=demo&start=0&end=0")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PromJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PromResponse>(json!(
[{"__name__" : "demo","ts":"1970-01-01 00:00:00+0000","cpu":"1.1","host":"host1","memory":"2.2"}]
))
.unwrap()
);
let res = client
.post("/api/v1/series?match[]=up&match[]=down")
.header("Content-Type", "application/x-www-form-urlencoded")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
// label values
// should return error if there is no match[]
let res = client.get("/api/v1/label/instance/values").send().await;
@@ -383,16 +347,14 @@ pub async fn test_prom_http_api(store_type: StorageType) {
// single match[]
let res = client
.get("/api/v1/label/host/values?match[]=demo&start=0&end=600")
.get("/api/v1/label/instance/values?match[]=up")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PromJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PromResponse>(json!(["host1", "host2"])).unwrap()
);
let prom_resp = res.json::<PromJsonResponse>().await;
assert_eq!(prom_resp.status, "success");
assert!(prom_resp.error.is_none());
assert!(prom_resp.error_type.is_none());
// multiple match[]
let res = client