Compare commits

..

17 Commits

Author SHA1 Message Date
Zhenchi
f66803622d chore: bump version to 0.14.3
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-05-23 20:23:23 +08:00
Ruihang Xia
e7774437b8 fix: require input ordering in series divide plan (#6148)
* require input ordering in series divide plan

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add sqlness case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* finilise

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-05-23 20:23:23 +08:00
Ruihang Xia
c272b25456 feat: support altering multiple logical table in one remote write request (#6137)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-05-23 20:23:23 +08:00
discord9
724b802018 chore: invalid table flow mapping cache (#6135)
* chore: invalid table flow mapping

* chore: exists

* fix: invalid all related keys in kv cache when drop flow&refactor: per review

* fix: flow not found status code

* chore: rm unused error code

* chore: stuff

* chore: unused
2025-05-23 20:23:23 +08:00
Ruihang Xia
f3ca5f5d7f feat: accommodate default column name with pre-created table schema (#6126)
* refactor: prepare_mocked_backend

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* modify request in place

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* apply to influx line protocol

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* return on empty alter expr list

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* expose to other write paths

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-05-23 20:23:23 +08:00
Ruihang Xia
6c672b96bf fix: update promql-parser for regex anchor fix (#6117)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-05-23 20:23:23 +08:00
discord9
83018d6670 fix: flow update use proper update (#6108)
* fix: flow update use proper update

* refactor: per review

* fix: flow cache

* chore: per copilot review

* refactor: rm flow node id

* refactor: per review

* chore: per review

* refactor: per review

* chore: per review
2025-05-23 20:23:23 +08:00
discord9
69f1cbd484 fix(flow): flow task run interval (#6100)
* fix: always check for shutdown signal in flow
chore: correct log msg for flows that shouldn't exist
feat: use time window size/2 as sleep interval

* chore: better slower query refresh time

* chore

* refactor: per review
2025-05-23 20:23:23 +08:00
discord9
e1dad69648 fix: flownode chose fe randomly&not starve lock (#6077)
* fix: choose frontend randomly

* docs: update comment

* chore: more logs

* fix: ignore inserts until recovering flow is done

* chore: resolve TODO

* fix: rm unused code&set done in correct location

* refactor: speed up create flow
2025-05-23 20:23:23 +08:00
Ruihang Xia
6c976bc737 feat: don't hide atomic write dir (#6109)
* feat: don't hidden atomic write dir

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* compatible code

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/mito2/src/access_layer.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
2025-05-23 20:23:23 +08:00
jeremyhi
b20c1ac797 chore: reduce unnecessary txns in alter operations (#6133) 2025-05-23 20:23:23 +08:00
Yingwen
d7cfb741a5 fix: clean files under the atomic write dir on failure (#6112)
* fix: remove files under atomic dir on failure

* fix: clean atomic dir on download failure

* chore: update comment

* fix: clean if failed to write without write cache

* feat: add a TempFileCleaner to clean files on failure

* chore: after merge fix

* chore: more fix

---------

Co-authored-by: discord9 <55937128+discord9@users.noreply.github.com>
Co-authored-by: discord9 <discord9@163.com>
2025-05-23 20:23:23 +08:00
Weny Xu
1b3efef15c fix: append noop entry when auto topic creation is disabled (#6092)
* feat: improve topic management and add stale records cleanup

* fix: fix unit tests

* chore: apply suggestions from CR

* chore: apply suggestions from CR
2025-05-23 20:23:23 +08:00
Yingwen
1ca2dbd240 fix: reset tags when creating an empty metric in prom call (#6056)
* Revert "chore: remove debug logs"

This reverts commit f73f3a7373c83db974d8ed80cb47f5f87317b490.

* chore: more logs

* fix: reset tags and fields

* test: add binary time fn test

* chore: remove logs

* test: sort result
2025-05-23 20:23:23 +08:00
Ning Sun
d596dba240 fix: ident value in set search_path (#6153)
* fix: ident value in set search_path

* refactor: remove unneeded clone
2025-05-23 20:23:23 +08:00
discord9
5c9cbb5f4c chore: bump version to 0.14.2 (#6032)
* chore: only retry when retry-able in flow (#5987)

* chore: only retry when retry-able

* chore: revert dbg change

* refactor: per review

* fix: check for available frontend first

* docs: more explain&longer timeout&feat: more retry at every level&try send select 1

* fix: use `sql` method for "SELECT 1"

* fix: also put recover flows in spawned task and a dead loop

* test: update transient error in flow rebuild test

* chore: sleep after sqlness sleep

* chore: add a warning

* chore: wait even more time after reboot

* fix: sanitize_connection_string (#6012)

* fix: disable recursion limit in prost (#6010)

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* ci: fix the bugs of release-dev-builder-images and add update-dev-builder-image-tag (#6009)

* fix: the dev-builder release job is not triggered by merged event

* ci: add update-dev-builder-image-tag

* fix: always create mito engine (#6018)

* fix: force streaming mode for instant source table (#6031)

* fix: force streaming mode for instant source table

* tests: sqlness test&refactor: get table

* refactor: per review

* chore: bump version to 0.14.2

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: jeremyhi <jiachun_feng@proton.me>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: zyy17 <zyylsxm@gmail.com>
Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
2025-05-01 09:20:01 -07:00
Zhenchi
e2df38d0d1 chore: bump version to 0.14.1 (#6006)
* feat: remove own greatest fn (#5994)

* fix: prune primary key with multiple columns may use default value as statistics (#5996)

* test: incorrect test result when filtering pk with multiple columns

* fix: prune non first tag correctly

Distinguish no column and no stats and only use default value when no
column

* test: update test result

* refactor: rename test file

* test: add test for null filter

* fix: use StatValues for null counts

* test: drop table

* test: fix unstable flow test

* fix: check if memtable is empty by stats (#5989)

fix/checking-memtable-empty-and-stats:
 - **Refactor timestamp updates**: Simplified timestamp range updates in `PartitionTreeMemtable` and `TimeSeriesMemtable` by replacing `update_timestamp_range` with `fetch_max` and `fetch_min` methods for `max_timestamp` and `min_timestamp`.
   - Affected files: `partition_tree.rs`, `time_series.rs`

 - **Remove unused code**: Deleted the `update_timestamp_range` method from `WriteMetrics` and removed unnecessary imports.
   - Affected file: `stats.rs`

 - **Optimize memtable filtering**: Streamlined the check for empty memtables in `ScanRegion` by directly using `time_range`.
   - Affected file: `scan_region.rs`

* chore: bump version to 0.14.1

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
Co-authored-by: dennis zhuang <killme2008@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
2025-04-28 07:39:49 +00:00
316 changed files with 5395 additions and 8382 deletions

View File

@@ -10,17 +10,17 @@ set -e
function create_version() {
# Read from envrionment variables.
if [ -z "$GITHUB_EVENT_NAME" ]; then
echo "GITHUB_EVENT_NAME is empty" >&2
echo "GITHUB_EVENT_NAME is empty"
exit 1
fi
if [ -z "$NEXT_RELEASE_VERSION" ]; then
echo "NEXT_RELEASE_VERSION is empty, use version from Cargo.toml" >&2
export NEXT_RELEASE_VERSION=$(grep '^version = ' Cargo.toml | cut -d '"' -f 2 | head -n 1)
echo "NEXT_RELEASE_VERSION is empty"
exit 1
fi
if [ -z "$NIGHTLY_RELEASE_PREFIX" ]; then
echo "NIGHTLY_RELEASE_PREFIX is empty" >&2
echo "NIGHTLY_RELEASE_PREFIX is empty"
exit 1
fi
@@ -35,7 +35,7 @@ function create_version() {
# It will be like 'dev-2023080819-f0e7216c'.
if [ "$NEXT_RELEASE_VERSION" = dev ]; then
if [ -z "$COMMIT_SHA" ]; then
echo "COMMIT_SHA is empty in dev build" >&2
echo "COMMIT_SHA is empty in dev build"
exit 1
fi
echo "dev-$(date "+%Y%m%d-%s")-$(echo "$COMMIT_SHA" | cut -c1-8)"
@@ -45,7 +45,7 @@ function create_version() {
# Note: Only output 'version=xxx' to stdout when everything is ok, so that it can be used in GitHub Actions Outputs.
if [ "$GITHUB_EVENT_NAME" = push ]; then
if [ -z "$GITHUB_REF_NAME" ]; then
echo "GITHUB_REF_NAME is empty in push event" >&2
echo "GITHUB_REF_NAME is empty in push event"
exit 1
fi
echo "$GITHUB_REF_NAME"
@@ -54,7 +54,7 @@ function create_version() {
elif [ "$GITHUB_EVENT_NAME" = schedule ]; then
echo "$NEXT_RELEASE_VERSION-$NIGHTLY_RELEASE_PREFIX-$(date "+%Y%m%d")"
else
echo "Unsupported GITHUB_EVENT_NAME: $GITHUB_EVENT_NAME" >&2
echo "Unsupported GITHUB_EVENT_NAME: $GITHUB_EVENT_NAME"
exit 1
fi
}

View File

@@ -22,7 +22,6 @@ concurrency:
jobs:
check-typos-and-docs:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Check typos and docs
runs-on: ubuntu-latest
steps:
@@ -37,7 +36,6 @@ jobs:
|| (echo "'config/config.md' is not up-to-date, please run 'make config-docs'." && exit 1)
license-header-check:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
runs-on: ubuntu-latest
name: Check License Header
steps:
@@ -47,7 +45,6 @@ jobs:
- uses: korandoru/hawkeye@v5
check:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Check
runs-on: ${{ matrix.os }}
strategy:
@@ -74,7 +71,6 @@ jobs:
run: cargo check --locked --workspace --all-targets
toml:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Toml Check
runs-on: ubuntu-latest
timeout-minutes: 60
@@ -89,7 +85,6 @@ jobs:
run: taplo format --check
build:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Build GreptimeDB binaries
runs-on: ${{ matrix.os }}
strategy:
@@ -132,7 +127,6 @@ jobs:
version: current
fuzztest:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Fuzz Test
needs: build
runs-on: ubuntu-latest
@@ -189,7 +183,6 @@ jobs:
max-total-time: 120
unstable-fuzztest:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Unstable Fuzz Test
needs: build-greptime-ci
runs-on: ubuntu-latest
@@ -251,7 +244,6 @@ jobs:
retention-days: 3
build-greptime-ci:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Build GreptimeDB binary (profile-CI)
runs-on: ${{ matrix.os }}
strategy:
@@ -293,7 +285,6 @@ jobs:
version: current
distributed-fuzztest:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Fuzz Test (Distributed, ${{ matrix.mode.name }}, ${{ matrix.target }})
runs-on: ubuntu-latest
needs: build-greptime-ci
@@ -425,7 +416,6 @@ jobs:
docker system prune -f
distributed-fuzztest-with-chaos:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Fuzz Test with Chaos (Distributed, ${{ matrix.mode.name }}, ${{ matrix.target }})
runs-on: ubuntu-latest
needs: build-greptime-ci
@@ -573,7 +563,6 @@ jobs:
docker system prune -f
sqlness:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Sqlness Test (${{ matrix.mode.name }})
needs: build
runs-on: ${{ matrix.os }}
@@ -620,7 +609,6 @@ jobs:
retention-days: 3
fmt:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Rustfmt
runs-on: ubuntu-latest
timeout-minutes: 60
@@ -638,7 +626,6 @@ jobs:
run: make fmt-check
clippy:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Clippy
runs-on: ubuntu-latest
timeout-minutes: 60
@@ -664,7 +651,6 @@ jobs:
run: make clippy
conflict-check:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Check for conflict
runs-on: ubuntu-latest
steps:
@@ -675,7 +661,7 @@ jobs:
uses: olivernybroe/action-conflict-finder@v4.0
test:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' && github.event_name != 'merge_group' }}
if: github.event_name != 'merge_group'
runs-on: ubuntu-22.04-arm
timeout-minutes: 60
needs: [conflict-check, clippy, fmt]
@@ -727,7 +713,7 @@ jobs:
UNITTEST_LOG_DIR: "__unittest_logs"
coverage:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' && github.event_name == 'merge_group' }}
if: github.event_name == 'merge_group'
runs-on: ubuntu-22.04-8-cores
timeout-minutes: 60
steps:
@@ -787,7 +773,6 @@ jobs:
verbose: true
# compat:
# if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
# name: Compatibility Test
# needs: build
# runs-on: ubuntu-22.04

View File

@@ -117,16 +117,16 @@ jobs:
name: Run clean build on Linux
runs-on: ubuntu-latest
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
timeout-minutes: 45
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
persist-credentials: false
- uses: cachix/install-nix-action@v31
- uses: cachix/install-nix-action@v27
with:
nix_path: nixpkgs=channel:nixos-24.11
- run: nix develop --command cargo build --bin greptime
- run: nix develop --command cargo build
check-status:
name: Check status

View File

@@ -90,6 +90,8 @@ env:
# The scheduled version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-YYYYMMDD', like v0.2.0-nigthly-20230313;
NIGHTLY_RELEASE_PREFIX: nightly
# Note: The NEXT_RELEASE_VERSION should be modified manually by every formal release.
NEXT_RELEASE_VERSION: v0.14.0
jobs:
allocate-runners:
@@ -133,6 +135,7 @@ jobs:
env:
GITHUB_EVENT_NAME: ${{ github.event_name }}
GITHUB_REF_NAME: ${{ github.ref_name }}
NEXT_RELEASE_VERSION: ${{ env.NEXT_RELEASE_VERSION }}
NIGHTLY_RELEASE_PREFIX: ${{ env.NIGHTLY_RELEASE_PREFIX }}
- name: Allocate linux-amd64 runner
@@ -464,29 +467,6 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
DOCS_REPO_TOKEN: ${{ secrets.DOCS_REPO_TOKEN }}
bump-website-version:
name: Bump website version
if: ${{ github.event_name == 'push' || github.event_name == 'schedule' }}
needs: [allocate-runners]
runs-on: ubuntu-latest
# Permission reference: https://docs.github.com/en/actions/using-jobs/assigning-permissions-to-jobs
permissions:
issues: write # Allows the action to create issues for cyborg.
contents: write # Allows the action to create a release.
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
persist-credentials: false
- uses: ./.github/actions/setup-cyborg
- name: Bump website version
working-directory: cyborg
run: pnpm tsx bin/bump-website-version.ts
env:
VERSION: ${{ needs.allocate-runners.outputs.version }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
WEBSITE_REPO_TOKEN: ${{ secrets.WEBSITE_REPO_TOKEN }}
notification:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' && (github.event_name == 'push' || github.event_name == 'schedule') && always() }}
name: Send notification to Greptime team

1
.gitignore vendored
View File

@@ -28,7 +28,6 @@ debug/
# Logs
**/__unittest_logs
logs/
!grafana/dashboards/logs/
# cpython's generated python byte code
**/__pycache__/

970
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -68,16 +68,15 @@ members = [
resolver = "2"
[workspace.package]
version = "0.15.0"
version = "0.14.3"
edition = "2021"
license = "Apache-2.0"
[workspace.lints]
clippy.print_stdout = "warn"
clippy.print_stderr = "warn"
clippy.dbg_macro = "warn"
clippy.implicit_clone = "warn"
clippy.result_large_err = "allow"
clippy.large_enum_variant = "allow"
clippy.doc_overindented_list_items = "allow"
rust.unknown_lints = "deny"
rust.unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
@@ -130,7 +129,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "17a3550751c8b1e02ec16be40101d5f24dc255c3" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "4d4136692fe7fbbd509ebc8c902f6afcc0ce61e4" }
hex = "0.4"
http = "1"
humantime = "2.1"
@@ -162,7 +161,9 @@ parquet = { version = "54.2", default-features = false, features = ["arrow", "as
paste = "1.0"
pin-project = "1.0"
prometheus = { version = "0.13.3", features = ["process"] }
promql-parser = { version = "0.5.1", features = ["ser"] }
promql-parser = { git = "https://github.com/GreptimeTeam/promql-parser.git", rev = "0410e8b459dda7cb222ce9596f8bf3971bd07bd2", features = [
"ser",
] }
prost = { version = "0.13", features = ["no-recursion-limit"] }
raft-engine = { version = "0.4.1", default-features = false }
rand = "0.9"

View File

@@ -8,7 +8,7 @@ CARGO_BUILD_OPTS := --locked
IMAGE_REGISTRY ?= docker.io
IMAGE_NAMESPACE ?= greptime
IMAGE_TAG ?= latest
DEV_BUILDER_IMAGE_TAG ?= 2025-04-15-1a517ec8-20250428023155
DEV_BUILDER_IMAGE_TAG ?= 2024-12-25-a71b93dd-20250305072908
BUILDX_MULTI_PLATFORM_BUILD ?= false
BUILDX_BUILDER_NAME ?= gtbuilder
BASE_IMAGE ?= ubuntu

195
README.md
View File

@@ -8,8 +8,6 @@
<h2 align="center">Real-Time & Cloud-Native Observability Database<br/>for metrics, logs, and traces</h2>
> Delivers sub-second querying at PB scale and exceptional cost efficiency from edge to cloud.
<div align="center">
<h3 align="center">
<a href="https://greptime.com/product/cloud">GreptimeCloud</a> |
@@ -51,77 +49,74 @@
</div>
- [Introduction](#introduction)
- [⭐ Key Features](#features)
- [Quick Comparison](#quick-comparison)
- [Architecture](#architecture)
- [Try GreptimeDB](#try-greptimedb)
- [**Features: Why GreptimeDB**](#why-greptimedb)
- [Architecture](https://docs.greptime.com/contributor-guide/overview/#architecture)
- [Try it for free](#try-greptimedb)
- [Getting Started](#getting-started)
- [Build From Source](#build-from-source)
- [Tools & Extensions](#tools--extensions)
- [Project Status](#project-status)
- [Community](#community)
- [Join the community](#community)
- [Contributing](#contributing)
- [Tools & Extensions](#tools--extensions)
- [License](#license)
- [Commercial Support](#commercial-support)
- [Contributing](#contributing)
- [Acknowledgement](#acknowledgement)
## Introduction
**GreptimeDB** is an open-source, cloud-native database purpose-built for the unified collection and analysis of observability data (metrics, logs, and traces). Whether youre operating on the edge, in the cloud, or across hybrid environments, GreptimeDB empowers real-time insights at massive scale — all in one system.
**GreptimeDB** is an open-source, cloud-native, unified & cost-effective observability database for **Metrics**, **Logs**, and **Traces**. You can gain real-time insights from Edge to Cloud at Any Scale.
## Features
## News
| Feature | Description |
| --------- | ----------- |
| [Unified Observability Data](https://docs.greptime.com/user-guide/concepts/why-greptimedb) | Store metrics, logs, and traces as timestamped, contextual wide events. Query via [SQL](https://docs.greptime.com/user-guide/query-data/sql), [PromQL](https://docs.greptime.com/user-guide/query-data/promql), and [streaming](https://docs.greptime.com/user-guide/flow-computation/overview). |
| [High Performance & Cost Effective](https://docs.greptime.com/user-guide/manage-data/data-index) | Written in Rust, with a distributed query engine, [rich indexing](https://docs.greptime.com/user-guide/manage-data/data-index), and optimized columnar storage, delivering sub-second responses at PB scale. |
| [Cloud-Native Architecture](https://docs.greptime.com/user-guide/concepts/architecture) | Designed for [Kubernetes](https://docs.greptime.com/user-guide/deployments/deploy-on-kubernetes/greptimedb-operator-management), with compute/storage separation, native object storage (AWS S3, Azure Blob, etc.) and seamless cross-cloud access. |
| [Developer-Friendly](https://docs.greptime.com/user-guide/protocols/overview) | Access via SQL/PromQL interfaces, REST API, MySQL/PostgreSQL protocols, and popular ingestion [protocols](https://docs.greptime.com/user-guide/protocols/overview). |
| [Flexible Deployment](https://docs.greptime.com/user-guide/deployments/overview) | Deploy anywhere: edge (including ARM/[Android](https://docs.greptime.com/user-guide/deployments/run-on-android)) or cloud, with unified APIs and efficient data sync. |
**[GreptimeDB tops JSONBench's billion-record cold run test!](https://greptime.com/blogs/2025-03-18-jsonbench-greptimedb-performance)**
Learn more in [Why GreptimeDB](https://docs.greptime.com/user-guide/concepts/why-greptimedb) and [Observability 2.0 and the Database for It](https://greptime.com/blogs/2025-04-25-greptimedb-observability2-new-database).
## Why GreptimeDB
## Quick Comparison
Our core developers have been building observability data platforms for years. Based on our best practices, GreptimeDB was born to give you:
| Feature | GreptimeDB | Traditional TSDB | Log Stores |
|----------------------------------|-----------------------|--------------------|-----------------|
| Data Types | Metrics, Logs, Traces | Metrics only | Logs only |
| Query Language | SQL, PromQL, Streaming| Custom/PromQL | Custom/DSL |
| Deployment | Edge + Cloud | Cloud/On-prem | Mostly central |
| Indexing & Performance | PB-Scale, Sub-second | Varies | Varies |
| Integration | REST, SQL, Common protocols | Varies | Varies |
* **Unified Processing of Observability Data**
**Performance:**
* [GreptimeDB tops JSONBench's billion-record cold run test!](https://greptime.com/blogs/2025-03-18-jsonbench-greptimedb-performance)
* [TSBS Benchmark](https://github.com/GreptimeTeam/greptimedb/tree/main/docs/benchmarks/tsbs)
A unified database that treats metrics, logs, and traces as timestamped wide events with context, supporting [SQL](https://docs.greptime.com/user-guide/query-data/sql)/[PromQL](https://docs.greptime.com/user-guide/query-data/promql) queries and [stream processing](https://docs.greptime.com/user-guide/flow-computation/overview) to simplify complex data stacks.
Read [more benchmark reports](https://docs.greptime.com/user-guide/concepts/features-that-you-concern#how-is-greptimedbs-performance-compared-to-other-solutions).
* **High Performance and Cost-effective**
## Architecture
Written in Rust, combines a distributed query engine with [rich indexing](https://docs.greptime.com/user-guide/manage-data/data-index) (inverted, fulltext, skip data, and vector) and optimized columnar storage to deliver sub-second responses on petabyte-scale data and high-cost efficiency.
* Read the [architecture](https://docs.greptime.com/contributor-guide/overview/#architecture) document.
* [DeepWiki](https://deepwiki.com/GreptimeTeam/greptimedb/1-overview) provides an in-depth look at GreptimeDB:
<img alt="GreptimeDB System Overview" src="docs/architecture.png">
* **Cloud-native Distributed Database**
Built for [Kubernetes](https://docs.greptime.com/user-guide/deployments/deploy-on-kubernetes/greptimedb-operator-management). GreptimeDB achieves seamless scalability with its [cloud-native architecture](https://docs.greptime.com/user-guide/concepts/architecture) of separated compute and storage, built on object storage (AWS S3, Azure Blob Storage, etc.) while enabling cross-cloud deployment through a unified data access layer.
* **Developer-Friendly**
Access standardized SQL/PromQL interfaces through built-in web dashboard, REST API, and MySQL/PostgreSQL protocols. Supports widely adopted data ingestion [protocols](https://docs.greptime.com/user-guide/protocols/overview) for seamless migration and integration.
* **Flexible Deployment Options**
Deploy GreptimeDB anywhere from ARM-based edge devices to cloud environments with unified APIs and bandwidth-efficient data synchronization. Query edge and cloud data seamlessly through identical APIs. [Learn how to run on Android](https://docs.greptime.com/user-guide/deployments/run-on-android/).
For more detailed info please read [Why GreptimeDB](https://docs.greptime.com/user-guide/concepts/why-greptimedb).
## Try GreptimeDB
### 1. [Live Demo](https://greptime.com/playground)
Experience GreptimeDB directly in your browser.
Try out the features of GreptimeDB right from your browser.
### 2. [GreptimeCloud](https://console.greptime.cloud/)
Start instantly with a free cluster.
### 3. Docker (Local Quickstart)
### 3. Docker Image
To install GreptimeDB locally, the recommended way is via Docker:
```shell
docker pull greptime/greptimedb
```
Start a GreptimeDB container with:
```shell
docker run -p 127.0.0.1:4000-4003:4000-4003 \
-v "$(pwd)/greptimedb:/greptimedb_data" \
-v "$(pwd)/greptimedb:./greptimedb_data" \
--name greptime --rm \
greptime/greptimedb:latest standalone start \
--http-addr 0.0.0.0:4000 \
@@ -129,90 +124,114 @@ docker run -p 127.0.0.1:4000-4003:4000-4003 \
--mysql-addr 0.0.0.0:4002 \
--postgres-addr 0.0.0.0:4003
```
Dashboard: [http://localhost:4000/dashboard](http://localhost:4000/dashboard)
[Full Install Guide](https://docs.greptime.com/getting-started/installation/overview)
**Troubleshooting:**
* Cannot connect to the database? Ensure that ports `4000`, `4001`, `4002`, and `4003` are not blocked by a firewall or used by other services.
* Failed to start? Check the container logs with `docker logs greptime` for further details.
Access the dashboard via `http://localhost:4000/dashboard`.
Read more about [Installation](https://docs.greptime.com/getting-started/installation/overview) on docs.
## Getting Started
- [Quickstart](https://docs.greptime.com/getting-started/quick-start)
- [User Guide](https://docs.greptime.com/user-guide/overview)
- [Demo Scenes](https://github.com/GreptimeTeam/demo-scene)
- [FAQ](https://docs.greptime.com/faq-and-others/faq)
* [Quickstart](https://docs.greptime.com/getting-started/quick-start)
* [User Guide](https://docs.greptime.com/user-guide/overview)
* [Demos](https://github.com/GreptimeTeam/demo-scene)
* [FAQ](https://docs.greptime.com/faq-and-others/faq)
## Build From Source
## Build
Check the prerequisite:
**Prerequisites:**
* [Rust toolchain](https://www.rust-lang.org/tools/install) (nightly)
* [Protobuf compiler](https://grpc.io/docs/protoc-installation/) (>= 3.15)
* C/C++ building essentials, including `gcc`/`g++`/`autoconf` and glibc library (eg. `libc6-dev` on Ubuntu and `glibc-devel` on Fedora)
* Python toolchain (optional): Required only if using some test scripts.
**Build and Run:**
```bash
Build GreptimeDB binary:
```shell
make
```
Run a standalone server:
```shell
cargo run -- standalone start
```
## Tools & Extensions
- **Kubernetes:** [GreptimeDB Operator](https://github.com/GrepTimeTeam/greptimedb-operator)
- **Helm Charts:** [Greptime Helm Charts](https://github.com/GreptimeTeam/helm-charts)
- **Dashboard:** [Web UI](https://github.com/GreptimeTeam/dashboard)
- **SDKs/Ingester:** [Go](https://github.com/GreptimeTeam/greptimedb-ingester-go), [Java](https://github.com/GreptimeTeam/greptimedb-ingester-java), [C++](https://github.com/GreptimeTeam/greptimedb-ingester-cpp), [Erlang](https://github.com/GreptimeTeam/greptimedb-ingester-erl), [Rust](https://github.com/GreptimeTeam/greptimedb-ingester-rust), [JS](https://github.com/GreptimeTeam/greptimedb-ingester-js)
- **Grafana**: [Official Dashboard](https://github.com/GreptimeTeam/greptimedb/blob/main/grafana/README.md)
### Kubernetes
- [GreptimeDB Operator](https://github.com/GrepTimeTeam/greptimedb-operator)
### Dashboard
- [The dashboard UI for GreptimeDB](https://github.com/GreptimeTeam/dashboard)
### SDK
- [GreptimeDB Go Ingester](https://github.com/GreptimeTeam/greptimedb-ingester-go)
- [GreptimeDB Java Ingester](https://github.com/GreptimeTeam/greptimedb-ingester-java)
- [GreptimeDB C++ Ingester](https://github.com/GreptimeTeam/greptimedb-ingester-cpp)
- [GreptimeDB Erlang Ingester](https://github.com/GreptimeTeam/greptimedb-ingester-erl)
- [GreptimeDB Rust Ingester](https://github.com/GreptimeTeam/greptimedb-ingester-rust)
- [GreptimeDB JavaScript Ingester](https://github.com/GreptimeTeam/greptimedb-ingester-js)
### Grafana Dashboard
Our official Grafana dashboard for monitoring GreptimeDB is available at [grafana](grafana/README.md) directory.
## Project Status
> **Status:** Beta.
> **GA (v1.0):** Targeted for mid 2025.
GreptimeDB is currently in Beta. We are targeting GA (General Availability) with v1.0 release by Early 2025.
- Being used in production by early adopters
- Stable, actively maintained, with regular releases ([version info](https://docs.greptime.com/nightly/reference/about-greptimedb-version))
- Suitable for evaluation and pilot deployments
While in Beta, GreptimeDB is already:
* Being used in production by early adopters
* Actively maintained with regular releases, [about version number](https://docs.greptime.com/nightly/reference/about-greptimedb-version)
* Suitable for testing and evaluation
For production use, we recommend using the latest stable release.
[![Star History Chart](https://api.star-history.com/svg?repos=GreptimeTeam/GreptimeDB&type=Date)](https://www.star-history.com/#GreptimeTeam/GreptimeDB&Date)
If you find this project useful, a ⭐ would mean a lot to us!
<img alt="Known Users" src="https://greptime.com/logo/img/users.png"/>
## Community
We invite you to engage and contribute!
Our core team is thrilled to see you participate in any ways you like. When you are stuck, try to
ask for help by filling an issue with a detailed description of what you were trying to do
and what went wrong. If you have any questions or if you would like to get involved in our
community, please check out:
- [Slack](https://greptime.com/slack)
- [Discussions](https://github.com/GreptimeTeam/greptimedb/discussions)
- [Official Website](https://greptime.com/)
- [Blog](https://greptime.com/blogs/)
- [LinkedIn](https://www.linkedin.com/company/greptime/)
- [Twitter](https://twitter.com/greptime)
- GreptimeDB Community on [Slack](https://greptime.com/slack)
- GreptimeDB [GitHub Discussions forum](https://github.com/GreptimeTeam/greptimedb/discussions)
- Greptime official [website](https://greptime.com)
## License
In addition, you may:
GreptimeDB is licensed under the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0.txt).
- View our official [Blog](https://greptime.com/blogs/)
- Connect us with [Linkedin](https://www.linkedin.com/company/greptime/)
- Follow us on [Twitter](https://twitter.com/greptime)
## Commercial Support
Running GreptimeDB in your organization?
We offer enterprise add-ons, services, training, and consulting.
[Contact us](https://greptime.com/contactus) for details.
If you are running GreptimeDB OSS in your organization, we offer additional
enterprise add-ons, installation services, training, and consulting. [Contact
us](https://greptime.com/contactus) and we will reach out to you with more
detail of our commercial license.
## License
GreptimeDB uses the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0.txt) to strike a balance between
open contributions and allowing you to use the software however you want.
## Contributing
- Read our [Contribution Guidelines](https://github.com/GreptimeTeam/greptimedb/blob/main/CONTRIBUTING.md).
- Explore [Internal Concepts](https://docs.greptime.com/contributor-guide/overview.html) and [DeepWiki](https://deepwiki.com/GreptimeTeam/greptimedb).
- Pick up a [good first issue](https://github.com/GreptimeTeam/greptimedb/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+first+issue%22) and join the #contributors [Slack](https://greptime.com/slack) channel.
Please refer to [contribution guidelines](CONTRIBUTING.md) and [internal concepts docs](https://docs.greptime.com/contributor-guide/overview.html) for more information.
## Acknowledgement
Special thanks to all contributors! See [AUTHORS.md](https://github.com/GreptimeTeam/greptimedb/blob/main/AUTHOR.md).
Special thanks to all the contributors who have propelled GreptimeDB forward. For a complete list of contributors, please refer to [AUTHOR.md](AUTHOR.md).
- Uses [Apache Arrow™](https://arrow.apache.org/) (memory model)
- [Apache Parquet](https://parquet.apache.org/) (file storage)
- [Apache Arrow DataFusion](https://arrow.apache.org/datafusion/) (query engine)
- [Apache OpenDAL™](https://opendal.apache.org/) (data access abstraction)
- [etcd](https://etcd.io/) (meta service)
- GreptimeDB uses [Apache Arrow™](https://arrow.apache.org/) as the memory model and [Apache Parquet™](https://parquet.apache.org/) as the persistent file format.
- GreptimeDB's query engine is powered by [Apache Arrow DataFusion](https://arrow.apache.org/datafusion/).
- [Apache OpenDAL](https://opendal.apache.org) gives GreptimeDB a very general and elegant data access abstraction layer.
- GreptimeDB's meta service is based on [etcd](https://etcd.io/).
<img alt="Known Users" src="https://greptime.com/logo/img/users.png"/>

View File

@@ -1,57 +0,0 @@
/*
* Copyright 2023 Greptime Team
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import * as core from "@actions/core";
import {obtainClient} from "@/common";
async function triggerWorkflow(workflowId: string, version: string) {
const websiteClient = obtainClient("WEBSITE_REPO_TOKEN")
try {
await websiteClient.rest.actions.createWorkflowDispatch({
owner: "GreptimeTeam",
repo: "website",
workflow_id: workflowId,
ref: "main",
inputs: {
version,
},
});
console.log(`Successfully triggered ${workflowId} workflow with version ${version}`);
} catch (error) {
core.setFailed(`Failed to trigger workflow: ${error.message}`);
}
}
const version = process.env.VERSION;
if (!version) {
core.setFailed("VERSION environment variable is required");
process.exit(1);
}
// Remove 'v' prefix if exists
const cleanVersion = version.startsWith('v') ? version.slice(1) : version;
if (cleanVersion.includes('nightly')) {
console.log('Nightly version detected, skipping workflow trigger.');
process.exit(0);
}
try {
triggerWorkflow('bump-patch-version.yml', cleanVersion);
} catch (error) {
core.setFailed(`Error processing version: ${error.message}`);
process.exit(1);
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 173 KiB

18
flake.lock generated
View File

@@ -8,11 +8,11 @@
"rust-analyzer-src": "rust-analyzer-src"
},
"locked": {
"lastModified": 1745735608,
"narHash": "sha256-L0jzm815XBFfF2wCFmR+M1CF+beIEFj6SxlqVKF59Ec=",
"lastModified": 1737613896,
"narHash": "sha256-ldqXIglq74C7yKMFUzrS9xMT/EVs26vZpOD68Sh7OcU=",
"owner": "nix-community",
"repo": "fenix",
"rev": "c39a78eba6ed2a022cc3218db90d485077101496",
"rev": "303a062fdd8e89f233db05868468975d17855d80",
"type": "github"
},
"original": {
@@ -41,11 +41,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1745487689,
"narHash": "sha256-FQoi3R0NjQeBAsEOo49b5tbDPcJSMWc3QhhaIi9eddw=",
"lastModified": 1737569578,
"narHash": "sha256-6qY0pk2QmUtBT9Mywdvif0i/CLVgpCjMUn6g9vB+f3M=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "5630cf13cceac06cefe9fc607e8dfa8fb342dde3",
"rev": "47addd76727f42d351590c905d9d1905ca895b82",
"type": "github"
},
"original": {
@@ -65,11 +65,11 @@
"rust-analyzer-src": {
"flake": false,
"locked": {
"lastModified": 1745694049,
"narHash": "sha256-fxvRYH/tS7hGQeg9zCVh5RBcSWT+JGJet7RA8Ss+rC0=",
"lastModified": 1737581772,
"narHash": "sha256-t1P2Pe3FAX9TlJsCZbmJ3wn+C4qr6aSMypAOu8WNsN0=",
"owner": "rust-lang",
"repo": "rust-analyzer",
"rev": "d8887c0758bbd2d5f752d5bd405d4491e90e7ed6",
"rev": "582af7ee9c8d84f5d534272fc7de9f292bd849be",
"type": "github"
},
"original": {

View File

@@ -21,7 +21,7 @@
lib = nixpkgs.lib;
rustToolchain = fenix.packages.${system}.fromToolchainName {
name = (lib.importTOML ./rust-toolchain.toml).toolchain.channel;
sha256 = "sha256-arzEYlWLGGYeOhECHpBxQd2joZ4rPKV3qLNnZ+eql6A=";
sha256 = "sha256-f/CVA1EC61EWbh0SjaRNhLL0Ypx2ObupbzigZp8NmL4=";
};
in
{

View File

@@ -2,63 +2,30 @@
## Overview
This repository contains Grafana dashboards for visualizing metrics and logs of GreptimeDB instances running in either cluster or standalone mode. **The Grafana version should be greater than 9.0**.
This repository maintains the Grafana dashboards for GreptimeDB. It has two types of dashboards:
We highly recommend using the self-monitoring feature provided by [GreptimeDB Operator](https://github.com/GrepTimeTeam/greptimedb-operator) to automatically collect metrics and logs from your GreptimeDB instances and store them in a dedicated GreptimeDB instance.
- `cluster/dashboard.json`: The Grafana dashboard for the GreptimeDB cluster. Read the [dashboard.md](./dashboards/cluster/dashboard.md) for more details.
- `standalone/dashboard.json`: The Grafana dashboard for the standalone GreptimeDB instance. **It's generated from the `cluster/dashboard.json` by removing the instance filter through the `make dashboards` command**. Read the [dashboard.md](./dashboards/standalone/dashboard.md) for more details.
- **Metrics Dashboards**
As the rapid development of GreptimeDB, the metrics may be changed, and please feel free to submit your feedback and/or contribution to this dashboard 🤗
- `dashboards/metrics/cluster/dashboard.json`: The Grafana dashboard for the GreptimeDB cluster. Read the [dashboard.md](./dashboards/metrics/cluster/dashboard.md) for more details.
- `dashboards/metrics/standalone/dashboard.json`: The Grafana dashboard for the standalone GreptimeDB instance. **It's generated from the `cluster/dashboard.json` by removing the instance filter through the `make dashboards` command**. Read the [dashboard.md](./dashboards/metrics/standalone/dashboard.md) for more details.
**NOTE**:
- **Logs Dashboard**
- The Grafana version should be greater than 9.0.
The `dashboards/logs/dashboard.json` provides a comprehensive Grafana dashboard for visualizing GreptimeDB logs. To utilize this dashboard effectively, you need to collect logs in JSON format from your GreptimeDB instances and store them in a dedicated GreptimeDB instance.
- If you want to modify the dashboards, you only need to modify the `cluster/dashboard.json` and run the `make dashboards` command to generate the `standalone/dashboard.json` and other related files.
For proper integration, the logs table must adhere to the following schema design with the table name `_gt_logs`:
To maintain the dashboards easily, we use the [`dac`](https://github.com/zyy17/dac) tool to generate the intermediate dashboards and markdown documents:
```sql
CREATE TABLE IF NOT EXISTS `_gt_logs` (
`pod_ip` STRING NULL,
`namespace` STRING NULL,
`cluster` STRING NULL,
`file` STRING NULL,
`module_path` STRING NULL,
`level` STRING NULL,
`target` STRING NULL,
`role` STRING NULL,
`pod` STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),
`message` STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false'),
`err` STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false'),
`timestamp` TIMESTAMP(9) NOT NULL,
TIME INDEX (`timestamp`),
PRIMARY KEY (`level`, `target`, `role`)
)
ENGINE=mito
WITH (
append_mode = 'true'
)
```
## Development
As GreptimeDB evolves rapidly, metrics may change over time. We welcome your feedback and contributions to improve these dashboards 🤗
To modify the metrics dashboards, simply edit the `dashboards/metrics/cluster/dashboard.json` file and run the `make dashboards` command. This will automatically generate the updated `dashboards/metrics/standalone/dashboard.json` and other related files.
For easier dashboard maintenance, we utilize the [`dac`](https://github.com/zyy17/dac) tool to generate human-readable intermediate dashboards and documentation:
- `dashboards/metrics/cluster/dashboard.yaml`: The intermediate dashboard file for the GreptimeDB cluster.
- `dashboards/metrics/standalone/dashboard.yaml`: The intermediate dashboard file for standalone GreptimeDB instances.
- `cluster/dashboard.yaml`: The intermediate dashboard for the GreptimeDB cluster.
- `standalone/dashboard.yaml`: The intermediate dashboard for the standalone GreptimeDB instance.
## Data Sources
The following data sources are used to fetch metrics and logs:
There are two data sources for the dashboards to fetch the metrics:
- **`${metrics}`**: Prometheus data source for providing the GreptimeDB metrics.
- **`${logs}`**: MySQL data source for providing the GreptimeDB logs.
- **`${information_schema}`**: MySQL data source for providing the information schema of the current instance and used for the `overview` panel. It is the MySQL port of the current monitored instance.
- **Prometheus**: Expose the metrics of GreptimeDB.
- **Information Schema**: It is the MySQL port of the current monitored instance. The `overview` dashboard will use this datasource to show the information schema of the current instance.
## Instance Filters
@@ -76,9 +43,9 @@ And the legend will be like: `[{{instance}}]-[{{ pod }}]`.
## Deployment
### (Recommended) Helm Chart
### Helm
If you use the [Helm Chart](https://github.com/GreptimeTeam/helm-charts) to deploy a GreptimeDB cluster, you can enable self-monitoring by setting the following values in your Helm chart:
If you use the Helm [chart](https://github.com/GreptimeTeam/helm-charts) to deploy a GreptimeDB cluster, you can enable self-monitoring by setting the following values in your Helm chart:
- `monitoring.enabled=true`: Deploys a standalone GreptimeDB instance dedicated to monitoring the cluster;
- `grafana.enabled=true`: Deploys Grafana and automatically imports the monitoring dashboard;
@@ -118,5 +85,5 @@ The standalone GreptimeDB instance will collect metrics from your cluster, and t
3. **Import the dashboards based on your deployment scenario**
- **Cluster**: Import the `dashboards/metrics/cluster/dashboard.json` dashboard.
- **Standalone**: Import the `dashboards/metrics/standalone/dashboard.json` dashboard.
- **Cluster**: Import the `cluster/dashboard.json` dashboard.
- **Standalone**: Import the `standalone/dashboard.json` dashboard.

View File

@@ -1,292 +0,0 @@
{
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": {
"type": "grafana",
"uid": "-- Grafana --"
},
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"type": "dashboard"
}
]
},
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"id": 12,
"links": [],
"panels": [
{
"datasource": {
"default": false,
"type": "mysql",
"uid": "${datasource}"
},
"fieldConfig": {
"defaults": {},
"overrides": []
},
"gridPos": {
"h": 20,
"w": 24,
"x": 0,
"y": 0
},
"id": 1,
"options": {
"dedupStrategy": "none",
"enableInfiniteScrolling": true,
"enableLogDetails": true,
"prettifyLogMessage": false,
"showCommonLabels": false,
"showLabels": false,
"showTime": true,
"sortOrder": "Descending",
"wrapLogMessage": false
},
"pluginVersion": "11.6.0",
"targets": [
{
"dataset": "greptime_private",
"datasource": {
"type": "mysql",
"uid": "${datasource}"
},
"editorMode": "code",
"format": "table",
"rawQuery": true,
"rawSql": "SELECT `timestamp`, CONCAT('[', `level`, ']', ' ', '<', `target`, '>', ' ', `message`),\n `role`,\n `pod`,\n `pod_ip`,\n `namespace`,\n `cluster`,\n `err`,\n `file`,\n `module_path`\nFROM\n `_gt_logs`\nWHERE\n (\n \"$level\" = \"'all'\"\n OR `level` IN ($level)\n ) \n AND (\n \"$role\" = \"'all'\"\n OR `role` IN ($role)\n )\n AND (\n \"$pod\" = \"\"\n OR `pod` = '$pod'\n )\n AND (\n \"$target\" = \"\"\n OR `target` = '$target'\n )\n AND (\n \"$search\" = \"\"\n OR matches_term(`message`, '$search')\n )\n AND (\n \"$exclude\" = \"\"\n OR NOT matches_term(`message`, '$exclude')\n )\n AND $__timeFilter(`timestamp`)\nORDER BY `timestamp` DESC\nLIMIT $limit;\n",
"refId": "A",
"sql": {
"columns": [
{
"parameters": [],
"type": "function"
}
],
"groupBy": [
{
"property": {
"type": "string"
},
"type": "groupBy"
}
],
"limit": 50
}
}
],
"title": "Logs",
"type": "logs"
}
],
"preload": false,
"refresh": "",
"schemaVersion": 41,
"tags": [],
"templating": {
"list": [
{
"current": {
"text": "logs",
"value": "P98F38F12DB221A8C"
},
"includeAll": false,
"name": "datasource",
"options": [],
"query": "mysql",
"refresh": 1,
"regex": "",
"type": "datasource"
},
{
"allValue": "'all'",
"current": {
"text": [
"$__all"
],
"value": [
"$__all"
]
},
"includeAll": true,
"label": "level",
"multi": true,
"name": "level",
"options": [
{
"selected": false,
"text": "INFO",
"value": "INFO"
},
{
"selected": false,
"text": "ERROR",
"value": "ERROR"
},
{
"selected": false,
"text": "WARN",
"value": "WARN"
},
{
"selected": false,
"text": "DEBUG",
"value": "DEBUG"
},
{
"selected": false,
"text": "TRACE",
"value": "TRACE"
}
],
"query": "INFO,ERROR,WARN,DEBUG,TRACE",
"type": "custom"
},
{
"allValue": "'all'",
"current": {
"text": [
"$__all"
],
"value": [
"$__all"
]
},
"includeAll": true,
"label": "role",
"multi": true,
"name": "role",
"options": [
{
"selected": false,
"text": "datanode",
"value": "datanode"
},
{
"selected": false,
"text": "frontend",
"value": "frontend"
},
{
"selected": false,
"text": "meta",
"value": "meta"
}
],
"query": "datanode,frontend,meta",
"type": "custom"
},
{
"current": {
"text": "",
"value": ""
},
"label": "pod",
"name": "pod",
"options": [
{
"selected": true,
"text": "",
"value": ""
}
],
"query": "",
"type": "textbox"
},
{
"current": {
"text": "",
"value": ""
},
"label": "target",
"name": "target",
"options": [
{
"selected": true,
"text": "",
"value": ""
}
],
"query": "",
"type": "textbox"
},
{
"current": {
"text": "",
"value": ""
},
"label": "search",
"name": "search",
"options": [
{
"selected": true,
"text": "",
"value": ""
}
],
"query": "",
"type": "textbox"
},
{
"current": {
"text": "",
"value": ""
},
"label": "exclude",
"name": "exclude",
"options": [
{
"selected": true,
"text": "",
"value": ""
}
],
"query": "",
"type": "textbox"
},
{
"current": {
"text": "2000",
"value": "2000"
},
"includeAll": false,
"label": "limit",
"name": "limit",
"options": [
{
"selected": true,
"text": "2000",
"value": "2000"
},
{
"selected": false,
"text": "5000",
"value": "5000"
},
{
"selected": false,
"text": "8000",
"value": "8000"
}
],
"query": "2000,5000,8000",
"type": "custom"
}
]
},
"time": {
"from": "now-6h",
"to": "now"
},
"timepicker": {},
"timezone": "browser",
"title": "GreptimeDB Logs",
"uid": "edx5veo4rd3wge2",
"version": 1
}

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env bash
DASHBOARD_DIR=${1:-grafana/dashboards/metrics}
DASHBOARD_DIR=${1:-grafana/dashboards}
check_dashboard_description() {
for dashboard in $(find $DASHBOARD_DIR -name "*.json"); do
@@ -25,7 +25,7 @@ check_dashboard_description() {
check_dashboards_generation() {
./grafana/scripts/gen-dashboards.sh
if [[ -n "$(git diff --name-only grafana/dashboards/metrics)" ]]; then
if [[ -n "$(git diff --name-only grafana/dashboards)" ]]; then
echo "Error: The dashboards are not generated correctly. You should execute the `make dashboards` command."
exit 1
fi

View File

@@ -1,7 +1,7 @@
#! /usr/bin/env bash
CLUSTER_DASHBOARD_DIR=${1:-grafana/dashboards/metrics/cluster}
STANDALONE_DASHBOARD_DIR=${2:-grafana/dashboards/metrics/standalone}
CLUSTER_DASHBOARD_DIR=${1:-grafana/dashboards/cluster}
STANDALONE_DASHBOARD_DIR=${2:-grafana/dashboards/standalone}
DAC_IMAGE=ghcr.io/zyy17/dac:20250423-522bd35
remove_instance_filters() {

View File

@@ -1,2 +1,2 @@
[toolchain]
channel = "nightly-2025-04-15"
channel = "nightly-2024-12-25"

View File

@@ -1050,7 +1050,7 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue {
Value::Int64(v) => Some(ValueData::I64Value(v)),
Value::Float32(v) => Some(ValueData::F32Value(*v)),
Value::Float64(v) => Some(ValueData::F64Value(*v)),
Value::String(v) => Some(ValueData::StringValue(v.into_string())),
Value::String(v) => Some(ValueData::StringValue(v.as_utf8().to_string())),
Value::Binary(v) => Some(ValueData::BinaryValue(v.to_vec())),
Value::Date(v) => Some(ValueData::DateValue(v.val())),
Value::Timestamp(v) => Some(match v.unit() {

View File

@@ -36,7 +36,7 @@ pub fn userinfo_by_name(username: Option<String>) -> UserInfoRef {
}
pub fn user_provider_from_option(opt: &String) -> Result<UserProviderRef> {
let (name, content) = opt.split_once(':').with_context(|| InvalidConfigSnafu {
let (name, content) = opt.split_once(':').context(InvalidConfigSnafu {
value: opt.to_string(),
msg: "UserProviderOption must be in format `<option>:<value>`",
})?;
@@ -57,24 +57,6 @@ pub fn user_provider_from_option(opt: &String) -> Result<UserProviderRef> {
}
}
pub fn static_user_provider_from_option(opt: &String) -> Result<StaticUserProvider> {
let (name, content) = opt.split_once(':').with_context(|| InvalidConfigSnafu {
value: opt.to_string(),
msg: "UserProviderOption must be in format `<option>:<value>`",
})?;
match name {
STATIC_USER_PROVIDER => {
let provider = StaticUserProvider::new(content)?;
Ok(provider)
}
_ => InvalidConfigSnafu {
value: name.to_string(),
msg: format!("Invalid UserProviderOption, expect only {STATIC_USER_PROVIDER}"),
}
.fail(),
}
}
type Username<'a> = &'a str;
type HostOrIp<'a> = &'a str;

View File

@@ -38,14 +38,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to convert to utf8"))]
FromUtf8 {
#[snafu(source)]
error: std::string::FromUtf8Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Authentication source failure"))]
AuthBackend {
#[snafu(implicit)]
@@ -93,7 +85,7 @@ impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::InvalidConfig { .. } => StatusCode::InvalidArguments,
Error::IllegalParam { .. } | Error::FromUtf8 { .. } => StatusCode::InvalidArguments,
Error::IllegalParam { .. } => StatusCode::InvalidArguments,
Error::FileWatch { .. } => StatusCode::InvalidArguments,
Error::InternalState { .. } => StatusCode::Unexpected,
Error::Io { .. } => StatusCode::StorageUnavailable,

View File

@@ -22,12 +22,10 @@ mod user_provider;
pub mod tests;
pub use common::{
auth_mysql, static_user_provider_from_option, user_provider_from_option, userinfo_by_name,
HashedPassword, Identity, Password,
auth_mysql, user_provider_from_option, userinfo_by_name, HashedPassword, Identity, Password,
};
pub use permission::{PermissionChecker, PermissionReq, PermissionResp};
pub use user_info::UserInfo;
pub use user_provider::static_user_provider::StaticUserProvider;
pub use user_provider::UserProvider;
/// pub type alias

View File

@@ -15,15 +15,15 @@
use std::collections::HashMap;
use async_trait::async_trait;
use snafu::{OptionExt, ResultExt};
use snafu::OptionExt;
use crate::error::{FromUtf8Snafu, InvalidConfigSnafu, Result};
use crate::error::{InvalidConfigSnafu, Result};
use crate::user_provider::{authenticate_with_credential, load_credential_from_file};
use crate::{Identity, Password, UserInfoRef, UserProvider};
pub(crate) const STATIC_USER_PROVIDER: &str = "static_user_provider";
pub struct StaticUserProvider {
pub(crate) struct StaticUserProvider {
users: HashMap<String, Vec<u8>>,
}
@@ -60,18 +60,6 @@ impl StaticUserProvider {
.fail(),
}
}
/// Return a random username/password pair
/// This is useful for invoking from other components in the cluster
pub fn get_one_user_pwd(&self) -> Result<(String, String)> {
let kv = self.users.iter().next().context(InvalidConfigSnafu {
value: "",
msg: "Expect at least one pair of username and password",
})?;
let username = kv.0;
let pwd = String::from_utf8(kv.1.clone()).context(FromUtf8Snafu)?;
Ok((username.clone(), pwd))
}
}
#[async_trait]

View File

@@ -84,6 +84,12 @@ mod tests {
let key1 = "3178510";
let key2 = "4215648";
// have collision
assert_eq!(
oid_map.hasher.hash_one(key1) as u32,
oid_map.hasher.hash_one(key2) as u32
);
// insert them into oid_map
let oid1 = oid_map.get_oid(key1);
let oid2 = oid_map.get_oid(key2);

View File

@@ -51,6 +51,7 @@ opendal = { version = "0.51.1", features = [
query.workspace = true
rand.workspace = true
reqwest.workspace = true
rustyline = "10.1"
serde.workspace = true
serde_json.workspace = true
servers.workspace = true

154
src/cli/src/cmd.rs Normal file
View File

@@ -0,0 +1,154 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::error::{Error, InvalidReplCommandSnafu, Result};
/// Represents the parsed command from the user (which may be over many lines)
#[derive(Debug, PartialEq)]
pub(crate) enum ReplCommand {
Help,
UseDatabase { db_name: String },
Sql { sql: String },
Exit,
}
impl TryFrom<&str> for ReplCommand {
type Error = Error;
fn try_from(input: &str) -> Result<Self> {
let input = input.trim();
if input.is_empty() {
return InvalidReplCommandSnafu {
reason: "No command specified".to_string(),
}
.fail();
}
// If line ends with ';', it must be treated as a complete input.
// However, the opposite is not true.
let input_is_completed = input.ends_with(';');
let input = input.strip_suffix(';').map(|x| x.trim()).unwrap_or(input);
let lowercase = input.to_lowercase();
match lowercase.as_str() {
"help" => Ok(Self::Help),
"exit" | "quit" => Ok(Self::Exit),
_ => match input.split_once(' ') {
Some((maybe_use, database)) if maybe_use.to_lowercase() == "use" => {
Ok(Self::UseDatabase {
db_name: database.trim().to_string(),
})
}
// Any valid SQL must contains at least one whitespace.
Some(_) if input_is_completed => Ok(Self::Sql {
sql: input.to_string(),
}),
_ => InvalidReplCommandSnafu {
reason: format!("unknown command '{input}', maybe input is not completed"),
}
.fail(),
},
}
}
}
impl ReplCommand {
pub fn help() -> &'static str {
r#"
Available commands (case insensitive):
- 'help': print this help
- 'exit' or 'quit': exit the REPL
- 'use <your database name>': switch to another database/schema context
- Other typed in text will be treated as SQL.
You can enter new line while typing, just remember to end it with ';'.
"#
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::Error::InvalidReplCommand;
#[test]
fn test_from_str() {
fn test_ok(s: &str, expected: ReplCommand) {
let actual: ReplCommand = s.try_into().unwrap();
assert_eq!(expected, actual, "'{}'", s);
}
fn test_err(s: &str) {
let result: Result<ReplCommand> = s.try_into();
assert!(matches!(result, Err(InvalidReplCommand { .. })))
}
test_err("");
test_err(" ");
test_err("\t");
test_ok("help", ReplCommand::Help);
test_ok("help", ReplCommand::Help);
test_ok(" help", ReplCommand::Help);
test_ok(" help ", ReplCommand::Help);
test_ok(" HELP ", ReplCommand::Help);
test_ok(" Help; ", ReplCommand::Help);
test_ok(" help ; ", ReplCommand::Help);
test_ok("exit", ReplCommand::Exit);
test_ok("exit;", ReplCommand::Exit);
test_ok("exit ;", ReplCommand::Exit);
test_ok("EXIT", ReplCommand::Exit);
test_ok("quit", ReplCommand::Exit);
test_ok("quit;", ReplCommand::Exit);
test_ok("quit ;", ReplCommand::Exit);
test_ok("QUIT", ReplCommand::Exit);
test_ok(
"use Foo",
ReplCommand::UseDatabase {
db_name: "Foo".to_string(),
},
);
test_ok(
" use Foo ; ",
ReplCommand::UseDatabase {
db_name: "Foo".to_string(),
},
);
// ensure that database name is case sensitive
test_ok(
" use FOO ; ",
ReplCommand::UseDatabase {
db_name: "FOO".to_string(),
},
);
// ensure that we aren't messing with capitalization
test_ok(
"SELECT * from foo;",
ReplCommand::Sql {
sql: "SELECT * from foo".to_string(),
},
);
// Input line (that don't belong to any other cases above) must ends with ';' to make it a valid SQL.
test_err("insert blah");
test_ok(
"insert blah;",
ReplCommand::Sql {
sql: "insert blah".to_string(),
},
);
}
}

View File

@@ -101,6 +101,9 @@ pub enum Error {
error: reqwest::Error,
},
#[snafu(display("Invalid REPL command: {reason}"))]
InvalidReplCommand { reason: String },
#[snafu(display("Failed to parse SQL: {}", sql))]
ParseSql {
sql: String,
@@ -251,6 +254,7 @@ impl ErrorExt for Error {
Error::MissingConfig { .. }
| Error::LoadLayeredConfig { .. }
| Error::IllegalConfig { .. }
| Error::InvalidReplCommand { .. }
| Error::InitTimezone { .. }
| Error::ConnectEtcd { .. }
| Error::CreateDir { .. }

112
src/cli/src/helper.rs Normal file
View File

@@ -0,0 +1,112 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::borrow::Cow;
use rustyline::completion::Completer;
use rustyline::highlight::{Highlighter, MatchingBracketHighlighter};
use rustyline::hint::{Hinter, HistoryHinter};
use rustyline::validate::{ValidationContext, ValidationResult, Validator};
use crate::cmd::ReplCommand;
pub(crate) struct RustylineHelper {
hinter: HistoryHinter,
highlighter: MatchingBracketHighlighter,
}
impl Default for RustylineHelper {
fn default() -> Self {
Self {
hinter: HistoryHinter {},
highlighter: MatchingBracketHighlighter::default(),
}
}
}
impl rustyline::Helper for RustylineHelper {}
impl Validator for RustylineHelper {
fn validate(&self, ctx: &mut ValidationContext<'_>) -> rustyline::Result<ValidationResult> {
let input = ctx.input();
match ReplCommand::try_from(input) {
Ok(_) => Ok(ValidationResult::Valid(None)),
Err(e) => {
if input.trim_end().ends_with(';') {
// If line ends with ';', it HAS to be a valid command.
Ok(ValidationResult::Invalid(Some(e.to_string())))
} else {
Ok(ValidationResult::Incomplete)
}
}
}
}
}
impl Hinter for RustylineHelper {
type Hint = String;
fn hint(&self, line: &str, pos: usize, ctx: &rustyline::Context<'_>) -> Option<Self::Hint> {
self.hinter.hint(line, pos, ctx)
}
}
impl Highlighter for RustylineHelper {
fn highlight<'l>(&self, line: &'l str, pos: usize) -> Cow<'l, str> {
self.highlighter.highlight(line, pos)
}
fn highlight_prompt<'b, 's: 'b, 'p: 'b>(
&'s self,
prompt: &'p str,
default: bool,
) -> Cow<'b, str> {
self.highlighter.highlight_prompt(prompt, default)
}
fn highlight_hint<'h>(&self, hint: &'h str) -> Cow<'h, str> {
use nu_ansi_term::Style;
Cow::Owned(Style::new().dimmed().paint(hint).to_string())
}
fn highlight_candidate<'c>(
&self,
candidate: &'c str,
completion: rustyline::CompletionType,
) -> Cow<'c, str> {
self.highlighter.highlight_candidate(candidate, completion)
}
fn highlight_char(&self, line: &str, pos: usize) -> bool {
self.highlighter.highlight_char(line, pos)
}
}
impl Completer for RustylineHelper {
type Candidate = String;
fn complete(
&self,
line: &str,
pos: usize,
ctx: &rustyline::Context<'_>,
) -> rustyline::Result<(usize, Vec<Self::Candidate>)> {
// If there is a hint, use that as the auto-complete when user hits `tab`
if let Some(hint) = self.hinter.hint(line, pos, ctx) {
Ok((pos, vec![hint]))
} else {
Ok((0, vec![]))
}
}
}

View File

@@ -13,9 +13,15 @@
// limitations under the License.
mod bench;
mod database;
pub mod error;
// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373
#[allow(unused)]
mod cmd;
mod export;
mod helper;
// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373
mod database;
mod import;
use async_trait::async_trait;

View File

@@ -42,11 +42,11 @@ use futures::future;
use futures_util::{Stream, StreamExt, TryStreamExt};
use prost::Message;
use snafu::{ensure, ResultExt};
use tonic::metadata::{AsciiMetadataKey, AsciiMetadataValue, MetadataMap, MetadataValue};
use tonic::metadata::{AsciiMetadataKey, MetadataValue};
use tonic::transport::Channel;
use crate::error::{
ConvertFlightDataSnafu, Error, FlightGetSnafu, IllegalFlightMessagesSnafu,
ConvertFlightDataSnafu, Error, FlightGetSnafu, IllegalFlightMessagesSnafu, InvalidAsciiSnafu,
InvalidTonicMetadataValueSnafu, ServerSnafu,
};
use crate::{from_grpc_response, Client, Result};
@@ -165,27 +165,26 @@ impl Database {
let mut request = tonic::Request::new(request);
let metadata = request.metadata_mut();
Self::put_hints(metadata, hints)?;
for (key, value) in hints {
let key = AsciiMetadataKey::from_bytes(format!("x-greptime-hint-{}", key).as_bytes())
.map_err(|_| {
InvalidAsciiSnafu {
value: key.to_string(),
}
.build()
})?;
let value = value.parse().map_err(|_| {
InvalidAsciiSnafu {
value: value.to_string(),
}
.build()
})?;
metadata.insert(key, value);
}
let response = client.handle(request).await?.into_inner();
from_grpc_response(response)
}
fn put_hints(metadata: &mut MetadataMap, hints: &[(&str, &str)]) -> Result<()> {
let Some(value) = hints
.iter()
.map(|(k, v)| format!("{}={}", k, v))
.reduce(|a, b| format!("{},{}", a, b))
else {
return Ok(());
};
let key = AsciiMetadataKey::from_static("x-greptime-hints");
let value = AsciiMetadataValue::from_str(&value).context(InvalidTonicMetadataValueSnafu)?;
metadata.insert(key, value);
Ok(())
}
pub async fn handle(&self, request: Request) -> Result<u32> {
let mut client = make_database_client(&self.client)?.inner;
let request = self.to_rpc_request(request);
@@ -243,49 +242,39 @@ impl Database {
where
S: AsRef<str>,
{
self.sql_with_hint(sql, &[]).await
}
pub async fn sql_with_hint<S>(&self, sql: S, hints: &[(&str, &str)]) -> Result<Output>
where
S: AsRef<str>,
{
let request = Request::Query(QueryRequest {
self.do_get(Request::Query(QueryRequest {
query: Some(Query::Sql(sql.as_ref().to_string())),
});
self.do_get(request, hints).await
}))
.await
}
pub async fn logical_plan(&self, logical_plan: Vec<u8>) -> Result<Output> {
let request = Request::Query(QueryRequest {
self.do_get(Request::Query(QueryRequest {
query: Some(Query::LogicalPlan(logical_plan)),
});
self.do_get(request, &[]).await
}))
.await
}
pub async fn create(&self, expr: CreateTableExpr) -> Result<Output> {
let request = Request::Ddl(DdlRequest {
self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(expr)),
});
self.do_get(request, &[]).await
}))
.await
}
pub async fn alter(&self, expr: AlterTableExpr) -> Result<Output> {
let request = Request::Ddl(DdlRequest {
self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::AlterTable(expr)),
});
self.do_get(request, &[]).await
}))
.await
}
async fn do_get(&self, request: Request, hints: &[(&str, &str)]) -> Result<Output> {
async fn do_get(&self, request: Request) -> Result<Output> {
let request = self.to_rpc_request(request);
let request = Ticket {
ticket: request.encode_to_vec().into(),
};
let mut request = tonic::Request::new(request);
Self::put_hints(request.metadata_mut(), hints)?;
let mut client = self.client.make_flight_client()?;
let response = client.mut_inner().do_get(request).await.or_else(|e| {

View File

@@ -110,6 +110,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to parse ascii string: {}", value))]
InvalidAscii {
value: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid Tonic metadata value"))]
InvalidTonicMetadataValue {
#[snafu(source)]
@@ -136,7 +143,10 @@ impl ErrorExt for Error {
| Error::ConvertFlightData { source, .. }
| Error::CreateTlsChannel { source, .. } => source.status_code(),
Error::IllegalGrpcClientState { .. } => StatusCode::Unexpected,
Error::InvalidTonicMetadataValue { .. } => StatusCode::InvalidArguments,
Error::InvalidAscii { .. } | Error::InvalidTonicMetadataValue { .. } => {
StatusCode::InvalidArguments
}
}
}

View File

@@ -15,11 +15,9 @@
#![doc = include_str!("../../../../README.md")]
use clap::{Parser, Subcommand};
use cmd::datanode::builder::InstanceBuilder;
use cmd::error::{InitTlsProviderSnafu, Result};
use cmd::options::GlobalOptions;
use cmd::{cli, datanode, flownode, frontend, metasrv, standalone, App};
use common_base::Plugins;
use common_version::version;
use servers::install_ring_crypto_provider;
@@ -104,10 +102,10 @@ async fn main_body() -> Result<()> {
async fn start(cli: Command) -> Result<()> {
match cli.subcmd {
SubCommand::Datanode(cmd) => {
let opts = cmd.load_options(&cli.global_options)?;
let plugins = Plugins::new();
let builder = InstanceBuilder::try_new_with_init(opts, plugins).await?;
cmd.build_with(builder).await?.run().await
cmd.build(cmd.load_options(&cli.global_options)?)
.await?
.run()
.await
}
SubCommand::Flownode(cmd) => {
cmd.build(cmd.load_options(&cli.global_options)?)

View File

@@ -58,7 +58,7 @@ impl App for Instance {
false
}
async fn stop(&mut self) -> Result<()> {
async fn stop(&self) -> Result<()> {
Ok(())
}
}

View File

@@ -12,27 +12,33 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod builder;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use cache::build_datanode_cache_registry;
use catalog::kvbackend::MetaKvBackend;
use clap::Parser;
use common_base::Plugins;
use common_config::Configurable;
use common_meta::cache::LayeredCacheRegistryBuilder;
use common_telemetry::logging::TracingOptions;
use common_telemetry::{info, warn};
use common_version::{short_version, version};
use common_wal::config::DatanodeWalConfig;
use datanode::datanode::Datanode;
use meta_client::MetaClientOptions;
use snafu::{ensure, ResultExt};
use datanode::datanode::{Datanode, DatanodeBuilder};
use datanode::service::DatanodeServiceBuilder;
use meta_client::{MetaClientOptions, MetaClientType};
use servers::Mode;
use snafu::{ensure, OptionExt, ResultExt};
use tracing_appender::non_blocking::WorkerGuard;
use crate::datanode::builder::InstanceBuilder;
use crate::error::{
LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu,
LoadLayeredConfigSnafu, MetaClientInitSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu,
StartDatanodeSnafu,
};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::App;
use crate::{log_versions, App};
pub const APP_NAME: &str = "greptime-datanode";
@@ -77,7 +83,7 @@ impl App for Instance {
self.datanode.start().await.context(StartDatanodeSnafu)
}
async fn stop(&mut self) -> Result<()> {
async fn stop(&self) -> Result<()> {
self.datanode
.shutdown()
.await
@@ -92,8 +98,8 @@ pub struct Command {
}
impl Command {
pub async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
self.subcmd.build_with(builder).await
pub async fn build(&self, opts: DatanodeOptions) -> Result<Instance> {
self.subcmd.build(opts).await
}
pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
@@ -109,12 +115,9 @@ enum SubCommand {
}
impl SubCommand {
async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
async fn build(&self, opts: DatanodeOptions) -> Result<Instance> {
match self {
SubCommand::Start(cmd) => {
info!("Building datanode with {:#?}", cmd);
builder.build().await
}
SubCommand::Start(cmd) => cmd.build(opts).await,
}
}
}
@@ -260,6 +263,74 @@ impl StartCommand {
Ok(())
}
async fn build(&self, opts: DatanodeOptions) -> Result<Instance> {
common_runtime::init_global_runtimes(&opts.runtime);
let guard = common_telemetry::init_global_logging(
APP_NAME,
&opts.component.logging,
&opts.component.tracing,
opts.component.node_id.map(|x| x.to_string()),
);
log_versions(version(), short_version(), APP_NAME);
info!("Datanode start command: {:#?}", self);
info!("Datanode options: {:#?}", opts);
let plugin_opts = opts.plugins;
let mut opts = opts.component;
opts.grpc.detect_server_addr();
let mut plugins = Plugins::new();
plugins::setup_datanode_plugins(&mut plugins, &plugin_opts, &opts)
.await
.context(StartDatanodeSnafu)?;
let member_id = opts
.node_id
.context(MissingConfigSnafu { msg: "'node_id'" })?;
let meta_config = opts.meta_client.as_ref().context(MissingConfigSnafu {
msg: "'meta_client_options'",
})?;
let meta_client = meta_client::create_meta_client(
MetaClientType::Datanode { member_id },
meta_config,
None,
)
.await
.context(MetaClientInitSnafu)?;
let meta_backend = Arc::new(MetaKvBackend {
client: meta_client.clone(),
});
// Builds cache registry for datanode.
let layered_cache_registry = Arc::new(
LayeredCacheRegistryBuilder::default()
.add_cache_registry(build_datanode_cache_registry(meta_backend.clone()))
.build(),
);
let mut datanode = DatanodeBuilder::new(opts.clone(), plugins, Mode::Distributed)
.with_meta_client(meta_client)
.with_kv_backend(meta_backend)
.with_cache_registry(layered_cache_registry)
.build()
.await
.context(StartDatanodeSnafu)?;
let services = DatanodeServiceBuilder::new(&opts)
.with_default_grpc_server(&datanode.region_server())
.enable_http_service()
.build()
.await
.context(StartDatanodeSnafu)?;
datanode.setup_services(services);
Ok(Instance::new(datanode, guard))
}
}
#[cfg(test)]
@@ -281,6 +352,7 @@ mod tests {
common_telemetry::init_default_ut_logging();
let mut file = create_named_temp_file();
let toml_str = r#"
mode = "distributed"
enable_memory_catalog = false
node_id = 42
@@ -307,6 +379,7 @@ mod tests {
fn test_read_from_config_file() {
let mut file = create_named_temp_file();
let toml_str = r#"
mode = "distributed"
enable_memory_catalog = false
node_id = 42
@@ -472,6 +545,7 @@ mod tests {
fn test_config_precedence_order() {
let mut file = create_named_temp_file();
let toml_str = r#"
mode = "distributed"
enable_memory_catalog = false
node_id = 42
rpc_addr = "127.0.0.1:3001"

View File

@@ -1,137 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use cache::build_datanode_cache_registry;
use catalog::kvbackend::MetaKvBackend;
use common_base::Plugins;
use common_meta::cache::LayeredCacheRegistryBuilder;
use common_telemetry::info;
use common_version::{short_version, version};
use datanode::datanode::DatanodeBuilder;
use datanode::service::DatanodeServiceBuilder;
use meta_client::MetaClientType;
use snafu::{OptionExt, ResultExt};
use tracing_appender::non_blocking::WorkerGuard;
use crate::datanode::{DatanodeOptions, Instance, APP_NAME};
use crate::error::{MetaClientInitSnafu, MissingConfigSnafu, Result, StartDatanodeSnafu};
use crate::log_versions;
/// Builder for Datanode instance.
pub struct InstanceBuilder {
guard: Vec<WorkerGuard>,
opts: DatanodeOptions,
datanode_builder: DatanodeBuilder,
}
impl InstanceBuilder {
/// Try to create a new [InstanceBuilder], and do some initialization work like allocating
/// runtime resources, setting up global logging and plugins, etc.
pub async fn try_new_with_init(
mut opts: DatanodeOptions,
mut plugins: Plugins,
) -> Result<Self> {
let guard = Self::init(&mut opts, &mut plugins).await?;
let datanode_builder = Self::datanode_builder(&opts, plugins).await?;
Ok(Self {
guard,
opts,
datanode_builder,
})
}
async fn init(opts: &mut DatanodeOptions, plugins: &mut Plugins) -> Result<Vec<WorkerGuard>> {
common_runtime::init_global_runtimes(&opts.runtime);
let dn_opts = &mut opts.component;
let guard = common_telemetry::init_global_logging(
APP_NAME,
&dn_opts.logging,
&dn_opts.tracing,
dn_opts.node_id.map(|x| x.to_string()),
);
log_versions(version(), short_version(), APP_NAME);
plugins::setup_datanode_plugins(plugins, &opts.plugins, dn_opts)
.await
.context(StartDatanodeSnafu)?;
dn_opts.grpc.detect_server_addr();
info!("Initialized Datanode instance with {:#?}", opts);
Ok(guard)
}
async fn datanode_builder(opts: &DatanodeOptions, plugins: Plugins) -> Result<DatanodeBuilder> {
let dn_opts = &opts.component;
let member_id = dn_opts
.node_id
.context(MissingConfigSnafu { msg: "'node_id'" })?;
let meta_client_options = dn_opts.meta_client.as_ref().context(MissingConfigSnafu {
msg: "meta client options",
})?;
let client = meta_client::create_meta_client(
MetaClientType::Datanode { member_id },
meta_client_options,
Some(&plugins),
)
.await
.context(MetaClientInitSnafu)?;
let backend = Arc::new(MetaKvBackend {
client: client.clone(),
});
let mut builder = DatanodeBuilder::new(dn_opts.clone(), plugins.clone(), backend.clone());
let registry = Arc::new(
LayeredCacheRegistryBuilder::default()
.add_cache_registry(build_datanode_cache_registry(backend))
.build(),
);
builder
.with_cache_registry(registry)
.with_meta_client(client.clone());
Ok(builder)
}
/// Get the mutable builder for Datanode, in case you want to change some fields before the
/// final construction.
pub fn mut_datanode_builder(&mut self) -> &mut DatanodeBuilder {
&mut self.datanode_builder
}
/// Try to build the Datanode instance.
pub async fn build(self) -> Result<Instance> {
let mut datanode = self
.datanode_builder
.build()
.await
.context(StartDatanodeSnafu)?;
let services = DatanodeServiceBuilder::new(&self.opts.component)
.with_default_grpc_server(&datanode.region_server())
.enable_http_service()
.build()
.context(StartDatanodeSnafu)?;
datanode.setup_services(services);
Ok(Instance::new(datanode, self.guard))
}
}

View File

@@ -177,6 +177,9 @@ pub enum Error {
source: meta_srv::error::Error,
},
#[snafu(display("Invalid REPL command: {reason}"))]
InvalidReplCommand { reason: String },
#[snafu(display("Failed to parse SQL: {}", sql))]
ParseSql {
sql: String,
@@ -328,6 +331,7 @@ impl ErrorExt for Error {
Error::MissingConfig { .. }
| Error::LoadLayeredConfig { .. }
| Error::IllegalConfig { .. }
| Error::InvalidReplCommand { .. }
| Error::InitTimezone { .. }
| Error::ConnectEtcd { .. }
| Error::CreateDir { .. }

View File

@@ -33,8 +33,7 @@ use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
use common_version::{short_version, version};
use flow::{
get_flow_auth_options, FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder,
FrontendClient, FrontendInvoker,
FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder, FrontendClient, FrontendInvoker,
};
use meta_client::{MetaClientOptions, MetaClientType};
use snafu::{ensure, OptionExt, ResultExt};
@@ -83,14 +82,10 @@ impl App for Instance {
}
async fn start(&mut self) -> Result<()> {
plugins::start_flownode_plugins(self.flownode.flow_engine().plugins().clone())
.await
.context(StartFlownodeSnafu)?;
self.flownode.start().await.context(StartFlownodeSnafu)
}
async fn stop(&mut self) -> Result<()> {
async fn stop(&self) -> Result<()> {
self.flownode
.shutdown()
.await
@@ -156,9 +151,6 @@ struct StartCommand {
/// HTTP request timeout in seconds.
#[clap(long)]
http_timeout: Option<u64>,
/// User Provider cfg, for auth, currently only support static user provider
#[clap(long)]
user_provider: Option<String>,
}
impl StartCommand {
@@ -222,10 +214,6 @@ impl StartCommand {
opts.http.timeout = Duration::from_secs(http_timeout);
}
if let Some(user_provider) = &self.user_provider {
opts.user_provider = Some(user_provider.clone());
}
ensure!(
opts.node_id.is_some(),
MissingConfigSnafu {
@@ -250,15 +238,9 @@ impl StartCommand {
info!("Flownode start command: {:#?}", self);
info!("Flownode options: {:#?}", opts);
let plugin_opts = opts.plugins;
let mut opts = opts.component;
opts.grpc.detect_server_addr();
let mut plugins = Plugins::new();
plugins::setup_flownode_plugins(&mut plugins, &plugin_opts, &opts)
.await
.context(StartFlownodeSnafu)?;
let member_id = opts
.node_id
.context(MissingConfigSnafu { msg: "'node_id'" })?;
@@ -333,12 +315,10 @@ impl StartCommand {
);
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone()));
let flow_auth_header = get_flow_auth_options(&opts).context(StartFlownodeSnafu)?;
let frontend_client =
FrontendClient::from_meta_client(meta_client.clone(), flow_auth_header);
let frontend_client = FrontendClient::from_meta_client(meta_client.clone());
let flownode_builder = FlownodeBuilder::new(
opts.clone(),
plugins,
Plugins::new(),
table_metadata_manager,
catalog_manager.clone(),
flow_metadata_manager,
@@ -351,6 +331,7 @@ impl StartCommand {
.with_grpc_server(flownode.flownode_server().clone())
.enable_http_service()
.build()
.await
.context(StartFlownodeSnafu)?;
flownode.setup_services(services);
let flownode = flownode;

View File

@@ -89,7 +89,7 @@ impl App for Instance {
.context(error::StartFrontendSnafu)
}
async fn stop(&mut self) -> Result<()> {
async fn stop(&self) -> Result<()> {
self.frontend
.shutdown()
.await
@@ -382,6 +382,7 @@ impl StartCommand {
let servers = Services::new(opts, instance.clone(), plugins)
.build()
.await
.context(error::StartFrontendSnafu)?;
let frontend = Frontend {
@@ -447,6 +448,8 @@ mod tests {
fn test_read_from_config_file() {
let mut file = create_named_temp_file();
let toml_str = r#"
mode = "distributed"
[http]
addr = "127.0.0.1:4000"
timeout = "0s"
@@ -535,6 +538,8 @@ mod tests {
fn test_config_precedence_order() {
let mut file = create_named_temp_file();
let toml_str = r#"
mode = "distributed"
[http]
addr = "127.0.0.1:4000"

View File

@@ -74,7 +74,7 @@ pub trait App: Send {
true
}
async fn stop(&mut self) -> Result<()>;
async fn stop(&self) -> Result<()>;
async fn run(&mut self) -> Result<()> {
info!("Starting app: {}", self.name());

View File

@@ -69,7 +69,7 @@ impl App for Instance {
self.instance.start().await.context(StartMetaServerSnafu)
}
async fn stop(&mut self) -> Result<()> {
async fn stop(&self) -> Result<()> {
self.instance
.shutdown()
.await

View File

@@ -75,6 +75,7 @@ use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
use servers::grpc::GrpcOptions;
use servers::http::HttpOptions;
use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
use snafu::ResultExt;
use tokio::sync::RwLock;
use tracing_appender::non_blocking::WorkerGuard;
@@ -255,8 +256,8 @@ pub struct Instance {
impl Instance {
/// Find the socket addr of a server by its `name`.
pub fn server_addr(&self, name: &str) -> Option<SocketAddr> {
self.frontend.server_handlers().addr(name)
pub async fn server_addr(&self, name: &str) -> Option<SocketAddr> {
self.frontend.server_handlers().addr(name).await
}
}
@@ -293,7 +294,7 @@ impl App for Instance {
Ok(())
}
async fn stop(&mut self) -> Result<()> {
async fn stop(&self) -> Result<()> {
self.frontend
.shutdown()
.await
@@ -496,9 +497,12 @@ impl StartCommand {
.build(),
);
let mut builder = DatanodeBuilder::new(dn_opts, plugins.clone(), kv_backend.clone());
builder.with_cache_registry(layered_cache_registry.clone());
let datanode = builder.build().await.context(error::StartDatanodeSnafu)?;
let datanode = DatanodeBuilder::new(dn_opts, plugins.clone(), Mode::Standalone)
.with_kv_backend(kv_backend.clone())
.with_cache_registry(layered_cache_registry.clone())
.build()
.await
.context(error::StartDatanodeSnafu)?;
let information_extension = Arc::new(StandaloneInformationExtension::new(
datanode.region_server(),
@@ -630,6 +634,7 @@ impl StartCommand {
let servers = Services::new(opts, fe_instance.clone(), plugins)
.build()
.await
.context(error::StartFrontendSnafu)?;
let frontend = Frontend {
@@ -853,6 +858,8 @@ mod tests {
fn test_read_from_config_file() {
let mut file = create_named_temp_file();
let toml_str = r#"
mode = "distributed"
enable_memory_catalog = true
[wal]
@@ -983,6 +990,8 @@ mod tests {
fn test_config_precedence_order() {
let mut file = create_named_temp_file();
let toml_str = r#"
mode = "standalone"
[http]
addr = "127.0.0.1:4000"

View File

@@ -111,9 +111,11 @@ mod tests {
use serde::{Deserialize, Serialize};
use super::*;
use crate::Mode;
#[derive(Debug, Serialize, Deserialize, Default)]
#[derive(Debug, Serialize, Deserialize)]
struct TestDatanodeConfig {
mode: Mode,
node_id: Option<u64>,
logging: LoggingOptions,
meta_client: Option<MetaClientOptions>,
@@ -121,6 +123,19 @@ mod tests {
storage: StorageConfig,
}
impl Default for TestDatanodeConfig {
fn default() -> Self {
Self {
mode: Mode::Distributed,
node_id: None,
logging: LoggingOptions::default(),
meta_client: None,
wal: DatanodeWalConfig::default(),
storage: StorageConfig::default(),
}
}
}
impl Configurable for TestDatanodeConfig {
fn env_list_keys() -> Option<&'static [&'static str]> {
Some(&["meta_client.metasrv_addrs"])
@@ -131,6 +146,7 @@ mod tests {
fn test_load_layered_options() {
let mut file = create_named_temp_file();
let toml_str = r#"
mode = "distributed"
enable_memory_catalog = false
rpc_addr = "127.0.0.1:3001"
rpc_hostname = "127.0.0.1"

View File

@@ -26,6 +26,16 @@ pub fn metadata_store_dir(store_dir: &str) -> String {
format!("{store_dir}/metadata")
}
/// The Server running mode
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Copy)]
#[serde(rename_all = "lowercase")]
pub enum Mode {
// The single process mode.
Standalone,
// The distributed cluster mode.
Distributed,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct KvBackendConfig {

View File

@@ -13,7 +13,7 @@ default = ["geo"]
geo = ["geohash", "h3o", "s2", "wkt", "geo-types", "dep:geo"]
[dependencies]
ahash.workspace = true
ahash = "0.8"
api.workspace = true
arc-swap = "1.0"
async-trait.workspace = true

View File

@@ -19,4 +19,4 @@ mod uddsketch_state;
pub use geo_path::{GeoPathAccumulator, GEO_PATH_NAME};
pub(crate) use hll::HllStateType;
pub use hll::{HllState, HLL_MERGE_NAME, HLL_NAME};
pub use uddsketch_state::{UddSketchState, UDDSKETCH_MERGE_NAME, UDDSKETCH_STATE_NAME};
pub use uddsketch_state::{UddSketchState, UDDSKETCH_STATE_NAME};

View File

@@ -31,28 +31,23 @@ use datafusion::physical_plan::expressions::Literal;
use datafusion::prelude::create_udaf;
use datatypes::arrow::array::ArrayRef;
use datatypes::arrow::datatypes::{DataType, Float64Type};
use serde::{Deserialize, Serialize};
use uddsketch::{SketchHashKey, UDDSketch};
pub const UDDSKETCH_STATE_NAME: &str = "uddsketch_state";
pub const UDDSKETCH_MERGE_NAME: &str = "uddsketch_merge";
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug)]
pub struct UddSketchState {
uddsketch: UDDSketch,
error_rate: f64,
}
impl UddSketchState {
pub fn new(bucket_size: u64, error_rate: f64) -> Self {
Self {
uddsketch: UDDSketch::new(bucket_size, error_rate),
error_rate,
}
}
pub fn state_udf_impl() -> AggregateUDF {
pub fn udf_impl() -> AggregateUDF {
create_udaf(
UDDSKETCH_STATE_NAME,
vec![DataType::Int64, DataType::Float64, DataType::Float64],
@@ -66,55 +61,18 @@ impl UddSketchState {
)
}
/// Create a UDF for the `uddsketch_merge` function.
///
/// `uddsketch_merge` accepts bucket size, error rate, and a binary column of states generated by `uddsketch_state`
/// and merges them into a single state.
///
/// The bucket size and error rate must be the same as the original state.
pub fn merge_udf_impl() -> AggregateUDF {
create_udaf(
UDDSKETCH_MERGE_NAME,
vec![DataType::Int64, DataType::Float64, DataType::Binary],
Arc::new(DataType::Binary),
Volatility::Immutable,
Arc::new(|args| {
let (bucket_size, error_rate) = downcast_accumulator_args(args)?;
Ok(Box::new(UddSketchState::new(bucket_size, error_rate)))
}),
Arc::new(vec![DataType::Binary]),
)
}
fn update(&mut self, value: f64) {
self.uddsketch.add_value(value);
}
fn merge(&mut self, raw: &[u8]) -> DfResult<()> {
if let Ok(uddsketch) = bincode::deserialize::<Self>(raw) {
if uddsketch.uddsketch.count() != 0 {
if self.uddsketch.max_allowed_buckets() != uddsketch.uddsketch.max_allowed_buckets()
|| (self.error_rate - uddsketch.error_rate).abs() >= 1e-9
{
return Err(DataFusionError::Plan(format!(
"Merging UDDSketch with different parameters: arguments={:?} vs actual input={:?}",
(
self.uddsketch.max_allowed_buckets(),
self.error_rate
),
(uddsketch.uddsketch.max_allowed_buckets(), uddsketch.error_rate)
)));
}
self.uddsketch.merge_sketch(&uddsketch.uddsketch);
fn merge(&mut self, raw: &[u8]) {
if let Ok(uddsketch) = bincode::deserialize::<UDDSketch>(raw) {
if uddsketch.count() != 0 {
self.uddsketch.merge_sketch(&uddsketch);
}
} else {
trace!("Warning: Failed to deserialize UDDSketch from {:?}", raw);
return Err(DataFusionError::Plan(
"Failed to deserialize UDDSketch from binary".to_string(),
));
}
Ok(())
}
}
@@ -155,21 +113,9 @@ fn downcast_accumulator_args(args: AccumulatorArgs) -> DfResult<(u64, f64)> {
impl DfAccumulator for UddSketchState {
fn update_batch(&mut self, values: &[ArrayRef]) -> DfResult<()> {
let array = &values[2]; // the third column is data value
match array.data_type() {
DataType::Float64 => {
let f64_array = as_primitive_array::<Float64Type>(array)?;
for v in f64_array.iter().flatten() {
self.update(v);
}
}
// meaning instantiate as `uddsketch_merge`
DataType::Binary => self.merge_batch(&[array.clone()])?,
_ => {
return not_impl_err!(
"UDDSketch functions do not support data type: {}",
array.data_type()
)
}
let f64_array = as_primitive_array::<Float64Type>(array)?;
for v in f64_array.iter().flatten() {
self.update(v);
}
Ok(())
@@ -177,7 +123,7 @@ impl DfAccumulator for UddSketchState {
fn evaluate(&mut self) -> DfResult<ScalarValue> {
Ok(ScalarValue::Binary(Some(
bincode::serialize(&self).map_err(|e| {
bincode::serialize(&self.uddsketch).map_err(|e| {
DataFusionError::Internal(format!("Failed to serialize UDDSketch: {}", e))
})?,
)))
@@ -204,7 +150,7 @@ impl DfAccumulator for UddSketchState {
fn state(&mut self) -> DfResult<Vec<ScalarValue>> {
Ok(vec![ScalarValue::Binary(Some(
bincode::serialize(&self).map_err(|e| {
bincode::serialize(&self.uddsketch).map_err(|e| {
DataFusionError::Internal(format!("Failed to serialize UDDSketch: {}", e))
})?,
))])
@@ -214,7 +160,7 @@ impl DfAccumulator for UddSketchState {
let array = &states[0];
let binary_array = as_binary_array(array)?;
for v in binary_array.iter().flatten() {
self.merge(v)?;
self.merge(v);
}
Ok(())
@@ -236,8 +182,8 @@ mod tests {
let result = state.evaluate().unwrap();
if let ScalarValue::Binary(Some(bytes)) = result {
let deserialized: UddSketchState = bincode::deserialize(&bytes).unwrap();
assert_eq!(deserialized.uddsketch.count(), 3);
let deserialized: UDDSketch = bincode::deserialize(&bytes).unwrap();
assert_eq!(deserialized.count(), 3);
} else {
panic!("Expected binary scalar value");
}
@@ -255,15 +201,13 @@ mod tests {
// Create new state and merge the serialized data
let mut new_state = UddSketchState::new(10, 0.01);
if let ScalarValue::Binary(Some(bytes)) = &serialized {
new_state.merge(bytes).unwrap();
new_state.merge(bytes);
// Verify the merged state matches original by comparing deserialized values
let original_sketch: UddSketchState = bincode::deserialize(bytes).unwrap();
let original_sketch = original_sketch.uddsketch;
let original_sketch: UDDSketch = bincode::deserialize(bytes).unwrap();
let new_result = new_state.evaluate().unwrap();
if let ScalarValue::Binary(Some(new_bytes)) = new_result {
let new_sketch: UddSketchState = bincode::deserialize(&new_bytes).unwrap();
let new_sketch = new_sketch.uddsketch;
let new_sketch: UDDSketch = bincode::deserialize(&new_bytes).unwrap();
assert_eq!(original_sketch.count(), new_sketch.count());
assert_eq!(original_sketch.sum(), new_sketch.sum());
assert_eq!(original_sketch.mean(), new_sketch.mean());
@@ -300,8 +244,7 @@ mod tests {
let result = state.evaluate().unwrap();
if let ScalarValue::Binary(Some(bytes)) = result {
let deserialized: UddSketchState = bincode::deserialize(&bytes).unwrap();
let deserialized = deserialized.uddsketch;
let deserialized: UDDSketch = bincode::deserialize(&bytes).unwrap();
assert_eq!(deserialized.count(), 3);
} else {
panic!("Expected binary scalar value");
@@ -330,8 +273,7 @@ mod tests {
let result = merged_state.evaluate().unwrap();
if let ScalarValue::Binary(Some(bytes)) = result {
let deserialized: UddSketchState = bincode::deserialize(&bytes).unwrap();
let deserialized = deserialized.uddsketch;
let deserialized: UDDSketch = bincode::deserialize(&bytes).unwrap();
assert_eq!(deserialized.count(), 2);
} else {
panic!("Expected binary scalar value");

View File

@@ -12,9 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use std::iter::repeat_n;
use std::sync::Arc;
use std::{fmt, iter};
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::Volatility;
@@ -127,10 +126,9 @@ impl Function for MatchesTermFunction {
let term = term_column.get_ref(0).as_string().unwrap();
match term {
None => {
return Ok(Arc::new(BooleanVector::from_iter(repeat_n(
None,
text_column.len(),
))));
return Ok(Arc::new(BooleanVector::from_iter(
iter::repeat(None).take(text_column.len()),
)));
}
Some(term) => Some(MatchesTermFinder::new(term)),
}
@@ -219,7 +217,7 @@ impl MatchesTermFinder {
}
let mut pos = 0;
while let Some(found_pos) = self.finder.find(&text.as_bytes()[pos..]) {
while let Some(found_pos) = self.finder.find(text[pos..].as_bytes()) {
let actual_pos = pos + found_pos;
let prev_ok = self.starts_with_non_alnum

View File

@@ -192,10 +192,6 @@ impl FlightDecoder {
}
}
}
pub fn schema(&self) -> Option<&SchemaRef> {
self.schema.as_ref()
}
}
pub fn flight_messages_to_recordbatches(messages: Vec<FlightMessage>) -> Result<RecordBatches> {

View File

@@ -24,21 +24,39 @@ use crate::cache::{CacheContainer, Initializer};
use crate::error::Result;
use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
use crate::key::flow::{TableFlowManager, TableFlowManagerRef};
use crate::key::{FlowId, FlowPartitionId};
use crate::kv_backend::KvBackendRef;
use crate::peer::Peer;
use crate::FlownodeId;
type FlownodeSet = Arc<HashMap<FlownodeId, Peer>>;
/// Flow id&flow partition key
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct FlowIdent {
pub flow_id: FlowId,
pub partition_id: FlowPartitionId,
}
impl FlowIdent {
pub fn new(flow_id: FlowId, partition_id: FlowPartitionId) -> Self {
Self {
flow_id,
partition_id,
}
}
}
/// cache for TableFlowManager, the table_id part is in the outer cache
/// include flownode_id, flow_id, partition_id mapping to Peer
type FlownodeFlowSet = Arc<HashMap<FlowIdent, Peer>>;
pub type TableFlownodeSetCacheRef = Arc<TableFlownodeSetCache>;
/// [TableFlownodeSetCache] caches the [TableId] to [FlownodeSet] mapping.
pub type TableFlownodeSetCache = CacheContainer<TableId, FlownodeSet, CacheIdent>;
pub type TableFlownodeSetCache = CacheContainer<TableId, FlownodeFlowSet, CacheIdent>;
/// Constructs a [TableFlownodeSetCache].
pub fn new_table_flownode_set_cache(
name: String,
cache: Cache<TableId, FlownodeSet>,
cache: Cache<TableId, FlownodeFlowSet>,
kv_backend: KvBackendRef,
) -> TableFlownodeSetCache {
let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend));
@@ -47,7 +65,7 @@ pub fn new_table_flownode_set_cache(
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
}
fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId, FlownodeSet> {
fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId, FlownodeFlowSet> {
Arc::new(move |&table_id| {
let table_flow_manager = table_flow_manager.clone();
Box::pin(async move {
@@ -57,7 +75,12 @@ fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId,
.map(|flows| {
flows
.into_iter()
.map(|(key, value)| (key.flownode_id(), value.peer))
.map(|(key, value)| {
(
FlowIdent::new(key.flow_id(), key.partition_id()),
value.peer,
)
})
.collect::<HashMap<_, _>>()
})
// We must cache the `HashSet` even if it's empty,
@@ -71,26 +94,33 @@ fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId,
}
async fn handle_create_flow(
cache: &Cache<TableId, FlownodeSet>,
cache: &Cache<TableId, FlownodeFlowSet>,
CreateFlow {
flow_id,
source_table_ids,
flownodes: flownode_peers,
partition_to_peer_mapping: flow_part2nodes,
}: &CreateFlow,
) {
for table_id in source_table_ids {
let entry = cache.entry(*table_id);
entry
.and_compute_with(
async |entry: Option<moka::Entry<u32, Arc<HashMap<u64, _>>>>| match entry {
async |entry: Option<moka::Entry<u32, FlownodeFlowSet>>| match entry {
Some(entry) => {
let mut map = entry.into_value().as_ref().clone();
map.extend(flownode_peers.iter().map(|peer| (peer.id, peer.clone())));
map.extend(
flow_part2nodes.iter().map(|(part, peer)| {
(FlowIdent::new(*flow_id, *part), peer.clone())
}),
);
Op::Put(Arc::new(map))
}
None => Op::Put(Arc::new(HashMap::from_iter(
flownode_peers.iter().map(|peer| (peer.id, peer.clone())),
))),
None => {
Op::Put(Arc::new(HashMap::from_iter(flow_part2nodes.iter().map(
|(part, peer)| (FlowIdent::new(*flow_id, *part), peer.clone()),
))))
}
},
)
.await;
@@ -98,21 +128,23 @@ async fn handle_create_flow(
}
async fn handle_drop_flow(
cache: &Cache<TableId, FlownodeSet>,
cache: &Cache<TableId, FlownodeFlowSet>,
DropFlow {
flow_id,
source_table_ids,
flownode_ids,
flow_part2node_id,
}: &DropFlow,
) {
for table_id in source_table_ids {
let entry = cache.entry(*table_id);
entry
.and_compute_with(
async |entry: Option<moka::Entry<u32, Arc<HashMap<u64, _>>>>| match entry {
async |entry: Option<moka::Entry<u32, FlownodeFlowSet>>| match entry {
Some(entry) => {
let mut set = entry.into_value().as_ref().clone();
for flownode_id in flownode_ids {
set.remove(flownode_id);
for (part, _node) in flow_part2node_id {
let key = FlowIdent::new(*flow_id, *part);
set.remove(&key);
}
Op::Put(Arc::new(set))
@@ -128,7 +160,7 @@ async fn handle_drop_flow(
}
fn invalidator<'a>(
cache: &'a Cache<TableId, FlownodeSet>,
cache: &'a Cache<TableId, FlownodeFlowSet>,
ident: &'a CacheIdent,
) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
@@ -154,7 +186,7 @@ mod tests {
use moka::future::CacheBuilder;
use table::table_name::TableName;
use crate::cache::flow::table_flownode::new_table_flownode_set_cache;
use crate::cache::flow::table_flownode::{new_table_flownode_set_cache, FlowIdent};
use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::flow_route::FlowRouteValue;
@@ -214,12 +246,16 @@ mod tests {
let set = cache.get(1024).await.unwrap().unwrap();
assert_eq!(
set.as_ref().clone(),
HashMap::from_iter((1..=3).map(|i| { (i, Peer::empty(i),) }))
HashMap::from_iter(
(1..=3).map(|i| { (FlowIdent::new(1024, (i - 1) as u32), Peer::empty(i),) })
)
);
let set = cache.get(1025).await.unwrap().unwrap();
assert_eq!(
set.as_ref().clone(),
HashMap::from_iter((1..=3).map(|i| { (i, Peer::empty(i),) }))
HashMap::from_iter(
(1..=3).map(|i| { (FlowIdent::new(1024, (i - 1) as u32), Peer::empty(i),) })
)
);
let result = cache.get(1026).await.unwrap().unwrap();
assert_eq!(result.len(), 0);
@@ -231,8 +267,9 @@ mod tests {
let cache = CacheBuilder::new(128).build();
let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
let ident = vec![CacheIdent::CreateFlow(CreateFlow {
flow_id: 2001,
source_table_ids: vec![1024, 1025],
flownodes: (1..=5).map(Peer::empty).collect(),
partition_to_peer_mapping: (1..=5).map(|i| (i as u32, Peer::empty(i + 1))).collect(),
})];
cache.invalidate(&ident).await.unwrap();
let set = cache.get(1024).await.unwrap().unwrap();
@@ -241,6 +278,54 @@ mod tests {
assert_eq!(set.len(), 5);
}
#[tokio::test]
async fn test_replace_flow() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let cache = CacheBuilder::new(128).build();
let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
let ident = vec![CacheIdent::CreateFlow(CreateFlow {
flow_id: 2001,
source_table_ids: vec![1024, 1025],
partition_to_peer_mapping: (1..=5).map(|i| (i as u32, Peer::empty(i + 1))).collect(),
})];
cache.invalidate(&ident).await.unwrap();
let set = cache.get(1024).await.unwrap().unwrap();
assert_eq!(set.len(), 5);
let set = cache.get(1025).await.unwrap().unwrap();
assert_eq!(set.len(), 5);
let drop_then_create_flow = vec![
CacheIdent::DropFlow(DropFlow {
flow_id: 2001,
source_table_ids: vec![1024, 1025],
flow_part2node_id: (1..=5).map(|i| (i as u32, i + 1)).collect(),
}),
CacheIdent::CreateFlow(CreateFlow {
flow_id: 2001,
source_table_ids: vec![1026, 1027],
partition_to_peer_mapping: (11..=15)
.map(|i| (i as u32, Peer::empty(i + 1)))
.collect(),
}),
CacheIdent::FlowId(2001),
];
cache.invalidate(&drop_then_create_flow).await.unwrap();
let set = cache.get(1024).await.unwrap().unwrap();
assert!(set.is_empty());
let expected = HashMap::from_iter(
(11..=15).map(|i| (FlowIdent::new(2001, i as u32), Peer::empty(i + 1))),
);
let set = cache.get(1026).await.unwrap().unwrap();
assert_eq!(set.as_ref().clone(), expected);
let set = cache.get(1027).await.unwrap().unwrap();
assert_eq!(set.as_ref().clone(), expected);
}
#[tokio::test]
async fn test_drop_flow() {
let mem_kv = Arc::new(MemoryKvBackend::default());
@@ -248,34 +333,57 @@ mod tests {
let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
let ident = vec![
CacheIdent::CreateFlow(CreateFlow {
flow_id: 2001,
source_table_ids: vec![1024, 1025],
flownodes: (1..=5).map(Peer::empty).collect(),
partition_to_peer_mapping: (1..=5)
.map(|i| (i as u32, Peer::empty(i + 1)))
.collect(),
}),
CacheIdent::CreateFlow(CreateFlow {
flow_id: 2002,
source_table_ids: vec![1024, 1025],
flownodes: (11..=12).map(Peer::empty).collect(),
partition_to_peer_mapping: (11..=12)
.map(|i| (i as u32, Peer::empty(i + 1)))
.collect(),
}),
// same flownode that hold multiple flows
CacheIdent::CreateFlow(CreateFlow {
flow_id: 2003,
source_table_ids: vec![1024, 1025],
partition_to_peer_mapping: (1..=5)
.map(|i| (i as u32, Peer::empty(i + 1)))
.collect(),
}),
];
cache.invalidate(&ident).await.unwrap();
let set = cache.get(1024).await.unwrap().unwrap();
assert_eq!(set.len(), 7);
assert_eq!(set.len(), 12);
let set = cache.get(1025).await.unwrap().unwrap();
assert_eq!(set.len(), 7);
assert_eq!(set.len(), 12);
let ident = vec![CacheIdent::DropFlow(DropFlow {
flow_id: 2001,
source_table_ids: vec![1024, 1025],
flownode_ids: vec![1, 2, 3, 4, 5],
flow_part2node_id: (1..=5).map(|i| (i as u32, i + 1)).collect(),
})];
cache.invalidate(&ident).await.unwrap();
let set = cache.get(1024).await.unwrap().unwrap();
assert_eq!(
set.as_ref().clone(),
HashMap::from_iter((11..=12).map(|i| { (i, Peer::empty(i),) }))
HashMap::from_iter(
(11..=12)
.map(|i| (FlowIdent::new(2002, i as u32), Peer::empty(i + 1)))
.chain((1..=5).map(|i| (FlowIdent::new(2003, i as u32), Peer::empty(i + 1))))
)
);
let set = cache.get(1025).await.unwrap().unwrap();
assert_eq!(
set.as_ref().clone(),
HashMap::from_iter((11..=12).map(|i| { (i, Peer::empty(i),) }))
HashMap::from_iter(
(11..=12)
.map(|i| (FlowIdent::new(2002, i as u32), Peer::empty(i + 1)))
.chain((1..=5).map(|i| (FlowIdent::new(2003, i as u32), Peer::empty(i + 1))))
)
);
}
}

View File

@@ -16,9 +16,12 @@ use std::sync::Arc;
use crate::error::Result;
use crate::flow_name::FlowName;
use crate::instruction::CacheIdent;
use crate::instruction::{CacheIdent, DropFlow};
use crate::key::flow::flow_info::FlowInfoKey;
use crate::key::flow::flow_name::FlowNameKey;
use crate::key::flow::flow_route::FlowRouteKey;
use crate::key::flow::flownode_flow::FlownodeFlowKey;
use crate::key::flow::table_flow::TableFlowKey;
use crate::key::schema_name::SchemaNameKey;
use crate::key::table_info::TableInfoKey;
use crate::key::table_name::TableNameKey;
@@ -89,9 +92,40 @@ where
let key: SchemaNameKey = schema_name.into();
self.invalidate_key(&key.to_bytes()).await;
}
CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_) => {
CacheIdent::CreateFlow(_) => {
// Do nothing
}
CacheIdent::DropFlow(DropFlow {
flow_id,
source_table_ids,
flow_part2node_id,
}) => {
// invalidate flow route/flownode flow/table flow
let mut keys = Vec::with_capacity(
source_table_ids.len() * flow_part2node_id.len()
+ flow_part2node_id.len() * 2,
);
for table_id in source_table_ids {
for (partition_id, node_id) in flow_part2node_id {
let key =
TableFlowKey::new(*table_id, *node_id, *flow_id, *partition_id)
.to_bytes();
keys.push(key);
}
}
for (partition_id, node_id) in flow_part2node_id {
let key =
FlownodeFlowKey::new(*node_id, *flow_id, *partition_id).to_bytes();
keys.push(key);
let key = FlowRouteKey::new(*flow_id, *partition_id).to_bytes();
keys.push(key);
}
for key in keys {
self.invalidate_key(&key).await;
}
}
CacheIdent::FlowName(FlowName {
catalog_name,
flow_name,

View File

@@ -39,7 +39,7 @@ use crate::cache_invalidator::Context;
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error};
use crate::ddl::DdlContext;
use crate::error::{self, Result, UnexpectedSnafu};
use crate::instruction::{CacheIdent, CreateFlow};
use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::flow_route::FlowRouteValue;
use crate::key::table_name::TableNameKey;
@@ -70,6 +70,7 @@ impl CreateFlowProcedure {
query_context,
state: CreateFlowState::Prepare,
prev_flow_info_value: None,
did_replace: false,
flow_type: None,
},
}
@@ -224,6 +225,7 @@ impl CreateFlowProcedure {
.update_flow_metadata(flow_id, prev_flow_value, &flow_info, flow_routes)
.await?;
info!("Replaced flow metadata for flow {flow_id}");
self.data.did_replace = true;
} else {
self.context
.flow_metadata_manager
@@ -240,22 +242,43 @@ impl CreateFlowProcedure {
debug_assert!(self.data.state == CreateFlowState::InvalidateFlowCache);
// Safety: The flow id must be allocated.
let flow_id = self.data.flow_id.unwrap();
let did_replace = self.data.did_replace;
let ctx = Context {
subject: Some("Invalidate flow cache by creating flow".to_string()),
};
let mut caches = vec![];
// if did replaced, invalidate the flow cache with drop the old flow
if did_replace {
let old_flow_info = self.data.prev_flow_info_value.as_ref().unwrap();
// only drop flow is needed, since flow name haven't changed, and flow id already invalidated below
caches.extend([CacheIdent::DropFlow(DropFlow {
flow_id,
source_table_ids: old_flow_info.source_table_ids.clone(),
flow_part2node_id: old_flow_info.flownode_ids().clone().into_iter().collect(),
})]);
}
let (_flow_info, flow_routes) = (&self.data).into();
let flow_part2peers = flow_routes
.into_iter()
.map(|(part_id, route)| (part_id, route.peer))
.collect();
caches.extend([
CacheIdent::CreateFlow(CreateFlow {
flow_id,
source_table_ids: self.data.source_table_ids.clone(),
partition_to_peer_mapping: flow_part2peers,
}),
CacheIdent::FlowId(flow_id),
]);
self.context
.cache_invalidator
.invalidate(
&ctx,
&[
CacheIdent::CreateFlow(CreateFlow {
source_table_ids: self.data.source_table_ids.clone(),
flownodes: self.data.peers.clone(),
}),
CacheIdent::FlowId(flow_id),
],
)
.invalidate(&ctx, &caches)
.await?;
Ok(Status::done_with_output(flow_id))
@@ -377,6 +400,10 @@ pub struct CreateFlowData {
/// For verify if prev value is consistent when need to update flow metadata.
/// only set when `or_replace` is true.
pub(crate) prev_flow_info_value: Option<DeserializedValueWithBytes<FlowInfoValue>>,
/// Only set to true when replace actually happened.
/// This is used to determine whether to invalidate the cache.
#[serde(default)]
pub(crate) did_replace: bool,
pub(crate) flow_type: Option<FlowType>,
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
mod metadata;
use api::v1::flow::{flow_request, DropRequest, FlowRequest};
use async_trait::async_trait;
use common_catalog::format_full_flow_name;
@@ -153,6 +154,12 @@ impl DropFlowProcedure {
};
let flow_info_value = self.data.flow_info_value.as_ref().unwrap();
let flow_part2nodes = flow_info_value
.flownode_ids()
.clone()
.into_iter()
.collect::<Vec<_>>();
self.context
.cache_invalidator
.invalidate(
@@ -164,8 +171,9 @@ impl DropFlowProcedure {
flow_name: flow_info_value.flow_name.to_string(),
}),
CacheIdent::DropFlow(DropFlow {
flow_id,
source_table_ids: flow_info_value.source_table_ids.clone(),
flownode_ids: flow_info_value.flownode_ids.values().cloned().collect(),
flow_part2node_id: flow_part2nodes,
}),
],
)

View File

@@ -514,11 +514,25 @@ pub enum Error {
},
#[snafu(display(
"Failed to build a Kafka partition client, topic: {}, partition: {}",
"Failed to get a Kafka partition client, topic: {}, partition: {}",
topic,
partition
))]
BuildKafkaPartitionClient {
KafkaPartitionClient {
topic: String,
partition: i32,
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
},
#[snafu(display(
"Failed to get offset from Kafka, topic: {}, partition: {}",
topic,
partition
))]
KafkaGetOffset {
topic: String,
partition: i32,
#[snafu(implicit)]
@@ -843,7 +857,7 @@ impl ErrorExt for Error {
| EncodeWalOptions { .. }
| BuildKafkaClient { .. }
| BuildKafkaCtrlClient { .. }
| BuildKafkaPartitionClient { .. }
| KafkaPartitionClient { .. }
| ResolveKafkaEndpoint { .. }
| ProduceRecord { .. }
| CreateKafkaWalTopic { .. }
@@ -852,7 +866,8 @@ impl ErrorExt for Error {
| ProcedureOutput { .. }
| FromUtf8 { .. }
| MetadataCorruption { .. }
| ParseWalOptions { .. } => StatusCode::Unexpected,
| ParseWalOptions { .. }
| KafkaGetOffset { .. } => StatusCode::Unexpected,
SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,

View File

@@ -24,7 +24,7 @@ use table::table_name::TableName;
use crate::flow_name::FlowName;
use crate::key::schema_name::SchemaName;
use crate::key::FlowId;
use crate::key::{FlowId, FlowPartitionId};
use crate::peer::Peer;
use crate::{DatanodeId, FlownodeId};
@@ -184,14 +184,19 @@ pub enum CacheIdent {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct CreateFlow {
/// The unique identifier for the flow.
pub flow_id: FlowId,
pub source_table_ids: Vec<TableId>,
pub flownodes: Vec<Peer>,
/// Mapping of flow partition to peer information
pub partition_to_peer_mapping: Vec<(FlowPartitionId, Peer)>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DropFlow {
pub flow_id: FlowId,
pub source_table_ids: Vec<TableId>,
pub flownode_ids: Vec<FlownodeId>,
/// Mapping of flow partition to flownode id
pub flow_part2node_id: Vec<(FlowPartitionId, FlownodeId)>,
}
/// Flushes a batch of regions.
@@ -217,9 +222,7 @@ pub enum Instruction {
/// Invalidates batch cache.
InvalidateCaches(Vec<CacheIdent>),
/// Flushes regions.
FlushRegions(FlushRegions),
/// Flushes a single region.
FlushRegion(RegionId),
FlushRegion(FlushRegions),
}
/// The reply of [UpgradeRegion].
@@ -250,7 +253,6 @@ pub enum InstructionReply {
CloseRegion(SimpleReply),
UpgradeRegion(UpgradeRegionReply),
DowngradeRegion(DowngradeRegionReply),
FlushRegion(SimpleReply),
}
impl Display for InstructionReply {
@@ -262,7 +264,6 @@ impl Display for InstructionReply {
Self::DowngradeRegion(reply) => {
write!(f, "InstructionReply::DowngradeRegion({})", reply)
}
Self::FlushRegion(reply) => write!(f, "InstructionReply::FlushRegion({})", reply),
}
}
}

View File

@@ -112,7 +112,7 @@ pub mod test_utils;
mod tombstone;
pub mod topic_name;
pub mod topic_region;
pub mod txn_helper;
pub(crate) mod txn_helper;
pub mod view_info;
use std::collections::{BTreeMap, HashMap, HashSet};

View File

@@ -256,6 +256,11 @@ impl DatanodeTableManager {
})?
.and_then(|r| DatanodeTableValue::try_from_raw_value(&r.value))?
.region_info;
// If the region options are the same, we don't need to update it.
if region_info.region_options == new_region_options {
return Ok(Txn::new());
}
// substitute region options only.
region_info.region_options = new_region_options;

View File

@@ -45,7 +45,7 @@ use crate::kv_backend::KvBackendRef;
use crate::rpc::store::BatchDeleteRequest;
/// The key of `__flow/` scope.
#[derive(Debug, PartialEq)]
#[derive(Debug, Clone, PartialEq)]
pub struct FlowScoped<T> {
inner: T,
}
@@ -246,27 +246,32 @@ impl FlowMetadataManager {
new_flow_info: &FlowInfoValue,
flow_routes: Vec<(FlowPartitionId, FlowRouteValue)>,
) -> Result<()> {
let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) =
let (update_flow_flow_name_txn, on_create_flow_flow_name_failure) =
self.flow_name_manager.build_update_txn(
&new_flow_info.catalog_name,
&new_flow_info.flow_name,
flow_id,
)?;
let (create_flow_txn, on_create_flow_failure) =
let (update_flow_txn, on_create_flow_failure) =
self.flow_info_manager
.build_update_txn(flow_id, current_flow_info, new_flow_info)?;
let create_flow_routes_txn = self
.flow_route_manager
.build_create_txn(flow_id, flow_routes.clone())?;
let create_flownode_flow_txn = self
.flownode_flow_manager
.build_create_txn(flow_id, new_flow_info.flownode_ids().clone());
let create_table_flow_txn = self.table_flow_manager.build_create_txn(
let update_flow_routes_txn = self.flow_route_manager.build_update_txn(
flow_id,
current_flow_info,
flow_routes.clone(),
)?;
let update_flownode_flow_txn = self.flownode_flow_manager.build_update_txn(
flow_id,
current_flow_info,
new_flow_info.flownode_ids().clone(),
);
let update_table_flow_txn = self.table_flow_manager.build_update_txn(
flow_id,
current_flow_info,
flow_routes
.into_iter()
.map(|(partition_id, route)| (partition_id, TableFlowValue { peer: route.peer }))
@@ -275,11 +280,11 @@ impl FlowMetadataManager {
)?;
let txn = Txn::merge_all(vec![
create_flow_flow_name_txn,
create_flow_txn,
create_flow_routes_txn,
create_flownode_flow_txn,
create_table_flow_txn,
update_flow_flow_name_txn,
update_flow_txn,
update_flow_routes_txn,
update_flownode_flow_txn,
update_table_flow_txn,
]);
info!(
"Creating flow {}.{}({}), with {} txn operations",
@@ -783,6 +788,141 @@ mod tests {
}
}
#[tokio::test]
async fn test_update_flow_metadata_diff_flownode() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
let flow_id = 10;
let flow_value = test_flow_info_value(
"flow",
[(0u32, 1u64), (1u32, 2u64)].into(),
vec![1024, 1025, 1026],
);
let flow_routes = vec![
(
0u32,
FlowRouteValue {
peer: Peer::empty(1),
},
),
(
1,
FlowRouteValue {
peer: Peer::empty(2),
},
),
];
flow_metadata_manager
.create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
.await
.unwrap();
let new_flow_value = {
let mut tmp = flow_value.clone();
tmp.raw_sql = "new".to_string();
// move to different flownodes
tmp.flownode_ids = [(0, 3u64), (1, 4u64)].into();
tmp
};
let new_flow_routes = vec![
(
0u32,
FlowRouteValue {
peer: Peer::empty(3),
},
),
(
1,
FlowRouteValue {
peer: Peer::empty(4),
},
),
];
// Update flow instead
flow_metadata_manager
.update_flow_metadata(
flow_id,
&DeserializedValueWithBytes::from_inner(flow_value.clone()),
&new_flow_value,
new_flow_routes.clone(),
)
.await
.unwrap();
let got = flow_metadata_manager
.flow_info_manager()
.get(flow_id)
.await
.unwrap()
.unwrap();
let routes = flow_metadata_manager
.flow_route_manager()
.routes(flow_id)
.await
.unwrap();
assert_eq!(
routes,
vec![
(
FlowRouteKey::new(flow_id, 0),
FlowRouteValue {
peer: Peer::empty(3),
},
),
(
FlowRouteKey::new(flow_id, 1),
FlowRouteValue {
peer: Peer::empty(4),
},
),
]
);
assert_eq!(got, new_flow_value);
let flows = flow_metadata_manager
.flownode_flow_manager()
.flows(1)
.try_collect::<Vec<_>>()
.await
.unwrap();
// should moved to different flownode
assert_eq!(flows, vec![]);
let flows = flow_metadata_manager
.flownode_flow_manager()
.flows(3)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(flows, vec![(flow_id, 0)]);
for table_id in [1024, 1025, 1026] {
let nodes = flow_metadata_manager
.table_flow_manager()
.flows(table_id)
.await
.unwrap();
assert_eq!(
nodes,
vec![
(
TableFlowKey::new(table_id, 3, flow_id, 0),
TableFlowValue {
peer: Peer::empty(3)
}
),
(
TableFlowKey::new(table_id, 4, flow_id, 1),
TableFlowValue {
peer: Peer::empty(4)
}
)
]
);
}
}
#[tokio::test]
async fn test_update_flow_metadata_flow_replace_diff_id_err() {
let mem_kv = Arc::new(MemoryKvBackend::default());

View File

@@ -153,6 +153,15 @@ impl FlowInfoValue {
&self.flownode_ids
}
/// Insert a new flownode id for a partition.
pub fn insert_flownode_id(
&mut self,
partition: FlowPartitionId,
node: FlownodeId,
) -> Option<FlownodeId> {
self.flownode_ids.insert(partition, node)
}
/// Returns the `source_table`.
pub fn source_table_ids(&self) -> &[TableId] {
&self.source_table_ids
@@ -272,10 +281,11 @@ impl FlowInfoManager {
let raw_value = new_flow_value.try_as_raw_value()?;
let prev_value = current_flow_value.get_raw_bytes();
let txn = Txn::new()
.when(vec![
Compare::new(key.clone(), CompareOp::NotEqual, None),
Compare::new(key.clone(), CompareOp::Equal, Some(prev_value)),
])
.when(vec![Compare::new(
key.clone(),
CompareOp::Equal,
Some(prev_value),
)])
.and_then(vec![TxnOp::Put(key.clone(), raw_value)])
.or_else(vec![TxnOp::Get(key.clone())]);

View File

@@ -19,9 +19,12 @@ use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use crate::error::{self, Result};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::{flownode_addr_helper, FlowScoped};
use crate::key::node_address::NodeAddressKey;
use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetadataKey, MetadataValue};
use crate::key::{
BytesAdapter, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue,
};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::peer::Peer;
@@ -39,7 +42,7 @@ lazy_static! {
/// The key stores the route info of the flow.
///
/// The layout: `__flow/route/{flow_id}/{partition_id}`.
#[derive(Debug, PartialEq)]
#[derive(Debug, Clone, PartialEq)]
pub struct FlowRouteKey(FlowScoped<FlowRouteKeyInner>);
impl FlowRouteKey {
@@ -142,6 +145,12 @@ pub struct FlowRouteValue {
pub(crate) peer: Peer,
}
impl From<Peer> for FlowRouteValue {
fn from(peer: Peer) -> Self {
Self { peer }
}
}
impl FlowRouteValue {
/// Returns the `peer`.
pub fn peer(&self) -> &Peer {
@@ -204,6 +213,33 @@ impl FlowRouteManager {
Ok(Txn::new().and_then(txns))
}
/// Builds a update flow routes transaction.
///
/// Puts `__flow/route/{flow_id}/{partition_id}` keys.
/// Also removes `__flow/route/{flow_id}/{old_partition_id}` keys.
pub(crate) fn build_update_txn<I: IntoIterator<Item = (FlowPartitionId, FlowRouteValue)>>(
&self,
flow_id: FlowId,
current_flow_info: &DeserializedValueWithBytes<FlowInfoValue>,
flow_routes: I,
) -> Result<Txn> {
let del_txns = current_flow_info
.flownode_ids()
.iter()
.map(|(partition_id, _)| {
let key = FlowRouteKey::new(flow_id, *partition_id).to_bytes();
Ok(TxnOp::Delete(key))
});
let put_txns = flow_routes.into_iter().map(|(partition_id, route)| {
let key = FlowRouteKey::new(flow_id, partition_id).to_bytes();
Ok(TxnOp::Put(key, route.try_as_raw_value()?))
});
let txns = del_txns.chain(put_txns).collect::<Result<Vec<_>>>()?;
Ok(Txn::new().and_then(txns))
}
async fn remap_flow_route_addresses(
&self,
flow_routes: &mut [(FlowRouteKey, FlowRouteValue)],

View File

@@ -19,8 +19,9 @@ use regex::Regex;
use snafu::OptionExt;
use crate::error::{self, Result};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::FlowScoped;
use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetadataKey};
use crate::key::{BytesAdapter, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
@@ -165,6 +166,17 @@ impl FlownodeFlowManager {
Self { kv_backend }
}
/// Whether given flow exist on this flownode.
pub async fn exists(
&self,
flownode_id: FlownodeId,
flow_id: FlowId,
partition_id: FlowPartitionId,
) -> Result<bool> {
let key = FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes();
Ok(self.kv_backend.get(&key).await?.is_some())
}
/// Retrieves all [FlowId] and [FlowPartitionId]s of the specified `flownode_id`.
pub fn flows(
&self,
@@ -202,6 +214,33 @@ impl FlownodeFlowManager {
Txn::new().and_then(txns)
}
/// Builds a update flownode flow transaction.
///
/// Puts `__flownode_flow/{flownode_id}/{flow_id}/{partition_id}` keys.
/// Remove the old `__flownode_flow/{old_flownode_id}/{flow_id}/{old_partition_id}` keys.
pub(crate) fn build_update_txn<I: IntoIterator<Item = (FlowPartitionId, FlownodeId)>>(
&self,
flow_id: FlowId,
current_flow_info: &DeserializedValueWithBytes<FlowInfoValue>,
flownode_ids: I,
) -> Txn {
let del_txns =
current_flow_info
.flownode_ids()
.iter()
.map(|(partition_id, flownode_id)| {
let key = FlownodeFlowKey::new(*flownode_id, flow_id, *partition_id).to_bytes();
TxnOp::Delete(key)
});
let put_txns = flownode_ids.into_iter().map(|(partition_id, flownode_id)| {
let key = FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes();
TxnOp::Put(key, vec![])
});
let txns = del_txns.chain(put_txns).collect::<Vec<_>>();
Txn::new().and_then(txns)
}
}
#[cfg(test)]

View File

@@ -22,9 +22,12 @@ use snafu::OptionExt;
use table::metadata::TableId;
use crate::error::{self, Result};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::{flownode_addr_helper, FlowScoped};
use crate::key::node_address::NodeAddressKey;
use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetadataKey, MetadataValue};
use crate::key::{
BytesAdapter, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue,
};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::peer::Peer;
@@ -215,7 +218,7 @@ impl TableFlowManager {
/// Builds a create table flow transaction.
///
/// Puts `__flow/source_table/{table_id}/{node_id}/{partition_id}` keys.
/// Puts `__flow/source_table/{table_id}/{node_id}/{flow_id}/{partition_id}` keys.
pub fn build_create_txn(
&self,
flow_id: FlowId,
@@ -239,6 +242,44 @@ impl TableFlowManager {
Ok(Txn::new().and_then(txns))
}
/// Builds a update table flow transaction.
///
/// Puts `__flow/source_table/{table_id}/{node_id}/{flow_id}/{partition_id}` keys,
/// Also remove previous
/// `__flow/source_table/{table_id}/{old_node_id}/{flow_id}/{partition_id}` keys.
pub fn build_update_txn(
&self,
flow_id: FlowId,
current_flow_info: &DeserializedValueWithBytes<FlowInfoValue>,
table_flow_values: Vec<(FlowPartitionId, TableFlowValue)>,
source_table_ids: &[TableId],
) -> Result<Txn> {
let mut txns = Vec::with_capacity(2 * source_table_ids.len() * table_flow_values.len());
// first remove the old keys
for (part_id, node_id) in current_flow_info.flownode_ids() {
for source_table_id in current_flow_info.source_table_ids() {
txns.push(TxnOp::Delete(
TableFlowKey::new(*source_table_id, *node_id, flow_id, *part_id).to_bytes(),
));
}
}
for (partition_id, table_flow_value) in table_flow_values {
let flownode_id = table_flow_value.peer.id;
let value = table_flow_value.try_as_raw_value()?;
for source_table_id in source_table_ids {
txns.push(TxnOp::Put(
TableFlowKey::new(*source_table_id, flownode_id, flow_id, partition_id)
.to_bytes(),
value.clone(),
));
}
}
Ok(Txn::new().and_then(txns))
}
async fn remap_table_flow_addresses(
&self,
table_flows: &mut [(TableFlowKey, TableFlowValue)],

View File

@@ -478,11 +478,10 @@ impl TableRouteStorage {
))
}
// TODO(LFC): restore its original visibility after some test utility codes are refined
/// Builds a update table route transaction,
/// it expected the remote value equals the `current_table_route_value`.
/// It retrieves the latest value if the comparing failed.
pub fn build_update_txn(
pub(crate) fn build_update_txn(
&self,
table_id: TableId,
current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,

View File

@@ -25,7 +25,7 @@ pub struct TxnOpGetResponseSet(Vec<KeyValue>);
impl TxnOpGetResponseSet {
/// Returns a filter to consume a [KeyValue] where the key equals `key`.
pub fn filter(key: Vec<u8>) -> impl FnMut(&mut TxnOpGetResponseSet) -> Option<Vec<u8>> {
pub(crate) fn filter(key: Vec<u8>) -> impl FnMut(&mut TxnOpGetResponseSet) -> Option<Vec<u8>> {
move |set| {
let pos = set.0.iter().position(|kv| kv.key == key);
match pos {
@@ -36,7 +36,7 @@ impl TxnOpGetResponseSet {
}
/// Returns a decoder to decode bytes to `DeserializedValueWithBytes<T>`.
pub fn decode_with<F, T>(
pub(crate) fn decode_with<F, T>(
mut f: F,
) -> impl FnMut(&mut TxnOpGetResponseSet) -> Result<Option<DeserializedValueWithBytes<T>>>
where

View File

@@ -15,6 +15,8 @@
#![feature(assert_matches)]
#![feature(btree_extract_if)]
#![feature(let_chains)]
#![feature(extract_if)]
#![feature(hash_extract_if)]
pub mod cache;
pub mod cache_invalidator;

View File

@@ -121,8 +121,8 @@ pub enum FlowNameLock {
}
impl FlowNameLock {
pub fn new(catalog: &str, flow_name: &str) -> Self {
Self::Write(format!("{catalog}.{flow_name}"))
pub fn new(catalog: &str, table: &str) -> Self {
Self::Write(format!("{catalog}.{table}"))
}
}

View File

@@ -176,12 +176,15 @@ impl TableRoute {
})?
.into();
let leader_peer = peers.get(region_route.leader_peer_index as usize).cloned();
let leader_peer = peers
.get(region_route.leader_peer_index as usize)
.cloned()
.map(Into::into);
let follower_peers = region_route
.follower_peer_indexes
.into_iter()
.filter_map(|x| peers.get(x as usize).cloned())
.filter_map(|x| peers.get(x as usize).cloned().map(Into::into))
.collect::<Vec<_>>();
region_routes.push(RegionRoute {

View File

@@ -20,6 +20,8 @@ use api::v1::region::{InsertRequests, RegionRequest};
pub use common_base::AffectedRows;
use common_query::request::QueryRequest;
use common_recordbatch::SendableRecordBatchStream;
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::config::kafka::MetasrvKafkaConfig;
use crate::cache_invalidator::DummyCacheInvalidator;
use crate::ddl::flow_meta::FlowMetadataAllocator;
@@ -37,7 +39,8 @@ use crate::peer::{Peer, PeerLookupService};
use crate::region_keeper::MemoryRegionKeeper;
use crate::region_registry::LeaderRegionRegistry;
use crate::sequence::SequenceBuilder;
use crate::wal_options_allocator::WalOptionsAllocator;
use crate::wal_options_allocator::topic_pool::KafkaTopicPool;
use crate::wal_options_allocator::{build_kafka_topic_creator, WalOptionsAllocator};
use crate::{DatanodeId, FlownodeId};
#[async_trait::async_trait]
@@ -199,3 +202,34 @@ impl PeerLookupService for NoopPeerLookupService {
Ok(Some(Peer::empty(id)))
}
}
/// Create a kafka topic pool for testing.
pub async fn test_kafka_topic_pool(
broker_endpoints: Vec<String>,
num_topics: usize,
auto_create_topics: bool,
topic_name_prefix: Option<&str>,
) -> KafkaTopicPool {
let mut config = MetasrvKafkaConfig {
connection: KafkaConnectionConfig {
broker_endpoints,
..Default::default()
},
kafka_topic: KafkaTopicConfig {
num_topics,
..Default::default()
},
auto_create_topics,
..Default::default()
};
if let Some(prefix) = topic_name_prefix {
config.kafka_topic.topic_name_prefix = prefix.to_string();
}
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let topic_creator = build_kafka_topic_creator(&config.connection, &config.kafka_topic)
.await
.unwrap();
KafkaTopicPool::new(&config, kv_backend, topic_creator)
}

View File

@@ -112,7 +112,9 @@ pub async fn build_wal_options_allocator(
NAME_PATTERN_REGEX.is_match(prefix),
InvalidTopicNamePrefixSnafu { prefix }
);
let topic_creator = build_kafka_topic_creator(kafka_config).await?;
let topic_creator =
build_kafka_topic_creator(&kafka_config.connection, &kafka_config.kafka_topic)
.await?;
let topic_pool = KafkaTopicPool::new(kafka_config, kv_backend, topic_creator);
Ok(WalOptionsAllocator::Kafka(topic_pool))
}
@@ -151,13 +153,16 @@ pub fn prepare_wal_options(
mod tests {
use std::assert_matches::assert_matches;
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::config::kafka::common::KafkaTopicConfig;
use common_wal::config::kafka::MetasrvKafkaConfig;
use common_wal::test_util::run_test_with_kafka_wal;
use common_wal::maybe_skip_kafka_integration_test;
use common_wal::test_util::get_kafka_endpoints;
use super::*;
use crate::error::Error;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::test_util::test_kafka_topic_pool;
use crate::wal_options_allocator::selector::RoundRobinTopicSelector;
// Tests that the wal options allocator could successfully allocate raft-engine wal options.
#[tokio::test]
@@ -197,55 +202,42 @@ mod tests {
assert_matches!(got, Error::InvalidTopicNamePrefix { .. });
}
// Tests that the wal options allocator could successfully allocate Kafka wal options.
#[tokio::test]
async fn test_allocator_with_kafka() {
run_test_with_kafka_wal(|broker_endpoints| {
Box::pin(async {
let topics = (0..256)
.map(|i| format!("test_allocator_with_kafka_{}_{}", i, uuid::Uuid::new_v4()))
.collect::<Vec<_>>();
// Creates a topic manager.
let kafka_topic = KafkaTopicConfig {
replication_factor: broker_endpoints.len() as i16,
..Default::default()
};
let config = MetasrvKafkaConfig {
connection: KafkaConnectionConfig {
broker_endpoints,
..Default::default()
},
kafka_topic,
..Default::default()
};
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let topic_creator = build_kafka_topic_creator(&config).await.unwrap();
let mut topic_pool = KafkaTopicPool::new(&config, kv_backend, topic_creator);
topic_pool.topics.clone_from(&topics);
topic_pool.selector = Arc::new(selector::RoundRobinTopicSelector::default());
// Creates an options allocator.
let allocator = WalOptionsAllocator::Kafka(topic_pool);
allocator.start().await.unwrap();
let num_regions = 32;
let regions = (0..num_regions).collect::<Vec<_>>();
let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap();
// Check the allocated wal options contain the expected topics.
let expected = (0..num_regions)
.map(|i| {
let options = WalOptions::Kafka(KafkaWalOptions {
topic: topics[i as usize].clone(),
});
(i, serde_json::to_string(&options).unwrap())
})
.collect::<HashMap<_, _>>();
assert_eq!(got, expected);
})
})
async fn test_allocator_with_kafka_allocate_wal_options() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let num_topics = 5;
let mut topic_pool = test_kafka_topic_pool(
get_kafka_endpoints(),
num_topics,
true,
Some("test_allocator_with_kafka"),
)
.await;
topic_pool.selector = Arc::new(RoundRobinTopicSelector::default());
let topics = topic_pool.topics.clone();
// clean up the topics before test
let topic_creator = topic_pool.topic_creator();
topic_creator.delete_topics(&topics).await.unwrap();
// Creates an options allocator.
let allocator = WalOptionsAllocator::Kafka(topic_pool);
allocator.start().await.unwrap();
let num_regions = 3;
let regions = (0..num_regions).collect::<Vec<_>>();
let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap();
// Check the allocated wal options contain the expected topics.
let expected = (0..num_regions)
.map(|i| {
let options = WalOptions::Kafka(KafkaWalOptions {
topic: topics[i as usize].clone(),
});
(i, serde_json::to_string(&options).unwrap())
})
.collect::<HashMap<_, _>>();
assert_eq!(got, expected);
}
#[tokio::test]

View File

@@ -12,20 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::{error, info};
use common_wal::config::kafka::common::DEFAULT_BACKOFF_CONFIG;
use common_wal::config::kafka::MetasrvKafkaConfig;
use common_telemetry::{debug, error, info};
use common_wal::config::kafka::common::{
KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_BACKOFF_CONFIG,
};
use rskafka::client::error::Error as RsKafkaError;
use rskafka::client::error::ProtocolError::TopicAlreadyExists;
use rskafka::client::partition::{Compression, UnknownTopicHandling};
use rskafka::client::partition::{Compression, OffsetAt, PartitionClient, UnknownTopicHandling};
use rskafka::client::{Client, ClientBuilder};
use rskafka::record::Record;
use snafu::ResultExt;
use crate::error::{
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, BuildKafkaPartitionClientSnafu,
CreateKafkaWalTopicSnafu, ProduceRecordSnafu, ResolveKafkaEndpointSnafu, Result,
TlsConfigSnafu,
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu,
KafkaGetOffsetSnafu, KafkaPartitionClientSnafu, ProduceRecordSnafu, ResolveKafkaEndpointSnafu,
Result, TlsConfigSnafu,
};
// Each topic only has one partition for now.
@@ -70,21 +71,47 @@ impl KafkaTopicCreator {
info!("The topic {} already exists", topic);
Ok(())
} else {
error!("Failed to create a topic {}, error {:?}", topic, e);
error!(e; "Failed to create a topic {}", topic);
Err(e).context(CreateKafkaWalTopicSnafu)
}
}
}
}
async fn append_noop_record(&self, topic: &String, client: &Client) -> Result<()> {
let partition_client = client
async fn prepare_topic(&self, topic: &String) -> Result<()> {
let partition_client = self.partition_client(topic).await?;
self.append_noop_record(topic, &partition_client).await?;
Ok(())
}
/// Creates a [PartitionClient] for the given topic.
async fn partition_client(&self, topic: &str) -> Result<PartitionClient> {
self.client
.partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry)
.await
.context(BuildKafkaPartitionClientSnafu {
.context(KafkaPartitionClientSnafu {
topic,
partition: DEFAULT_PARTITION,
})
}
/// Appends a noop record to the topic.
/// It only appends a noop record if the topic is empty.
async fn append_noop_record(
&self,
topic: &String,
partition_client: &PartitionClient,
) -> Result<()> {
let end_offset = partition_client
.get_offset(OffsetAt::Latest)
.await
.context(KafkaGetOffsetSnafu {
topic: topic.to_string(),
partition: DEFAULT_PARTITION,
})?;
if end_offset > 0 {
return Ok(());
}
partition_client
.produce(
@@ -98,22 +125,28 @@ impl KafkaTopicCreator {
)
.await
.context(ProduceRecordSnafu { topic })?;
debug!("Appended a noop record to topic {}", topic);
Ok(())
}
/// Creates topics in Kafka.
pub async fn create_topics(&self, topics: &[String]) -> Result<()> {
let tasks = topics
.iter()
.map(|topic| async { self.create_topic(topic, &self.client).await })
.collect::<Vec<_>>();
futures::future::try_join_all(tasks).await.map(|_| ())
}
/// Prepares topics in Kafka.
/// 1. Creates missing topics.
/// 2. Appends a noop record to each topic.
pub async fn prepare_topics(&self, topics: &[&String]) -> Result<()> {
///
/// It appends a noop record to each topic if the topic is empty.
pub async fn prepare_topics(&self, topics: &[String]) -> Result<()> {
// Try to create missing topics.
let tasks = topics
.iter()
.map(|topic| async {
self.create_topic(topic, &self.client).await?;
self.append_noop_record(topic, &self.client).await?;
Ok(())
})
.map(|topic| async { self.prepare_topic(topic).await })
.collect::<Vec<_>>();
futures::future::try_join_all(tasks).await.map(|_| ())
}
@@ -129,34 +162,244 @@ impl KafkaTopicCreator {
}
}
#[cfg(test)]
impl KafkaTopicCreator {
pub async fn delete_topics(&self, topics: &[String]) -> Result<()> {
let tasks = topics
.iter()
.map(|topic| async { self.delete_topic(topic, &self.client).await })
.collect::<Vec<_>>();
futures::future::try_join_all(tasks).await.map(|_| ())
}
async fn delete_topic(&self, topic: &String, client: &Client) -> Result<()> {
let controller = client
.controller_client()
.context(BuildKafkaCtrlClientSnafu)?;
match controller.delete_topic(topic, 10).await {
Ok(_) => {
info!("Successfully deleted topic {}", topic);
Ok(())
}
Err(e) => {
if Self::is_unknown_topic_err(&e) {
info!("The topic {} does not exist", topic);
Ok(())
} else {
panic!("Failed to delete a topic {}, error: {}", topic, e);
}
}
}
}
fn is_unknown_topic_err(e: &RsKafkaError) -> bool {
matches!(
e,
&RsKafkaError::ServerError {
protocol_error: rskafka::client::error::ProtocolError::UnknownTopicOrPartition,
..
}
)
}
pub async fn get_partition_client(&self, topic: &str) -> PartitionClient {
self.partition_client(topic).await.unwrap()
}
}
/// Builds a kafka [Client](rskafka::client::Client).
pub async fn build_kafka_client(config: &MetasrvKafkaConfig) -> Result<Client> {
pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result<Client> {
// Builds an kafka controller client for creating topics.
let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints)
let broker_endpoints = common_wal::resolve_to_ipv4(&connection.broker_endpoints)
.await
.context(ResolveKafkaEndpointSnafu)?;
let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(DEFAULT_BACKOFF_CONFIG);
if let Some(sasl) = &config.connection.sasl {
if let Some(sasl) = &connection.sasl {
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
};
if let Some(tls) = &config.connection.tls {
if let Some(tls) = &connection.tls {
builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?)
};
builder
.build()
.await
.with_context(|_| BuildKafkaClientSnafu {
broker_endpoints: config.connection.broker_endpoints.clone(),
broker_endpoints: connection.broker_endpoints.clone(),
})
}
/// Builds a [KafkaTopicCreator].
pub async fn build_kafka_topic_creator(config: &MetasrvKafkaConfig) -> Result<KafkaTopicCreator> {
let client = build_kafka_client(config).await?;
pub async fn build_kafka_topic_creator(
connection: &KafkaConnectionConfig,
kafka_topic: &KafkaTopicConfig,
) -> Result<KafkaTopicCreator> {
let client = build_kafka_client(connection).await?;
Ok(KafkaTopicCreator {
client,
num_partitions: config.kafka_topic.num_partitions,
replication_factor: config.kafka_topic.replication_factor,
create_topic_timeout: config.kafka_topic.create_topic_timeout.as_millis() as i32,
num_partitions: kafka_topic.num_partitions,
replication_factor: kafka_topic.replication_factor,
create_topic_timeout: kafka_topic.create_topic_timeout.as_millis() as i32,
})
}
#[cfg(test)]
mod tests {
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::maybe_skip_kafka_integration_test;
use common_wal::test_util::get_kafka_endpoints;
use super::*;
async fn test_topic_creator(broker_endpoints: Vec<String>) -> KafkaTopicCreator {
let connection = KafkaConnectionConfig {
broker_endpoints,
..Default::default()
};
let kafka_topic = KafkaTopicConfig::default();
build_kafka_topic_creator(&connection, &kafka_topic)
.await
.unwrap()
}
async fn append_records(partition_client: &PartitionClient, num_records: usize) -> Result<()> {
for i in 0..num_records {
partition_client
.produce(
vec![Record {
key: Some(b"test".to_vec()),
value: Some(format!("test {}", i).as_bytes().to_vec()),
timestamp: chrono::Utc::now(),
headers: Default::default(),
}],
Compression::Lz4,
)
.await
.unwrap();
}
Ok(())
}
#[tokio::test]
async fn test_append_noop_record_to_empty_topic() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let prefix = "append_noop_record_to_empty_topic";
let creator = test_topic_creator(get_kafka_endpoints()).await;
let topic = format!("{}{}", prefix, "0");
// Clean up the topics before test
creator.delete_topics(&[topic.to_string()]).await.unwrap();
creator.create_topics(&[topic.to_string()]).await.unwrap();
let partition_client = creator.partition_client(&topic).await.unwrap();
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
assert_eq!(end_offset, 0);
// The topic is not empty, so no noop record is appended.
creator
.append_noop_record(&topic, &partition_client)
.await
.unwrap();
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
assert_eq!(end_offset, 1);
}
#[tokio::test]
async fn test_append_noop_record_to_non_empty_topic() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let prefix = "append_noop_record_to_non_empty_topic";
let creator = test_topic_creator(get_kafka_endpoints()).await;
let topic = format!("{}{}", prefix, "0");
// Clean up the topics before test
creator.delete_topics(&[topic.to_string()]).await.unwrap();
creator.create_topics(&[topic.to_string()]).await.unwrap();
let partition_client = creator.partition_client(&topic).await.unwrap();
append_records(&partition_client, 2).await.unwrap();
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
assert_eq!(end_offset, 2);
// The topic is not empty, so no noop record is appended.
creator
.append_noop_record(&topic, &partition_client)
.await
.unwrap();
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
assert_eq!(end_offset, 2);
}
#[tokio::test]
async fn test_create_topic() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let prefix = "create_topic";
let creator = test_topic_creator(get_kafka_endpoints()).await;
let topic = format!("{}{}", prefix, "0");
// Clean up the topics before test
creator.delete_topics(&[topic.to_string()]).await.unwrap();
creator.create_topics(&[topic.to_string()]).await.unwrap();
// Should be ok
creator.create_topics(&[topic.to_string()]).await.unwrap();
let partition_client = creator.partition_client(&topic).await.unwrap();
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
assert_eq!(end_offset, 0);
}
#[tokio::test]
async fn test_prepare_topic() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let prefix = "prepare_topic";
let creator = test_topic_creator(get_kafka_endpoints()).await;
let topic = format!("{}{}", prefix, "0");
// Clean up the topics before test
creator.delete_topics(&[topic.to_string()]).await.unwrap();
creator.create_topics(&[topic.to_string()]).await.unwrap();
creator.prepare_topic(&topic).await.unwrap();
let partition_client = creator.partition_client(&topic).await.unwrap();
let start_offset = partition_client
.get_offset(OffsetAt::Earliest)
.await
.unwrap();
assert_eq!(start_offset, 0);
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
assert_eq!(end_offset, 1);
}
#[tokio::test]
async fn test_prepare_topic_with_stale_records_without_pruning() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let prefix = "prepare_topic_with_stale_records_without_pruning";
let creator = test_topic_creator(get_kafka_endpoints()).await;
let topic = format!("{}{}", prefix, "0");
// Clean up the topics before test
creator.delete_topics(&[topic.to_string()]).await.unwrap();
creator.create_topics(&[topic.to_string()]).await.unwrap();
let partition_client = creator.partition_client(&topic).await.unwrap();
append_records(&partition_client, 10).await.unwrap();
creator.prepare_topic(&topic).await.unwrap();
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
assert_eq!(end_offset, 10);
let start_offset = partition_client
.get_offset(OffsetAt::Earliest)
.await
.unwrap();
assert_eq!(start_offset, 0);
}
}

View File

@@ -40,24 +40,21 @@ impl KafkaTopicManager {
Ok(topics)
}
/// Restores topics from the key-value backend. and returns the topics that are not stored in kvbackend.
pub async fn get_topics_to_create<'a>(
&self,
all_topics: &'a [String],
) -> Result<Vec<&'a String>> {
/// Returns the topics that are not prepared.
pub async fn unprepare_topics(&self, all_topics: &[String]) -> Result<Vec<String>> {
let existing_topics = self.restore_topics().await?;
let existing_topic_set = existing_topics.iter().collect::<HashSet<_>>();
let mut topics_to_create = Vec::with_capacity(all_topics.len());
for topic in all_topics {
if !existing_topic_set.contains(topic) {
topics_to_create.push(topic);
topics_to_create.push(topic.to_string());
}
}
Ok(topics_to_create)
}
/// Persists topics into the key-value backend.
pub async fn persist_topics(&self, topics: &[String]) -> Result<()> {
/// Persists prepared topics into the key-value backend.
pub async fn persist_prepared_topics(&self, topics: &[String]) -> Result<()> {
self.topic_name_manager
.batch_put(
topics
@@ -70,6 +67,14 @@ impl KafkaTopicManager {
}
}
#[cfg(test)]
impl KafkaTopicManager {
/// Lists all topics in the key-value backend.
pub async fn list_topics(&self) -> Result<Vec<String>> {
self.topic_name_manager.range().await
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
@@ -90,11 +95,11 @@ mod tests {
// No legacy topics.
let mut topics_to_be_created = topic_kvbackend_manager
.get_topics_to_create(&all_topics)
.unprepare_topics(&all_topics)
.await
.unwrap();
topics_to_be_created.sort();
let mut expected = all_topics.iter().collect::<Vec<_>>();
let mut expected = all_topics.clone();
expected.sort();
assert_eq!(expected, topics_to_be_created);
@@ -109,7 +114,7 @@ mod tests {
assert!(res.prev_kv.is_none());
let topics_to_be_created = topic_kvbackend_manager
.get_topics_to_create(&all_topics)
.unprepare_topics(&all_topics)
.await
.unwrap();
assert!(topics_to_be_created.is_empty());
@@ -144,21 +149,21 @@ mod tests {
let topic_kvbackend_manager = KafkaTopicManager::new(kv_backend);
let mut topics_to_be_created = topic_kvbackend_manager
.get_topics_to_create(&all_topics)
.unprepare_topics(&all_topics)
.await
.unwrap();
topics_to_be_created.sort();
let mut expected = all_topics.iter().collect::<Vec<_>>();
let mut expected = all_topics.clone();
expected.sort();
assert_eq!(expected, topics_to_be_created);
// Persists topics to kv backend.
topic_kvbackend_manager
.persist_topics(&all_topics)
.persist_prepared_topics(&all_topics)
.await
.unwrap();
let topics_to_be_created = topic_kvbackend_manager
.get_topics_to_create(&all_topics)
.unprepare_topics(&all_topics)
.await
.unwrap();
assert!(topics_to_be_created.is_empty());

View File

@@ -15,6 +15,7 @@
use std::fmt::{self, Formatter};
use std::sync::Arc;
use common_telemetry::info;
use common_wal::config::kafka::MetasrvKafkaConfig;
use common_wal::TopicSelectorType;
use snafu::ensure;
@@ -77,27 +78,35 @@ impl KafkaTopicPool {
}
/// Tries to activate the topic manager when metasrv becomes the leader.
///
/// First tries to restore persisted topics from the kv backend.
/// If not enough topics retrieved, it will try to contact the Kafka cluster and request creating more topics.
/// If there are unprepared topics (topics that exist in the configuration but not in the kv backend),
/// it will create these topics in Kafka if `auto_create_topics` is enabled.
///
/// Then it prepares all unprepared topics by appending a noop record if the topic is empty,
/// and persists them in the kv backend for future use.
pub async fn activate(&self) -> Result<()> {
if !self.auto_create_topics {
return Ok(());
}
let num_topics = self.topics.len();
ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics });
let topics_to_be_created = self
.topic_manager
.get_topics_to_create(&self.topics)
.await?;
let unprepared_topics = self.topic_manager.unprepare_topics(&self.topics).await?;
if !topics_to_be_created.is_empty() {
if !unprepared_topics.is_empty() {
if self.auto_create_topics {
info!("Creating {} topics", unprepared_topics.len());
self.topic_creator.create_topics(&unprepared_topics).await?;
} else {
info!("Auto create topics is disabled, skipping topic creation.");
}
self.topic_creator
.prepare_topics(&topics_to_be_created)
.prepare_topics(&unprepared_topics)
.await?;
self.topic_manager
.persist_prepared_topics(&unprepared_topics)
.await?;
self.topic_manager.persist_topics(&self.topics).await?;
}
info!("Activated topic pool with {} topics", self.topics.len());
Ok(())
}
@@ -114,77 +123,147 @@ impl KafkaTopicPool {
}
}
#[cfg(test)]
impl KafkaTopicPool {
pub(crate) fn topic_manager(&self) -> &KafkaTopicManager {
&self.topic_manager
}
pub(crate) fn topic_creator(&self) -> &KafkaTopicCreator {
&self.topic_creator
}
}
#[cfg(test)]
mod tests {
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::test_util::run_test_with_kafka_wal;
use std::assert_matches::assert_matches;
use common_wal::maybe_skip_kafka_integration_test;
use common_wal::test_util::get_kafka_endpoints;
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::wal_options_allocator::topic_creator::build_kafka_topic_creator;
use crate::error::Error;
use crate::test_util::test_kafka_topic_pool;
use crate::wal_options_allocator::selector::RoundRobinTopicSelector;
#[tokio::test]
async fn test_pool_invalid_number_topics_err() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let endpoints = get_kafka_endpoints();
let pool = test_kafka_topic_pool(endpoints.clone(), 0, false, None).await;
let err = pool.activate().await.unwrap_err();
assert_matches!(err, Error::InvalidNumTopics { .. });
let pool = test_kafka_topic_pool(endpoints, 0, true, None).await;
let err = pool.activate().await.unwrap_err();
assert_matches!(err, Error::InvalidNumTopics { .. });
}
#[tokio::test]
async fn test_pool_activate_unknown_topics_err() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let pool =
test_kafka_topic_pool(get_kafka_endpoints(), 1, false, Some("unknown_topic")).await;
let err = pool.activate().await.unwrap_err();
assert_matches!(err, Error::KafkaPartitionClient { .. });
}
#[tokio::test]
async fn test_pool_activate() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let pool =
test_kafka_topic_pool(get_kafka_endpoints(), 2, true, Some("pool_activate")).await;
// clean up the topics before test
let topic_creator = pool.topic_creator();
topic_creator.delete_topics(&pool.topics).await.unwrap();
let topic_manager = pool.topic_manager();
pool.activate().await.unwrap();
let topics = topic_manager.list_topics().await.unwrap();
assert_eq!(topics.len(), 2);
}
#[tokio::test]
async fn test_pool_activate_with_existing_topics() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let prefix = "pool_activate_with_existing_topics";
let pool = test_kafka_topic_pool(get_kafka_endpoints(), 2, true, Some(prefix)).await;
let topic_creator = pool.topic_creator();
topic_creator.delete_topics(&pool.topics).await.unwrap();
let topic_manager = pool.topic_manager();
// persists one topic info, then pool.activate() will create new topics that not persisted.
topic_manager
.persist_prepared_topics(&pool.topics[0..1])
.await
.unwrap();
pool.activate().await.unwrap();
let topics = topic_manager.list_topics().await.unwrap();
assert_eq!(topics.len(), 2);
let client = pool.topic_creator().client();
let topics = client
.list_topics()
.await
.unwrap()
.into_iter()
.filter(|t| t.name.starts_with(prefix))
.collect::<Vec<_>>();
assert_eq!(topics.len(), 1);
}
/// Tests that the topic manager could allocate topics correctly.
#[tokio::test]
async fn test_alloc_topics() {
run_test_with_kafka_wal(|broker_endpoints| {
Box::pin(async {
// Constructs topics that should be created.
let topics = (0..256)
.map(|i| format!("test_alloc_topics_{}_{}", i, uuid::Uuid::new_v4()))
.collect::<Vec<_>>();
// Creates a topic manager.
let kafka_topic = KafkaTopicConfig {
replication_factor: broker_endpoints.len() as i16,
..Default::default()
};
let config = MetasrvKafkaConfig {
connection: KafkaConnectionConfig {
broker_endpoints,
..Default::default()
},
kafka_topic,
..Default::default()
};
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let topic_creator = build_kafka_topic_creator(&config).await.unwrap();
let mut topic_pool = KafkaTopicPool::new(&config, kv_backend, topic_creator);
// Replaces the default topic pool with the constructed topics.
topic_pool.topics.clone_from(&topics);
// Replaces the default selector with a round-robin selector without shuffled.
topic_pool.selector = Arc::new(RoundRobinTopicSelector::default());
topic_pool.activate().await.unwrap();
// Selects exactly the number of `num_topics` topics one by one.
let got = (0..topics.len())
.map(|_| topic_pool.select().unwrap())
.cloned()
.collect::<Vec<_>>();
assert_eq!(got, topics);
// Selects exactly the number of `num_topics` topics in a batching manner.
let got = topic_pool
.select_batch(topics.len())
.unwrap()
.into_iter()
.map(ToString::to_string)
.collect::<Vec<_>>();
assert_eq!(got, topics);
// Selects more than the number of `num_topics` topics.
let got = topic_pool
.select_batch(2 * topics.len())
.unwrap()
.into_iter()
.map(ToString::to_string)
.collect::<Vec<_>>();
let expected = vec![topics.clone(); 2]
.into_iter()
.flatten()
.collect::<Vec<_>>();
assert_eq!(got, expected);
})
})
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let num_topics = 5;
let mut topic_pool = test_kafka_topic_pool(
get_kafka_endpoints(),
num_topics,
true,
Some("test_allocator_with_kafka"),
)
.await;
topic_pool.selector = Arc::new(RoundRobinTopicSelector::default());
let topics = topic_pool.topics.clone();
// clean up the topics before test
let topic_creator = topic_pool.topic_creator();
topic_creator.delete_topics(&topics).await.unwrap();
// Selects exactly the number of `num_topics` topics one by one.
let got = (0..topics.len())
.map(|_| topic_pool.select().unwrap())
.cloned()
.collect::<Vec<_>>();
assert_eq!(got, topics);
// Selects exactly the number of `num_topics` topics in a batching manner.
let got = topic_pool
.select_batch(topics.len())
.unwrap()
.into_iter()
.map(ToString::to_string)
.collect::<Vec<_>>();
assert_eq!(got, topics);
// Selects more than the number of `num_topics` topics.
let got = topic_pool
.select_batch(2 * topics.len())
.unwrap()
.into_iter()
.map(ToString::to_string)
.collect::<Vec<_>>();
let expected = vec![topics.clone(); 2]
.into_iter()
.flatten()
.collect::<Vec<_>>();
assert_eq!(got, expected);
}
}

View File

@@ -18,13 +18,11 @@ use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use common_procedure::local::{acquire_dynamic_key_lock, DynamicKeyLockGuard};
use common_procedure::rwlock::KeyRwLock;
use common_procedure::store::poison_store::PoisonStore;
use common_procedure::test_util::InMemoryPoisonStore;
use common_procedure::{
Context, ContextProvider, Output, PoisonKey, Procedure, ProcedureId, ProcedureState,
ProcedureWithId, Result, Status, StringKey,
ProcedureWithId, Result, Status,
};
/// A Mock [ContextProvider].
@@ -32,7 +30,6 @@ use common_procedure::{
pub struct MockContextProvider {
states: HashMap<ProcedureId, ProcedureState>,
poison_manager: InMemoryPoisonStore,
dynamic_key_lock: Arc<KeyRwLock<String>>,
}
impl MockContextProvider {
@@ -41,7 +38,6 @@ impl MockContextProvider {
MockContextProvider {
states,
poison_manager: InMemoryPoisonStore::default(),
dynamic_key_lock: Arc::new(KeyRwLock::new()),
}
}
@@ -62,10 +58,6 @@ impl ContextProvider for MockContextProvider {
.try_put_poison(key.to_string(), procedure_id.to_string())
.await
}
async fn acquire_lock(&self, key: &StringKey) -> DynamicKeyLockGuard {
acquire_dynamic_key_lock(&self.dynamic_key_lock, key).await
}
}
/// Executes a procedure until it returns [Status::Done].

View File

@@ -20,7 +20,6 @@ pub mod error;
pub mod local;
pub mod options;
mod procedure;
pub mod rwlock;
pub mod store;
pub mod watcher;
@@ -29,8 +28,8 @@ pub mod test_util;
pub use crate::error::{Error, Result};
pub use crate::procedure::{
BoxedProcedure, BoxedProcedureLoader, Context, ContextProvider, ContextProviderRef, LockKey,
Output, ParseIdError, PoisonKey, PoisonKeys, Procedure, ProcedureId, ProcedureInfo,
ProcedureManager, ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, StringKey,
BoxedProcedure, BoxedProcedureLoader, Context, ContextProvider, LockKey, Output, ParseIdError,
PoisonKey, PoisonKeys, Procedure, ProcedureId, ProcedureInfo, ProcedureManager,
ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, StringKey,
};
pub use crate::watcher::Watcher;

View File

@@ -13,6 +13,7 @@
// limitations under the License.
mod runner;
mod rwlock;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet, VecDeque};
@@ -29,6 +30,7 @@ use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::watch::{self, Receiver, Sender};
use tokio::sync::{Mutex as TokioMutex, Notify};
use self::rwlock::KeyRwLock;
use crate::error::{
self, DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu,
PoisonKeyNotDefinedSnafu, ProcedureNotFoundSnafu, Result, StartRemoveOutdatedMetaTaskSnafu,
@@ -36,12 +38,11 @@ use crate::error::{
};
use crate::local::runner::Runner;
use crate::procedure::{BoxedProcedureLoader, InitProcedureState, PoisonKeys, ProcedureInfo};
use crate::rwlock::{KeyRwLock, OwnedKeyRwLockGuard};
use crate::store::poison_store::PoisonStoreRef;
use crate::store::{ProcedureMessage, ProcedureMessages, ProcedureStore, StateStoreRef};
use crate::{
BoxedProcedure, ContextProvider, LockKey, PoisonKey, ProcedureId, ProcedureManager,
ProcedureState, ProcedureWithId, StringKey, Watcher,
ProcedureState, ProcedureWithId, Watcher,
};
/// The expired time of a procedure's metadata.
@@ -156,80 +157,12 @@ struct LoadedProcedure {
step: u32,
}
/// The dynamic lock for procedure execution.
///
/// Unlike the procedure-level locks, these locks are acquired dynamically by the procedure
/// during execution. They are only held when the procedure specifically needs these keys
/// and are released as soon as the procedure no longer needs them.
/// This allows for more fine-grained concurrency control during procedure execution.
pub(crate) type DynamicKeyLock = Arc<KeyRwLock<String>>;
/// Acquires a dynamic key lock for the given key.
///
/// This function takes a reference to the dynamic key lock and a pointer to the key.
/// It then matches the key type and acquires the appropriate lock.
pub async fn acquire_dynamic_key_lock(
lock: &DynamicKeyLock,
key: &StringKey,
) -> DynamicKeyLockGuard {
match key {
StringKey::Share(key) => {
let guard = lock.read(key.to_string()).await;
DynamicKeyLockGuard {
guard: Some(OwnedKeyRwLockGuard::from(guard)),
key: key.to_string(),
lock: lock.clone(),
}
}
StringKey::Exclusive(key) => {
let guard = lock.write(key.to_string()).await;
DynamicKeyLockGuard {
guard: Some(OwnedKeyRwLockGuard::from(guard)),
key: key.to_string(),
lock: lock.clone(),
}
}
}
}
/// A guard for the dynamic key lock.
///
/// This guard is used to release the lock when the procedure no longer needs it.
/// It also ensures that the lock is cleaned up when the guard is dropped.
pub struct DynamicKeyLockGuard {
guard: Option<OwnedKeyRwLockGuard>,
key: String,
lock: DynamicKeyLock,
}
impl Drop for DynamicKeyLockGuard {
fn drop(&mut self) {
if let Some(guard) = self.guard.take() {
drop(guard);
}
self.lock.clean_keys(&[self.key.to_string()]);
}
}
/// Shared context of the manager.
pub(crate) struct ManagerContext {
/// Procedure loaders. The key is the type name of the procedure which the loader returns.
loaders: Mutex<HashMap<String, BoxedProcedureLoader>>,
/// The key lock for the procedure.
///
/// The lock keys are defined in `Procedure::lock_key()`.
/// These locks are acquired before the procedure starts and released after the procedure finishes.
/// They ensure exclusive access to resources throughout the entire procedure lifecycle.
key_lock: KeyRwLock<String>,
/// The dynamic lock for procedure execution.
///
/// Unlike the procedure-level locks, these locks are acquired dynamically by the procedure
/// during execution. They are only held when the procedure specifically needs these keys
/// and are released as soon as the procedure no longer needs them.
/// This allows for more fine-grained concurrency control during procedure execution.
dynamic_key_lock: DynamicKeyLock,
/// Procedures in the manager.
procedures: RwLock<HashMap<ProcedureId, ProcedureMetaRef>>,
/// Running procedures.
running_procedures: Mutex<HashSet<ProcedureId>>,
/// Ids and finished time of finished procedures.
finished_procedures: Mutex<VecDeque<(ProcedureId, Instant)>>,
@@ -266,10 +199,6 @@ impl ContextProvider for ManagerContext {
let procedure_id = procedure_id.to_string();
self.poison_manager.try_put_poison(key, procedure_id).await
}
async fn acquire_lock(&self, key: &StringKey) -> DynamicKeyLockGuard {
acquire_dynamic_key_lock(&self.dynamic_key_lock, key).await
}
}
impl ManagerContext {
@@ -277,7 +206,6 @@ impl ManagerContext {
fn new(poison_manager: PoisonStoreRef) -> ManagerContext {
ManagerContext {
key_lock: KeyRwLock::new(),
dynamic_key_lock: Arc::new(KeyRwLock::new()),
loaders: Mutex::new(HashMap::new()),
procedures: RwLock::new(HashMap::new()),
running_procedures: Mutex::new(HashSet::new()),

View File

@@ -23,9 +23,9 @@ use snafu::ResultExt;
use tokio::time;
use crate::error::{self, ProcedurePanicSnafu, Result, RollbackTimesExceededSnafu};
use crate::local::rwlock::OwnedKeyRwLockGuard;
use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
use crate::procedure::{Output, StringKey};
use crate::rwlock::OwnedKeyRwLockGuard;
use crate::store::{ProcedureMessage, ProcedureStore};
use crate::{
BoxedProcedure, Context, Error, Procedure, ProcedureId, ProcedureState, ProcedureWithId, Status,
@@ -581,7 +581,6 @@ impl Runner {
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
@@ -589,14 +588,13 @@ mod tests {
use common_error::mock::MockError;
use common_error::status_code::StatusCode;
use common_test_util::temp_dir::create_temp_dir;
use futures::future::join_all;
use futures_util::future::BoxFuture;
use futures_util::FutureExt;
use object_store::{EntryMode, ObjectStore};
use tokio::sync::mpsc;
use super::*;
use crate::local::{test_util, DynamicKeyLockGuard};
use crate::local::test_util;
use crate::procedure::PoisonKeys;
use crate::store::proc_path;
use crate::test_util::InMemoryPoisonStore;
@@ -668,10 +666,6 @@ mod tests {
) -> Result<()> {
unimplemented!()
}
async fn acquire_lock(&self, _key: &StringKey) -> DynamicKeyLockGuard {
unimplemented!()
}
}
Context {
@@ -1680,66 +1674,4 @@ mod tests {
// If the procedure is poisoned, the poison key shouldn't be deleted.
assert_eq!(procedure_id, ROOT_ID);
}
fn test_procedure_with_dynamic_lock(
shared_atomic_value: Arc<AtomicU64>,
id: u64,
) -> (BoxedProcedure, Arc<ProcedureMeta>) {
let exec_fn = move |ctx: Context| {
let moved_shared_atomic_value = shared_atomic_value.clone();
let moved_ctx = ctx.clone();
async move {
debug!("Acquiring write lock, id: {}", id);
let key = StringKey::Exclusive("test_lock".to_string());
let guard = moved_ctx.provider.acquire_lock(&key).await;
debug!("Acquired write lock, id: {}", id);
let millis = rand::rng().random_range(10..=50);
tokio::time::sleep(Duration::from_millis(millis)).await;
let value = moved_shared_atomic_value.load(Ordering::Relaxed);
moved_shared_atomic_value.store(value + 1, Ordering::Relaxed);
debug!("Dropping write lock, id: {}", id);
drop(guard);
Ok(Status::done())
}
.boxed()
};
let adapter = ProcedureAdapter {
data: "dynamic_lock".to_string(),
lock_key: LockKey::new_exclusive([]),
poison_keys: PoisonKeys::new([]),
exec_fn,
rollback_fn: None,
};
let meta = adapter.new_meta(ROOT_ID);
(Box::new(adapter), meta)
}
#[tokio::test(flavor = "multi_thread")]
async fn test_execute_with_dynamic_lock() {
common_telemetry::init_default_ut_logging();
let shared_atomic_value = Arc::new(AtomicU64::new(0));
let (procedure1, meta1) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 1);
let (procedure2, meta2) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 2);
let dir = create_temp_dir("dynamic_lock");
let object_store = test_util::new_object_store(&dir);
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
let mut runner1 = new_runner(meta1.clone(), procedure1, procedure_store.clone());
let mut runner2 = new_runner(meta2.clone(), procedure2, procedure_store.clone());
let ctx1 = context_with_provider(
meta1.id,
runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
);
let ctx2 = context_with_provider(
meta2.id,
// use same manager ctx as runner1
runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
);
let tasks = [runner1.execute_once(&ctx1), runner2.execute_once(&ctx2)];
join_all(tasks).await;
assert_eq!(shared_atomic_value.load(Ordering::Relaxed), 2);
}
}

View File

@@ -18,18 +18,8 @@ use std::sync::{Arc, Mutex};
use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
/// A guard that owns a read or write lock on a key.
///
/// This enum wraps either a read or write lock guard obtained from a `KeyRwLock`.
/// The guard is automatically released when it is dropped.
pub enum OwnedKeyRwLockGuard {
/// Represents a shared read lock on a key.
/// Multiple read locks can be held simultaneously for the same key.
Read { _guard: OwnedRwLockReadGuard<()> },
/// Represents an exclusive write lock on a key.
/// Only one write lock can be held at a time for a given key,
/// and no read locks can be held simultaneously with a write lock.
Write { _guard: OwnedRwLockWriteGuard<()> },
}
@@ -46,7 +36,7 @@ impl From<OwnedRwLockWriteGuard<()>> for OwnedKeyRwLockGuard {
}
/// Locks based on a key, allowing other keys to lock independently.
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct KeyRwLock<K> {
/// The inner map of locks for specific keys.
inner: Mutex<HashMap<K, Arc<RwLock<()>>>>,

View File

@@ -25,7 +25,6 @@ use snafu::{ResultExt, Snafu};
use uuid::Uuid;
use crate::error::{self, Error, Result};
use crate::local::DynamicKeyLockGuard;
use crate::watcher::Watcher;
pub type Output = Arc<dyn Any + Send + Sync>;
@@ -145,9 +144,6 @@ pub trait ContextProvider: Send + Sync {
/// This method is used to mark a resource as being operated on by a procedure.
/// If the poison key already exists with a different value, the operation will fail.
async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()>;
/// Acquires a key lock for the procedure.
async fn acquire_lock(&self, key: &StringKey) -> DynamicKeyLockGuard;
}
/// Reference-counted pointer to [ContextProvider].

View File

@@ -24,7 +24,7 @@ use datatypes::prelude::*;
use datatypes::vectors::{Helper as VectorHelper, VectorRef};
use snafu::ResultExt;
use crate::error::{self, FromScalarValueSnafu, IntoVectorSnafu, Result};
use crate::error::{self, Error, FromScalarValueSnafu, IntoVectorSnafu, Result};
use crate::prelude::*;
pub type AggregateFunctionCreatorRef = Arc<dyn AggregateFunctionCreator>;
@@ -166,7 +166,8 @@ impl DfAccumulator for DfAccumulatorAdaptor {
let output_type = self.creator.output_type()?;
let scalar_value = value
.try_to_scalar_value(&output_type)
.context(error::ToScalarValueSnafu)?;
.context(error::ToScalarValueSnafu)
.map_err(Error::from)?;
Ok(scalar_value)
}

View File

@@ -17,7 +17,6 @@ use datafusion_expr::LogicalPlan;
use store_api::storage::RegionId;
/// The query request to be handled by the RegionServer (Datanode).
#[derive(Clone, Debug)]
pub struct QueryRequest {
/// The header of this request. Often to store some context of the query. None means all to defaults.
pub header: Option<RegionRequestHeader>,

View File

@@ -23,11 +23,16 @@ use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
/// The default backoff config for kafka client.
///
/// If the operation fails, the client will retry 3 times.
/// The backoff time is 100ms, 300ms, 900ms.
pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig {
init_backoff: Duration::from_millis(100),
max_backoff: Duration::from_secs(10),
base: 2.0,
deadline: Some(Duration::from_secs(120)),
max_backoff: Duration::from_secs(1),
base: 3.0,
// The deadline shouldn't be too long,
// otherwise the client will block the worker loop for a long time.
deadline: Some(Duration::from_secs(3)),
};
/// Default interval for auto WAL pruning.

View File

@@ -31,3 +31,33 @@ where
test(endpoints).await
}
/// Get the kafka endpoints from the environment variable `GT_KAFKA_ENDPOINTS`.
///
/// The format of the environment variable is:
/// ```
/// GT_KAFKA_ENDPOINTS=localhost:9092,localhost:9093
/// ```
pub fn get_kafka_endpoints() -> Vec<String> {
let endpoints = std::env::var("GT_KAFKA_ENDPOINTS").unwrap();
endpoints
.split(',')
.map(|s| s.trim().to_string())
.collect::<Vec<_>>()
}
#[macro_export]
/// Skip the test if the environment variable `GT_KAFKA_ENDPOINTS` is not set.
///
/// The format of the environment variable is:
/// ```
/// GT_KAFKA_ENDPOINTS=localhost:9092,localhost:9093
/// ```
macro_rules! maybe_skip_kafka_integration_test {
() => {
if std::env::var("GT_KAFKA_ENDPOINTS").is_err() {
common_telemetry::warn!("The endpoints is empty, skipping the test");
return;
}
};
}

View File

@@ -43,10 +43,10 @@ use mito2::config::MitoConfig;
use mito2::engine::MitoEngine;
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::util::normalize_dir;
use query::dummy_catalog::TableProviderFactoryRef;
use query::QueryEngineFactory;
use servers::export_metrics::ExportMetricsTask;
use servers::server::ServerHandlers;
use servers::Mode;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::path_utils::{region_dir, WAL_DIR};
use store_api::region_engine::{RegionEngineRef, RegionRole};
@@ -58,8 +58,8 @@ use tokio::sync::Notify;
use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
use crate::error::{
self, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu,
MissingCacheSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, ShutdownInstanceSnafu,
ShutdownServerSnafu, StartServerSnafu,
MissingCacheSnafu, MissingKvBackendSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result,
ShutdownInstanceSnafu, ShutdownServerSnafu, StartServerSnafu,
};
use crate::event_listener::{
new_region_server_event_channel, NoopRegionServerEventListener, RegionServerEventListenerRef,
@@ -129,7 +129,7 @@ impl Datanode {
self.services = services;
}
pub async fn shutdown(&mut self) -> Result<()> {
pub async fn shutdown(&self) -> Result<()> {
self.services
.shutdown_all()
.await
@@ -157,49 +157,50 @@ impl Datanode {
pub struct DatanodeBuilder {
opts: DatanodeOptions,
table_provider_factory: Option<TableProviderFactoryRef>,
mode: Mode,
plugins: Plugins,
meta_client: Option<MetaClientRef>,
kv_backend: KvBackendRef,
kv_backend: Option<KvBackendRef>,
cache_registry: Option<Arc<LayeredCacheRegistry>>,
}
impl DatanodeBuilder {
pub fn new(opts: DatanodeOptions, plugins: Plugins, kv_backend: KvBackendRef) -> Self {
/// `kv_backend` is optional. If absent, the builder will try to build one
/// by using the given `opts`
pub fn new(opts: DatanodeOptions, plugins: Plugins, mode: Mode) -> Self {
Self {
opts,
table_provider_factory: None,
mode,
plugins,
meta_client: None,
kv_backend,
kv_backend: None,
cache_registry: None,
}
}
pub fn options(&self) -> &DatanodeOptions {
&self.opts
pub fn with_meta_client(self, meta_client: MetaClientRef) -> Self {
Self {
meta_client: Some(meta_client),
..self
}
}
pub fn with_meta_client(&mut self, client: MetaClientRef) -> &mut Self {
self.meta_client = Some(client);
self
pub fn with_cache_registry(self, cache_registry: Arc<LayeredCacheRegistry>) -> Self {
Self {
cache_registry: Some(cache_registry),
..self
}
}
pub fn with_cache_registry(&mut self, registry: Arc<LayeredCacheRegistry>) -> &mut Self {
self.cache_registry = Some(registry);
self
}
pub fn kv_backend(&self) -> &KvBackendRef {
&self.kv_backend
}
pub fn with_table_provider_factory(&mut self, factory: TableProviderFactoryRef) -> &mut Self {
self.table_provider_factory = Some(factory);
self
pub fn with_kv_backend(self, kv_backend: KvBackendRef) -> Self {
Self {
kv_backend: Some(kv_backend),
..self
}
}
pub async fn build(mut self) -> Result<Datanode> {
let mode = &self.mode;
let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
let meta_client = self.meta_client.take();
@@ -209,6 +210,8 @@ impl DatanodeBuilder {
// writable upon open.
let controlled_by_metasrv = meta_client.is_some();
let kv_backend = self.kv_backend.take().context(MissingKvBackendSnafu)?;
// build and initialize region server
let (region_event_listener, region_event_receiver) = if controlled_by_metasrv {
let (tx, rx) = new_region_server_event_channel();
@@ -230,7 +233,7 @@ impl DatanodeBuilder {
.new_region_server(schema_metadata_manager, region_event_listener)
.await?;
let datanode_table_manager = DatanodeTableManager::new(self.kv_backend.clone());
let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
let table_values = datanode_table_manager
.tables(node_id)
.try_collect::<Vec<_>>()
@@ -270,18 +273,19 @@ impl DatanodeBuilder {
None
};
let is_standalone = heartbeat_task.is_none();
let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
Some(self.opts.storage.data_home.clone()),
is_standalone && self.opts.enable_telemetry,
mode,
self.opts.enable_telemetry,
)
.await;
let leases_notifier = if self.opts.require_lease_before_startup && !is_standalone {
Some(Arc::new(Notify::new()))
} else {
None
};
let leases_notifier =
if self.opts.require_lease_before_startup && matches!(mode, Mode::Distributed) {
Some(Arc::new(Notify::new()))
} else {
None
};
let export_metrics_task =
ExportMetricsTask::try_new(&self.opts.export_metrics, Some(&self.plugins))
@@ -359,11 +363,7 @@ impl DatanodeBuilder {
);
let query_engine = query_engine_factory.query_engine();
let table_provider_factory = self
.table_provider_factory
.clone()
.unwrap_or_else(|| Arc::new(DummyTableProviderFactory));
let table_provider_factory = Arc::new(DummyTableProviderFactory);
let mut region_server = RegionServer::with_table_provider(
query_engine,
common_runtime::global_runtime(),
@@ -635,6 +635,7 @@ mod tests {
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use mito2::engine::MITO_ENGINE_NAME;
use servers::Mode;
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;
@@ -670,19 +671,19 @@ mod tests {
let kv_backend = Arc::new(MemoryKvBackend::new());
let layered_cache_registry = Arc::new(
LayeredCacheRegistryBuilder::default()
.add_cache_registry(build_datanode_cache_registry(kv_backend.clone()))
.add_cache_registry(build_datanode_cache_registry(kv_backend))
.build(),
);
let mut builder = DatanodeBuilder::new(
let builder = DatanodeBuilder::new(
DatanodeOptions {
node_id: Some(0),
..Default::default()
},
Plugins::default(),
kv_backend,
);
builder.with_cache_registry(layered_cache_registry);
Mode::Standalone,
)
.with_cache_registry(layered_cache_registry);
let kv = Arc::new(MemoryKvBackend::default()) as _;
setup_table_datanode(&kv).await;

View File

@@ -150,6 +150,12 @@ pub enum Error {
location: Location,
},
#[snafu(display("Expect KvBackend but not found"))]
MissingKvBackend {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid SQL, error: {}", msg))]
InvalidSql { msg: String },
@@ -420,6 +426,7 @@ impl ErrorExt for Error {
| MissingRequiredField { .. }
| RegionEngineNotFound { .. }
| ParseAddr { .. }
| MissingKvBackend { .. }
| TomlFormat { .. } => StatusCode::InvalidArguments,
PayloadNotExist { .. }

View File

@@ -20,6 +20,7 @@ use common_greptimedb_telemetry::{
default_get_uuid, Collector, GreptimeDBTelemetry, GreptimeDBTelemetryTask,
Mode as VersionReporterMode, TELEMETRY_INTERVAL,
};
use servers::Mode;
struct StandaloneGreptimeDBTelemetryCollector {
uuid: Option<String>,
@@ -54,6 +55,7 @@ impl Collector for StandaloneGreptimeDBTelemetryCollector {
pub async fn get_greptimedb_telemetry_task(
working_home: Option<String>,
mode: &Mode,
enable: bool,
) -> Arc<GreptimeDBTelemetryTask> {
if !enable || cfg!(test) || cfg!(debug_assertions) {
@@ -62,14 +64,19 @@ pub async fn get_greptimedb_telemetry_task(
// Always enable.
let should_report = Arc::new(AtomicBool::new(true));
let uuid = default_get_uuid(&working_home);
Arc::new(GreptimeDBTelemetryTask::enable(
TELEMETRY_INTERVAL,
Box::new(GreptimeDBTelemetry::new(
working_home,
Box::new(StandaloneGreptimeDBTelemetryCollector { uuid, retry: 0 }),
should_report.clone(),
match mode {
Mode::Standalone => Arc::new(GreptimeDBTelemetryTask::enable(
TELEMETRY_INTERVAL,
Box::new(GreptimeDBTelemetry::new(
working_home.clone(),
Box::new(StandaloneGreptimeDBTelemetryCollector {
uuid: default_get_uuid(&working_home),
retry: 0,
}),
should_report.clone(),
)),
should_report,
)),
should_report,
))
Mode::Distributed => Arc::new(GreptimeDBTelemetryTask::disable()),
}
}

View File

@@ -39,7 +39,6 @@ pub struct RegionHeartbeatResponseHandler {
region_server: RegionServer,
catchup_tasks: TaskTracker<()>,
downgrade_tasks: TaskTracker<()>,
flush_tasks: TaskTracker<()>,
}
/// Handler of the instruction.
@@ -51,7 +50,6 @@ pub struct HandlerContext {
region_server: RegionServer,
catchup_tasks: TaskTracker<()>,
downgrade_tasks: TaskTracker<()>,
flush_tasks: TaskTracker<()>,
}
impl HandlerContext {
@@ -65,7 +63,6 @@ impl HandlerContext {
region_server,
catchup_tasks: TaskTracker::new(),
downgrade_tasks: TaskTracker::new(),
flush_tasks: TaskTracker::new(),
}
}
}
@@ -77,7 +74,6 @@ impl RegionHeartbeatResponseHandler {
region_server,
catchup_tasks: TaskTracker::new(),
downgrade_tasks: TaskTracker::new(),
flush_tasks: TaskTracker::new(),
}
}
@@ -99,11 +95,8 @@ impl RegionHeartbeatResponseHandler {
handler_context.handle_upgrade_region_instruction(upgrade_region)
})),
Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
Instruction::FlushRegions(flush_regions) => Ok(Box::new(move |handler_context| {
handler_context.handle_flush_regions_instruction(flush_regions)
})),
Instruction::FlushRegion(flush_region) => Ok(Box::new(move |handler_context| {
handler_context.handle_flush_region_instruction(flush_region)
Instruction::FlushRegion(flush_regions) => Ok(Box::new(move |handler_context| {
handler_context.handle_flush_region_instruction(flush_regions)
})),
}
}
@@ -118,7 +111,6 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
| Some((_, Instruction::CloseRegion { .. }))
| Some((_, Instruction::DowngradeRegion { .. }))
| Some((_, Instruction::UpgradeRegion { .. }))
| Some((_, Instruction::FlushRegion { .. }))
)
}
@@ -132,14 +124,12 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
let region_server = self.region_server.clone();
let catchup_tasks = self.catchup_tasks.clone();
let downgrade_tasks = self.downgrade_tasks.clone();
let flush_tasks = self.flush_tasks.clone();
let handler = Self::build_handler(instruction)?;
let _handle = common_runtime::spawn_global(async move {
let reply = handler(HandlerContext {
region_server,
catchup_tasks,
downgrade_tasks,
flush_tasks,
})
.await;

View File

@@ -12,17 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::instruction::{FlushRegions, InstructionReply, SimpleReply};
use common_meta::instruction::{FlushRegions, InstructionReply};
use common_telemetry::warn;
use futures_util::future::BoxFuture;
use store_api::region_request::{RegionFlushRequest, RegionRequest};
use store_api::storage::RegionId;
use crate::error;
use crate::heartbeat::handler::HandlerContext;
impl HandlerContext {
pub(crate) fn handle_flush_regions_instruction(
pub(crate) fn handle_flush_region_instruction(
self,
flush_regions: FlushRegions,
) -> BoxFuture<'static, Option<InstructionReply>> {
@@ -50,59 +49,6 @@ impl HandlerContext {
None
})
}
pub(crate) fn handle_flush_region_instruction(
self,
region_id: RegionId,
) -> BoxFuture<'static, Option<InstructionReply>> {
Box::pin(async move {
let Some(writable) = self.region_server.is_region_leader(region_id) else {
return Some(InstructionReply::FlushRegion(SimpleReply {
result: false,
error: Some("Region is not leader".to_string()),
}));
};
if !writable {
return Some(InstructionReply::FlushRegion(SimpleReply {
result: false,
error: Some("Region is not writable".to_string()),
}));
}
let region_server_moved = self.region_server.clone();
let register_result = self
.flush_tasks
.try_register(
region_id,
Box::pin(async move {
let request = RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
});
region_server_moved
.handle_request(region_id, request)
.await?;
Ok(())
}),
)
.await;
if register_result.is_busy() {
warn!("Another flush task is running for the region: {region_id}");
}
let mut watcher = register_result.into_watcher();
let result = self.flush_tasks.wait_until_finish(&mut watcher).await;
match result {
Ok(()) => Some(InstructionReply::FlushRegion(SimpleReply {
result: true,
error: None,
})),
Err(err) => Some(InstructionReply::FlushRegion(SimpleReply {
result: false,
error: Some(format!("{err:?}")),
})),
}
})
}
}
#[cfg(test)]
@@ -138,7 +84,7 @@ mod tests {
let reply = handler_context
.clone()
.handle_flush_regions_instruction(FlushRegions {
.handle_flush_region_instruction(FlushRegions {
region_ids: region_ids.clone(),
})
.await;
@@ -148,7 +94,7 @@ mod tests {
flushed_region_ids.write().unwrap().clear();
let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::<Vec<_>>();
let reply = handler_context
.handle_flush_regions_instruction(FlushRegions {
.handle_flush_region_instruction(FlushRegions {
region_ids: not_found_region_ids.clone(),
})
.await;

View File

@@ -144,11 +144,6 @@ impl<T: Send + Sync + Clone + 'static> TaskTracker<T> {
}
}
/// Waits for a [RegisterResult] and returns a [WaitResult].
pub(crate) async fn wait_until_finish(&self, watcher: &mut TaskWatcher<T>) -> Result<T> {
wait(watcher).await
}
/// Tries to register a new async task, returns [RegisterResult::Busy] if previous task is running.
pub(crate) async fn try_register(
&self,

View File

@@ -62,7 +62,7 @@ impl<'a> DatanodeServiceBuilder<'a> {
}
}
pub fn build(mut self) -> Result<ServerHandlers> {
pub async fn build(mut self) -> Result<ServerHandlers> {
let handlers = ServerHandlers::default();
if let Some(grpc_server) = self.grpc_server.take() {
@@ -70,7 +70,7 @@ impl<'a> DatanodeServiceBuilder<'a> {
addr: &self.opts.grpc.bind_addr,
})?;
let handler: ServerHandler = (Box::new(grpc_server), addr);
handlers.insert(handler);
handlers.insert(handler).await;
}
if self.enable_http_service {
@@ -82,7 +82,7 @@ impl<'a> DatanodeServiceBuilder<'a> {
addr: &self.opts.http.addr,
})?;
let handler: ServerHandler = (Box::new(http_server), addr);
handlers.insert(handler);
handlers.insert(handler).await;
}
Ok(handlers)

View File

@@ -25,6 +25,7 @@ use std::sync::Arc;
use std::time::Duration;
use common_telemetry::{info, warn};
use mito2::access_layer::{ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR};
use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer};
use object_store::services::Fs;
use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
@@ -168,9 +169,13 @@ async fn build_cache_layer(
if let Some(path) = cache_path.as_ref()
&& !path.trim().is_empty()
{
let atomic_temp_dir = join_dir(path, ".tmp/");
let atomic_temp_dir = join_dir(path, ATOMIC_WRITE_DIR);
clean_temp_dir(&atomic_temp_dir)?;
// Compatible code. Remove this after a major release.
let old_atomic_temp_dir = join_dir(path, OLD_ATOMIC_WRITE_DIR);
clean_temp_dir(&old_atomic_temp_dir)?;
let cache_store = Fs::default()
.root(path)
.atomic_write_dir(&atomic_temp_dir)

View File

@@ -15,6 +15,7 @@
use std::{fs, path};
use common_telemetry::info;
use mito2::access_layer::{ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR};
use object_store::services::Fs;
use object_store::util::join_dir;
use object_store::ObjectStore;
@@ -33,9 +34,13 @@ pub async fn new_fs_object_store(
.context(error::CreateDirSnafu { dir: data_home })?;
info!("The file storage home is: {}", data_home);
let atomic_write_dir = join_dir(data_home, ".tmp/");
let atomic_write_dir = join_dir(data_home, ATOMIC_WRITE_DIR);
store::clean_temp_dir(&atomic_write_dir)?;
// Compatible code. Remove this after a major release.
let old_atomic_temp_dir = join_dir(data_home, OLD_ATOMIC_WRITE_DIR);
store::clean_temp_dir(&old_atomic_temp_dir)?;
let builder = Fs::default()
.root(data_home)
.atomic_write_dir(&atomic_write_dir);

View File

@@ -20,7 +20,6 @@ use snafu::{ensure, ResultExt};
use crate::data_type::{ConcreteDataType, DataType};
use crate::error::{self, Result};
use crate::types::cast;
use crate::value::Value;
use crate::vectors::operations::VectorOp;
use crate::vectors::{TimestampMillisecondVector, VectorRef};
@@ -179,18 +178,6 @@ impl ColumnDefaultConstraint {
}
}
/// Cast default value to given type
pub fn cast_to_datatype(&self, data_type: &ConcreteDataType) -> Result<Self> {
match self {
ColumnDefaultConstraint::Value(v) => Ok(Self::Value(cast(v.clone(), data_type)?)),
ColumnDefaultConstraint::Function(expr) => match &expr[..] {
// no need to cast, since function always require a data_type when need to create default value
CURRENT_TIMESTAMP | CURRENT_TIMESTAMP_FN | NOW_FN => Ok(self.clone()),
_ => error::UnsupportedDefaultExprSnafu { expr }.fail(),
},
}
}
/// Only create default vector if it's impure, i.e., it's a function.
///
/// This helps to delay creating constant default values to mito engine while also keeps impure default have consistent values
@@ -266,10 +253,9 @@ fn create_current_timestamp_vector(
data_type: &ConcreteDataType,
num_rows: usize,
) -> Result<VectorRef> {
let current_timestamp_vector = TimestampMillisecondVector::from_values(std::iter::repeat_n(
util::current_time_millis(),
num_rows,
));
let current_timestamp_vector = TimestampMillisecondVector::from_values(
std::iter::repeat(util::current_time_millis()).take(num_rows),
);
if data_type.is_timestamp() {
current_timestamp_vector.cast(data_type)
} else {

View File

@@ -198,7 +198,8 @@ impl fmt::Debug for ConstantVector {
impl Serializable for ConstantVector {
fn serialize_to_json(&self) -> Result<Vec<serde_json::Value>> {
std::iter::repeat_n(self.get(0), self.len())
std::iter::repeat(self.get(0))
.take(self.len())
.map(serde_json::Value::try_from)
.collect::<serde_json::Result<_>>()
.context(SerializeSnafu)

View File

@@ -412,7 +412,7 @@ pub(crate) fn replicate_decimal128(
// Safety: std::iter::Repeat and std::iter::Take implement TrustedLen.
builder
.mutable_array
.append_trusted_len_iter(std::iter::repeat_n(data, repeat_times));
.append_trusted_len_iter(std::iter::repeat(data).take(repeat_times));
}
}
None => {

View File

@@ -120,7 +120,9 @@ impl fmt::Debug for NullVector {
impl Serializable for NullVector {
fn serialize_to_json(&self) -> Result<Vec<serde_json::Value>> {
Ok(std::iter::repeat_n(serde_json::Value::Null, self.len()).collect())
Ok(std::iter::repeat(serde_json::Value::Null)
.take(self.len())
.collect())
}
}

Some files were not shown because too many files have changed in this diff Show More