mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-27 00:19:58 +00:00
Compare commits
4 Commits
v0.7.1
...
fix-proto-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
038bc4fe6e | ||
|
|
6d07c422d8 | ||
|
|
6c14ece23f | ||
|
|
89c51d9b87 |
@@ -70,7 +70,7 @@ runs:
|
||||
|
||||
- name: Build greptime binary
|
||||
shell: pwsh
|
||||
run: cargo build --profile ${{ inputs.cargo-profile }} --features ${{ inputs.features }} --target ${{ inputs.arch }} --bin greptime
|
||||
run: cargo build --profile ${{ inputs.cargo-profile }} --features ${{ inputs.features }} --target ${{ inputs.arch }}
|
||||
|
||||
- name: Upload artifacts
|
||||
uses: ./.github/actions/upload-artifacts
|
||||
|
||||
8
.github/workflows/develop.yml
vendored
8
.github/workflows/develop.yml
vendored
@@ -117,7 +117,7 @@ jobs:
|
||||
artifacts-dir: bins
|
||||
version: current
|
||||
|
||||
fuzztest:
|
||||
fuzztest:
|
||||
name: Fuzz Test
|
||||
needs: build
|
||||
runs-on: ubuntu-latest
|
||||
@@ -148,7 +148,7 @@ jobs:
|
||||
- name: Unzip binaries
|
||||
run: tar -xvf ./bins.tar.gz
|
||||
- name: Run GreptimeDB
|
||||
run: |
|
||||
run: |
|
||||
./bins/greptime standalone start&
|
||||
- name: Fuzz Test
|
||||
uses: ./.github/actions/fuzz-test
|
||||
@@ -279,10 +279,6 @@ jobs:
|
||||
with:
|
||||
# Shares cross multiple jobs
|
||||
shared-key: "coverage-test"
|
||||
- name: Docker Cache
|
||||
uses: ScribeMD/docker-cache@0.3.7
|
||||
with:
|
||||
key: docker-${{ runner.os }}-coverage
|
||||
- name: Install latest nextest release
|
||||
uses: taiki-e/install-action@nextest
|
||||
- name: Install cargo-llvm-cov
|
||||
|
||||
149
Cargo.lock
generated
149
Cargo.lock
generated
@@ -207,7 +207,7 @@ checksum = "8f1f8f5a6f3d50d89e3797d7593a50f96bb2aaa20ca0cc7be1fb673232c91d72"
|
||||
|
||||
[[package]]
|
||||
name = "api"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"common-base",
|
||||
"common-decimal",
|
||||
@@ -570,6 +570,7 @@ version = "0.3.15"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "942c7cd7ae39e91bde4820d74132e9862e62c2f386c3aa90ccf55949f5bad63a"
|
||||
dependencies = [
|
||||
"brotli",
|
||||
"bzip2",
|
||||
"flate2",
|
||||
"futures-core",
|
||||
@@ -588,7 +589,6 @@ version = "0.4.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bc2d0cfb2a7388d34f590e76686704c494ed7aaceed62ee1ba35cbf363abc2a5"
|
||||
dependencies = [
|
||||
"brotli",
|
||||
"bzip2",
|
||||
"flate2",
|
||||
"futures-core",
|
||||
@@ -695,7 +695,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "auth"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -881,7 +881,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "benchmarks"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"chrono",
|
||||
@@ -1248,7 +1248,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
|
||||
|
||||
[[package]]
|
||||
name = "catalog"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -1558,7 +1558,7 @@ checksum = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1"
|
||||
|
||||
[[package]]
|
||||
name = "client"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -1594,7 +1594,7 @@ dependencies = [
|
||||
"session",
|
||||
"snafu",
|
||||
"substrait 0.17.1",
|
||||
"substrait 0.7.1",
|
||||
"substrait 0.7.0",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tonic 0.10.2",
|
||||
@@ -1624,7 +1624,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cmd"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"anymap",
|
||||
"async-trait",
|
||||
@@ -1677,7 +1677,7 @@ dependencies = [
|
||||
"session",
|
||||
"snafu",
|
||||
"store-api",
|
||||
"substrait 0.7.1",
|
||||
"substrait 0.7.0",
|
||||
"table",
|
||||
"temp-env",
|
||||
"tikv-jemallocator",
|
||||
@@ -1720,7 +1720,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
|
||||
|
||||
[[package]]
|
||||
name = "common-base"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"anymap",
|
||||
"bitvec",
|
||||
@@ -1735,7 +1735,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-catalog"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"common-error",
|
||||
@@ -1746,7 +1746,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-config"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"common-base",
|
||||
"humantime-serde",
|
||||
@@ -1757,7 +1757,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-datasource"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-schema",
|
||||
@@ -1789,7 +1789,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-decimal"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"bigdecimal",
|
||||
@@ -1803,7 +1803,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-error"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"snafu",
|
||||
"strum 0.25.0",
|
||||
@@ -1811,7 +1811,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-function"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -1846,7 +1846,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-greptimedb-telemetry"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"common-error",
|
||||
@@ -1865,7 +1865,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-grpc"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow-flight",
|
||||
@@ -1895,7 +1895,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-grpc-expr"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -1914,7 +1914,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-macro"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"common-query",
|
||||
@@ -1929,7 +1929,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-mem-prof"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"common-error",
|
||||
"common-macro",
|
||||
@@ -1942,7 +1942,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-meta"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-recursion",
|
||||
@@ -1992,11 +1992,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-plugins"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
|
||||
[[package]]
|
||||
name = "common-procedure"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
@@ -2020,7 +2020,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-procedure-test"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"common-procedure",
|
||||
@@ -2028,7 +2028,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-query"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -2051,7 +2051,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-recordbatch"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"common-base",
|
||||
@@ -2071,7 +2071,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-runtime"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"common-error",
|
||||
@@ -2091,7 +2091,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-telemetry"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"atty",
|
||||
"backtrace",
|
||||
@@ -2119,7 +2119,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-test-util"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"client",
|
||||
"common-query",
|
||||
@@ -2131,7 +2131,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-time"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"chrono",
|
||||
@@ -2147,14 +2147,14 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-version"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"build-data",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "common-wal"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"common-base",
|
||||
"common-error",
|
||||
@@ -2802,7 +2802,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "datanode"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow-flight",
|
||||
@@ -2860,7 +2860,7 @@ dependencies = [
|
||||
"snafu",
|
||||
"sql",
|
||||
"store-api",
|
||||
"substrait 0.7.1",
|
||||
"substrait 0.7.0",
|
||||
"table",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
@@ -2874,7 +2874,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "datatypes"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -3361,7 +3361,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "file-engine"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -3462,7 +3462,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "flow"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"bimap",
|
||||
@@ -3519,7 +3519,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
|
||||
|
||||
[[package]]
|
||||
name = "frontend"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -3583,7 +3583,7 @@ dependencies = [
|
||||
"sqlparser 0.38.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=6a93567ae38d42be5c8d08b13c8ff4dde26502ef)",
|
||||
"store-api",
|
||||
"strfmt",
|
||||
"substrait 0.7.1",
|
||||
"substrait 0.7.0",
|
||||
"table",
|
||||
"tokio",
|
||||
"toml 0.8.8",
|
||||
@@ -4352,7 +4352,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "index"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"asynchronous-codec",
|
||||
@@ -4526,12 +4526,11 @@ checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3"
|
||||
|
||||
[[package]]
|
||||
name = "iri-string"
|
||||
version = "0.7.0"
|
||||
version = "0.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "21859b667d66a4c1dacd9df0863b3efb65785474255face87f5bca39dd8407c0"
|
||||
checksum = "8f0f7638c1e223529f1bfdc48c8b133b9e0b434094d1d28473161ee48b235f78"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
"serde",
|
||||
"nom",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4931,7 +4930,7 @@ checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
|
||||
|
||||
[[package]]
|
||||
name = "log-store"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
@@ -5220,7 +5219,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "meta-client"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -5250,7 +5249,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "meta-srv"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"anymap",
|
||||
"api",
|
||||
@@ -5330,7 +5329,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "metric-engine"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"aquamarine",
|
||||
@@ -5402,7 +5401,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "mito2"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"anymap",
|
||||
"api",
|
||||
@@ -6016,7 +6015,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "object-store"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
@@ -6259,7 +6258,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "operator"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -6306,7 +6305,7 @@ dependencies = [
|
||||
"sql",
|
||||
"sqlparser 0.38.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=6a93567ae38d42be5c8d08b13c8ff4dde26502ef)",
|
||||
"store-api",
|
||||
"substrait 0.7.1",
|
||||
"substrait 0.7.0",
|
||||
"table",
|
||||
"tokio",
|
||||
"tonic 0.10.2",
|
||||
@@ -6537,7 +6536,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "partition"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -6897,7 +6896,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "plugins"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"auth",
|
||||
"common-base",
|
||||
@@ -7164,7 +7163,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "promql"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"ahash 0.8.6",
|
||||
"async-recursion",
|
||||
@@ -7375,7 +7374,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "puffin"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bitflags 2.4.1",
|
||||
@@ -7496,7 +7495,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "query"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"ahash 0.8.6",
|
||||
"api",
|
||||
@@ -7557,7 +7556,7 @@ dependencies = [
|
||||
"stats-cli",
|
||||
"store-api",
|
||||
"streaming-stats",
|
||||
"substrait 0.7.1",
|
||||
"substrait 0.7.0",
|
||||
"table",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
@@ -8874,7 +8873,7 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
|
||||
|
||||
[[package]]
|
||||
name = "script"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -9158,7 +9157,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "servers"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"aide",
|
||||
"api",
|
||||
@@ -9264,7 +9263,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "session"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -9534,7 +9533,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "sql"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"common-base",
|
||||
@@ -9586,7 +9585,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "sqlness-runner"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"clap 4.4.11",
|
||||
@@ -9793,7 +9792,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "store-api"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"aquamarine",
|
||||
@@ -9933,7 +9932,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "substrait"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"async-recursion",
|
||||
"async-trait",
|
||||
@@ -10106,7 +10105,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "table"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"anymap",
|
||||
"async-trait",
|
||||
@@ -10218,7 +10217,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
|
||||
|
||||
[[package]]
|
||||
name = "tests-fuzz"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"arbitrary",
|
||||
"async-trait",
|
||||
@@ -10247,7 +10246,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tests-integration"
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow-flight",
|
||||
@@ -10304,7 +10303,7 @@ dependencies = [
|
||||
"sql",
|
||||
"sqlx",
|
||||
"store-api",
|
||||
"substrait 0.7.1",
|
||||
"substrait 0.7.0",
|
||||
"table",
|
||||
"tempfile",
|
||||
"time",
|
||||
@@ -10863,13 +10862,13 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tower-http"
|
||||
version = "0.4.4"
|
||||
version = "0.3.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140"
|
||||
checksum = "f873044bf02dd1e8239e9c1293ea39dad76dc594ec16185d0a1bf31d8dc8d858"
|
||||
dependencies = [
|
||||
"async-compression 0.4.5",
|
||||
"base64 0.21.5",
|
||||
"bitflags 2.4.1",
|
||||
"async-compression 0.3.15",
|
||||
"base64 0.13.1",
|
||||
"bitflags 1.3.2",
|
||||
"bytes",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
|
||||
@@ -62,7 +62,7 @@ members = [
|
||||
resolver = "2"
|
||||
|
||||
[workspace.package]
|
||||
version = "0.7.1"
|
||||
version = "0.7.0"
|
||||
edition = "2021"
|
||||
license = "Apache-2.0"
|
||||
|
||||
|
||||
@@ -1,50 +0,0 @@
|
||||
# TSBS benchmark - v0.7.0
|
||||
|
||||
## Environment
|
||||
|
||||
### Local
|
||||
| | |
|
||||
| ------ | ---------------------------------- |
|
||||
| CPU | AMD Ryzen 7 7735HS (8 core 3.2GHz) |
|
||||
| Memory | 32GB |
|
||||
| Disk | SOLIDIGM SSDPFKNU010TZ |
|
||||
| OS | Ubuntu 22.04.2 LTS |
|
||||
|
||||
### Amazon EC2
|
||||
|
||||
| | |
|
||||
| ------- | -------------- |
|
||||
| Machine | c5d.2xlarge |
|
||||
| CPU | 8 core |
|
||||
| Memory | 16GB |
|
||||
| Disk | 50GB (GP3) |
|
||||
| OS | Ubuntu 22.04.1 |
|
||||
|
||||
|
||||
## Write performance
|
||||
|
||||
| Environment | Ingest rate (rows/s) |
|
||||
| ------------------ | --------------------- |
|
||||
| Local | 3695814.64 |
|
||||
| EC2 c5d.2xlarge | 2987166.64 |
|
||||
|
||||
|
||||
## Query performance
|
||||
|
||||
| Query type | Local (ms) | EC2 c5d.2xlarge (ms) |
|
||||
| --------------------- | ---------- | ---------------------- |
|
||||
| cpu-max-all-1 | 30.56 | 54.74 |
|
||||
| cpu-max-all-8 | 52.69 | 70.50 |
|
||||
| double-groupby-1 | 664.30 | 1366.63 |
|
||||
| double-groupby-5 | 1391.26 | 2141.71 |
|
||||
| double-groupby-all | 2828.94 | 3389.59 |
|
||||
| groupby-orderby-limit | 718.92 | 1213.90 |
|
||||
| high-cpu-1 | 29.21 | 52.98 |
|
||||
| high-cpu-all | 5514.12 | 7194.91 |
|
||||
| lastpoint | 7571.40 | 9423.41 |
|
||||
| single-groupby-1-1-1 | 19.09 | 7.77 |
|
||||
| single-groupby-1-1-12 | 27.28 | 51.64 |
|
||||
| single-groupby-1-8-1 | 31.85 | 11.64 |
|
||||
| single-groupby-5-1-1 | 16.14 | 9.67 |
|
||||
| single-groupby-5-1-12 | 27.21 | 53.62 |
|
||||
| single-groupby-5-8-1 | 39.62 | 14.96 |
|
||||
@@ -34,14 +34,10 @@ pub struct SequenceBuilder {
|
||||
max: u64,
|
||||
}
|
||||
|
||||
fn seq_name(name: impl AsRef<str>) -> String {
|
||||
format!("{}-{}", SEQ_PREFIX, name.as_ref())
|
||||
}
|
||||
|
||||
impl SequenceBuilder {
|
||||
pub fn new(name: impl AsRef<str>, generator: KvBackendRef) -> Self {
|
||||
Self {
|
||||
name: seq_name(name),
|
||||
name: format!("{}-{}", SEQ_PREFIX, name.as_ref()),
|
||||
initial: 0,
|
||||
step: 1,
|
||||
generator,
|
||||
@@ -142,14 +138,13 @@ impl Inner {
|
||||
pub async fn next_range(&self) -> Result<Range<u64>> {
|
||||
let key = self.name.as_bytes();
|
||||
let mut start = self.next;
|
||||
|
||||
let mut expect = if start == self.initial {
|
||||
vec![]
|
||||
} else {
|
||||
u64::to_le_bytes(start).to_vec()
|
||||
};
|
||||
|
||||
for _ in 0..self.force_quit {
|
||||
let expect = if start == self.initial {
|
||||
vec![]
|
||||
} else {
|
||||
u64::to_le_bytes(start).to_vec()
|
||||
};
|
||||
|
||||
let step = self.step.min(self.max - start);
|
||||
|
||||
ensure!(
|
||||
@@ -172,24 +167,15 @@ impl Inner {
|
||||
|
||||
if !res.success {
|
||||
if let Some(kv) = res.prev_kv {
|
||||
expect = kv.value.clone();
|
||||
|
||||
let v: [u8; 8] = match kv.value.try_into() {
|
||||
Ok(a) => a,
|
||||
Err(v) => {
|
||||
return error::UnexpectedSequenceValueSnafu {
|
||||
err_msg: format!("Not a valid u64 for '{}': {v:?}", self.name),
|
||||
}
|
||||
.fail()
|
||||
let value = kv.value;
|
||||
ensure!(
|
||||
value.len() == std::mem::size_of::<u64>(),
|
||||
error::UnexpectedSequenceValueSnafu {
|
||||
err_msg: format!("key={}, unexpected value={:?}", self.name, value)
|
||||
}
|
||||
};
|
||||
let v = u64::from_le_bytes(v);
|
||||
|
||||
// If the existed value is smaller than the initial, we should start from the initial.
|
||||
start = v.max(self.initial);
|
||||
);
|
||||
start = u64::from_le_bytes(value.try_into().unwrap());
|
||||
} else {
|
||||
expect = vec![];
|
||||
|
||||
start = self.initial;
|
||||
}
|
||||
continue;
|
||||
@@ -211,12 +197,8 @@ impl Inner {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::any::Any;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use itertools::{Itertools, MinMaxResult};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use super::*;
|
||||
use crate::error::Error;
|
||||
use crate::kv_backend::memory::MemoryKvBackend;
|
||||
@@ -227,76 +209,6 @@ mod tests {
|
||||
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_sequence_with_existed_value() {
|
||||
async fn test(exist: u64, expected: Vec<u64>) {
|
||||
let kv_backend = Arc::new(MemoryKvBackend::default());
|
||||
|
||||
let exist = u64::to_le_bytes(exist);
|
||||
kv_backend
|
||||
.put(PutRequest::new().with_key(seq_name("s")).with_value(exist))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let initial = 100;
|
||||
let seq = SequenceBuilder::new("s", kv_backend)
|
||||
.initial(initial)
|
||||
.build();
|
||||
|
||||
let mut actual = Vec::with_capacity(expected.len());
|
||||
for _ in 0..expected.len() {
|
||||
actual.push(seq.next().await.unwrap());
|
||||
}
|
||||
assert_eq!(actual, expected);
|
||||
}
|
||||
|
||||
// put a value not greater than the "initial", the sequence should start from "initial"
|
||||
test(1, vec![100, 101, 102]).await;
|
||||
test(100, vec![100, 101, 102]).await;
|
||||
|
||||
// put a value greater than the "initial", the sequence should start from the put value
|
||||
test(200, vec![200, 201, 202]).await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_sequence_with_contention() {
|
||||
let seq = Arc::new(
|
||||
SequenceBuilder::new("s", Arc::new(MemoryKvBackend::default()))
|
||||
.initial(1024)
|
||||
.build(),
|
||||
);
|
||||
|
||||
let (tx, mut rx) = mpsc::unbounded_channel();
|
||||
// Spawn 10 tasks to concurrently get the next sequence. Each task will get 100 sequences.
|
||||
for _ in 0..10 {
|
||||
tokio::spawn({
|
||||
let seq = seq.clone();
|
||||
let tx = tx.clone();
|
||||
async move {
|
||||
for _ in 0..100 {
|
||||
tx.send(seq.next().await.unwrap()).unwrap()
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Test that we get 1000 unique sequences, and start from 1024 to 2023.
|
||||
let mut nums = HashSet::new();
|
||||
let mut c = 0;
|
||||
while c < 1000
|
||||
&& let Some(x) = rx.recv().await
|
||||
{
|
||||
nums.insert(x);
|
||||
c += 1;
|
||||
}
|
||||
assert_eq!(nums.len(), 1000);
|
||||
let MinMaxResult::MinMax(min, max) = nums.iter().minmax() else {
|
||||
unreachable!("nums has more than one elements");
|
||||
};
|
||||
assert_eq!(*min, 1024);
|
||||
assert_eq!(*max, 2023);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_sequence() {
|
||||
let kv_backend = Arc::new(MemoryKvBackend::default());
|
||||
|
||||
@@ -73,7 +73,7 @@ tokio-stream = { workspace = true, features = ["net"] }
|
||||
toml.workspace = true
|
||||
tonic.workspace = true
|
||||
tower = { version = "0.4", features = ["full"] }
|
||||
tower-http = { version = "0.4", features = ["full"] }
|
||||
tower-http = { version = "0.3", features = ["full"] }
|
||||
url = "2.3.1"
|
||||
uuid.workspace = true
|
||||
|
||||
|
||||
@@ -26,7 +26,6 @@ use table::predicate::Predicate;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::flush::WriteBufferManagerRef;
|
||||
use crate::memtable::key_values::KeyValue;
|
||||
pub use crate::memtable::key_values::KeyValues;
|
||||
use crate::memtable::merge_tree::MergeTreeConfig;
|
||||
use crate::metrics::WRITE_BUFFER_BYTES;
|
||||
@@ -34,7 +33,6 @@ use crate::read::Batch;
|
||||
|
||||
pub mod key_values;
|
||||
pub mod merge_tree;
|
||||
pub mod time_partition;
|
||||
pub mod time_series;
|
||||
pub(crate) mod version;
|
||||
|
||||
@@ -84,12 +82,9 @@ pub trait Memtable: Send + Sync + fmt::Debug {
|
||||
/// Returns the id of this memtable.
|
||||
fn id(&self) -> MemtableId;
|
||||
|
||||
/// Writes key values into the memtable.
|
||||
/// Write key values into the memtable.
|
||||
fn write(&self, kvs: &KeyValues) -> Result<()>;
|
||||
|
||||
/// Writes one key value pair into the memtable.
|
||||
fn write_one(&self, key_value: KeyValue) -> Result<()>;
|
||||
|
||||
/// Scans the memtable.
|
||||
/// `projection` selects columns to read, `None` means reading all columns.
|
||||
/// `filters` are the predicates to be pushed down to memtable.
|
||||
|
||||
@@ -71,7 +71,7 @@ impl KeyValues {
|
||||
/// Primary key columns have the same order as region's primary key. Field
|
||||
/// columns are ordered by their position in the region schema (The same order
|
||||
/// as users defined while creating the region).
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
#[derive(Debug)]
|
||||
pub struct KeyValue<'a> {
|
||||
row: &'a Row,
|
||||
schema: &'a Vec<ColumnSchema>,
|
||||
|
||||
@@ -36,7 +36,6 @@ use table::predicate::Predicate;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::flush::WriteBufferManagerRef;
|
||||
use crate::memtable::key_values::KeyValue;
|
||||
use crate::memtable::merge_tree::metrics::WriteMetrics;
|
||||
use crate::memtable::merge_tree::tree::MergeTree;
|
||||
use crate::memtable::{
|
||||
@@ -128,17 +127,6 @@ impl Memtable for MergeTreeMemtable {
|
||||
res
|
||||
}
|
||||
|
||||
fn write_one(&self, key_value: KeyValue) -> Result<()> {
|
||||
let mut metrics = WriteMetrics::default();
|
||||
let mut pk_buffer = Vec::new();
|
||||
// Ensures the memtable always updates stats.
|
||||
let res = self.tree.write_one(key_value, &mut pk_buffer, &mut metrics);
|
||||
|
||||
self.update_stats(&metrics);
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
fn iter(
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
@@ -302,14 +290,16 @@ impl MemtableBuilder for MergeTreeMemtableBuilder {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::BTreeSet;
|
||||
|
||||
use common_time::Timestamp;
|
||||
use datafusion_common::{Column, ScalarValue};
|
||||
use datafusion_expr::{BinaryExpr, Expr, Operator};
|
||||
use datatypes::scalars::ScalarVector;
|
||||
use datatypes::vectors::Int64Vector;
|
||||
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector};
|
||||
|
||||
use super::*;
|
||||
use crate::test_util::memtable_util::{self, collect_iter_timestamps};
|
||||
use crate::test_util::memtable_util;
|
||||
|
||||
#[test]
|
||||
fn test_memtable_sorted_input() {
|
||||
@@ -332,10 +322,23 @@ mod tests {
|
||||
let expected_ts = kvs
|
||||
.iter()
|
||||
.map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
|
||||
.collect::<Vec<_>>();
|
||||
.collect::<BTreeSet<_>>();
|
||||
|
||||
let iter = memtable.iter(None, None).unwrap();
|
||||
let read = collect_iter_timestamps(iter);
|
||||
let read = iter
|
||||
.flat_map(|batch| {
|
||||
batch
|
||||
.unwrap()
|
||||
.timestamps()
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondVector>()
|
||||
.unwrap()
|
||||
.iter_data()
|
||||
.collect::<Vec<_>>()
|
||||
.into_iter()
|
||||
})
|
||||
.map(|v| v.unwrap().0.value())
|
||||
.collect::<BTreeSet<_>>();
|
||||
assert_eq!(expected_ts, read);
|
||||
|
||||
let stats = memtable.stats();
|
||||
@@ -383,7 +386,20 @@ mod tests {
|
||||
memtable.write(&kvs).unwrap();
|
||||
|
||||
let iter = memtable.iter(None, None).unwrap();
|
||||
let read = collect_iter_timestamps(iter);
|
||||
let read = iter
|
||||
.flat_map(|batch| {
|
||||
batch
|
||||
.unwrap()
|
||||
.timestamps()
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondVector>()
|
||||
.unwrap()
|
||||
.iter_data()
|
||||
.collect::<Vec<_>>()
|
||||
.into_iter()
|
||||
})
|
||||
.map(|v| v.unwrap().0.value())
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read);
|
||||
|
||||
let iter = memtable.iter(None, None).unwrap();
|
||||
@@ -498,7 +514,20 @@ mod tests {
|
||||
|
||||
let expect = data.into_iter().map(|x| x.2).collect::<Vec<_>>();
|
||||
let iter = memtable.iter(None, None).unwrap();
|
||||
let read = collect_iter_timestamps(iter);
|
||||
let read = iter
|
||||
.flat_map(|batch| {
|
||||
batch
|
||||
.unwrap()
|
||||
.timestamps()
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondVector>()
|
||||
.unwrap()
|
||||
.iter_data()
|
||||
.collect::<Vec<_>>()
|
||||
.into_iter()
|
||||
})
|
||||
.map(|v| v.unwrap().0.value())
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(expect, read);
|
||||
}
|
||||
|
||||
@@ -535,7 +564,20 @@ mod tests {
|
||||
let iter = memtable
|
||||
.iter(None, Some(Predicate::new(vec![expr.into()])))
|
||||
.unwrap();
|
||||
let read = collect_iter_timestamps(iter);
|
||||
let read = iter
|
||||
.flat_map(|batch| {
|
||||
batch
|
||||
.unwrap()
|
||||
.timestamps()
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondVector>()
|
||||
.unwrap()
|
||||
.iter_data()
|
||||
.collect::<Vec<_>>()
|
||||
.into_iter()
|
||||
})
|
||||
.map(|v| v.unwrap().0.value())
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(timestamps, read);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -148,54 +148,6 @@ impl MergeTree {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Write one key value pair into the tree.
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if the tree is immutable (frozen).
|
||||
pub fn write_one(
|
||||
&self,
|
||||
kv: KeyValue,
|
||||
pk_buffer: &mut Vec<u8>,
|
||||
metrics: &mut WriteMetrics,
|
||||
) -> Result<()> {
|
||||
let has_pk = !self.metadata.primary_key.is_empty();
|
||||
|
||||
ensure!(
|
||||
kv.num_primary_keys() == self.row_codec.num_fields(),
|
||||
PrimaryKeyLengthMismatchSnafu {
|
||||
expect: self.row_codec.num_fields(),
|
||||
actual: kv.num_primary_keys(),
|
||||
}
|
||||
);
|
||||
// Safety: timestamp of kv must be both present and a valid timestamp value.
|
||||
let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
|
||||
metrics.min_ts = metrics.min_ts.min(ts);
|
||||
metrics.max_ts = metrics.max_ts.max(ts);
|
||||
metrics.value_bytes += kv.fields().map(|v| v.data_size()).sum::<usize>();
|
||||
|
||||
if !has_pk {
|
||||
// No primary key.
|
||||
return self.write_no_key(kv);
|
||||
}
|
||||
|
||||
// Encode primary key.
|
||||
pk_buffer.clear();
|
||||
if self.is_partitioned {
|
||||
// Use sparse encoder for metric engine.
|
||||
self.sparse_encoder
|
||||
.encode_to_vec(kv.primary_keys(), pk_buffer)?;
|
||||
} else {
|
||||
self.row_codec.encode_to_vec(kv.primary_keys(), pk_buffer)?;
|
||||
}
|
||||
|
||||
// Write rows with
|
||||
self.write_with_key(pk_buffer, kv, metrics)?;
|
||||
|
||||
metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Scans the tree.
|
||||
pub fn read(
|
||||
&self,
|
||||
|
||||
@@ -1,551 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Partitions memtables by time.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
use common_telemetry::debug;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use common_time::timestamp_millis::BucketAligned;
|
||||
use common_time::Timestamp;
|
||||
use smallvec::{smallvec, SmallVec};
|
||||
use snafu::OptionExt;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
|
||||
use crate::error::{InvalidRequestSnafu, Result};
|
||||
use crate::memtable::key_values::KeyValue;
|
||||
use crate::memtable::version::SmallMemtableVec;
|
||||
use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef};
|
||||
|
||||
/// A partition holds rows with timestamps between `[min, max)`.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TimePartition {
|
||||
/// Memtable of the partition.
|
||||
memtable: MemtableRef,
|
||||
/// Time range of the partition. `None` means there is no time range. The time
|
||||
/// range is `None` if and only if the [TimePartitions::part_duration] is `None`.
|
||||
time_range: Option<PartTimeRange>,
|
||||
}
|
||||
|
||||
impl TimePartition {
|
||||
/// Returns whether the `ts` belongs to the partition.
|
||||
fn contains_timestamp(&self, ts: Timestamp) -> bool {
|
||||
let Some(range) = self.time_range else {
|
||||
return true;
|
||||
};
|
||||
|
||||
range.contains_timestamp(ts)
|
||||
}
|
||||
|
||||
/// Write rows to the part.
|
||||
fn write(&self, kvs: &KeyValues) -> Result<()> {
|
||||
self.memtable.write(kvs)
|
||||
}
|
||||
}
|
||||
|
||||
type PartitionVec = SmallVec<[TimePartition; 2]>;
|
||||
|
||||
/// Partitions.
|
||||
#[derive(Debug)]
|
||||
pub struct TimePartitions {
|
||||
/// Mutable data of partitions.
|
||||
inner: Mutex<PartitionsInner>,
|
||||
/// Duration of a partition.
|
||||
///
|
||||
/// `None` means there is only one partition and the [TimePartition::time_range] is
|
||||
/// also `None`.
|
||||
part_duration: Option<Duration>,
|
||||
/// Metadata of the region.
|
||||
metadata: RegionMetadataRef,
|
||||
/// Builder of memtables.
|
||||
builder: MemtableBuilderRef,
|
||||
}
|
||||
|
||||
pub type TimePartitionsRef = Arc<TimePartitions>;
|
||||
|
||||
impl TimePartitions {
|
||||
/// Returns a new empty partition list with optional duration.
|
||||
pub fn new(
|
||||
metadata: RegionMetadataRef,
|
||||
builder: MemtableBuilderRef,
|
||||
next_memtable_id: MemtableId,
|
||||
part_duration: Option<Duration>,
|
||||
) -> Self {
|
||||
let mut inner = PartitionsInner::new(next_memtable_id);
|
||||
if part_duration.is_none() {
|
||||
// If `part_duration` is None, then we create a partition with `None` time
|
||||
// range so we will write all rows to that partition.
|
||||
let memtable = builder.build(inner.alloc_memtable_id(), &metadata);
|
||||
debug!(
|
||||
"Creates a time partition for all timestamps, region: {}, memtable_id: {}",
|
||||
metadata.region_id,
|
||||
memtable.id(),
|
||||
);
|
||||
let part = TimePartition {
|
||||
memtable,
|
||||
time_range: None,
|
||||
};
|
||||
inner.parts.push(part);
|
||||
}
|
||||
|
||||
Self {
|
||||
inner: Mutex::new(inner),
|
||||
part_duration,
|
||||
metadata,
|
||||
builder,
|
||||
}
|
||||
}
|
||||
|
||||
/// Write key values to memtables.
|
||||
///
|
||||
/// It creates new partitions if necessary.
|
||||
pub fn write(&self, kvs: &KeyValues) -> Result<()> {
|
||||
// Get all parts.
|
||||
let parts = self.list_partitions();
|
||||
|
||||
// Checks whether all rows belongs to a single part. Checks in reverse order as we usually
|
||||
// put to latest part.
|
||||
for part in parts.iter().rev() {
|
||||
let mut all_in_partition = true;
|
||||
for kv in kvs.iter() {
|
||||
// Safety: We checked the schema in the write request.
|
||||
let ts = kv.timestamp().as_timestamp().unwrap().unwrap();
|
||||
if !part.contains_timestamp(ts) {
|
||||
all_in_partition = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if !all_in_partition {
|
||||
continue;
|
||||
}
|
||||
|
||||
// We can write all rows to this part.
|
||||
return part.write(kvs);
|
||||
}
|
||||
|
||||
// Slow path: We have to split kvs by partitions.
|
||||
self.write_multi_parts(kvs, &parts)
|
||||
}
|
||||
|
||||
/// Append memtables in partitions to `memtables`.
|
||||
pub fn list_memtables(&self, memtables: &mut Vec<MemtableRef>) {
|
||||
let inner = self.inner.lock().unwrap();
|
||||
memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
|
||||
}
|
||||
|
||||
/// Returns the number of partitions.
|
||||
pub fn num_partitions(&self) -> usize {
|
||||
let inner = self.inner.lock().unwrap();
|
||||
inner.parts.len()
|
||||
}
|
||||
|
||||
/// Returns true if all memtables are empty.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
let inner = self.inner.lock().unwrap();
|
||||
inner.parts.iter().all(|part| part.memtable.is_empty())
|
||||
}
|
||||
|
||||
/// Freezes all memtables.
|
||||
pub fn freeze(&self) -> Result<()> {
|
||||
let inner = self.inner.lock().unwrap();
|
||||
for part in &*inner.parts {
|
||||
part.memtable.freeze()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Forks latest partition.
|
||||
pub fn fork(&self, metadata: &RegionMetadataRef) -> Self {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
let latest_part = inner
|
||||
.parts
|
||||
.iter()
|
||||
.max_by_key(|part| part.time_range.map(|range| range.min_timestamp))
|
||||
.cloned();
|
||||
|
||||
let Some(old_part) = latest_part else {
|
||||
return Self::new(
|
||||
metadata.clone(),
|
||||
self.builder.clone(),
|
||||
inner.next_memtable_id,
|
||||
self.part_duration,
|
||||
);
|
||||
};
|
||||
let memtable = old_part.memtable.fork(inner.alloc_memtable_id(), metadata);
|
||||
let new_part = TimePartition {
|
||||
memtable,
|
||||
time_range: old_part.time_range,
|
||||
};
|
||||
Self {
|
||||
inner: Mutex::new(PartitionsInner::with_partition(
|
||||
new_part,
|
||||
inner.next_memtable_id,
|
||||
)),
|
||||
part_duration: self.part_duration,
|
||||
metadata: metadata.clone(),
|
||||
builder: self.builder.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns partition duration.
|
||||
pub(crate) fn part_duration(&self) -> Option<Duration> {
|
||||
self.part_duration
|
||||
}
|
||||
|
||||
/// Returns memory usage.
|
||||
pub(crate) fn memory_usage(&self) -> usize {
|
||||
let inner = self.inner.lock().unwrap();
|
||||
inner
|
||||
.parts
|
||||
.iter()
|
||||
.map(|part| part.memtable.stats().estimated_bytes)
|
||||
.sum()
|
||||
}
|
||||
|
||||
/// Append memtables in partitions to small vec.
|
||||
pub(crate) fn list_memtables_to_small_vec(&self, memtables: &mut SmallMemtableVec) {
|
||||
let inner = self.inner.lock().unwrap();
|
||||
memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
|
||||
}
|
||||
|
||||
/// Returns the next memtable id.
|
||||
pub(crate) fn next_memtable_id(&self) -> MemtableId {
|
||||
let inner = self.inner.lock().unwrap();
|
||||
inner.next_memtable_id
|
||||
}
|
||||
|
||||
/// Returns all partitions.
|
||||
fn list_partitions(&self) -> PartitionVec {
|
||||
let inner = self.inner.lock().unwrap();
|
||||
inner.parts.clone()
|
||||
}
|
||||
|
||||
/// Write to multiple partitions.
|
||||
fn write_multi_parts(&self, kvs: &KeyValues, parts: &PartitionVec) -> Result<()> {
|
||||
// If part duration is `None` then there is always one partition and all rows
|
||||
// will be put in that partition before invoking this method.
|
||||
debug_assert!(self.part_duration.is_some());
|
||||
|
||||
let mut parts_to_write = HashMap::new();
|
||||
let mut missing_parts = HashMap::new();
|
||||
for kv in kvs.iter() {
|
||||
let mut part_found = false;
|
||||
// Safety: We used the timestamp before.
|
||||
let ts = kv.timestamp().as_timestamp().unwrap().unwrap();
|
||||
for part in parts {
|
||||
if part.contains_timestamp(ts) {
|
||||
// Safety: Since part duration is `Some` so all time range should be `Some`.
|
||||
parts_to_write
|
||||
.entry(part.time_range.unwrap().min_timestamp)
|
||||
.or_insert_with(|| PartitionToWrite {
|
||||
partition: part.clone(),
|
||||
key_values: Vec::new(),
|
||||
})
|
||||
.key_values
|
||||
.push(kv);
|
||||
part_found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if !part_found {
|
||||
// We need to write it to a new part.
|
||||
// Safety: `new()` ensures duration is always Some if we do to this method.
|
||||
let part_duration = self.part_duration.unwrap();
|
||||
let part_start =
|
||||
partition_start_timestamp(ts, part_duration).with_context(|| {
|
||||
InvalidRequestSnafu {
|
||||
region_id: self.metadata.region_id,
|
||||
reason: format!(
|
||||
"timestamp {ts:?} and bucket {part_duration:?} are out of range"
|
||||
),
|
||||
}
|
||||
})?;
|
||||
missing_parts
|
||||
.entry(part_start)
|
||||
.or_insert_with(Vec::new)
|
||||
.push(kv);
|
||||
}
|
||||
}
|
||||
|
||||
// Writes rows to existing parts.
|
||||
for part_to_write in parts_to_write.into_values() {
|
||||
for kv in part_to_write.key_values {
|
||||
part_to_write.partition.memtable.write_one(kv)?;
|
||||
}
|
||||
}
|
||||
|
||||
let part_duration = self.part_duration.unwrap();
|
||||
// Creates new parts and writes to them. Acquires the lock to avoid others create
|
||||
// the same partition.
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
for (part_start, key_values) in missing_parts {
|
||||
let part_pos = match inner
|
||||
.parts
|
||||
.iter()
|
||||
.position(|part| part.time_range.unwrap().min_timestamp == part_start)
|
||||
{
|
||||
Some(pos) => pos,
|
||||
None => {
|
||||
let range = PartTimeRange::from_start_duration(part_start, part_duration)
|
||||
.with_context(|| InvalidRequestSnafu {
|
||||
region_id: self.metadata.region_id,
|
||||
reason: format!(
|
||||
"Partition time range for {part_start:?} is out of bound, bucket size: {part_duration:?}",
|
||||
),
|
||||
})?;
|
||||
let memtable = self
|
||||
.builder
|
||||
.build(inner.alloc_memtable_id(), &self.metadata);
|
||||
debug!(
|
||||
"Create time partition {:?} for region {}, duration: {:?}, memtable_id: {}, parts_total: {}",
|
||||
range,
|
||||
self.metadata.region_id,
|
||||
part_duration,
|
||||
memtable.id(),
|
||||
inner.parts.len() + 1
|
||||
);
|
||||
let pos = inner.parts.len();
|
||||
inner.parts.push(TimePartition {
|
||||
memtable,
|
||||
time_range: Some(range),
|
||||
});
|
||||
pos
|
||||
}
|
||||
};
|
||||
|
||||
let memtable = &inner.parts[part_pos].memtable;
|
||||
for kv in key_values {
|
||||
memtable.write_one(kv)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Computes the start timestamp of the partition for `ts`.
|
||||
///
|
||||
/// It always use bucket size in seconds which should fit all timestamp resolution.
|
||||
fn partition_start_timestamp(ts: Timestamp, bucket: Duration) -> Option<Timestamp> {
|
||||
// Safety: We convert it to seconds so it never returns `None`.
|
||||
let ts_sec = ts.convert_to(TimeUnit::Second).unwrap();
|
||||
let bucket_sec: i64 = bucket.as_secs().try_into().ok()?;
|
||||
let start_sec = ts_sec.align_by_bucket(bucket_sec)?;
|
||||
start_sec.convert_to(ts.unit())
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PartitionsInner {
|
||||
/// All partitions.
|
||||
parts: PartitionVec,
|
||||
/// Next memtable id.
|
||||
next_memtable_id: MemtableId,
|
||||
}
|
||||
|
||||
impl PartitionsInner {
|
||||
fn new(next_memtable_id: MemtableId) -> Self {
|
||||
Self {
|
||||
parts: Default::default(),
|
||||
next_memtable_id,
|
||||
}
|
||||
}
|
||||
|
||||
fn with_partition(part: TimePartition, next_memtable_id: MemtableId) -> Self {
|
||||
Self {
|
||||
parts: smallvec![part],
|
||||
next_memtable_id,
|
||||
}
|
||||
}
|
||||
|
||||
fn alloc_memtable_id(&mut self) -> MemtableId {
|
||||
let id = self.next_memtable_id;
|
||||
self.next_memtable_id += 1;
|
||||
id
|
||||
}
|
||||
}
|
||||
|
||||
/// Time range of a partition.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
struct PartTimeRange {
|
||||
/// Inclusive min timestamp of rows in the partition.
|
||||
min_timestamp: Timestamp,
|
||||
/// Exclusive max timestamp of rows in the partition.
|
||||
max_timestamp: Timestamp,
|
||||
}
|
||||
|
||||
impl PartTimeRange {
|
||||
fn from_start_duration(start: Timestamp, duration: Duration) -> Option<Self> {
|
||||
let start_sec = start.convert_to(TimeUnit::Second)?;
|
||||
let end_sec = start_sec.add_duration(duration).ok()?;
|
||||
let min_timestamp = start_sec.convert_to(start.unit())?;
|
||||
let max_timestamp = end_sec.convert_to(start.unit())?;
|
||||
|
||||
Some(Self {
|
||||
min_timestamp,
|
||||
max_timestamp,
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns whether the `ts` belongs to the partition.
|
||||
fn contains_timestamp(&self, ts: Timestamp) -> bool {
|
||||
self.min_timestamp <= ts && ts < self.max_timestamp
|
||||
}
|
||||
}
|
||||
|
||||
struct PartitionToWrite<'a> {
|
||||
partition: TimePartition,
|
||||
key_values: Vec<KeyValue<'a>>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::memtable::merge_tree::MergeTreeMemtableBuilder;
|
||||
use crate::test_util::memtable_util::{self, collect_iter_timestamps};
|
||||
|
||||
#[test]
|
||||
fn test_no_duration() {
|
||||
let metadata = memtable_util::metadata_for_test();
|
||||
let builder = Arc::new(MergeTreeMemtableBuilder::default());
|
||||
let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
|
||||
assert_eq!(1, partitions.num_partitions());
|
||||
assert!(partitions.is_empty());
|
||||
|
||||
let kvs = memtable_util::build_key_values(
|
||||
&metadata,
|
||||
"hello".to_string(),
|
||||
0,
|
||||
&[1000, 3000, 7000, 5000, 6000],
|
||||
0, // sequence 0, 1, 2, 3, 4
|
||||
);
|
||||
partitions.write(&kvs).unwrap();
|
||||
|
||||
assert_eq!(1, partitions.num_partitions());
|
||||
assert!(!partitions.is_empty());
|
||||
assert!(!partitions.is_empty());
|
||||
let mut memtables = Vec::new();
|
||||
partitions.list_memtables(&mut memtables);
|
||||
|
||||
let iter = memtables[0].iter(None, None).unwrap();
|
||||
let timestamps = collect_iter_timestamps(iter);
|
||||
assert_eq!(&[1000, 3000, 5000, 6000, 7000], ×tamps[..]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_write_single_part() {
|
||||
let metadata = memtable_util::metadata_for_test();
|
||||
let builder = Arc::new(MergeTreeMemtableBuilder::default());
|
||||
let partitions =
|
||||
TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(10)));
|
||||
assert_eq!(0, partitions.num_partitions());
|
||||
|
||||
let kvs = memtable_util::build_key_values(
|
||||
&metadata,
|
||||
"hello".to_string(),
|
||||
0,
|
||||
&[5000, 2000, 0],
|
||||
0, // sequence 0, 1, 2
|
||||
);
|
||||
// It should creates a new partition.
|
||||
partitions.write(&kvs).unwrap();
|
||||
assert_eq!(1, partitions.num_partitions());
|
||||
assert!(!partitions.is_empty());
|
||||
|
||||
let kvs = memtable_util::build_key_values(
|
||||
&metadata,
|
||||
"hello".to_string(),
|
||||
0,
|
||||
&[3000, 7000, 4000],
|
||||
3, // sequence 3, 4, 5
|
||||
);
|
||||
// Still writes to the same partition.
|
||||
partitions.write(&kvs).unwrap();
|
||||
assert_eq!(1, partitions.num_partitions());
|
||||
|
||||
let mut memtables = Vec::new();
|
||||
partitions.list_memtables(&mut memtables);
|
||||
let iter = memtables[0].iter(None, None).unwrap();
|
||||
let timestamps = collect_iter_timestamps(iter);
|
||||
assert_eq!(&[0, 2000, 3000, 4000, 5000, 7000], ×tamps[..]);
|
||||
let parts = partitions.list_partitions();
|
||||
assert_eq!(
|
||||
Timestamp::new_millisecond(0),
|
||||
parts[0].time_range.unwrap().min_timestamp
|
||||
);
|
||||
assert_eq!(
|
||||
Timestamp::new_millisecond(10000),
|
||||
parts[0].time_range.unwrap().max_timestamp
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_write_multi_parts() {
|
||||
let metadata = memtable_util::metadata_for_test();
|
||||
let builder = Arc::new(MergeTreeMemtableBuilder::default());
|
||||
let partitions =
|
||||
TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(5)));
|
||||
assert_eq!(0, partitions.num_partitions());
|
||||
|
||||
let kvs = memtable_util::build_key_values(
|
||||
&metadata,
|
||||
"hello".to_string(),
|
||||
0,
|
||||
&[2000, 0],
|
||||
0, // sequence 0, 1
|
||||
);
|
||||
// It should creates a new partition.
|
||||
partitions.write(&kvs).unwrap();
|
||||
assert_eq!(1, partitions.num_partitions());
|
||||
assert!(!partitions.is_empty());
|
||||
|
||||
let kvs = memtable_util::build_key_values(
|
||||
&metadata,
|
||||
"hello".to_string(),
|
||||
0,
|
||||
&[3000, 7000, 4000, 5000],
|
||||
2, // sequence 2, 3, 4, 5
|
||||
);
|
||||
// Writes 2 rows to the old partition and 1 row to a new partition.
|
||||
partitions.write(&kvs).unwrap();
|
||||
assert_eq!(2, partitions.num_partitions());
|
||||
|
||||
let parts = partitions.list_partitions();
|
||||
let iter = parts[0].memtable.iter(None, None).unwrap();
|
||||
let timestamps = collect_iter_timestamps(iter);
|
||||
assert_eq!(
|
||||
Timestamp::new_millisecond(0),
|
||||
parts[0].time_range.unwrap().min_timestamp
|
||||
);
|
||||
assert_eq!(
|
||||
Timestamp::new_millisecond(5000),
|
||||
parts[0].time_range.unwrap().max_timestamp
|
||||
);
|
||||
assert_eq!(&[0, 2000, 3000, 4000], ×tamps[..]);
|
||||
let iter = parts[1].memtable.iter(None, None).unwrap();
|
||||
let timestamps = collect_iter_timestamps(iter);
|
||||
assert_eq!(&[5000, 7000], ×tamps[..]);
|
||||
assert_eq!(
|
||||
Timestamp::new_millisecond(5000),
|
||||
parts[1].time_range.unwrap().min_timestamp
|
||||
);
|
||||
assert_eq!(
|
||||
Timestamp::new_millisecond(10000),
|
||||
parts[1].time_range.unwrap().max_timestamp
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -38,7 +38,6 @@ use table::predicate::Predicate;
|
||||
|
||||
use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result};
|
||||
use crate::flush::WriteBufferManagerRef;
|
||||
use crate::memtable::key_values::KeyValue;
|
||||
use crate::memtable::{
|
||||
AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId,
|
||||
MemtableRef, MemtableStats,
|
||||
@@ -111,75 +110,49 @@ impl TimeSeriesMemtable {
|
||||
}
|
||||
|
||||
/// Updates memtable stats.
|
||||
fn update_stats(&self, stats: LocalStats) {
|
||||
self.alloc_tracker.on_allocation(stats.allocated);
|
||||
fn update_stats(&self, request_size: usize, min: i64, max: i64) {
|
||||
self.alloc_tracker.on_allocation(request_size);
|
||||
|
||||
loop {
|
||||
let current_min = self.min_timestamp.load(Ordering::Relaxed);
|
||||
if stats.min_ts >= current_min {
|
||||
if min >= current_min {
|
||||
break;
|
||||
}
|
||||
|
||||
let Err(updated) = self.min_timestamp.compare_exchange(
|
||||
current_min,
|
||||
stats.min_ts,
|
||||
min,
|
||||
Ordering::Relaxed,
|
||||
Ordering::Relaxed,
|
||||
) else {
|
||||
break;
|
||||
};
|
||||
|
||||
if updated == stats.min_ts {
|
||||
if updated == min {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
let current_max = self.max_timestamp.load(Ordering::Relaxed);
|
||||
if stats.max_ts <= current_max {
|
||||
if max <= current_max {
|
||||
break;
|
||||
}
|
||||
|
||||
let Err(updated) = self.max_timestamp.compare_exchange(
|
||||
current_max,
|
||||
stats.max_ts,
|
||||
max,
|
||||
Ordering::Relaxed,
|
||||
Ordering::Relaxed,
|
||||
) else {
|
||||
break;
|
||||
};
|
||||
|
||||
if updated == stats.max_ts {
|
||||
if updated == max {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn write_key_value(&self, kv: KeyValue, stats: &mut LocalStats) -> Result<()> {
|
||||
ensure!(
|
||||
kv.num_primary_keys() == self.row_codec.num_fields(),
|
||||
PrimaryKeyLengthMismatchSnafu {
|
||||
expect: self.row_codec.num_fields(),
|
||||
actual: kv.num_primary_keys()
|
||||
}
|
||||
);
|
||||
let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?;
|
||||
let fields = kv.fields().collect::<Vec<_>>();
|
||||
|
||||
stats.allocated += fields.iter().map(|v| v.data_size()).sum::<usize>();
|
||||
let (series, series_allocated) = self.series_set.get_or_add_series(primary_key_encoded);
|
||||
stats.allocated += series_allocated;
|
||||
|
||||
// safety: timestamp of kv must be both present and a valid timestamp value.
|
||||
let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
|
||||
stats.min_ts = stats.min_ts.min(ts);
|
||||
stats.max_ts = stats.max_ts.max(ts);
|
||||
|
||||
let mut guard = series.write().unwrap();
|
||||
guard.push(kv.timestamp(), kv.sequence(), kv.op_type(), fields);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for TimeSeriesMemtable {
|
||||
@@ -194,30 +167,43 @@ impl Memtable for TimeSeriesMemtable {
|
||||
}
|
||||
|
||||
fn write(&self, kvs: &KeyValues) -> Result<()> {
|
||||
let mut local_stats = LocalStats::default();
|
||||
let mut allocated = 0;
|
||||
let mut min_ts = i64::MAX;
|
||||
let mut max_ts = i64::MIN;
|
||||
|
||||
for kv in kvs.iter() {
|
||||
self.write_key_value(kv, &mut local_stats)?;
|
||||
ensure!(
|
||||
kv.num_primary_keys() == self.row_codec.num_fields(),
|
||||
PrimaryKeyLengthMismatchSnafu {
|
||||
expect: self.row_codec.num_fields(),
|
||||
actual: kv.num_primary_keys()
|
||||
}
|
||||
);
|
||||
let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?;
|
||||
let fields = kv.fields().collect::<Vec<_>>();
|
||||
|
||||
allocated += fields.iter().map(|v| v.data_size()).sum::<usize>();
|
||||
let (series, series_allocated) = self.series_set.get_or_add_series(primary_key_encoded);
|
||||
allocated += series_allocated;
|
||||
|
||||
// safety: timestamp of kv must be both present and a valid timestamp value.
|
||||
let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
|
||||
min_ts = min_ts.min(ts);
|
||||
max_ts = max_ts.max(ts);
|
||||
|
||||
let mut guard = series.write().unwrap();
|
||||
guard.push(kv.timestamp(), kv.sequence(), kv.op_type(), fields);
|
||||
}
|
||||
local_stats.allocated += kvs.num_rows() * std::mem::size_of::<Timestamp>();
|
||||
local_stats.allocated += kvs.num_rows() * std::mem::size_of::<OpType>();
|
||||
allocated += kvs.num_rows() * std::mem::size_of::<Timestamp>();
|
||||
allocated += kvs.num_rows() * std::mem::size_of::<OpType>();
|
||||
|
||||
// TODO(hl): this maybe inaccurate since for-iteration may return early.
|
||||
// We may lift the primary key length check out of Memtable::write
|
||||
// so that we can ensure writing to memtable will succeed.
|
||||
self.update_stats(local_stats);
|
||||
self.update_stats(allocated, min_ts, max_ts);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_one(&self, key_value: KeyValue) -> Result<()> {
|
||||
let mut local_stats = LocalStats::default();
|
||||
let res = self.write_key_value(key_value, &mut local_stats);
|
||||
local_stats.allocated += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
|
||||
|
||||
self.update_stats(local_stats);
|
||||
res
|
||||
}
|
||||
|
||||
fn iter(
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
@@ -281,22 +267,6 @@ impl Memtable for TimeSeriesMemtable {
|
||||
}
|
||||
}
|
||||
|
||||
struct LocalStats {
|
||||
allocated: usize,
|
||||
min_ts: i64,
|
||||
max_ts: i64,
|
||||
}
|
||||
|
||||
impl Default for LocalStats {
|
||||
fn default() -> Self {
|
||||
LocalStats {
|
||||
allocated: 0,
|
||||
min_ts: i64::MAX,
|
||||
max_ts: i64::MIN,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type SeriesRwLockMap = RwLock<BTreeMap<Vec<u8>, Arc<RwLock<Series>>>>;
|
||||
|
||||
struct SeriesSet {
|
||||
|
||||
@@ -20,29 +20,26 @@ use smallvec::SmallVec;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::memtable::time_partition::TimePartitionsRef;
|
||||
use crate::memtable::{MemtableId, MemtableRef};
|
||||
|
||||
pub(crate) type SmallMemtableVec = SmallVec<[MemtableRef; 2]>;
|
||||
|
||||
/// A version of current memtables in a region.
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct MemtableVersion {
|
||||
/// Mutable memtable.
|
||||
pub(crate) mutable: TimePartitionsRef,
|
||||
pub(crate) mutable: MemtableRef,
|
||||
/// Immutable memtables.
|
||||
///
|
||||
/// We only allow one flush job per region but if a flush job failed, then we
|
||||
/// might need to store more than one immutable memtable on the next time we
|
||||
/// flush the region.
|
||||
immutables: SmallMemtableVec,
|
||||
immutables: SmallVec<[MemtableRef; 2]>,
|
||||
}
|
||||
|
||||
pub(crate) type MemtableVersionRef = Arc<MemtableVersion>;
|
||||
|
||||
impl MemtableVersion {
|
||||
/// Returns a new [MemtableVersion] with specific mutable memtable.
|
||||
pub(crate) fn new(mutable: TimePartitionsRef) -> MemtableVersion {
|
||||
pub(crate) fn new(mutable: MemtableRef) -> MemtableVersion {
|
||||
MemtableVersion {
|
||||
mutable,
|
||||
immutables: SmallVec::new(),
|
||||
@@ -56,8 +53,8 @@ impl MemtableVersion {
|
||||
|
||||
/// Lists mutable and immutable memtables.
|
||||
pub(crate) fn list_memtables(&self) -> Vec<MemtableRef> {
|
||||
let mut mems = Vec::with_capacity(self.immutables.len() + self.mutable.num_partitions());
|
||||
self.mutable.list_memtables(&mut mems);
|
||||
let mut mems = Vec::with_capacity(self.immutables.len() + 1);
|
||||
mems.push(self.mutable.clone());
|
||||
mems.extend_from_slice(&self.immutables);
|
||||
mems
|
||||
}
|
||||
@@ -79,13 +76,15 @@ impl MemtableVersion {
|
||||
// soft limit.
|
||||
self.mutable.freeze()?;
|
||||
// Fork the memtable.
|
||||
let mutable = Arc::new(self.mutable.fork(metadata));
|
||||
let mutable = self.mutable.fork(self.next_memtable_id(), metadata);
|
||||
|
||||
// Pushes the mutable memtable to immutable list.
|
||||
let mut immutables =
|
||||
SmallVec::with_capacity(self.immutables.len() + self.mutable.num_partitions());
|
||||
self.mutable.list_memtables_to_small_vec(&mut immutables);
|
||||
immutables.extend(self.immutables.iter().cloned());
|
||||
let immutables = self
|
||||
.immutables
|
||||
.iter()
|
||||
.cloned()
|
||||
.chain([self.mutable.clone()])
|
||||
.collect();
|
||||
Ok(Some(MemtableVersion {
|
||||
mutable,
|
||||
immutables,
|
||||
@@ -104,7 +103,7 @@ impl MemtableVersion {
|
||||
|
||||
/// Returns the memory usage of the mutable memtable.
|
||||
pub(crate) fn mutable_usage(&self) -> usize {
|
||||
self.mutable.memory_usage()
|
||||
self.mutable.stats().estimated_bytes
|
||||
}
|
||||
|
||||
/// Returns the memory usage of the immutable memtables.
|
||||
@@ -122,4 +121,9 @@ impl MemtableVersion {
|
||||
pub(crate) fn is_empty(&self) -> bool {
|
||||
self.mutable.is_empty() && self.immutables.is_empty()
|
||||
}
|
||||
|
||||
/// Returns the next memtable id.
|
||||
pub(crate) fn next_memtable_id(&self) -> MemtableId {
|
||||
self.mutable.id() + 1
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,7 +37,6 @@ use crate::error::{
|
||||
};
|
||||
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
|
||||
use crate::manifest::storage::manifest_compress_type;
|
||||
use crate::memtable::time_partition::TimePartitions;
|
||||
use crate::memtable::MemtableBuilderRef;
|
||||
use crate::region::options::RegionOptions;
|
||||
use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
|
||||
@@ -170,13 +169,7 @@ impl RegionOpener {
|
||||
RegionManifestManager::new(metadata.clone(), region_manifest_options).await?;
|
||||
|
||||
// Initial memtable id is 0.
|
||||
let part_duration = options.compaction.time_window();
|
||||
let mutable = Arc::new(TimePartitions::new(
|
||||
metadata.clone(),
|
||||
self.memtable_builder,
|
||||
0,
|
||||
part_duration,
|
||||
));
|
||||
let mutable = self.memtable_builder.build(0, &metadata);
|
||||
|
||||
debug!("Create region {} with options: {:?}", region_id, options);
|
||||
|
||||
@@ -272,13 +265,7 @@ impl RegionOpener {
|
||||
self.cache_manager.clone(),
|
||||
));
|
||||
// Initial memtable id is 0.
|
||||
let part_duration = region_options.compaction.time_window();
|
||||
let mutable = Arc::new(TimePartitions::new(
|
||||
metadata.clone(),
|
||||
self.memtable_builder.clone(),
|
||||
0,
|
||||
part_duration,
|
||||
));
|
||||
let mutable = self.memtable_builder.build(0, &metadata);
|
||||
let version = VersionBuilder::new(metadata, mutable)
|
||||
.add_files(file_purger.clone(), manifest.files.values().cloned())
|
||||
.flushed_entry_id(manifest.flushed_entry_id)
|
||||
|
||||
@@ -94,14 +94,6 @@ pub enum CompactionOptions {
|
||||
Twcs(TwcsOptions),
|
||||
}
|
||||
|
||||
impl CompactionOptions {
|
||||
pub(crate) fn time_window(&self) -> Option<Duration> {
|
||||
match self {
|
||||
CompactionOptions::Twcs(opts) => opts.time_window,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for CompactionOptions {
|
||||
fn default() -> Self {
|
||||
Self::Twcs(TwcsOptions::default())
|
||||
|
||||
@@ -31,9 +31,8 @@ use store_api::storage::SequenceNumber;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::manifest::action::RegionEdit;
|
||||
use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
|
||||
use crate::memtable::version::{MemtableVersion, MemtableVersionRef};
|
||||
use crate::memtable::{MemtableBuilderRef, MemtableId};
|
||||
use crate::memtable::{MemtableBuilderRef, MemtableId, MemtableRef};
|
||||
use crate::region::options::RegionOptions;
|
||||
use crate::sst::file::FileMeta;
|
||||
use crate::sst::file_purger::FilePurgerRef;
|
||||
@@ -123,14 +122,8 @@ impl VersionControl {
|
||||
/// Mark all opened files as deleted and set the delete marker in [VersionControlData]
|
||||
pub(crate) fn mark_dropped(&self, memtable_builder: &MemtableBuilderRef) {
|
||||
let version = self.current().version;
|
||||
let part_duration = version.memtables.mutable.part_duration();
|
||||
let next_memtable_id = version.memtables.mutable.next_memtable_id();
|
||||
let new_mutable = Arc::new(TimePartitions::new(
|
||||
version.metadata.clone(),
|
||||
memtable_builder.clone(),
|
||||
next_memtable_id,
|
||||
part_duration,
|
||||
));
|
||||
let new_mutable =
|
||||
memtable_builder.build(version.memtables.next_memtable_id(), &version.metadata);
|
||||
|
||||
let mut data = self.data.write().unwrap();
|
||||
data.is_dropped = true;
|
||||
@@ -147,14 +140,7 @@ impl VersionControl {
|
||||
/// new schema. Memtables of the version must be empty.
|
||||
pub(crate) fn alter_schema(&self, metadata: RegionMetadataRef, builder: &MemtableBuilderRef) {
|
||||
let version = self.current().version;
|
||||
let part_duration = version.memtables.mutable.part_duration();
|
||||
let next_memtable_id = version.memtables.mutable.next_memtable_id();
|
||||
let new_mutable = Arc::new(TimePartitions::new(
|
||||
metadata.clone(),
|
||||
builder.clone(),
|
||||
next_memtable_id,
|
||||
part_duration,
|
||||
));
|
||||
let new_mutable = builder.build(version.memtables.next_memtable_id(), &metadata);
|
||||
debug_assert!(version.memtables.mutable.is_empty());
|
||||
debug_assert!(version.memtables.immutables().is_empty());
|
||||
let new_version = Arc::new(
|
||||
@@ -177,14 +163,8 @@ impl VersionControl {
|
||||
) {
|
||||
let version = self.current().version;
|
||||
|
||||
let part_duration = version.memtables.mutable.part_duration();
|
||||
let next_memtable_id = version.memtables.mutable.next_memtable_id();
|
||||
let new_mutable = Arc::new(TimePartitions::new(
|
||||
version.metadata.clone(),
|
||||
memtable_builder.clone(),
|
||||
next_memtable_id,
|
||||
part_duration,
|
||||
));
|
||||
let new_mutable =
|
||||
memtable_builder.build(version.memtables.next_memtable_id(), &version.metadata);
|
||||
let new_version = Arc::new(
|
||||
VersionBuilder::new(version.metadata.clone(), new_mutable)
|
||||
.flushed_entry_id(truncated_entry_id)
|
||||
@@ -262,7 +242,7 @@ pub(crate) struct VersionBuilder {
|
||||
|
||||
impl VersionBuilder {
|
||||
/// Returns a new builder.
|
||||
pub(crate) fn new(metadata: RegionMetadataRef, mutable: TimePartitionsRef) -> Self {
|
||||
pub(crate) fn new(metadata: RegionMetadataRef, mutable: MemtableRef) -> Self {
|
||||
VersionBuilder {
|
||||
metadata,
|
||||
memtables: Arc::new(MemtableVersion::new(mutable)),
|
||||
|
||||
@@ -21,9 +21,7 @@ use api::v1::value::ValueData;
|
||||
use api::v1::{Row, Rows, SemanticType};
|
||||
use datatypes::arrow::array::UInt64Array;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::scalars::ScalarVector;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use datatypes::vectors::TimestampMillisecondVector;
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
|
||||
use store_api::storage::{ColumnId, RegionId, SequenceNumber};
|
||||
use table::predicate::Predicate;
|
||||
@@ -60,10 +58,6 @@ impl Memtable for EmptyMemtable {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_one(&self, _key_value: KeyValue) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn iter(
|
||||
&self,
|
||||
_projection: Option<&[ColumnId]>,
|
||||
@@ -309,20 +303,3 @@ pub(crate) fn encode_key_by_kv(key_value: &KeyValue) -> Vec<u8> {
|
||||
]);
|
||||
row_codec.encode(key_value.primary_keys()).unwrap()
|
||||
}
|
||||
|
||||
/// Collects timestamps from the batch iter.
|
||||
pub(crate) fn collect_iter_timestamps(iter: BoxedBatchIterator) -> Vec<i64> {
|
||||
iter.flat_map(|batch| {
|
||||
batch
|
||||
.unwrap()
|
||||
.timestamps()
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondVector>()
|
||||
.unwrap()
|
||||
.iter_data()
|
||||
.collect::<Vec<_>>()
|
||||
.into_iter()
|
||||
})
|
||||
.map(|v| v.unwrap().0.value())
|
||||
.collect()
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::manifest::action::RegionEdit;
|
||||
use crate::memtable::time_partition::TimePartitions;
|
||||
use crate::memtable::MemtableBuilder;
|
||||
use crate::region::version::{Version, VersionBuilder, VersionControl};
|
||||
use crate::sst::file::{FileId, FileMeta};
|
||||
use crate::sst::file_purger::FilePurgerRef;
|
||||
@@ -101,12 +101,7 @@ impl VersionControlBuilder {
|
||||
|
||||
pub(crate) fn build_version(&self) -> Version {
|
||||
let metadata = Arc::new(self.metadata.clone());
|
||||
let mutable = Arc::new(TimePartitions::new(
|
||||
metadata.clone(),
|
||||
self.memtable_builder.clone(),
|
||||
0,
|
||||
None,
|
||||
));
|
||||
let mutable = self.memtable_builder.build(0, &metadata);
|
||||
VersionBuilder::new(metadata, mutable)
|
||||
.add_files(self.file_purger.clone(), self.files.values().cloned())
|
||||
.build()
|
||||
|
||||
@@ -103,7 +103,7 @@ tokio-stream = { workspace = true, features = ["net"] }
|
||||
tonic.workspace = true
|
||||
tonic-reflection = "0.10"
|
||||
tower = { version = "0.4", features = ["full"] }
|
||||
tower-http = { version = "0.4", features = ["full"] }
|
||||
tower-http = { version = "0.3", features = ["full"] }
|
||||
urlencoding = "2.1"
|
||||
|
||||
[target.'cfg(not(windows))'.dependencies]
|
||||
|
||||
@@ -271,25 +271,18 @@ pub enum Error {
|
||||
#[snafu(display("Not found influx http authorization info"))]
|
||||
NotFoundInfluxAuth {},
|
||||
|
||||
#[snafu(display("Unsupported http auth scheme, name: {}", name))]
|
||||
UnsupportedAuthScheme { name: String },
|
||||
|
||||
#[snafu(display("Invalid visibility ASCII chars"))]
|
||||
InvalidAuthHeaderInvisibleASCII {
|
||||
InvisibleASCII {
|
||||
#[snafu(source)]
|
||||
error: hyper::header::ToStrError,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid utf-8 value"))]
|
||||
InvalidAuthHeaderInvalidUtf8Value {
|
||||
#[snafu(source)]
|
||||
error: FromUtf8Error,
|
||||
location: Location,
|
||||
},
|
||||
#[snafu(display("Unsupported http auth scheme, name: {}", name))]
|
||||
UnsupportedAuthScheme { name: String },
|
||||
|
||||
#[snafu(display("Invalid http authorization header"))]
|
||||
InvalidAuthHeader { location: Location },
|
||||
InvalidAuthorizationHeader { location: Location },
|
||||
|
||||
#[snafu(display("Invalid base64 value"))]
|
||||
InvalidBase64Value {
|
||||
@@ -527,17 +520,16 @@ impl ErrorExt for Error {
|
||||
DescribeStatement { source } => source.status_code(),
|
||||
|
||||
NotFoundAuthHeader { .. } | NotFoundInfluxAuth { .. } => StatusCode::AuthHeaderNotFound,
|
||||
InvalidAuthHeaderInvisibleASCII { .. }
|
||||
InvisibleASCII { .. }
|
||||
| UnsupportedAuthScheme { .. }
|
||||
| InvalidAuthHeader { .. }
|
||||
| InvalidAuthorizationHeader { .. }
|
||||
| InvalidBase64Value { .. }
|
||||
| InvalidAuthHeaderInvalidUtf8Value { .. } => StatusCode::InvalidAuthHeader,
|
||||
| InvalidUtf8Value { .. } => StatusCode::InvalidAuthHeader,
|
||||
|
||||
DatabaseNotFound { .. } => StatusCode::DatabaseNotFound,
|
||||
#[cfg(feature = "mem-prof")]
|
||||
DumpProfileData { source, .. } => source.status_code(),
|
||||
|
||||
InvalidUtf8Value { .. } | InvalidFlushArgument { .. } => StatusCode::InvalidArguments,
|
||||
InvalidFlushArgument { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
ReplacePreparedStmtParams { source, .. }
|
||||
| GetPreparedStmtParams { source, .. }
|
||||
@@ -613,7 +605,9 @@ macro_rules! define_into_tonic_status {
|
||||
fn from(err: $Error) -> Self {
|
||||
use tonic::codegen::http::{HeaderMap, HeaderValue};
|
||||
use tonic::metadata::MetadataMap;
|
||||
use $crate::http::header::constants::GREPTIME_DB_HEADER_ERROR_CODE;
|
||||
use $crate::http::header::constants::{
|
||||
GREPTIME_DB_HEADER_ERROR_CODE, GREPTIME_DB_HEADER_ERROR_MSG,
|
||||
};
|
||||
|
||||
let mut headers = HeaderMap::<HeaderValue>::with_capacity(2);
|
||||
|
||||
@@ -626,6 +620,10 @@ macro_rules! define_into_tonic_status {
|
||||
);
|
||||
let root_error = err.output_msg();
|
||||
|
||||
if let Ok(err_msg) = HeaderValue::from_bytes(root_error.as_bytes()) {
|
||||
let _ = headers.insert(GREPTIME_DB_HEADER_ERROR_MSG, err_msg);
|
||||
}
|
||||
|
||||
let metadata = MetadataMap::from_headers(headers);
|
||||
tonic::Status::with_metadata(
|
||||
$crate::error::status_to_tonic_code(status_code),
|
||||
|
||||
@@ -45,7 +45,6 @@ use tokio::sync::oneshot::{self, Sender};
|
||||
use tokio::sync::Mutex;
|
||||
use tower::timeout::TimeoutLayer;
|
||||
use tower::ServiceBuilder;
|
||||
use tower_http::decompression::RequestDecompressionLayer;
|
||||
use tower_http::trace::TraceLayer;
|
||||
|
||||
use self::authorize::AuthState;
|
||||
@@ -699,11 +698,6 @@ impl HttpServer {
|
||||
Router::new()
|
||||
.route("/write", routing::post(influxdb_write_v1))
|
||||
.route("/api/v2/write", routing::post(influxdb_write_v2))
|
||||
.layer(
|
||||
ServiceBuilder::new()
|
||||
.layer(HandleErrorLayer::new(handle_error))
|
||||
.layer(RequestDecompressionLayer::new()),
|
||||
)
|
||||
.route("/ping", routing::get(influxdb_ping))
|
||||
.route("/health", routing::get(influxdb_health))
|
||||
.with_state(influxdb_handler)
|
||||
|
||||
@@ -35,7 +35,7 @@ use snafu::{ensure, OptionExt, ResultExt};
|
||||
use super::header::{GreptimeDbName, GREPTIME_TIMEZONE_HEADER_NAME};
|
||||
use super::{ResponseFormat, PUBLIC_APIS};
|
||||
use crate::error::{
|
||||
self, InvalidAuthHeaderInvisibleASCIISnafu, InvalidAuthHeaderSnafu, InvalidParameterSnafu,
|
||||
self, InvalidAuthorizationHeaderSnafu, InvalidParameterSnafu, InvisibleASCIISnafu,
|
||||
NotFoundInfluxAuthSnafu, Result, UnsupportedAuthSchemeSnafu, UrlDecodeSnafu,
|
||||
};
|
||||
use crate::http::error_result::ErrorResponse;
|
||||
@@ -174,13 +174,15 @@ fn get_influxdb_credentials<B>(request: &Request<B>) -> Result<Option<(Username,
|
||||
// try header
|
||||
let (auth_scheme, credential) = header
|
||||
.to_str()
|
||||
.context(InvalidAuthHeaderInvisibleASCIISnafu)?
|
||||
.context(InvisibleASCIISnafu)?
|
||||
.split_once(' ')
|
||||
.context(InvalidAuthHeaderSnafu)?;
|
||||
.context(InvalidAuthorizationHeaderSnafu)?;
|
||||
|
||||
let (username, password) = match auth_scheme.to_lowercase().as_str() {
|
||||
"token" => {
|
||||
let (u, p) = credential.split_once(':').context(InvalidAuthHeaderSnafu)?;
|
||||
let (u, p) = credential
|
||||
.split_once(':')
|
||||
.context(InvalidAuthorizationHeaderSnafu)?;
|
||||
(u.to_string(), p.to_string().into())
|
||||
}
|
||||
"basic" => decode_basic(credential)?,
|
||||
@@ -235,10 +237,13 @@ impl TryFrom<&str> for AuthScheme {
|
||||
type Error = error::Error;
|
||||
|
||||
fn try_from(value: &str) -> Result<Self> {
|
||||
let (scheme, encoded_credentials) =
|
||||
value.split_once(' ').context(InvalidAuthHeaderSnafu)?;
|
||||
|
||||
ensure!(!encoded_credentials.contains(' '), InvalidAuthHeaderSnafu);
|
||||
let (scheme, encoded_credentials) = value
|
||||
.split_once(' ')
|
||||
.context(InvalidAuthorizationHeaderSnafu)?;
|
||||
ensure!(
|
||||
!encoded_credentials.contains(' '),
|
||||
InvalidAuthorizationHeaderSnafu
|
||||
);
|
||||
|
||||
match scheme.to_lowercase().as_str() {
|
||||
"basic" => decode_basic(encoded_credentials)
|
||||
@@ -256,7 +261,7 @@ fn auth_header<B>(req: &Request<B>) -> Result<AuthScheme> {
|
||||
.get(http::header::AUTHORIZATION)
|
||||
.context(error::NotFoundAuthHeaderSnafu)?
|
||||
.to_str()
|
||||
.context(InvalidAuthHeaderInvisibleASCIISnafu)?;
|
||||
.context(InvisibleASCIISnafu)?;
|
||||
|
||||
auth_header.try_into()
|
||||
}
|
||||
@@ -265,14 +270,13 @@ fn decode_basic(credential: Credential) -> Result<(Username, Password)> {
|
||||
let decoded = BASE64_STANDARD
|
||||
.decode(credential)
|
||||
.context(error::InvalidBase64ValueSnafu)?;
|
||||
let as_utf8 =
|
||||
String::from_utf8(decoded).context(error::InvalidAuthHeaderInvalidUtf8ValueSnafu)?;
|
||||
let as_utf8 = String::from_utf8(decoded).context(error::InvalidUtf8ValueSnafu)?;
|
||||
|
||||
if let Some((user_id, password)) = as_utf8.split_once(':') {
|
||||
return Ok((user_id.to_string(), password.to_string().into()));
|
||||
}
|
||||
|
||||
InvalidAuthHeaderSnafu {}.fail()
|
||||
InvalidAuthorizationHeaderSnafu {}.fail()
|
||||
}
|
||||
|
||||
fn need_auth<B>(req: &Request<B>) -> bool {
|
||||
@@ -391,7 +395,10 @@ mod tests {
|
||||
|
||||
let wrong_req = mock_http_request(Some("Basic dXNlcm5hbWU6 cGFzc3dvcmQ="), None).unwrap();
|
||||
let res = auth_header(&wrong_req);
|
||||
assert_matches!(res.err(), Some(error::Error::InvalidAuthHeader { .. }));
|
||||
assert_matches!(
|
||||
res.err(),
|
||||
Some(error::Error::InvalidAuthorizationHeader { .. })
|
||||
);
|
||||
|
||||
let wrong_req = mock_http_request(Some("Digest dXNlcm5hbWU6cGFzc3dvcmQ="), None).unwrap();
|
||||
let res = auth_header(&wrong_req);
|
||||
|
||||
@@ -21,7 +21,7 @@ use common_telemetry::logging::{debug, error};
|
||||
use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::http::header::constants::GREPTIME_DB_HEADER_ERROR_CODE;
|
||||
use crate::http::header::constants::{GREPTIME_DB_HEADER_ERROR_CODE, GREPTIME_DB_HEADER_ERROR_MSG};
|
||||
use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
|
||||
use crate::http::ResponseFormat;
|
||||
|
||||
@@ -78,10 +78,15 @@ impl IntoResponse for ErrorResponse {
|
||||
fn into_response(self) -> Response {
|
||||
let ty = self.ty.as_str();
|
||||
let code = self.code;
|
||||
let msg = self.error.clone();
|
||||
let execution_time = self.execution_time_ms;
|
||||
let mut resp = Json(self).into_response();
|
||||
resp.headers_mut()
|
||||
.insert(GREPTIME_DB_HEADER_ERROR_CODE, HeaderValue::from(code));
|
||||
resp.headers_mut().insert(
|
||||
GREPTIME_DB_HEADER_ERROR_MSG,
|
||||
HeaderValue::from_str(&msg).expect("malformed error msg"),
|
||||
);
|
||||
resp.headers_mut()
|
||||
.insert(&GREPTIME_DB_HEADER_FORMAT, HeaderValue::from_static(ty));
|
||||
resp.headers_mut().insert(
|
||||
|
||||
@@ -36,6 +36,7 @@ pub mod constants {
|
||||
pub const GREPTIME_DB_HEADER_NAME: &str = "x-greptime-db-name";
|
||||
pub const GREPTIME_TIMEZONE_HEADER_NAME: &str = "x-greptime-timezone";
|
||||
pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = common_error::GREPTIME_DB_HEADER_ERROR_CODE;
|
||||
pub const GREPTIME_DB_HEADER_ERROR_MSG: &str = common_error::GREPTIME_DB_HEADER_ERROR_MSG;
|
||||
}
|
||||
|
||||
pub static GREPTIME_DB_HEADER_FORMAT: HeaderName =
|
||||
|
||||
Reference in New Issue
Block a user