Compare commits

..

1 Commits

Author SHA1 Message Date
discord9
77e340270e Downgrade rust-toolchain.toml
DO NOT MERGE
2025-04-29 17:52:43 +08:00
223 changed files with 2911 additions and 7101 deletions

View File

@@ -22,7 +22,6 @@ concurrency:
jobs:
check-typos-and-docs:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Check typos and docs
runs-on: ubuntu-latest
steps:
@@ -37,7 +36,6 @@ jobs:
|| (echo "'config/config.md' is not up-to-date, please run 'make config-docs'." && exit 1)
license-header-check:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
runs-on: ubuntu-latest
name: Check License Header
steps:
@@ -47,7 +45,6 @@ jobs:
- uses: korandoru/hawkeye@v5
check:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Check
runs-on: ${{ matrix.os }}
strategy:
@@ -74,7 +71,6 @@ jobs:
run: cargo check --locked --workspace --all-targets
toml:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Toml Check
runs-on: ubuntu-latest
timeout-minutes: 60
@@ -89,7 +85,6 @@ jobs:
run: taplo format --check
build:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Build GreptimeDB binaries
runs-on: ${{ matrix.os }}
strategy:
@@ -132,7 +127,6 @@ jobs:
version: current
fuzztest:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Fuzz Test
needs: build
runs-on: ubuntu-latest
@@ -189,7 +183,6 @@ jobs:
max-total-time: 120
unstable-fuzztest:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Unstable Fuzz Test
needs: build-greptime-ci
runs-on: ubuntu-latest
@@ -251,7 +244,6 @@ jobs:
retention-days: 3
build-greptime-ci:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Build GreptimeDB binary (profile-CI)
runs-on: ${{ matrix.os }}
strategy:
@@ -293,7 +285,6 @@ jobs:
version: current
distributed-fuzztest:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Fuzz Test (Distributed, ${{ matrix.mode.name }}, ${{ matrix.target }})
runs-on: ubuntu-latest
needs: build-greptime-ci
@@ -425,7 +416,6 @@ jobs:
docker system prune -f
distributed-fuzztest-with-chaos:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Fuzz Test with Chaos (Distributed, ${{ matrix.mode.name }}, ${{ matrix.target }})
runs-on: ubuntu-latest
needs: build-greptime-ci
@@ -573,7 +563,6 @@ jobs:
docker system prune -f
sqlness:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Sqlness Test (${{ matrix.mode.name }})
needs: build
runs-on: ${{ matrix.os }}
@@ -620,7 +609,6 @@ jobs:
retention-days: 3
fmt:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Rustfmt
runs-on: ubuntu-latest
timeout-minutes: 60
@@ -638,7 +626,6 @@ jobs:
run: make fmt-check
clippy:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Clippy
runs-on: ubuntu-latest
timeout-minutes: 60
@@ -664,7 +651,6 @@ jobs:
run: make clippy
conflict-check:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Check for conflict
runs-on: ubuntu-latest
steps:
@@ -675,7 +661,7 @@ jobs:
uses: olivernybroe/action-conflict-finder@v4.0
test:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' && github.event_name != 'merge_group' }}
if: github.event_name != 'merge_group'
runs-on: ubuntu-22.04-arm
timeout-minutes: 60
needs: [conflict-check, clippy, fmt]
@@ -727,7 +713,7 @@ jobs:
UNITTEST_LOG_DIR: "__unittest_logs"
coverage:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' && github.event_name == 'merge_group' }}
if: github.event_name == 'merge_group'
runs-on: ubuntu-22.04-8-cores
timeout-minutes: 60
steps:
@@ -787,7 +773,6 @@ jobs:
verbose: true
# compat:
# if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
# name: Compatibility Test
# needs: build
# runs-on: ubuntu-22.04

View File

@@ -117,16 +117,16 @@ jobs:
name: Run clean build on Linux
runs-on: ubuntu-latest
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
timeout-minutes: 45
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
persist-credentials: false
- uses: cachix/install-nix-action@v31
- uses: cachix/install-nix-action@v27
with:
nix_path: nixpkgs=channel:nixos-24.11
- run: nix develop --command cargo build --bin greptime
- run: nix develop --command cargo build
check-status:
name: Check status

View File

@@ -464,29 +464,6 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
DOCS_REPO_TOKEN: ${{ secrets.DOCS_REPO_TOKEN }}
bump-website-version:
name: Bump website version
if: ${{ github.event_name == 'push' || github.event_name == 'schedule' }}
needs: [allocate-runners]
runs-on: ubuntu-latest
# Permission reference: https://docs.github.com/en/actions/using-jobs/assigning-permissions-to-jobs
permissions:
issues: write # Allows the action to create issues for cyborg.
contents: write # Allows the action to create a release.
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
persist-credentials: false
- uses: ./.github/actions/setup-cyborg
- name: Bump website version
working-directory: cyborg
run: pnpm tsx bin/bump-website-version.ts
env:
VERSION: ${{ needs.allocate-runners.outputs.version }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
WEBSITE_REPO_TOKEN: ${{ secrets.WEBSITE_REPO_TOKEN }}
notification:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' && (github.event_name == 'push' || github.event_name == 'schedule') && always() }}
name: Send notification to Greptime team

1
.gitignore vendored
View File

@@ -28,7 +28,6 @@ debug/
# Logs
**/__unittest_logs
logs/
!grafana/dashboards/logs/
# cpython's generated python byte code
**/__pycache__/

796
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -130,7 +130,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "17a3550751c8b1e02ec16be40101d5f24dc255c3" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "e82b0158cd38d4021edb4e4c0ae77f999051e62f" }
hex = "0.4"
http = "1"
humantime = "2.1"

195
README.md
View File

@@ -8,8 +8,6 @@
<h2 align="center">Real-Time & Cloud-Native Observability Database<br/>for metrics, logs, and traces</h2>
> Delivers sub-second querying at PB scale and exceptional cost efficiency from edge to cloud.
<div align="center">
<h3 align="center">
<a href="https://greptime.com/product/cloud">GreptimeCloud</a> |
@@ -51,77 +49,74 @@
</div>
- [Introduction](#introduction)
- [⭐ Key Features](#features)
- [Quick Comparison](#quick-comparison)
- [Architecture](#architecture)
- [Try GreptimeDB](#try-greptimedb)
- [**Features: Why GreptimeDB**](#why-greptimedb)
- [Architecture](https://docs.greptime.com/contributor-guide/overview/#architecture)
- [Try it for free](#try-greptimedb)
- [Getting Started](#getting-started)
- [Build From Source](#build-from-source)
- [Tools & Extensions](#tools--extensions)
- [Project Status](#project-status)
- [Community](#community)
- [Join the community](#community)
- [Contributing](#contributing)
- [Tools & Extensions](#tools--extensions)
- [License](#license)
- [Commercial Support](#commercial-support)
- [Contributing](#contributing)
- [Acknowledgement](#acknowledgement)
## Introduction
**GreptimeDB** is an open-source, cloud-native database purpose-built for the unified collection and analysis of observability data (metrics, logs, and traces). Whether youre operating on the edge, in the cloud, or across hybrid environments, GreptimeDB empowers real-time insights at massive scale — all in one system.
**GreptimeDB** is an open-source, cloud-native, unified & cost-effective observability database for **Metrics**, **Logs**, and **Traces**. You can gain real-time insights from Edge to Cloud at Any Scale.
## Features
## News
| Feature | Description |
| --------- | ----------- |
| [Unified Observability Data](https://docs.greptime.com/user-guide/concepts/why-greptimedb) | Store metrics, logs, and traces as timestamped, contextual wide events. Query via [SQL](https://docs.greptime.com/user-guide/query-data/sql), [PromQL](https://docs.greptime.com/user-guide/query-data/promql), and [streaming](https://docs.greptime.com/user-guide/flow-computation/overview). |
| [High Performance & Cost Effective](https://docs.greptime.com/user-guide/manage-data/data-index) | Written in Rust, with a distributed query engine, [rich indexing](https://docs.greptime.com/user-guide/manage-data/data-index), and optimized columnar storage, delivering sub-second responses at PB scale. |
| [Cloud-Native Architecture](https://docs.greptime.com/user-guide/concepts/architecture) | Designed for [Kubernetes](https://docs.greptime.com/user-guide/deployments/deploy-on-kubernetes/greptimedb-operator-management), with compute/storage separation, native object storage (AWS S3, Azure Blob, etc.) and seamless cross-cloud access. |
| [Developer-Friendly](https://docs.greptime.com/user-guide/protocols/overview) | Access via SQL/PromQL interfaces, REST API, MySQL/PostgreSQL protocols, and popular ingestion [protocols](https://docs.greptime.com/user-guide/protocols/overview). |
| [Flexible Deployment](https://docs.greptime.com/user-guide/deployments/overview) | Deploy anywhere: edge (including ARM/[Android](https://docs.greptime.com/user-guide/deployments/run-on-android)) or cloud, with unified APIs and efficient data sync. |
**[GreptimeDB tops JSONBench's billion-record cold run test!](https://greptime.com/blogs/2025-03-18-jsonbench-greptimedb-performance)**
Learn more in [Why GreptimeDB](https://docs.greptime.com/user-guide/concepts/why-greptimedb) and [Observability 2.0 and the Database for It](https://greptime.com/blogs/2025-04-25-greptimedb-observability2-new-database).
## Why GreptimeDB
## Quick Comparison
Our core developers have been building observability data platforms for years. Based on our best practices, GreptimeDB was born to give you:
| Feature | GreptimeDB | Traditional TSDB | Log Stores |
|----------------------------------|-----------------------|--------------------|-----------------|
| Data Types | Metrics, Logs, Traces | Metrics only | Logs only |
| Query Language | SQL, PromQL, Streaming| Custom/PromQL | Custom/DSL |
| Deployment | Edge + Cloud | Cloud/On-prem | Mostly central |
| Indexing & Performance | PB-Scale, Sub-second | Varies | Varies |
| Integration | REST, SQL, Common protocols | Varies | Varies |
* **Unified Processing of Observability Data**
**Performance:**
* [GreptimeDB tops JSONBench's billion-record cold run test!](https://greptime.com/blogs/2025-03-18-jsonbench-greptimedb-performance)
* [TSBS Benchmark](https://github.com/GreptimeTeam/greptimedb/tree/main/docs/benchmarks/tsbs)
A unified database that treats metrics, logs, and traces as timestamped wide events with context, supporting [SQL](https://docs.greptime.com/user-guide/query-data/sql)/[PromQL](https://docs.greptime.com/user-guide/query-data/promql) queries and [stream processing](https://docs.greptime.com/user-guide/flow-computation/overview) to simplify complex data stacks.
Read [more benchmark reports](https://docs.greptime.com/user-guide/concepts/features-that-you-concern#how-is-greptimedbs-performance-compared-to-other-solutions).
* **High Performance and Cost-effective**
## Architecture
Written in Rust, combines a distributed query engine with [rich indexing](https://docs.greptime.com/user-guide/manage-data/data-index) (inverted, fulltext, skip data, and vector) and optimized columnar storage to deliver sub-second responses on petabyte-scale data and high-cost efficiency.
* Read the [architecture](https://docs.greptime.com/contributor-guide/overview/#architecture) document.
* [DeepWiki](https://deepwiki.com/GreptimeTeam/greptimedb/1-overview) provides an in-depth look at GreptimeDB:
<img alt="GreptimeDB System Overview" src="docs/architecture.png">
* **Cloud-native Distributed Database**
Built for [Kubernetes](https://docs.greptime.com/user-guide/deployments/deploy-on-kubernetes/greptimedb-operator-management). GreptimeDB achieves seamless scalability with its [cloud-native architecture](https://docs.greptime.com/user-guide/concepts/architecture) of separated compute and storage, built on object storage (AWS S3, Azure Blob Storage, etc.) while enabling cross-cloud deployment through a unified data access layer.
* **Developer-Friendly**
Access standardized SQL/PromQL interfaces through built-in web dashboard, REST API, and MySQL/PostgreSQL protocols. Supports widely adopted data ingestion [protocols](https://docs.greptime.com/user-guide/protocols/overview) for seamless migration and integration.
* **Flexible Deployment Options**
Deploy GreptimeDB anywhere from ARM-based edge devices to cloud environments with unified APIs and bandwidth-efficient data synchronization. Query edge and cloud data seamlessly through identical APIs. [Learn how to run on Android](https://docs.greptime.com/user-guide/deployments/run-on-android/).
For more detailed info please read [Why GreptimeDB](https://docs.greptime.com/user-guide/concepts/why-greptimedb).
## Try GreptimeDB
### 1. [Live Demo](https://greptime.com/playground)
Experience GreptimeDB directly in your browser.
Try out the features of GreptimeDB right from your browser.
### 2. [GreptimeCloud](https://console.greptime.cloud/)
Start instantly with a free cluster.
### 3. Docker (Local Quickstart)
### 3. Docker Image
To install GreptimeDB locally, the recommended way is via Docker:
```shell
docker pull greptime/greptimedb
```
Start a GreptimeDB container with:
```shell
docker run -p 127.0.0.1:4000-4003:4000-4003 \
-v "$(pwd)/greptimedb:/greptimedb_data" \
-v "$(pwd)/greptimedb:./greptimedb_data" \
--name greptime --rm \
greptime/greptimedb:latest standalone start \
--http-addr 0.0.0.0:4000 \
@@ -129,90 +124,114 @@ docker run -p 127.0.0.1:4000-4003:4000-4003 \
--mysql-addr 0.0.0.0:4002 \
--postgres-addr 0.0.0.0:4003
```
Dashboard: [http://localhost:4000/dashboard](http://localhost:4000/dashboard)
[Full Install Guide](https://docs.greptime.com/getting-started/installation/overview)
**Troubleshooting:**
* Cannot connect to the database? Ensure that ports `4000`, `4001`, `4002`, and `4003` are not blocked by a firewall or used by other services.
* Failed to start? Check the container logs with `docker logs greptime` for further details.
Access the dashboard via `http://localhost:4000/dashboard`.
Read more about [Installation](https://docs.greptime.com/getting-started/installation/overview) on docs.
## Getting Started
- [Quickstart](https://docs.greptime.com/getting-started/quick-start)
- [User Guide](https://docs.greptime.com/user-guide/overview)
- [Demo Scenes](https://github.com/GreptimeTeam/demo-scene)
- [FAQ](https://docs.greptime.com/faq-and-others/faq)
* [Quickstart](https://docs.greptime.com/getting-started/quick-start)
* [User Guide](https://docs.greptime.com/user-guide/overview)
* [Demos](https://github.com/GreptimeTeam/demo-scene)
* [FAQ](https://docs.greptime.com/faq-and-others/faq)
## Build From Source
## Build
Check the prerequisite:
**Prerequisites:**
* [Rust toolchain](https://www.rust-lang.org/tools/install) (nightly)
* [Protobuf compiler](https://grpc.io/docs/protoc-installation/) (>= 3.15)
* C/C++ building essentials, including `gcc`/`g++`/`autoconf` and glibc library (eg. `libc6-dev` on Ubuntu and `glibc-devel` on Fedora)
* Python toolchain (optional): Required only if using some test scripts.
**Build and Run:**
```bash
Build GreptimeDB binary:
```shell
make
```
Run a standalone server:
```shell
cargo run -- standalone start
```
## Tools & Extensions
- **Kubernetes:** [GreptimeDB Operator](https://github.com/GrepTimeTeam/greptimedb-operator)
- **Helm Charts:** [Greptime Helm Charts](https://github.com/GreptimeTeam/helm-charts)
- **Dashboard:** [Web UI](https://github.com/GreptimeTeam/dashboard)
- **SDKs/Ingester:** [Go](https://github.com/GreptimeTeam/greptimedb-ingester-go), [Java](https://github.com/GreptimeTeam/greptimedb-ingester-java), [C++](https://github.com/GreptimeTeam/greptimedb-ingester-cpp), [Erlang](https://github.com/GreptimeTeam/greptimedb-ingester-erl), [Rust](https://github.com/GreptimeTeam/greptimedb-ingester-rust), [JS](https://github.com/GreptimeTeam/greptimedb-ingester-js)
- **Grafana**: [Official Dashboard](https://github.com/GreptimeTeam/greptimedb/blob/main/grafana/README.md)
### Kubernetes
- [GreptimeDB Operator](https://github.com/GrepTimeTeam/greptimedb-operator)
### Dashboard
- [The dashboard UI for GreptimeDB](https://github.com/GreptimeTeam/dashboard)
### SDK
- [GreptimeDB Go Ingester](https://github.com/GreptimeTeam/greptimedb-ingester-go)
- [GreptimeDB Java Ingester](https://github.com/GreptimeTeam/greptimedb-ingester-java)
- [GreptimeDB C++ Ingester](https://github.com/GreptimeTeam/greptimedb-ingester-cpp)
- [GreptimeDB Erlang Ingester](https://github.com/GreptimeTeam/greptimedb-ingester-erl)
- [GreptimeDB Rust Ingester](https://github.com/GreptimeTeam/greptimedb-ingester-rust)
- [GreptimeDB JavaScript Ingester](https://github.com/GreptimeTeam/greptimedb-ingester-js)
### Grafana Dashboard
Our official Grafana dashboard for monitoring GreptimeDB is available at [grafana](grafana/README.md) directory.
## Project Status
> **Status:** Beta.
> **GA (v1.0):** Targeted for mid 2025.
GreptimeDB is currently in Beta. We are targeting GA (General Availability) with v1.0 release by Early 2025.
- Being used in production by early adopters
- Stable, actively maintained, with regular releases ([version info](https://docs.greptime.com/nightly/reference/about-greptimedb-version))
- Suitable for evaluation and pilot deployments
While in Beta, GreptimeDB is already:
* Being used in production by early adopters
* Actively maintained with regular releases, [about version number](https://docs.greptime.com/nightly/reference/about-greptimedb-version)
* Suitable for testing and evaluation
For production use, we recommend using the latest stable release.
[![Star History Chart](https://api.star-history.com/svg?repos=GreptimeTeam/GreptimeDB&type=Date)](https://www.star-history.com/#GreptimeTeam/GreptimeDB&Date)
If you find this project useful, a ⭐ would mean a lot to us!
<img alt="Known Users" src="https://greptime.com/logo/img/users.png"/>
## Community
We invite you to engage and contribute!
Our core team is thrilled to see you participate in any ways you like. When you are stuck, try to
ask for help by filling an issue with a detailed description of what you were trying to do
and what went wrong. If you have any questions or if you would like to get involved in our
community, please check out:
- [Slack](https://greptime.com/slack)
- [Discussions](https://github.com/GreptimeTeam/greptimedb/discussions)
- [Official Website](https://greptime.com/)
- [Blog](https://greptime.com/blogs/)
- [LinkedIn](https://www.linkedin.com/company/greptime/)
- [Twitter](https://twitter.com/greptime)
- GreptimeDB Community on [Slack](https://greptime.com/slack)
- GreptimeDB [GitHub Discussions forum](https://github.com/GreptimeTeam/greptimedb/discussions)
- Greptime official [website](https://greptime.com)
## License
In addition, you may:
GreptimeDB is licensed under the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0.txt).
- View our official [Blog](https://greptime.com/blogs/)
- Connect us with [Linkedin](https://www.linkedin.com/company/greptime/)
- Follow us on [Twitter](https://twitter.com/greptime)
## Commercial Support
Running GreptimeDB in your organization?
We offer enterprise add-ons, services, training, and consulting.
[Contact us](https://greptime.com/contactus) for details.
If you are running GreptimeDB OSS in your organization, we offer additional
enterprise add-ons, installation services, training, and consulting. [Contact
us](https://greptime.com/contactus) and we will reach out to you with more
detail of our commercial license.
## License
GreptimeDB uses the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0.txt) to strike a balance between
open contributions and allowing you to use the software however you want.
## Contributing
- Read our [Contribution Guidelines](https://github.com/GreptimeTeam/greptimedb/blob/main/CONTRIBUTING.md).
- Explore [Internal Concepts](https://docs.greptime.com/contributor-guide/overview.html) and [DeepWiki](https://deepwiki.com/GreptimeTeam/greptimedb).
- Pick up a [good first issue](https://github.com/GreptimeTeam/greptimedb/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+first+issue%22) and join the #contributors [Slack](https://greptime.com/slack) channel.
Please refer to [contribution guidelines](CONTRIBUTING.md) and [internal concepts docs](https://docs.greptime.com/contributor-guide/overview.html) for more information.
## Acknowledgement
Special thanks to all contributors! See [AUTHORS.md](https://github.com/GreptimeTeam/greptimedb/blob/main/AUTHOR.md).
Special thanks to all the contributors who have propelled GreptimeDB forward. For a complete list of contributors, please refer to [AUTHOR.md](AUTHOR.md).
- Uses [Apache Arrow™](https://arrow.apache.org/) (memory model)
- [Apache Parquet](https://parquet.apache.org/) (file storage)
- [Apache Arrow DataFusion](https://arrow.apache.org/datafusion/) (query engine)
- [Apache OpenDAL™](https://opendal.apache.org/) (data access abstraction)
- [etcd](https://etcd.io/) (meta service)
- GreptimeDB uses [Apache Arrow™](https://arrow.apache.org/) as the memory model and [Apache Parquet™](https://parquet.apache.org/) as the persistent file format.
- GreptimeDB's query engine is powered by [Apache Arrow DataFusion](https://arrow.apache.org/datafusion/).
- [Apache OpenDAL](https://opendal.apache.org) gives GreptimeDB a very general and elegant data access abstraction layer.
- GreptimeDB's meta service is based on [etcd](https://etcd.io/).
<img alt="Known Users" src="https://greptime.com/logo/img/users.png"/>

View File

@@ -1,57 +0,0 @@
/*
* Copyright 2023 Greptime Team
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import * as core from "@actions/core";
import {obtainClient} from "@/common";
async function triggerWorkflow(workflowId: string, version: string) {
const websiteClient = obtainClient("WEBSITE_REPO_TOKEN")
try {
await websiteClient.rest.actions.createWorkflowDispatch({
owner: "GreptimeTeam",
repo: "website",
workflow_id: workflowId,
ref: "main",
inputs: {
version,
},
});
console.log(`Successfully triggered ${workflowId} workflow with version ${version}`);
} catch (error) {
core.setFailed(`Failed to trigger workflow: ${error.message}`);
}
}
const version = process.env.VERSION;
if (!version) {
core.setFailed("VERSION environment variable is required");
process.exit(1);
}
// Remove 'v' prefix if exists
const cleanVersion = version.startsWith('v') ? version.slice(1) : version;
if (cleanVersion.includes('nightly')) {
console.log('Nightly version detected, skipping workflow trigger.');
process.exit(0);
}
try {
triggerWorkflow('bump-patch-version.yml', cleanVersion);
} catch (error) {
core.setFailed(`Error processing version: ${error.message}`);
process.exit(1);
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 173 KiB

View File

@@ -2,63 +2,30 @@
## Overview
This repository contains Grafana dashboards for visualizing metrics and logs of GreptimeDB instances running in either cluster or standalone mode. **The Grafana version should be greater than 9.0**.
This repository maintains the Grafana dashboards for GreptimeDB. It has two types of dashboards:
We highly recommend using the self-monitoring feature provided by [GreptimeDB Operator](https://github.com/GrepTimeTeam/greptimedb-operator) to automatically collect metrics and logs from your GreptimeDB instances and store them in a dedicated GreptimeDB instance.
- `cluster/dashboard.json`: The Grafana dashboard for the GreptimeDB cluster. Read the [dashboard.md](./dashboards/cluster/dashboard.md) for more details.
- `standalone/dashboard.json`: The Grafana dashboard for the standalone GreptimeDB instance. **It's generated from the `cluster/dashboard.json` by removing the instance filter through the `make dashboards` command**. Read the [dashboard.md](./dashboards/standalone/dashboard.md) for more details.
- **Metrics Dashboards**
As the rapid development of GreptimeDB, the metrics may be changed, and please feel free to submit your feedback and/or contribution to this dashboard 🤗
- `dashboards/metrics/cluster/dashboard.json`: The Grafana dashboard for the GreptimeDB cluster. Read the [dashboard.md](./dashboards/metrics/cluster/dashboard.md) for more details.
- `dashboards/metrics/standalone/dashboard.json`: The Grafana dashboard for the standalone GreptimeDB instance. **It's generated from the `cluster/dashboard.json` by removing the instance filter through the `make dashboards` command**. Read the [dashboard.md](./dashboards/metrics/standalone/dashboard.md) for more details.
**NOTE**:
- **Logs Dashboard**
- The Grafana version should be greater than 9.0.
The `dashboards/logs/dashboard.json` provides a comprehensive Grafana dashboard for visualizing GreptimeDB logs. To utilize this dashboard effectively, you need to collect logs in JSON format from your GreptimeDB instances and store them in a dedicated GreptimeDB instance.
- If you want to modify the dashboards, you only need to modify the `cluster/dashboard.json` and run the `make dashboards` command to generate the `standalone/dashboard.json` and other related files.
For proper integration, the logs table must adhere to the following schema design with the table name `_gt_logs`:
To maintain the dashboards easily, we use the [`dac`](https://github.com/zyy17/dac) tool to generate the intermediate dashboards and markdown documents:
```sql
CREATE TABLE IF NOT EXISTS `_gt_logs` (
`pod_ip` STRING NULL,
`namespace` STRING NULL,
`cluster` STRING NULL,
`file` STRING NULL,
`module_path` STRING NULL,
`level` STRING NULL,
`target` STRING NULL,
`role` STRING NULL,
`pod` STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),
`message` STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false'),
`err` STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false'),
`timestamp` TIMESTAMP(9) NOT NULL,
TIME INDEX (`timestamp`),
PRIMARY KEY (`level`, `target`, `role`)
)
ENGINE=mito
WITH (
append_mode = 'true'
)
```
## Development
As GreptimeDB evolves rapidly, metrics may change over time. We welcome your feedback and contributions to improve these dashboards 🤗
To modify the metrics dashboards, simply edit the `dashboards/metrics/cluster/dashboard.json` file and run the `make dashboards` command. This will automatically generate the updated `dashboards/metrics/standalone/dashboard.json` and other related files.
For easier dashboard maintenance, we utilize the [`dac`](https://github.com/zyy17/dac) tool to generate human-readable intermediate dashboards and documentation:
- `dashboards/metrics/cluster/dashboard.yaml`: The intermediate dashboard file for the GreptimeDB cluster.
- `dashboards/metrics/standalone/dashboard.yaml`: The intermediate dashboard file for standalone GreptimeDB instances.
- `cluster/dashboard.yaml`: The intermediate dashboard for the GreptimeDB cluster.
- `standalone/dashboard.yaml`: The intermediate dashboard for the standalone GreptimeDB instance.
## Data Sources
The following data sources are used to fetch metrics and logs:
There are two data sources for the dashboards to fetch the metrics:
- **`${metrics}`**: Prometheus data source for providing the GreptimeDB metrics.
- **`${logs}`**: MySQL data source for providing the GreptimeDB logs.
- **`${information_schema}`**: MySQL data source for providing the information schema of the current instance and used for the `overview` panel. It is the MySQL port of the current monitored instance.
- **Prometheus**: Expose the metrics of GreptimeDB.
- **Information Schema**: It is the MySQL port of the current monitored instance. The `overview` dashboard will use this datasource to show the information schema of the current instance.
## Instance Filters
@@ -76,9 +43,9 @@ And the legend will be like: `[{{instance}}]-[{{ pod }}]`.
## Deployment
### (Recommended) Helm Chart
### Helm
If you use the [Helm Chart](https://github.com/GreptimeTeam/helm-charts) to deploy a GreptimeDB cluster, you can enable self-monitoring by setting the following values in your Helm chart:
If you use the Helm [chart](https://github.com/GreptimeTeam/helm-charts) to deploy a GreptimeDB cluster, you can enable self-monitoring by setting the following values in your Helm chart:
- `monitoring.enabled=true`: Deploys a standalone GreptimeDB instance dedicated to monitoring the cluster;
- `grafana.enabled=true`: Deploys Grafana and automatically imports the monitoring dashboard;
@@ -118,5 +85,5 @@ The standalone GreptimeDB instance will collect metrics from your cluster, and t
3. **Import the dashboards based on your deployment scenario**
- **Cluster**: Import the `dashboards/metrics/cluster/dashboard.json` dashboard.
- **Standalone**: Import the `dashboards/metrics/standalone/dashboard.json` dashboard.
- **Cluster**: Import the `cluster/dashboard.json` dashboard.
- **Standalone**: Import the `standalone/dashboard.json` dashboard.

View File

@@ -1,292 +0,0 @@
{
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": {
"type": "grafana",
"uid": "-- Grafana --"
},
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"type": "dashboard"
}
]
},
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"id": 12,
"links": [],
"panels": [
{
"datasource": {
"default": false,
"type": "mysql",
"uid": "${datasource}"
},
"fieldConfig": {
"defaults": {},
"overrides": []
},
"gridPos": {
"h": 20,
"w": 24,
"x": 0,
"y": 0
},
"id": 1,
"options": {
"dedupStrategy": "none",
"enableInfiniteScrolling": true,
"enableLogDetails": true,
"prettifyLogMessage": false,
"showCommonLabels": false,
"showLabels": false,
"showTime": true,
"sortOrder": "Descending",
"wrapLogMessage": false
},
"pluginVersion": "11.6.0",
"targets": [
{
"dataset": "greptime_private",
"datasource": {
"type": "mysql",
"uid": "${datasource}"
},
"editorMode": "code",
"format": "table",
"rawQuery": true,
"rawSql": "SELECT `timestamp`, CONCAT('[', `level`, ']', ' ', '<', `target`, '>', ' ', `message`),\n `role`,\n `pod`,\n `pod_ip`,\n `namespace`,\n `cluster`,\n `err`,\n `file`,\n `module_path`\nFROM\n `_gt_logs`\nWHERE\n (\n \"$level\" = \"'all'\"\n OR `level` IN ($level)\n ) \n AND (\n \"$role\" = \"'all'\"\n OR `role` IN ($role)\n )\n AND (\n \"$pod\" = \"\"\n OR `pod` = '$pod'\n )\n AND (\n \"$target\" = \"\"\n OR `target` = '$target'\n )\n AND (\n \"$search\" = \"\"\n OR matches_term(`message`, '$search')\n )\n AND (\n \"$exclude\" = \"\"\n OR NOT matches_term(`message`, '$exclude')\n )\n AND $__timeFilter(`timestamp`)\nORDER BY `timestamp` DESC\nLIMIT $limit;\n",
"refId": "A",
"sql": {
"columns": [
{
"parameters": [],
"type": "function"
}
],
"groupBy": [
{
"property": {
"type": "string"
},
"type": "groupBy"
}
],
"limit": 50
}
}
],
"title": "Logs",
"type": "logs"
}
],
"preload": false,
"refresh": "",
"schemaVersion": 41,
"tags": [],
"templating": {
"list": [
{
"current": {
"text": "logs",
"value": "P98F38F12DB221A8C"
},
"includeAll": false,
"name": "datasource",
"options": [],
"query": "mysql",
"refresh": 1,
"regex": "",
"type": "datasource"
},
{
"allValue": "'all'",
"current": {
"text": [
"$__all"
],
"value": [
"$__all"
]
},
"includeAll": true,
"label": "level",
"multi": true,
"name": "level",
"options": [
{
"selected": false,
"text": "INFO",
"value": "INFO"
},
{
"selected": false,
"text": "ERROR",
"value": "ERROR"
},
{
"selected": false,
"text": "WARN",
"value": "WARN"
},
{
"selected": false,
"text": "DEBUG",
"value": "DEBUG"
},
{
"selected": false,
"text": "TRACE",
"value": "TRACE"
}
],
"query": "INFO,ERROR,WARN,DEBUG,TRACE",
"type": "custom"
},
{
"allValue": "'all'",
"current": {
"text": [
"$__all"
],
"value": [
"$__all"
]
},
"includeAll": true,
"label": "role",
"multi": true,
"name": "role",
"options": [
{
"selected": false,
"text": "datanode",
"value": "datanode"
},
{
"selected": false,
"text": "frontend",
"value": "frontend"
},
{
"selected": false,
"text": "meta",
"value": "meta"
}
],
"query": "datanode,frontend,meta",
"type": "custom"
},
{
"current": {
"text": "",
"value": ""
},
"label": "pod",
"name": "pod",
"options": [
{
"selected": true,
"text": "",
"value": ""
}
],
"query": "",
"type": "textbox"
},
{
"current": {
"text": "",
"value": ""
},
"label": "target",
"name": "target",
"options": [
{
"selected": true,
"text": "",
"value": ""
}
],
"query": "",
"type": "textbox"
},
{
"current": {
"text": "",
"value": ""
},
"label": "search",
"name": "search",
"options": [
{
"selected": true,
"text": "",
"value": ""
}
],
"query": "",
"type": "textbox"
},
{
"current": {
"text": "",
"value": ""
},
"label": "exclude",
"name": "exclude",
"options": [
{
"selected": true,
"text": "",
"value": ""
}
],
"query": "",
"type": "textbox"
},
{
"current": {
"text": "2000",
"value": "2000"
},
"includeAll": false,
"label": "limit",
"name": "limit",
"options": [
{
"selected": true,
"text": "2000",
"value": "2000"
},
{
"selected": false,
"text": "5000",
"value": "5000"
},
{
"selected": false,
"text": "8000",
"value": "8000"
}
],
"query": "2000,5000,8000",
"type": "custom"
}
]
},
"time": {
"from": "now-6h",
"to": "now"
},
"timepicker": {},
"timezone": "browser",
"title": "GreptimeDB Logs",
"uid": "edx5veo4rd3wge2",
"version": 1
}

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env bash
DASHBOARD_DIR=${1:-grafana/dashboards/metrics}
DASHBOARD_DIR=${1:-grafana/dashboards}
check_dashboard_description() {
for dashboard in $(find $DASHBOARD_DIR -name "*.json"); do
@@ -25,7 +25,7 @@ check_dashboard_description() {
check_dashboards_generation() {
./grafana/scripts/gen-dashboards.sh
if [[ -n "$(git diff --name-only grafana/dashboards/metrics)" ]]; then
if [[ -n "$(git diff --name-only grafana/dashboards)" ]]; then
echo "Error: The dashboards are not generated correctly. You should execute the `make dashboards` command."
exit 1
fi

View File

@@ -1,7 +1,7 @@
#! /usr/bin/env bash
CLUSTER_DASHBOARD_DIR=${1:-grafana/dashboards/metrics/cluster}
STANDALONE_DASHBOARD_DIR=${2:-grafana/dashboards/metrics/standalone}
CLUSTER_DASHBOARD_DIR=${1:-grafana/dashboards/cluster}
STANDALONE_DASHBOARD_DIR=${2:-grafana/dashboards/standalone}
DAC_IMAGE=ghcr.io/zyy17/dac:20250423-522bd35
remove_instance_filters() {

View File

@@ -1,2 +1,2 @@
[toolchain]
channel = "nightly-2025-04-15"
channel = "nightly-2024-12-25"

View File

@@ -1050,7 +1050,7 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue {
Value::Int64(v) => Some(ValueData::I64Value(v)),
Value::Float32(v) => Some(ValueData::F32Value(*v)),
Value::Float64(v) => Some(ValueData::F64Value(*v)),
Value::String(v) => Some(ValueData::StringValue(v.into_string())),
Value::String(v) => Some(ValueData::StringValue(v.as_utf8().to_string())),
Value::Binary(v) => Some(ValueData::BinaryValue(v.to_vec())),
Value::Date(v) => Some(ValueData::DateValue(v.val())),
Value::Timestamp(v) => Some(match v.unit() {

View File

@@ -36,7 +36,7 @@ pub fn userinfo_by_name(username: Option<String>) -> UserInfoRef {
}
pub fn user_provider_from_option(opt: &String) -> Result<UserProviderRef> {
let (name, content) = opt.split_once(':').with_context(|| InvalidConfigSnafu {
let (name, content) = opt.split_once(':').context(InvalidConfigSnafu {
value: opt.to_string(),
msg: "UserProviderOption must be in format `<option>:<value>`",
})?;
@@ -57,24 +57,6 @@ pub fn user_provider_from_option(opt: &String) -> Result<UserProviderRef> {
}
}
pub fn static_user_provider_from_option(opt: &String) -> Result<StaticUserProvider> {
let (name, content) = opt.split_once(':').with_context(|| InvalidConfigSnafu {
value: opt.to_string(),
msg: "UserProviderOption must be in format `<option>:<value>`",
})?;
match name {
STATIC_USER_PROVIDER => {
let provider = StaticUserProvider::new(content)?;
Ok(provider)
}
_ => InvalidConfigSnafu {
value: name.to_string(),
msg: format!("Invalid UserProviderOption, expect only {STATIC_USER_PROVIDER}"),
}
.fail(),
}
}
type Username<'a> = &'a str;
type HostOrIp<'a> = &'a str;

View File

@@ -38,14 +38,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to convert to utf8"))]
FromUtf8 {
#[snafu(source)]
error: std::string::FromUtf8Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Authentication source failure"))]
AuthBackend {
#[snafu(implicit)]
@@ -93,7 +85,7 @@ impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::InvalidConfig { .. } => StatusCode::InvalidArguments,
Error::IllegalParam { .. } | Error::FromUtf8 { .. } => StatusCode::InvalidArguments,
Error::IllegalParam { .. } => StatusCode::InvalidArguments,
Error::FileWatch { .. } => StatusCode::InvalidArguments,
Error::InternalState { .. } => StatusCode::Unexpected,
Error::Io { .. } => StatusCode::StorageUnavailable,

View File

@@ -22,12 +22,10 @@ mod user_provider;
pub mod tests;
pub use common::{
auth_mysql, static_user_provider_from_option, user_provider_from_option, userinfo_by_name,
HashedPassword, Identity, Password,
auth_mysql, user_provider_from_option, userinfo_by_name, HashedPassword, Identity, Password,
};
pub use permission::{PermissionChecker, PermissionReq, PermissionResp};
pub use user_info::UserInfo;
pub use user_provider::static_user_provider::StaticUserProvider;
pub use user_provider::UserProvider;
/// pub type alias

View File

@@ -15,15 +15,15 @@
use std::collections::HashMap;
use async_trait::async_trait;
use snafu::{OptionExt, ResultExt};
use snafu::OptionExt;
use crate::error::{FromUtf8Snafu, InvalidConfigSnafu, Result};
use crate::error::{InvalidConfigSnafu, Result};
use crate::user_provider::{authenticate_with_credential, load_credential_from_file};
use crate::{Identity, Password, UserInfoRef, UserProvider};
pub(crate) const STATIC_USER_PROVIDER: &str = "static_user_provider";
pub struct StaticUserProvider {
pub(crate) struct StaticUserProvider {
users: HashMap<String, Vec<u8>>,
}
@@ -60,18 +60,6 @@ impl StaticUserProvider {
.fail(),
}
}
/// Return a random username/password pair
/// This is useful for invoking from other components in the cluster
pub fn get_one_user_pwd(&self) -> Result<(String, String)> {
let kv = self.users.iter().next().context(InvalidConfigSnafu {
value: "",
msg: "Expect at least one pair of username and password",
})?;
let username = kv.0;
let pwd = String::from_utf8(kv.1.clone()).context(FromUtf8Snafu)?;
Ok((username.clone(), pwd))
}
}
#[async_trait]

View File

@@ -51,6 +51,7 @@ opendal = { version = "0.51.1", features = [
query.workspace = true
rand.workspace = true
reqwest.workspace = true
rustyline = "10.1"
serde.workspace = true
serde_json.workspace = true
servers.workspace = true

154
src/cli/src/cmd.rs Normal file
View File

@@ -0,0 +1,154 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::error::{Error, InvalidReplCommandSnafu, Result};
/// Represents the parsed command from the user (which may be over many lines)
#[derive(Debug, PartialEq)]
pub(crate) enum ReplCommand {
Help,
UseDatabase { db_name: String },
Sql { sql: String },
Exit,
}
impl TryFrom<&str> for ReplCommand {
type Error = Error;
fn try_from(input: &str) -> Result<Self> {
let input = input.trim();
if input.is_empty() {
return InvalidReplCommandSnafu {
reason: "No command specified".to_string(),
}
.fail();
}
// If line ends with ';', it must be treated as a complete input.
// However, the opposite is not true.
let input_is_completed = input.ends_with(';');
let input = input.strip_suffix(';').map(|x| x.trim()).unwrap_or(input);
let lowercase = input.to_lowercase();
match lowercase.as_str() {
"help" => Ok(Self::Help),
"exit" | "quit" => Ok(Self::Exit),
_ => match input.split_once(' ') {
Some((maybe_use, database)) if maybe_use.to_lowercase() == "use" => {
Ok(Self::UseDatabase {
db_name: database.trim().to_string(),
})
}
// Any valid SQL must contains at least one whitespace.
Some(_) if input_is_completed => Ok(Self::Sql {
sql: input.to_string(),
}),
_ => InvalidReplCommandSnafu {
reason: format!("unknown command '{input}', maybe input is not completed"),
}
.fail(),
},
}
}
}
impl ReplCommand {
pub fn help() -> &'static str {
r#"
Available commands (case insensitive):
- 'help': print this help
- 'exit' or 'quit': exit the REPL
- 'use <your database name>': switch to another database/schema context
- Other typed in text will be treated as SQL.
You can enter new line while typing, just remember to end it with ';'.
"#
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::Error::InvalidReplCommand;
#[test]
fn test_from_str() {
fn test_ok(s: &str, expected: ReplCommand) {
let actual: ReplCommand = s.try_into().unwrap();
assert_eq!(expected, actual, "'{}'", s);
}
fn test_err(s: &str) {
let result: Result<ReplCommand> = s.try_into();
assert!(matches!(result, Err(InvalidReplCommand { .. })))
}
test_err("");
test_err(" ");
test_err("\t");
test_ok("help", ReplCommand::Help);
test_ok("help", ReplCommand::Help);
test_ok(" help", ReplCommand::Help);
test_ok(" help ", ReplCommand::Help);
test_ok(" HELP ", ReplCommand::Help);
test_ok(" Help; ", ReplCommand::Help);
test_ok(" help ; ", ReplCommand::Help);
test_ok("exit", ReplCommand::Exit);
test_ok("exit;", ReplCommand::Exit);
test_ok("exit ;", ReplCommand::Exit);
test_ok("EXIT", ReplCommand::Exit);
test_ok("quit", ReplCommand::Exit);
test_ok("quit;", ReplCommand::Exit);
test_ok("quit ;", ReplCommand::Exit);
test_ok("QUIT", ReplCommand::Exit);
test_ok(
"use Foo",
ReplCommand::UseDatabase {
db_name: "Foo".to_string(),
},
);
test_ok(
" use Foo ; ",
ReplCommand::UseDatabase {
db_name: "Foo".to_string(),
},
);
// ensure that database name is case sensitive
test_ok(
" use FOO ; ",
ReplCommand::UseDatabase {
db_name: "FOO".to_string(),
},
);
// ensure that we aren't messing with capitalization
test_ok(
"SELECT * from foo;",
ReplCommand::Sql {
sql: "SELECT * from foo".to_string(),
},
);
// Input line (that don't belong to any other cases above) must ends with ';' to make it a valid SQL.
test_err("insert blah");
test_ok(
"insert blah;",
ReplCommand::Sql {
sql: "insert blah".to_string(),
},
);
}
}

View File

@@ -101,6 +101,9 @@ pub enum Error {
error: reqwest::Error,
},
#[snafu(display("Invalid REPL command: {reason}"))]
InvalidReplCommand { reason: String },
#[snafu(display("Failed to parse SQL: {}", sql))]
ParseSql {
sql: String,
@@ -251,6 +254,7 @@ impl ErrorExt for Error {
Error::MissingConfig { .. }
| Error::LoadLayeredConfig { .. }
| Error::IllegalConfig { .. }
| Error::InvalidReplCommand { .. }
| Error::InitTimezone { .. }
| Error::ConnectEtcd { .. }
| Error::CreateDir { .. }

112
src/cli/src/helper.rs Normal file
View File

@@ -0,0 +1,112 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::borrow::Cow;
use rustyline::completion::Completer;
use rustyline::highlight::{Highlighter, MatchingBracketHighlighter};
use rustyline::hint::{Hinter, HistoryHinter};
use rustyline::validate::{ValidationContext, ValidationResult, Validator};
use crate::cmd::ReplCommand;
pub(crate) struct RustylineHelper {
hinter: HistoryHinter,
highlighter: MatchingBracketHighlighter,
}
impl Default for RustylineHelper {
fn default() -> Self {
Self {
hinter: HistoryHinter {},
highlighter: MatchingBracketHighlighter::default(),
}
}
}
impl rustyline::Helper for RustylineHelper {}
impl Validator for RustylineHelper {
fn validate(&self, ctx: &mut ValidationContext<'_>) -> rustyline::Result<ValidationResult> {
let input = ctx.input();
match ReplCommand::try_from(input) {
Ok(_) => Ok(ValidationResult::Valid(None)),
Err(e) => {
if input.trim_end().ends_with(';') {
// If line ends with ';', it HAS to be a valid command.
Ok(ValidationResult::Invalid(Some(e.to_string())))
} else {
Ok(ValidationResult::Incomplete)
}
}
}
}
}
impl Hinter for RustylineHelper {
type Hint = String;
fn hint(&self, line: &str, pos: usize, ctx: &rustyline::Context<'_>) -> Option<Self::Hint> {
self.hinter.hint(line, pos, ctx)
}
}
impl Highlighter for RustylineHelper {
fn highlight<'l>(&self, line: &'l str, pos: usize) -> Cow<'l, str> {
self.highlighter.highlight(line, pos)
}
fn highlight_prompt<'b, 's: 'b, 'p: 'b>(
&'s self,
prompt: &'p str,
default: bool,
) -> Cow<'b, str> {
self.highlighter.highlight_prompt(prompt, default)
}
fn highlight_hint<'h>(&self, hint: &'h str) -> Cow<'h, str> {
use nu_ansi_term::Style;
Cow::Owned(Style::new().dimmed().paint(hint).to_string())
}
fn highlight_candidate<'c>(
&self,
candidate: &'c str,
completion: rustyline::CompletionType,
) -> Cow<'c, str> {
self.highlighter.highlight_candidate(candidate, completion)
}
fn highlight_char(&self, line: &str, pos: usize) -> bool {
self.highlighter.highlight_char(line, pos)
}
}
impl Completer for RustylineHelper {
type Candidate = String;
fn complete(
&self,
line: &str,
pos: usize,
ctx: &rustyline::Context<'_>,
) -> rustyline::Result<(usize, Vec<Self::Candidate>)> {
// If there is a hint, use that as the auto-complete when user hits `tab`
if let Some(hint) = self.hinter.hint(line, pos, ctx) {
Ok((pos, vec![hint]))
} else {
Ok((0, vec![]))
}
}
}

View File

@@ -13,9 +13,15 @@
// limitations under the License.
mod bench;
mod database;
pub mod error;
// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373
#[allow(unused)]
mod cmd;
mod export;
mod helper;
// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373
mod database;
mod import;
use async_trait::async_trait;

View File

@@ -42,11 +42,11 @@ use futures::future;
use futures_util::{Stream, StreamExt, TryStreamExt};
use prost::Message;
use snafu::{ensure, ResultExt};
use tonic::metadata::{AsciiMetadataKey, AsciiMetadataValue, MetadataMap, MetadataValue};
use tonic::metadata::{AsciiMetadataKey, MetadataValue};
use tonic::transport::Channel;
use crate::error::{
ConvertFlightDataSnafu, Error, FlightGetSnafu, IllegalFlightMessagesSnafu,
ConvertFlightDataSnafu, Error, FlightGetSnafu, IllegalFlightMessagesSnafu, InvalidAsciiSnafu,
InvalidTonicMetadataValueSnafu, ServerSnafu,
};
use crate::{from_grpc_response, Client, Result};
@@ -165,27 +165,26 @@ impl Database {
let mut request = tonic::Request::new(request);
let metadata = request.metadata_mut();
Self::put_hints(metadata, hints)?;
for (key, value) in hints {
let key = AsciiMetadataKey::from_bytes(format!("x-greptime-hint-{}", key).as_bytes())
.map_err(|_| {
InvalidAsciiSnafu {
value: key.to_string(),
}
.build()
})?;
let value = value.parse().map_err(|_| {
InvalidAsciiSnafu {
value: value.to_string(),
}
.build()
})?;
metadata.insert(key, value);
}
let response = client.handle(request).await?.into_inner();
from_grpc_response(response)
}
fn put_hints(metadata: &mut MetadataMap, hints: &[(&str, &str)]) -> Result<()> {
let Some(value) = hints
.iter()
.map(|(k, v)| format!("{}={}", k, v))
.reduce(|a, b| format!("{},{}", a, b))
else {
return Ok(());
};
let key = AsciiMetadataKey::from_static("x-greptime-hints");
let value = AsciiMetadataValue::from_str(&value).context(InvalidTonicMetadataValueSnafu)?;
metadata.insert(key, value);
Ok(())
}
pub async fn handle(&self, request: Request) -> Result<u32> {
let mut client = make_database_client(&self.client)?.inner;
let request = self.to_rpc_request(request);
@@ -243,49 +242,39 @@ impl Database {
where
S: AsRef<str>,
{
self.sql_with_hint(sql, &[]).await
}
pub async fn sql_with_hint<S>(&self, sql: S, hints: &[(&str, &str)]) -> Result<Output>
where
S: AsRef<str>,
{
let request = Request::Query(QueryRequest {
self.do_get(Request::Query(QueryRequest {
query: Some(Query::Sql(sql.as_ref().to_string())),
});
self.do_get(request, hints).await
}))
.await
}
pub async fn logical_plan(&self, logical_plan: Vec<u8>) -> Result<Output> {
let request = Request::Query(QueryRequest {
self.do_get(Request::Query(QueryRequest {
query: Some(Query::LogicalPlan(logical_plan)),
});
self.do_get(request, &[]).await
}))
.await
}
pub async fn create(&self, expr: CreateTableExpr) -> Result<Output> {
let request = Request::Ddl(DdlRequest {
self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(expr)),
});
self.do_get(request, &[]).await
}))
.await
}
pub async fn alter(&self, expr: AlterTableExpr) -> Result<Output> {
let request = Request::Ddl(DdlRequest {
self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::AlterTable(expr)),
});
self.do_get(request, &[]).await
}))
.await
}
async fn do_get(&self, request: Request, hints: &[(&str, &str)]) -> Result<Output> {
async fn do_get(&self, request: Request) -> Result<Output> {
let request = self.to_rpc_request(request);
let request = Ticket {
ticket: request.encode_to_vec().into(),
};
let mut request = tonic::Request::new(request);
Self::put_hints(request.metadata_mut(), hints)?;
let mut client = self.client.make_flight_client()?;
let response = client.mut_inner().do_get(request).await.or_else(|e| {

View File

@@ -110,6 +110,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to parse ascii string: {}", value))]
InvalidAscii {
value: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid Tonic metadata value"))]
InvalidTonicMetadataValue {
#[snafu(source)]
@@ -136,7 +143,10 @@ impl ErrorExt for Error {
| Error::ConvertFlightData { source, .. }
| Error::CreateTlsChannel { source, .. } => source.status_code(),
Error::IllegalGrpcClientState { .. } => StatusCode::Unexpected,
Error::InvalidTonicMetadataValue { .. } => StatusCode::InvalidArguments,
Error::InvalidAscii { .. } | Error::InvalidTonicMetadataValue { .. } => {
StatusCode::InvalidArguments
}
}
}

View File

@@ -15,11 +15,9 @@
#![doc = include_str!("../../../../README.md")]
use clap::{Parser, Subcommand};
use cmd::datanode::builder::InstanceBuilder;
use cmd::error::{InitTlsProviderSnafu, Result};
use cmd::options::GlobalOptions;
use cmd::{cli, datanode, flownode, frontend, metasrv, standalone, App};
use common_base::Plugins;
use common_version::version;
use servers::install_ring_crypto_provider;
@@ -104,10 +102,10 @@ async fn main_body() -> Result<()> {
async fn start(cli: Command) -> Result<()> {
match cli.subcmd {
SubCommand::Datanode(cmd) => {
let opts = cmd.load_options(&cli.global_options)?;
let plugins = Plugins::new();
let builder = InstanceBuilder::try_new_with_init(opts, plugins).await?;
cmd.build_with(builder).await?.run().await
cmd.build(cmd.load_options(&cli.global_options)?)
.await?
.run()
.await
}
SubCommand::Flownode(cmd) => {
cmd.build(cmd.load_options(&cli.global_options)?)

View File

@@ -58,7 +58,7 @@ impl App for Instance {
false
}
async fn stop(&mut self) -> Result<()> {
async fn stop(&self) -> Result<()> {
Ok(())
}
}

View File

@@ -12,27 +12,33 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod builder;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use cache::build_datanode_cache_registry;
use catalog::kvbackend::MetaKvBackend;
use clap::Parser;
use common_base::Plugins;
use common_config::Configurable;
use common_meta::cache::LayeredCacheRegistryBuilder;
use common_telemetry::logging::TracingOptions;
use common_telemetry::{info, warn};
use common_version::{short_version, version};
use common_wal::config::DatanodeWalConfig;
use datanode::datanode::Datanode;
use meta_client::MetaClientOptions;
use snafu::{ensure, ResultExt};
use datanode::datanode::{Datanode, DatanodeBuilder};
use datanode::service::DatanodeServiceBuilder;
use meta_client::{MetaClientOptions, MetaClientType};
use servers::Mode;
use snafu::{ensure, OptionExt, ResultExt};
use tracing_appender::non_blocking::WorkerGuard;
use crate::datanode::builder::InstanceBuilder;
use crate::error::{
LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu,
LoadLayeredConfigSnafu, MetaClientInitSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu,
StartDatanodeSnafu,
};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::App;
use crate::{log_versions, App};
pub const APP_NAME: &str = "greptime-datanode";
@@ -77,7 +83,7 @@ impl App for Instance {
self.datanode.start().await.context(StartDatanodeSnafu)
}
async fn stop(&mut self) -> Result<()> {
async fn stop(&self) -> Result<()> {
self.datanode
.shutdown()
.await
@@ -92,8 +98,8 @@ pub struct Command {
}
impl Command {
pub async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
self.subcmd.build_with(builder).await
pub async fn build(&self, opts: DatanodeOptions) -> Result<Instance> {
self.subcmd.build(opts).await
}
pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
@@ -109,12 +115,9 @@ enum SubCommand {
}
impl SubCommand {
async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
async fn build(&self, opts: DatanodeOptions) -> Result<Instance> {
match self {
SubCommand::Start(cmd) => {
info!("Building datanode with {:#?}", cmd);
builder.build().await
}
SubCommand::Start(cmd) => cmd.build(opts).await,
}
}
}
@@ -260,6 +263,74 @@ impl StartCommand {
Ok(())
}
async fn build(&self, opts: DatanodeOptions) -> Result<Instance> {
common_runtime::init_global_runtimes(&opts.runtime);
let guard = common_telemetry::init_global_logging(
APP_NAME,
&opts.component.logging,
&opts.component.tracing,
opts.component.node_id.map(|x| x.to_string()),
);
log_versions(version(), short_version(), APP_NAME);
info!("Datanode start command: {:#?}", self);
info!("Datanode options: {:#?}", opts);
let plugin_opts = opts.plugins;
let mut opts = opts.component;
opts.grpc.detect_server_addr();
let mut plugins = Plugins::new();
plugins::setup_datanode_plugins(&mut plugins, &plugin_opts, &opts)
.await
.context(StartDatanodeSnafu)?;
let member_id = opts
.node_id
.context(MissingConfigSnafu { msg: "'node_id'" })?;
let meta_config = opts.meta_client.as_ref().context(MissingConfigSnafu {
msg: "'meta_client_options'",
})?;
let meta_client = meta_client::create_meta_client(
MetaClientType::Datanode { member_id },
meta_config,
None,
)
.await
.context(MetaClientInitSnafu)?;
let meta_backend = Arc::new(MetaKvBackend {
client: meta_client.clone(),
});
// Builds cache registry for datanode.
let layered_cache_registry = Arc::new(
LayeredCacheRegistryBuilder::default()
.add_cache_registry(build_datanode_cache_registry(meta_backend.clone()))
.build(),
);
let mut datanode = DatanodeBuilder::new(opts.clone(), plugins, Mode::Distributed)
.with_meta_client(meta_client)
.with_kv_backend(meta_backend)
.with_cache_registry(layered_cache_registry)
.build()
.await
.context(StartDatanodeSnafu)?;
let services = DatanodeServiceBuilder::new(&opts)
.with_default_grpc_server(&datanode.region_server())
.enable_http_service()
.build()
.await
.context(StartDatanodeSnafu)?;
datanode.setup_services(services);
Ok(Instance::new(datanode, guard))
}
}
#[cfg(test)]
@@ -281,6 +352,7 @@ mod tests {
common_telemetry::init_default_ut_logging();
let mut file = create_named_temp_file();
let toml_str = r#"
mode = "distributed"
enable_memory_catalog = false
node_id = 42
@@ -307,6 +379,7 @@ mod tests {
fn test_read_from_config_file() {
let mut file = create_named_temp_file();
let toml_str = r#"
mode = "distributed"
enable_memory_catalog = false
node_id = 42
@@ -472,6 +545,7 @@ mod tests {
fn test_config_precedence_order() {
let mut file = create_named_temp_file();
let toml_str = r#"
mode = "distributed"
enable_memory_catalog = false
node_id = 42
rpc_addr = "127.0.0.1:3001"

View File

@@ -1,137 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use cache::build_datanode_cache_registry;
use catalog::kvbackend::MetaKvBackend;
use common_base::Plugins;
use common_meta::cache::LayeredCacheRegistryBuilder;
use common_telemetry::info;
use common_version::{short_version, version};
use datanode::datanode::DatanodeBuilder;
use datanode::service::DatanodeServiceBuilder;
use meta_client::MetaClientType;
use snafu::{OptionExt, ResultExt};
use tracing_appender::non_blocking::WorkerGuard;
use crate::datanode::{DatanodeOptions, Instance, APP_NAME};
use crate::error::{MetaClientInitSnafu, MissingConfigSnafu, Result, StartDatanodeSnafu};
use crate::log_versions;
/// Builder for Datanode instance.
pub struct InstanceBuilder {
guard: Vec<WorkerGuard>,
opts: DatanodeOptions,
datanode_builder: DatanodeBuilder,
}
impl InstanceBuilder {
/// Try to create a new [InstanceBuilder], and do some initialization work like allocating
/// runtime resources, setting up global logging and plugins, etc.
pub async fn try_new_with_init(
mut opts: DatanodeOptions,
mut plugins: Plugins,
) -> Result<Self> {
let guard = Self::init(&mut opts, &mut plugins).await?;
let datanode_builder = Self::datanode_builder(&opts, plugins).await?;
Ok(Self {
guard,
opts,
datanode_builder,
})
}
async fn init(opts: &mut DatanodeOptions, plugins: &mut Plugins) -> Result<Vec<WorkerGuard>> {
common_runtime::init_global_runtimes(&opts.runtime);
let dn_opts = &mut opts.component;
let guard = common_telemetry::init_global_logging(
APP_NAME,
&dn_opts.logging,
&dn_opts.tracing,
dn_opts.node_id.map(|x| x.to_string()),
);
log_versions(version(), short_version(), APP_NAME);
plugins::setup_datanode_plugins(plugins, &opts.plugins, dn_opts)
.await
.context(StartDatanodeSnafu)?;
dn_opts.grpc.detect_server_addr();
info!("Initialized Datanode instance with {:#?}", opts);
Ok(guard)
}
async fn datanode_builder(opts: &DatanodeOptions, plugins: Plugins) -> Result<DatanodeBuilder> {
let dn_opts = &opts.component;
let member_id = dn_opts
.node_id
.context(MissingConfigSnafu { msg: "'node_id'" })?;
let meta_client_options = dn_opts.meta_client.as_ref().context(MissingConfigSnafu {
msg: "meta client options",
})?;
let client = meta_client::create_meta_client(
MetaClientType::Datanode { member_id },
meta_client_options,
Some(&plugins),
)
.await
.context(MetaClientInitSnafu)?;
let backend = Arc::new(MetaKvBackend {
client: client.clone(),
});
let mut builder = DatanodeBuilder::new(dn_opts.clone(), plugins.clone(), backend.clone());
let registry = Arc::new(
LayeredCacheRegistryBuilder::default()
.add_cache_registry(build_datanode_cache_registry(backend))
.build(),
);
builder
.with_cache_registry(registry)
.with_meta_client(client.clone());
Ok(builder)
}
/// Get the mutable builder for Datanode, in case you want to change some fields before the
/// final construction.
pub fn mut_datanode_builder(&mut self) -> &mut DatanodeBuilder {
&mut self.datanode_builder
}
/// Try to build the Datanode instance.
pub async fn build(self) -> Result<Instance> {
let mut datanode = self
.datanode_builder
.build()
.await
.context(StartDatanodeSnafu)?;
let services = DatanodeServiceBuilder::new(&self.opts.component)
.with_default_grpc_server(&datanode.region_server())
.enable_http_service()
.build()
.context(StartDatanodeSnafu)?;
datanode.setup_services(services);
Ok(Instance::new(datanode, self.guard))
}
}

View File

@@ -177,6 +177,9 @@ pub enum Error {
source: meta_srv::error::Error,
},
#[snafu(display("Invalid REPL command: {reason}"))]
InvalidReplCommand { reason: String },
#[snafu(display("Failed to parse SQL: {}", sql))]
ParseSql {
sql: String,
@@ -328,6 +331,7 @@ impl ErrorExt for Error {
Error::MissingConfig { .. }
| Error::LoadLayeredConfig { .. }
| Error::IllegalConfig { .. }
| Error::InvalidReplCommand { .. }
| Error::InitTimezone { .. }
| Error::ConnectEtcd { .. }
| Error::CreateDir { .. }

View File

@@ -33,8 +33,7 @@ use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
use common_version::{short_version, version};
use flow::{
get_flow_auth_options, FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder,
FrontendClient, FrontendInvoker,
FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder, FrontendClient, FrontendInvoker,
};
use meta_client::{MetaClientOptions, MetaClientType};
use snafu::{ensure, OptionExt, ResultExt};
@@ -83,14 +82,10 @@ impl App for Instance {
}
async fn start(&mut self) -> Result<()> {
plugins::start_flownode_plugins(self.flownode.flow_engine().plugins().clone())
.await
.context(StartFlownodeSnafu)?;
self.flownode.start().await.context(StartFlownodeSnafu)
}
async fn stop(&mut self) -> Result<()> {
async fn stop(&self) -> Result<()> {
self.flownode
.shutdown()
.await
@@ -156,9 +151,6 @@ struct StartCommand {
/// HTTP request timeout in seconds.
#[clap(long)]
http_timeout: Option<u64>,
/// User Provider cfg, for auth, currently only support static user provider
#[clap(long)]
user_provider: Option<String>,
}
impl StartCommand {
@@ -222,10 +214,6 @@ impl StartCommand {
opts.http.timeout = Duration::from_secs(http_timeout);
}
if let Some(user_provider) = &self.user_provider {
opts.user_provider = Some(user_provider.clone());
}
ensure!(
opts.node_id.is_some(),
MissingConfigSnafu {
@@ -250,15 +238,9 @@ impl StartCommand {
info!("Flownode start command: {:#?}", self);
info!("Flownode options: {:#?}", opts);
let plugin_opts = opts.plugins;
let mut opts = opts.component;
opts.grpc.detect_server_addr();
let mut plugins = Plugins::new();
plugins::setup_flownode_plugins(&mut plugins, &plugin_opts, &opts)
.await
.context(StartFlownodeSnafu)?;
let member_id = opts
.node_id
.context(MissingConfigSnafu { msg: "'node_id'" })?;
@@ -333,12 +315,10 @@ impl StartCommand {
);
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone()));
let flow_auth_header = get_flow_auth_options(&opts).context(StartFlownodeSnafu)?;
let frontend_client =
FrontendClient::from_meta_client(meta_client.clone(), flow_auth_header);
let frontend_client = FrontendClient::from_meta_client(meta_client.clone());
let flownode_builder = FlownodeBuilder::new(
opts.clone(),
plugins,
Plugins::new(),
table_metadata_manager,
catalog_manager.clone(),
flow_metadata_manager,
@@ -351,6 +331,7 @@ impl StartCommand {
.with_grpc_server(flownode.flownode_server().clone())
.enable_http_service()
.build()
.await
.context(StartFlownodeSnafu)?;
flownode.setup_services(services);
let flownode = flownode;

View File

@@ -89,7 +89,7 @@ impl App for Instance {
.context(error::StartFrontendSnafu)
}
async fn stop(&mut self) -> Result<()> {
async fn stop(&self) -> Result<()> {
self.frontend
.shutdown()
.await
@@ -382,6 +382,7 @@ impl StartCommand {
let servers = Services::new(opts, instance.clone(), plugins)
.build()
.await
.context(error::StartFrontendSnafu)?;
let frontend = Frontend {
@@ -447,6 +448,8 @@ mod tests {
fn test_read_from_config_file() {
let mut file = create_named_temp_file();
let toml_str = r#"
mode = "distributed"
[http]
addr = "127.0.0.1:4000"
timeout = "0s"
@@ -535,6 +538,8 @@ mod tests {
fn test_config_precedence_order() {
let mut file = create_named_temp_file();
let toml_str = r#"
mode = "distributed"
[http]
addr = "127.0.0.1:4000"

View File

@@ -74,7 +74,7 @@ pub trait App: Send {
true
}
async fn stop(&mut self) -> Result<()>;
async fn stop(&self) -> Result<()>;
async fn run(&mut self) -> Result<()> {
info!("Starting app: {}", self.name());

View File

@@ -69,7 +69,7 @@ impl App for Instance {
self.instance.start().await.context(StartMetaServerSnafu)
}
async fn stop(&mut self) -> Result<()> {
async fn stop(&self) -> Result<()> {
self.instance
.shutdown()
.await

View File

@@ -75,6 +75,7 @@ use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
use servers::grpc::GrpcOptions;
use servers::http::HttpOptions;
use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
use snafu::ResultExt;
use tokio::sync::RwLock;
use tracing_appender::non_blocking::WorkerGuard;
@@ -255,8 +256,8 @@ pub struct Instance {
impl Instance {
/// Find the socket addr of a server by its `name`.
pub fn server_addr(&self, name: &str) -> Option<SocketAddr> {
self.frontend.server_handlers().addr(name)
pub async fn server_addr(&self, name: &str) -> Option<SocketAddr> {
self.frontend.server_handlers().addr(name).await
}
}
@@ -293,7 +294,7 @@ impl App for Instance {
Ok(())
}
async fn stop(&mut self) -> Result<()> {
async fn stop(&self) -> Result<()> {
self.frontend
.shutdown()
.await
@@ -496,9 +497,12 @@ impl StartCommand {
.build(),
);
let mut builder = DatanodeBuilder::new(dn_opts, plugins.clone(), kv_backend.clone());
builder.with_cache_registry(layered_cache_registry.clone());
let datanode = builder.build().await.context(error::StartDatanodeSnafu)?;
let datanode = DatanodeBuilder::new(dn_opts, plugins.clone(), Mode::Standalone)
.with_kv_backend(kv_backend.clone())
.with_cache_registry(layered_cache_registry.clone())
.build()
.await
.context(error::StartDatanodeSnafu)?;
let information_extension = Arc::new(StandaloneInformationExtension::new(
datanode.region_server(),
@@ -630,6 +634,7 @@ impl StartCommand {
let servers = Services::new(opts, fe_instance.clone(), plugins)
.build()
.await
.context(error::StartFrontendSnafu)?;
let frontend = Frontend {
@@ -853,6 +858,8 @@ mod tests {
fn test_read_from_config_file() {
let mut file = create_named_temp_file();
let toml_str = r#"
mode = "distributed"
enable_memory_catalog = true
[wal]
@@ -983,6 +990,8 @@ mod tests {
fn test_config_precedence_order() {
let mut file = create_named_temp_file();
let toml_str = r#"
mode = "standalone"
[http]
addr = "127.0.0.1:4000"

View File

@@ -111,9 +111,11 @@ mod tests {
use serde::{Deserialize, Serialize};
use super::*;
use crate::Mode;
#[derive(Debug, Serialize, Deserialize, Default)]
#[derive(Debug, Serialize, Deserialize)]
struct TestDatanodeConfig {
mode: Mode,
node_id: Option<u64>,
logging: LoggingOptions,
meta_client: Option<MetaClientOptions>,
@@ -121,6 +123,19 @@ mod tests {
storage: StorageConfig,
}
impl Default for TestDatanodeConfig {
fn default() -> Self {
Self {
mode: Mode::Distributed,
node_id: None,
logging: LoggingOptions::default(),
meta_client: None,
wal: DatanodeWalConfig::default(),
storage: StorageConfig::default(),
}
}
}
impl Configurable for TestDatanodeConfig {
fn env_list_keys() -> Option<&'static [&'static str]> {
Some(&["meta_client.metasrv_addrs"])
@@ -131,6 +146,7 @@ mod tests {
fn test_load_layered_options() {
let mut file = create_named_temp_file();
let toml_str = r#"
mode = "distributed"
enable_memory_catalog = false
rpc_addr = "127.0.0.1:3001"
rpc_hostname = "127.0.0.1"

View File

@@ -26,6 +26,16 @@ pub fn metadata_store_dir(store_dir: &str) -> String {
format!("{store_dir}/metadata")
}
/// The Server running mode
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Copy)]
#[serde(rename_all = "lowercase")]
pub enum Mode {
// The single process mode.
Standalone,
// The distributed cluster mode.
Distributed,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct KvBackendConfig {

View File

@@ -13,7 +13,7 @@ default = ["geo"]
geo = ["geohash", "h3o", "s2", "wkt", "geo-types", "dep:geo"]
[dependencies]
ahash.workspace = true
ahash = "0.8"
api.workspace = true
arc-swap = "1.0"
async-trait.workspace = true

View File

@@ -192,10 +192,6 @@ impl FlightDecoder {
}
}
}
pub fn schema(&self) -> Option<&SchemaRef> {
self.schema.as_ref()
}
}
pub fn flight_messages_to_recordbatches(messages: Vec<FlightMessage>) -> Result<RecordBatches> {

View File

@@ -18,5 +18,4 @@ pub mod flight;
pub mod precision;
pub mod select;
pub use arrow_flight::FlightData;
pub use error::Error;

View File

@@ -478,11 +478,10 @@ impl TableRouteStorage {
))
}
// TODO(LFC): restore its original visibility after some test utility codes are refined
/// Builds a update table route transaction,
/// it expected the remote value equals the `current_table_route_value`.
/// It retrieves the latest value if the comparing failed.
pub fn build_update_txn(
pub(crate) fn build_update_txn(
&self,
table_id: TableId,
current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,

View File

@@ -121,8 +121,8 @@ pub enum FlowNameLock {
}
impl FlowNameLock {
pub fn new(catalog: &str, flow_name: &str) -> Self {
Self::Write(format!("{catalog}.{flow_name}"))
pub fn new(catalog: &str, table: &str) -> Self {
Self::Write(format!("{catalog}.{table}"))
}
}

View File

@@ -18,13 +18,11 @@ use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use common_procedure::local::{acquire_dynamic_key_lock, DynamicKeyLockGuard};
use common_procedure::rwlock::KeyRwLock;
use common_procedure::store::poison_store::PoisonStore;
use common_procedure::test_util::InMemoryPoisonStore;
use common_procedure::{
Context, ContextProvider, Output, PoisonKey, Procedure, ProcedureId, ProcedureState,
ProcedureWithId, Result, Status, StringKey,
ProcedureWithId, Result, Status,
};
/// A Mock [ContextProvider].
@@ -32,7 +30,6 @@ use common_procedure::{
pub struct MockContextProvider {
states: HashMap<ProcedureId, ProcedureState>,
poison_manager: InMemoryPoisonStore,
dynamic_key_lock: Arc<KeyRwLock<String>>,
}
impl MockContextProvider {
@@ -41,7 +38,6 @@ impl MockContextProvider {
MockContextProvider {
states,
poison_manager: InMemoryPoisonStore::default(),
dynamic_key_lock: Arc::new(KeyRwLock::new()),
}
}
@@ -62,10 +58,6 @@ impl ContextProvider for MockContextProvider {
.try_put_poison(key.to_string(), procedure_id.to_string())
.await
}
async fn acquire_lock(&self, key: &StringKey) -> DynamicKeyLockGuard {
acquire_dynamic_key_lock(&self.dynamic_key_lock, key).await
}
}
/// Executes a procedure until it returns [Status::Done].

View File

@@ -20,7 +20,6 @@ pub mod error;
pub mod local;
pub mod options;
mod procedure;
pub mod rwlock;
pub mod store;
pub mod watcher;
@@ -29,8 +28,8 @@ pub mod test_util;
pub use crate::error::{Error, Result};
pub use crate::procedure::{
BoxedProcedure, BoxedProcedureLoader, Context, ContextProvider, ContextProviderRef, LockKey,
Output, ParseIdError, PoisonKey, PoisonKeys, Procedure, ProcedureId, ProcedureInfo,
ProcedureManager, ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, StringKey,
BoxedProcedure, BoxedProcedureLoader, Context, ContextProvider, LockKey, Output, ParseIdError,
PoisonKey, PoisonKeys, Procedure, ProcedureId, ProcedureInfo, ProcedureManager,
ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, StringKey,
};
pub use crate::watcher::Watcher;

View File

@@ -13,6 +13,7 @@
// limitations under the License.
mod runner;
mod rwlock;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet, VecDeque};
@@ -29,6 +30,7 @@ use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::watch::{self, Receiver, Sender};
use tokio::sync::{Mutex as TokioMutex, Notify};
use self::rwlock::KeyRwLock;
use crate::error::{
self, DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu,
PoisonKeyNotDefinedSnafu, ProcedureNotFoundSnafu, Result, StartRemoveOutdatedMetaTaskSnafu,
@@ -36,12 +38,11 @@ use crate::error::{
};
use crate::local::runner::Runner;
use crate::procedure::{BoxedProcedureLoader, InitProcedureState, PoisonKeys, ProcedureInfo};
use crate::rwlock::{KeyRwLock, OwnedKeyRwLockGuard};
use crate::store::poison_store::PoisonStoreRef;
use crate::store::{ProcedureMessage, ProcedureMessages, ProcedureStore, StateStoreRef};
use crate::{
BoxedProcedure, ContextProvider, LockKey, PoisonKey, ProcedureId, ProcedureManager,
ProcedureState, ProcedureWithId, StringKey, Watcher,
ProcedureState, ProcedureWithId, Watcher,
};
/// The expired time of a procedure's metadata.
@@ -156,80 +157,12 @@ struct LoadedProcedure {
step: u32,
}
/// The dynamic lock for procedure execution.
///
/// Unlike the procedure-level locks, these locks are acquired dynamically by the procedure
/// during execution. They are only held when the procedure specifically needs these keys
/// and are released as soon as the procedure no longer needs them.
/// This allows for more fine-grained concurrency control during procedure execution.
pub(crate) type DynamicKeyLock = Arc<KeyRwLock<String>>;
/// Acquires a dynamic key lock for the given key.
///
/// This function takes a reference to the dynamic key lock and a pointer to the key.
/// It then matches the key type and acquires the appropriate lock.
pub async fn acquire_dynamic_key_lock(
lock: &DynamicKeyLock,
key: &StringKey,
) -> DynamicKeyLockGuard {
match key {
StringKey::Share(key) => {
let guard = lock.read(key.to_string()).await;
DynamicKeyLockGuard {
guard: Some(OwnedKeyRwLockGuard::from(guard)),
key: key.to_string(),
lock: lock.clone(),
}
}
StringKey::Exclusive(key) => {
let guard = lock.write(key.to_string()).await;
DynamicKeyLockGuard {
guard: Some(OwnedKeyRwLockGuard::from(guard)),
key: key.to_string(),
lock: lock.clone(),
}
}
}
}
/// A guard for the dynamic key lock.
///
/// This guard is used to release the lock when the procedure no longer needs it.
/// It also ensures that the lock is cleaned up when the guard is dropped.
pub struct DynamicKeyLockGuard {
guard: Option<OwnedKeyRwLockGuard>,
key: String,
lock: DynamicKeyLock,
}
impl Drop for DynamicKeyLockGuard {
fn drop(&mut self) {
if let Some(guard) = self.guard.take() {
drop(guard);
}
self.lock.clean_keys(&[self.key.to_string()]);
}
}
/// Shared context of the manager.
pub(crate) struct ManagerContext {
/// Procedure loaders. The key is the type name of the procedure which the loader returns.
loaders: Mutex<HashMap<String, BoxedProcedureLoader>>,
/// The key lock for the procedure.
///
/// The lock keys are defined in `Procedure::lock_key()`.
/// These locks are acquired before the procedure starts and released after the procedure finishes.
/// They ensure exclusive access to resources throughout the entire procedure lifecycle.
key_lock: KeyRwLock<String>,
/// The dynamic lock for procedure execution.
///
/// Unlike the procedure-level locks, these locks are acquired dynamically by the procedure
/// during execution. They are only held when the procedure specifically needs these keys
/// and are released as soon as the procedure no longer needs them.
/// This allows for more fine-grained concurrency control during procedure execution.
dynamic_key_lock: DynamicKeyLock,
/// Procedures in the manager.
procedures: RwLock<HashMap<ProcedureId, ProcedureMetaRef>>,
/// Running procedures.
running_procedures: Mutex<HashSet<ProcedureId>>,
/// Ids and finished time of finished procedures.
finished_procedures: Mutex<VecDeque<(ProcedureId, Instant)>>,
@@ -266,10 +199,6 @@ impl ContextProvider for ManagerContext {
let procedure_id = procedure_id.to_string();
self.poison_manager.try_put_poison(key, procedure_id).await
}
async fn acquire_lock(&self, key: &StringKey) -> DynamicKeyLockGuard {
acquire_dynamic_key_lock(&self.dynamic_key_lock, key).await
}
}
impl ManagerContext {
@@ -277,7 +206,6 @@ impl ManagerContext {
fn new(poison_manager: PoisonStoreRef) -> ManagerContext {
ManagerContext {
key_lock: KeyRwLock::new(),
dynamic_key_lock: Arc::new(KeyRwLock::new()),
loaders: Mutex::new(HashMap::new()),
procedures: RwLock::new(HashMap::new()),
running_procedures: Mutex::new(HashSet::new()),

View File

@@ -23,9 +23,9 @@ use snafu::ResultExt;
use tokio::time;
use crate::error::{self, ProcedurePanicSnafu, Result, RollbackTimesExceededSnafu};
use crate::local::rwlock::OwnedKeyRwLockGuard;
use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
use crate::procedure::{Output, StringKey};
use crate::rwlock::OwnedKeyRwLockGuard;
use crate::store::{ProcedureMessage, ProcedureStore};
use crate::{
BoxedProcedure, Context, Error, Procedure, ProcedureId, ProcedureState, ProcedureWithId, Status,
@@ -581,7 +581,6 @@ impl Runner {
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
@@ -589,14 +588,13 @@ mod tests {
use common_error::mock::MockError;
use common_error::status_code::StatusCode;
use common_test_util::temp_dir::create_temp_dir;
use futures::future::join_all;
use futures_util::future::BoxFuture;
use futures_util::FutureExt;
use object_store::{EntryMode, ObjectStore};
use tokio::sync::mpsc;
use super::*;
use crate::local::{test_util, DynamicKeyLockGuard};
use crate::local::test_util;
use crate::procedure::PoisonKeys;
use crate::store::proc_path;
use crate::test_util::InMemoryPoisonStore;
@@ -668,10 +666,6 @@ mod tests {
) -> Result<()> {
unimplemented!()
}
async fn acquire_lock(&self, _key: &StringKey) -> DynamicKeyLockGuard {
unimplemented!()
}
}
Context {
@@ -1680,66 +1674,4 @@ mod tests {
// If the procedure is poisoned, the poison key shouldn't be deleted.
assert_eq!(procedure_id, ROOT_ID);
}
fn test_procedure_with_dynamic_lock(
shared_atomic_value: Arc<AtomicU64>,
id: u64,
) -> (BoxedProcedure, Arc<ProcedureMeta>) {
let exec_fn = move |ctx: Context| {
let moved_shared_atomic_value = shared_atomic_value.clone();
let moved_ctx = ctx.clone();
async move {
debug!("Acquiring write lock, id: {}", id);
let key = StringKey::Exclusive("test_lock".to_string());
let guard = moved_ctx.provider.acquire_lock(&key).await;
debug!("Acquired write lock, id: {}", id);
let millis = rand::rng().random_range(10..=50);
tokio::time::sleep(Duration::from_millis(millis)).await;
let value = moved_shared_atomic_value.load(Ordering::Relaxed);
moved_shared_atomic_value.store(value + 1, Ordering::Relaxed);
debug!("Dropping write lock, id: {}", id);
drop(guard);
Ok(Status::done())
}
.boxed()
};
let adapter = ProcedureAdapter {
data: "dynamic_lock".to_string(),
lock_key: LockKey::new_exclusive([]),
poison_keys: PoisonKeys::new([]),
exec_fn,
rollback_fn: None,
};
let meta = adapter.new_meta(ROOT_ID);
(Box::new(adapter), meta)
}
#[tokio::test(flavor = "multi_thread")]
async fn test_execute_with_dynamic_lock() {
common_telemetry::init_default_ut_logging();
let shared_atomic_value = Arc::new(AtomicU64::new(0));
let (procedure1, meta1) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 1);
let (procedure2, meta2) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 2);
let dir = create_temp_dir("dynamic_lock");
let object_store = test_util::new_object_store(&dir);
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
let mut runner1 = new_runner(meta1.clone(), procedure1, procedure_store.clone());
let mut runner2 = new_runner(meta2.clone(), procedure2, procedure_store.clone());
let ctx1 = context_with_provider(
meta1.id,
runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
);
let ctx2 = context_with_provider(
meta2.id,
// use same manager ctx as runner1
runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
);
let tasks = [runner1.execute_once(&ctx1), runner2.execute_once(&ctx2)];
join_all(tasks).await;
assert_eq!(shared_atomic_value.load(Ordering::Relaxed), 2);
}
}

View File

@@ -18,18 +18,8 @@ use std::sync::{Arc, Mutex};
use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
/// A guard that owns a read or write lock on a key.
///
/// This enum wraps either a read or write lock guard obtained from a `KeyRwLock`.
/// The guard is automatically released when it is dropped.
pub enum OwnedKeyRwLockGuard {
/// Represents a shared read lock on a key.
/// Multiple read locks can be held simultaneously for the same key.
Read { _guard: OwnedRwLockReadGuard<()> },
/// Represents an exclusive write lock on a key.
/// Only one write lock can be held at a time for a given key,
/// and no read locks can be held simultaneously with a write lock.
Write { _guard: OwnedRwLockWriteGuard<()> },
}
@@ -46,7 +36,7 @@ impl From<OwnedRwLockWriteGuard<()>> for OwnedKeyRwLockGuard {
}
/// Locks based on a key, allowing other keys to lock independently.
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct KeyRwLock<K> {
/// The inner map of locks for specific keys.
inner: Mutex<HashMap<K, Arc<RwLock<()>>>>,

View File

@@ -25,7 +25,6 @@ use snafu::{ResultExt, Snafu};
use uuid::Uuid;
use crate::error::{self, Error, Result};
use crate::local::DynamicKeyLockGuard;
use crate::watcher::Watcher;
pub type Output = Arc<dyn Any + Send + Sync>;
@@ -145,9 +144,6 @@ pub trait ContextProvider: Send + Sync {
/// This method is used to mark a resource as being operated on by a procedure.
/// If the poison key already exists with a different value, the operation will fail.
async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()>;
/// Acquires a key lock for the procedure.
async fn acquire_lock(&self, key: &StringKey) -> DynamicKeyLockGuard;
}
/// Reference-counted pointer to [ContextProvider].

View File

@@ -17,7 +17,6 @@ use datafusion_expr::LogicalPlan;
use store_api::storage::RegionId;
/// The query request to be handled by the RegionServer (Datanode).
#[derive(Clone, Debug)]
pub struct QueryRequest {
/// The header of this request. Often to store some context of the query. None means all to defaults.
pub header: Option<RegionRequestHeader>,

View File

@@ -43,10 +43,10 @@ use mito2::config::MitoConfig;
use mito2::engine::MitoEngine;
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::util::normalize_dir;
use query::dummy_catalog::TableProviderFactoryRef;
use query::QueryEngineFactory;
use servers::export_metrics::ExportMetricsTask;
use servers::server::ServerHandlers;
use servers::Mode;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::path_utils::{region_dir, WAL_DIR};
use store_api::region_engine::{RegionEngineRef, RegionRole};
@@ -58,8 +58,8 @@ use tokio::sync::Notify;
use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
use crate::error::{
self, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu,
MissingCacheSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, ShutdownInstanceSnafu,
ShutdownServerSnafu, StartServerSnafu,
MissingCacheSnafu, MissingKvBackendSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result,
ShutdownInstanceSnafu, ShutdownServerSnafu, StartServerSnafu,
};
use crate::event_listener::{
new_region_server_event_channel, NoopRegionServerEventListener, RegionServerEventListenerRef,
@@ -129,7 +129,7 @@ impl Datanode {
self.services = services;
}
pub async fn shutdown(&mut self) -> Result<()> {
pub async fn shutdown(&self) -> Result<()> {
self.services
.shutdown_all()
.await
@@ -157,49 +157,50 @@ impl Datanode {
pub struct DatanodeBuilder {
opts: DatanodeOptions,
table_provider_factory: Option<TableProviderFactoryRef>,
mode: Mode,
plugins: Plugins,
meta_client: Option<MetaClientRef>,
kv_backend: KvBackendRef,
kv_backend: Option<KvBackendRef>,
cache_registry: Option<Arc<LayeredCacheRegistry>>,
}
impl DatanodeBuilder {
pub fn new(opts: DatanodeOptions, plugins: Plugins, kv_backend: KvBackendRef) -> Self {
/// `kv_backend` is optional. If absent, the builder will try to build one
/// by using the given `opts`
pub fn new(opts: DatanodeOptions, plugins: Plugins, mode: Mode) -> Self {
Self {
opts,
table_provider_factory: None,
mode,
plugins,
meta_client: None,
kv_backend,
kv_backend: None,
cache_registry: None,
}
}
pub fn options(&self) -> &DatanodeOptions {
&self.opts
pub fn with_meta_client(self, meta_client: MetaClientRef) -> Self {
Self {
meta_client: Some(meta_client),
..self
}
}
pub fn with_meta_client(&mut self, client: MetaClientRef) -> &mut Self {
self.meta_client = Some(client);
self
pub fn with_cache_registry(self, cache_registry: Arc<LayeredCacheRegistry>) -> Self {
Self {
cache_registry: Some(cache_registry),
..self
}
}
pub fn with_cache_registry(&mut self, registry: Arc<LayeredCacheRegistry>) -> &mut Self {
self.cache_registry = Some(registry);
self
}
pub fn kv_backend(&self) -> &KvBackendRef {
&self.kv_backend
}
pub fn with_table_provider_factory(&mut self, factory: TableProviderFactoryRef) -> &mut Self {
self.table_provider_factory = Some(factory);
self
pub fn with_kv_backend(self, kv_backend: KvBackendRef) -> Self {
Self {
kv_backend: Some(kv_backend),
..self
}
}
pub async fn build(mut self) -> Result<Datanode> {
let mode = &self.mode;
let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
let meta_client = self.meta_client.take();
@@ -209,6 +210,8 @@ impl DatanodeBuilder {
// writable upon open.
let controlled_by_metasrv = meta_client.is_some();
let kv_backend = self.kv_backend.take().context(MissingKvBackendSnafu)?;
// build and initialize region server
let (region_event_listener, region_event_receiver) = if controlled_by_metasrv {
let (tx, rx) = new_region_server_event_channel();
@@ -230,7 +233,7 @@ impl DatanodeBuilder {
.new_region_server(schema_metadata_manager, region_event_listener)
.await?;
let datanode_table_manager = DatanodeTableManager::new(self.kv_backend.clone());
let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
let table_values = datanode_table_manager
.tables(node_id)
.try_collect::<Vec<_>>()
@@ -270,18 +273,19 @@ impl DatanodeBuilder {
None
};
let is_standalone = heartbeat_task.is_none();
let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
Some(self.opts.storage.data_home.clone()),
is_standalone && self.opts.enable_telemetry,
mode,
self.opts.enable_telemetry,
)
.await;
let leases_notifier = if self.opts.require_lease_before_startup && !is_standalone {
Some(Arc::new(Notify::new()))
} else {
None
};
let leases_notifier =
if self.opts.require_lease_before_startup && matches!(mode, Mode::Distributed) {
Some(Arc::new(Notify::new()))
} else {
None
};
let export_metrics_task =
ExportMetricsTask::try_new(&self.opts.export_metrics, Some(&self.plugins))
@@ -359,11 +363,7 @@ impl DatanodeBuilder {
);
let query_engine = query_engine_factory.query_engine();
let table_provider_factory = self
.table_provider_factory
.clone()
.unwrap_or_else(|| Arc::new(DummyTableProviderFactory));
let table_provider_factory = Arc::new(DummyTableProviderFactory);
let mut region_server = RegionServer::with_table_provider(
query_engine,
common_runtime::global_runtime(),
@@ -398,46 +398,45 @@ impl DatanodeBuilder {
schema_metadata_manager: SchemaMetadataManagerRef,
plugins: Plugins,
) -> Result<Vec<RegionEngineRef>> {
let mut metric_engine_config = metric_engine::config::EngineConfig::default();
let mut mito_engine_config = MitoConfig::default();
let mut file_engine_config = file_engine::config::EngineConfig::default();
let mut engines = vec![];
let mut metric_engine_config = opts.region_engine.iter().find_map(|c| match c {
RegionEngineConfig::Metric(config) => Some(config.clone()),
_ => None,
});
for engine in &opts.region_engine {
match engine {
RegionEngineConfig::Mito(config) => {
mito_engine_config = config.clone();
let mito_engine = Self::build_mito_engine(
opts,
object_store_manager.clone(),
config.clone(),
schema_metadata_manager.clone(),
plugins.clone(),
)
.await?;
let metric_engine = MetricEngine::try_new(
mito_engine.clone(),
metric_engine_config.take().unwrap_or_default(),
)
.context(BuildMetricEngineSnafu)?;
engines.push(Arc::new(mito_engine) as _);
engines.push(Arc::new(metric_engine) as _);
}
RegionEngineConfig::File(config) => {
file_engine_config = config.clone();
let engine = FileRegionEngine::new(
config.clone(),
object_store_manager.default_object_store().clone(), // TODO: implement custom storage for file engine
);
engines.push(Arc::new(engine) as _);
}
RegionEngineConfig::Metric(metric_config) => {
metric_engine_config = metric_config.clone();
RegionEngineConfig::Metric(_) => {
// Already handled in `build_mito_engine`.
}
}
}
let mito_engine = Self::build_mito_engine(
opts,
object_store_manager.clone(),
mito_engine_config,
schema_metadata_manager.clone(),
plugins.clone(),
)
.await?;
let metric_engine = MetricEngine::try_new(mito_engine.clone(), metric_engine_config)
.context(BuildMetricEngineSnafu)?;
let file_engine = FileRegionEngine::new(
file_engine_config,
object_store_manager.default_object_store().clone(), // TODO: implement custom storage for file engine
);
Ok(vec![
Arc::new(mito_engine) as _,
Arc::new(metric_engine) as _,
Arc::new(file_engine) as _,
])
Ok(engines)
}
/// Builds [MitoEngine] according to options.
@@ -635,6 +634,7 @@ mod tests {
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use mito2::engine::MITO_ENGINE_NAME;
use servers::Mode;
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;
@@ -670,19 +670,19 @@ mod tests {
let kv_backend = Arc::new(MemoryKvBackend::new());
let layered_cache_registry = Arc::new(
LayeredCacheRegistryBuilder::default()
.add_cache_registry(build_datanode_cache_registry(kv_backend.clone()))
.add_cache_registry(build_datanode_cache_registry(kv_backend))
.build(),
);
let mut builder = DatanodeBuilder::new(
let builder = DatanodeBuilder::new(
DatanodeOptions {
node_id: Some(0),
..Default::default()
},
Plugins::default(),
kv_backend,
);
builder.with_cache_registry(layered_cache_registry);
Mode::Standalone,
)
.with_cache_registry(layered_cache_registry);
let kv = Arc::new(MemoryKvBackend::default()) as _;
setup_table_datanode(&kv).await;

View File

@@ -150,6 +150,12 @@ pub enum Error {
location: Location,
},
#[snafu(display("Expect KvBackend but not found"))]
MissingKvBackend {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid SQL, error: {}", msg))]
InvalidSql { msg: String },
@@ -420,6 +426,7 @@ impl ErrorExt for Error {
| MissingRequiredField { .. }
| RegionEngineNotFound { .. }
| ParseAddr { .. }
| MissingKvBackend { .. }
| TomlFormat { .. } => StatusCode::InvalidArguments,
PayloadNotExist { .. }

View File

@@ -20,6 +20,7 @@ use common_greptimedb_telemetry::{
default_get_uuid, Collector, GreptimeDBTelemetry, GreptimeDBTelemetryTask,
Mode as VersionReporterMode, TELEMETRY_INTERVAL,
};
use servers::Mode;
struct StandaloneGreptimeDBTelemetryCollector {
uuid: Option<String>,
@@ -54,6 +55,7 @@ impl Collector for StandaloneGreptimeDBTelemetryCollector {
pub async fn get_greptimedb_telemetry_task(
working_home: Option<String>,
mode: &Mode,
enable: bool,
) -> Arc<GreptimeDBTelemetryTask> {
if !enable || cfg!(test) || cfg!(debug_assertions) {
@@ -62,14 +64,19 @@ pub async fn get_greptimedb_telemetry_task(
// Always enable.
let should_report = Arc::new(AtomicBool::new(true));
let uuid = default_get_uuid(&working_home);
Arc::new(GreptimeDBTelemetryTask::enable(
TELEMETRY_INTERVAL,
Box::new(GreptimeDBTelemetry::new(
working_home,
Box::new(StandaloneGreptimeDBTelemetryCollector { uuid, retry: 0 }),
should_report.clone(),
match mode {
Mode::Standalone => Arc::new(GreptimeDBTelemetryTask::enable(
TELEMETRY_INTERVAL,
Box::new(GreptimeDBTelemetry::new(
working_home.clone(),
Box::new(StandaloneGreptimeDBTelemetryCollector {
uuid: default_get_uuid(&working_home),
retry: 0,
}),
should_report.clone(),
)),
should_report,
)),
should_report,
))
Mode::Distributed => Arc::new(GreptimeDBTelemetryTask::disable()),
}
}

View File

@@ -62,7 +62,7 @@ impl<'a> DatanodeServiceBuilder<'a> {
}
}
pub fn build(mut self) -> Result<ServerHandlers> {
pub async fn build(mut self) -> Result<ServerHandlers> {
let handlers = ServerHandlers::default();
if let Some(grpc_server) = self.grpc_server.take() {
@@ -70,7 +70,7 @@ impl<'a> DatanodeServiceBuilder<'a> {
addr: &self.opts.grpc.bind_addr,
})?;
let handler: ServerHandler = (Box::new(grpc_server), addr);
handlers.insert(handler);
handlers.insert(handler).await;
}
if self.enable_http_service {
@@ -82,7 +82,7 @@ impl<'a> DatanodeServiceBuilder<'a> {
addr: &self.opts.http.addr,
})?;
let handler: ServerHandler = (Box::new(http_server), addr);
handlers.insert(handler);
handlers.insert(handler).await;
}
Ok(handlers)

View File

@@ -20,7 +20,6 @@ use snafu::{ensure, ResultExt};
use crate::data_type::{ConcreteDataType, DataType};
use crate::error::{self, Result};
use crate::types::cast;
use crate::value::Value;
use crate::vectors::operations::VectorOp;
use crate::vectors::{TimestampMillisecondVector, VectorRef};
@@ -179,18 +178,6 @@ impl ColumnDefaultConstraint {
}
}
/// Cast default value to given type
pub fn cast_to_datatype(&self, data_type: &ConcreteDataType) -> Result<Self> {
match self {
ColumnDefaultConstraint::Value(v) => Ok(Self::Value(cast(v.clone(), data_type)?)),
ColumnDefaultConstraint::Function(expr) => match &expr[..] {
// no need to cast, since function always require a data_type when need to create default value
CURRENT_TIMESTAMP | CURRENT_TIMESTAMP_FN | NOW_FN => Ok(self.clone()),
_ => error::UnsupportedDefaultExprSnafu { expr }.fail(),
},
}
}
/// Only create default vector if it's impure, i.e., it's a function.
///
/// This helps to delay creating constant default values to mito engine while also keeps impure default have consistent values

View File

@@ -166,7 +166,7 @@ impl ScalarVector for StringVector {
}
pub struct StringVectorBuilder {
pub mutable_array: MutableStringArray,
mutable_array: MutableStringArray,
}
impl MutableVector for StringVectorBuilder {

View File

@@ -13,11 +13,9 @@ arrow.workspace = true
arrow-schema.workspace = true
async-recursion = "1.0"
async-trait.workspace = true
auth.workspace = true
bytes.workspace = true
cache.workspace = true
catalog.workspace = true
chrono.workspace = true
client.workspace = true
common-base.workspace = true
common-config.workspace = true
@@ -41,13 +39,16 @@ datafusion-expr.workspace = true
datafusion-physical-expr.workspace = true
datafusion-substrait.workspace = true
datatypes.workspace = true
dfir_rs = { version = "0.13.0", default-features = false }
enum-as-inner = "0.6.0"
enum_dispatch = "0.3"
futures.workspace = true
get-size2 = "0.1.2"
greptime-proto.workspace = true
# This fork of hydroflow is simply for keeping our dependency in our org, and pin the version
# otherwise it is the same with upstream repo
chrono.workspace = true
http.workspace = true
hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" }
itertools.workspace = true
lazy_static.workspace = true
meta-client.workspace = true
@@ -59,7 +60,6 @@ partition.workspace = true
prometheus.workspace = true
prost.workspace = true
query.workspace = true
rand.workspace = true
serde.workspace = true
servers.workspace = true
session.workspace = true

View File

@@ -107,7 +107,6 @@ pub struct FlownodeOptions {
pub tracing: TracingOptions,
pub heartbeat: HeartbeatOptions,
pub query: QueryOptions,
pub user_provider: Option<String>,
}
impl Default for FlownodeOptions {
@@ -122,7 +121,6 @@ impl Default for FlownodeOptions {
tracing: TracingOptions::default(),
heartbeat: HeartbeatOptions::default(),
query: QueryOptions::default(),
user_provider: None,
}
}
}

View File

@@ -21,7 +21,6 @@ use api::v1::flow::{
};
use api::v1::region::InsertRequests;
use catalog::CatalogManager;
use common_base::Plugins;
use common_error::ext::BoxedError;
use common_meta::ddl::create_flow::FlowType;
use common_meta::error::Result as MetaResult;
@@ -64,7 +63,6 @@ pub struct FlowDualEngine {
flow_metadata_manager: Arc<FlowMetadataManager>,
catalog_manager: Arc<dyn CatalogManager>,
check_task: tokio::sync::Mutex<Option<ConsistentCheckTask>>,
plugins: Plugins,
}
impl FlowDualEngine {
@@ -73,7 +71,6 @@ impl FlowDualEngine {
batching_engine: Arc<BatchingEngine>,
flow_metadata_manager: Arc<FlowMetadataManager>,
catalog_manager: Arc<dyn CatalogManager>,
plugins: Plugins,
) -> Self {
Self {
streaming_engine,
@@ -82,14 +79,9 @@ impl FlowDualEngine {
flow_metadata_manager,
catalog_manager,
check_task: Mutex::new(None),
plugins,
}
}
pub fn plugins(&self) -> &Plugins {
&self.plugins
}
/// Determine if the engine is in distributed mode
pub fn is_distributed(&self) -> bool {
self.streaming_engine.node_id.is_some()

View File

@@ -19,8 +19,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use common_telemetry::info;
use dfir_rs::scheduled::graph::Dfir;
use enum_as_inner::EnumAsInner;
use hydroflow::scheduled::graph::Hydroflow;
use snafu::ensure;
use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
@@ -49,9 +49,9 @@ pub fn create_worker<'a>() -> (WorkerHandle, Worker<'a>) {
(worker_handle, worker)
}
/// ActiveDataflowState is a wrapper around `Dfir` and `DataflowState`
/// ActiveDataflowState is a wrapper around `Hydroflow` and `DataflowState`
pub(crate) struct ActiveDataflowState<'subgraph> {
df: Dfir<'subgraph>,
df: Hydroflow<'subgraph>,
state: DataflowState,
err_collector: ErrCollector,
}
@@ -59,7 +59,7 @@ pub(crate) struct ActiveDataflowState<'subgraph> {
impl std::fmt::Debug for ActiveDataflowState<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ActiveDataflowState")
.field("df", &"<Dfir>")
.field("df", &"<Hydroflow>")
.field("state", &self.state)
.field("err_collector", &self.err_collector)
.finish()
@@ -69,7 +69,7 @@ impl std::fmt::Debug for ActiveDataflowState<'_> {
impl Default for ActiveDataflowState<'_> {
fn default() -> Self {
ActiveDataflowState {
df: Dfir::new(),
df: Hydroflow::new(),
state: DataflowState::default(),
err_collector: ErrCollector::default(),
}

View File

@@ -303,7 +303,7 @@ impl BatchingEngine {
})
.transpose()?;
debug!(
info!(
"Flow id={}, found time window expr={}",
flow_id,
phy_expr

View File

@@ -27,9 +27,8 @@ use common_meta::peer::Peer;
use common_meta::rpc::store::RangeRequest;
use common_query::Output;
use common_telemetry::warn;
use itertools::Itertools;
use meta_client::client::MetaClient;
use rand::rng;
use rand::seq::SliceRandom;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
@@ -39,7 +38,7 @@ use crate::batching_mode::{
GRPC_MAX_RETRIES,
};
use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
use crate::{Error, FlowAuthHeader};
use crate::Error;
/// Just like [`GrpcQueryHandler`] but use BoxedError
///
@@ -82,7 +81,6 @@ pub enum FrontendClient {
Distributed {
meta_client: Arc<MetaClient>,
chnl_mgr: ChannelManager,
auth: Option<FlowAuthHeader>,
},
Standalone {
/// for the sake of simplicity still use grpc even in standalone mode
@@ -103,8 +101,7 @@ impl FrontendClient {
)
}
pub fn from_meta_client(meta_client: Arc<MetaClient>, auth: Option<FlowAuthHeader>) -> Self {
common_telemetry::info!("Frontend client build with auth={:?}", auth);
pub fn from_meta_client(meta_client: Arc<MetaClient>) -> Self {
Self::Distributed {
meta_client,
chnl_mgr: {
@@ -113,7 +110,6 @@ impl FrontendClient {
.timeout(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT);
ChannelManager::with_config(cfg)
},
auth,
}
}
@@ -190,7 +186,6 @@ impl FrontendClient {
let Self::Distributed {
meta_client: _,
chnl_mgr,
auth,
} = self
else {
return UnexpectedSnafu {
@@ -202,17 +197,17 @@ impl FrontendClient {
let mut interval = tokio::time::interval(GRPC_CONN_TIMEOUT);
interval.tick().await;
for retry in 0..GRPC_MAX_RETRIES {
let mut frontends = self.scan_for_frontend().await?;
let frontends = self.scan_for_frontend().await?;
let now_in_ms = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
// shuffle the frontends to avoid always pick the same one
frontends.shuffle(&mut rng());
// found node with maximum last_activity_ts
for (_, node_info) in frontends
.iter()
.sorted_by_key(|(_, node_info)| node_info.last_activity_ts)
.rev()
// filter out frontend that have been down for more than 1 min
.filter(|(_, node_info)| {
node_info.last_activity_ts + FRONTEND_ACTIVITY_TIMEOUT.as_millis() as i64
@@ -221,13 +216,7 @@ impl FrontendClient {
{
let addr = &node_info.peer.addr;
let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
let database = {
let mut db = Database::new(catalog, schema, client);
if let Some(auth) = auth {
db.set_auth(auth.auth().clone());
}
db
};
let database = Database::new(catalog, schema, client);
let db = DatabaseWithPeer::new(database, node_info.peer.clone());
match db.try_select_one().await {
Ok(_) => return Ok(db),

View File

@@ -192,7 +192,7 @@ impl BatchingTask {
frontend_client: &Arc<FrontendClient>,
) -> Result<Option<(u32, Duration)>, Error> {
if let Some(new_query) = self.gen_insert_plan(engine).await? {
debug!("Generate new query: {}", new_query);
debug!("Generate new query: {:#?}", new_query);
self.execute_logical_plan(frontend_client, &new_query).await
} else {
debug!("Generate no query");

View File

@@ -138,12 +138,9 @@ impl TreeNodeVisitor<'_> for FindGroupByFinalName {
fn f_down(&mut self, node: &Self::Node) -> datafusion_common::Result<TreeNodeRecursion> {
if let LogicalPlan::Aggregate(aggregate) = node {
self.group_exprs = Some(aggregate.group_expr.iter().cloned().collect());
debug!(
"FindGroupByFinalName: Get Group by exprs from Aggregate: {:?}",
self.group_exprs
);
debug!("Group by exprs: {:?}", self.group_exprs);
} else if let LogicalPlan::Distinct(distinct) = node {
debug!("FindGroupByFinalName: Distinct: {}", node);
debug!("Distinct: {:#?}", distinct);
match distinct {
Distinct::All(input) => {
if let LogicalPlan::TableScan(table_scan) = &**input {
@@ -165,10 +162,7 @@ impl TreeNodeVisitor<'_> for FindGroupByFinalName {
self.group_exprs = Some(distinct_on.on_expr.iter().cloned().collect())
}
}
debug!(
"FindGroupByFinalName: Get Group by exprs from Distinct: {:?}",
self.group_exprs
);
debug!("Group by exprs: {:?}", self.group_exprs);
}
Ok(TreeNodeRecursion::Continue)

View File

@@ -18,9 +18,9 @@
use std::collections::BTreeMap;
use dfir_rs::scheduled::graph::Dfir;
use dfir_rs::scheduled::graph_ext::GraphExt;
use dfir_rs::scheduled::port::{PortCtx, SEND};
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::port::{PortCtx, SEND};
use itertools::Itertools;
use snafu::OptionExt;
@@ -38,7 +38,7 @@ mod src_sink;
/// The Context for build a Operator with id of `GlobalId`
pub struct Context<'referred, 'df> {
pub id: GlobalId,
pub df: &'referred mut Dfir<'df>,
pub df: &'referred mut Hydroflow<'df>,
pub compute_state: &'referred mut DataflowState,
/// a list of all collections being used in the operator
///
@@ -361,16 +361,16 @@ mod test {
use std::cell::RefCell;
use std::rc::Rc;
use dfir_rs::scheduled::graph::Dfir;
use dfir_rs::scheduled::graph_ext::GraphExt;
use dfir_rs::scheduled::handoff::VecHandoff;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::handoff::VecHandoff;
use pretty_assertions::assert_eq;
use super::*;
use crate::repr::Row;
pub fn run_and_check(
state: &mut DataflowState,
df: &mut Dfir,
df: &mut Hydroflow,
time_range: std::ops::Range<i64>,
expected: BTreeMap<i64, Vec<DiffRow>>,
output: Rc<RefCell<Vec<DiffRow>>>,
@@ -416,7 +416,7 @@ mod test {
}
pub fn harness_test_ctx<'r, 'h>(
df: &'r mut Dfir<'h>,
df: &'r mut Hydroflow<'h>,
state: &'r mut DataflowState,
) -> Context<'r, 'h> {
let err_collector = state.get_err_collector();
@@ -436,7 +436,7 @@ mod test {
/// that is it only emit once, not multiple times
#[test]
fn test_render_constant() {
let mut df = Dfir::new();
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -473,7 +473,7 @@ mod test {
/// a simple example to show how to use source and sink
#[test]
fn example_source_sink() {
let mut df = Dfir::new();
let mut df = Hydroflow::new();
let (send_port, recv_port) = df.make_edge::<_, VecHandoff<i32>>("test_handoff");
df.add_subgraph_source("test_handoff_source", send_port, move |_ctx, send| {
for i in 0..10 {
@@ -498,8 +498,8 @@ mod test {
#[test]
fn test_tee_auto_schedule() {
use dfir_rs::scheduled::handoff::TeeingHandoff as Toff;
let mut df = Dfir::new();
use hydroflow::scheduled::handoff::TeeingHandoff as Toff;
let mut df = Hydroflow::new();
let (send_port, recv_port) = df.make_edge::<_, Toff<i32>>("test_handoff");
let source = df.add_subgraph_source("test_handoff_source", send_port, move |_ctx, send| {
for i in 0..10 {

View File

@@ -14,8 +14,8 @@
use std::collections::BTreeMap;
use dfir_rs::scheduled::graph_ext::GraphExt;
use dfir_rs::scheduled::port::{PortCtx, SEND};
use hydroflow::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::port::{PortCtx, SEND};
use itertools::Itertools;
use snafu::OptionExt;
@@ -256,7 +256,7 @@ fn eval_mfp_core(
mod test {
use datatypes::data_type::ConcreteDataType;
use dfir_rs::scheduled::graph::Dfir;
use hydroflow::scheduled::graph::Hydroflow;
use super::*;
use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check};
@@ -269,7 +269,7 @@ mod test {
/// namely: if mfp operator can schedule a delete at the correct time
#[test]
fn test_render_mfp_with_temporal() {
let mut df = Dfir::new();
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -348,7 +348,7 @@ mod test {
/// that is it filter the rows correctly
#[test]
fn test_render_mfp() {
let mut df = Dfir::new();
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -388,7 +388,7 @@ mod test {
/// test if mfp operator can run multiple times within same tick
#[test]
fn test_render_mfp_multiple_times() {
let mut df = Dfir::new();
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);

View File

@@ -22,7 +22,7 @@ use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::DataType;
use datatypes::value::{ListValue, Value};
use datatypes::vectors::{BooleanVector, NullVector};
use dfir_rs::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::graph_ext::GraphExt;
use itertools::Itertools;
use snafu::{ensure, OptionExt, ResultExt};
@@ -1212,7 +1212,7 @@ mod test {
use common_time::Timestamp;
use datatypes::data_type::{ConcreteDataType, ConcreteDataType as CDT};
use dfir_rs::scheduled::graph::Dfir;
use hydroflow::scheduled::graph::Hydroflow;
use super::*;
use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check};
@@ -1228,7 +1228,7 @@ mod test {
/// expected: sum(number), window_start, window_end
#[test]
fn test_tumble_group_by() {
let mut df = Dfir::new();
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
const START: i64 = 1625097600000;
@@ -1389,7 +1389,7 @@ mod test {
/// select avg(number) from number;
#[test]
fn test_avg_eval() {
let mut df = Dfir::new();
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -1500,7 +1500,7 @@ mod test {
/// | col | Int64 |
#[test]
fn test_basic_distinct() {
let mut df = Dfir::new();
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -1556,7 +1556,7 @@ mod test {
/// | col | Int64 |
#[test]
fn test_basic_batch_reduce_accum() {
let mut df = Dfir::new();
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let now = state.current_time_ref();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -1662,7 +1662,7 @@ mod test {
/// | col | Int64 |
#[test]
fn test_basic_reduce_accum() {
let mut df = Dfir::new();
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -1739,7 +1739,7 @@ mod test {
/// this test include even more insert/delete case to cover all case for eval_distinct_core
#[test]
fn test_delete_reduce_distinct_accum() {
let mut df = Dfir::new();
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -1818,7 +1818,7 @@ mod test {
/// this test include insert and delete which should cover all case for eval_distinct_core
#[test]
fn test_basic_reduce_distinct_accum() {
let mut df = Dfir::new();
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
@@ -1896,7 +1896,7 @@ mod test {
/// | col | Int64 |
#[test]
fn test_composite_reduce_distinct_accum() {
let mut df = Dfir::new();
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);

View File

@@ -17,7 +17,7 @@
use std::collections::BTreeMap;
use common_telemetry::{debug, trace};
use dfir_rs::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::graph_ext::GraphExt;
use itertools::Itertools;
use snafu::OptionExt;
use tokio::sync::broadcast::error::TryRecvError;

View File

@@ -16,16 +16,16 @@ use std::cell::RefCell;
use std::collections::{BTreeMap, VecDeque};
use std::rc::Rc;
use dfir_rs::scheduled::graph::Dfir;
use dfir_rs::scheduled::SubgraphId;
use get_size2::GetSize;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::SubgraphId;
use crate::compute::types::ErrCollector;
use crate::repr::{self, Timestamp};
use crate::utils::{ArrangeHandler, Arrangement};
/// input/output of a dataflow
/// One `ComputeState` manage the input/output/schedule of one `Dfir`
/// One `ComputeState` manage the input/output/schedule of one `Hydroflow`
#[derive(Debug, Default)]
pub struct DataflowState {
/// it is important to use a deque to maintain the order of subgraph here
@@ -38,7 +38,7 @@ pub struct DataflowState {
/// Which means it's also the current time in temporal filter to get current correct result
as_of: Rc<RefCell<Timestamp>>,
/// error collector local to this `ComputeState`,
/// useful for distinguishing errors from different `Dfir`
/// useful for distinguishing errors from different `Hydroflow`
err_collector: ErrCollector,
/// save all used arrange in this dataflow, since usually there is no delete operation
/// we can just keep track of all used arrange and schedule subgraph when they need to be updated
@@ -65,7 +65,7 @@ impl DataflowState {
/// schedule all subgraph that need to run with time <= `as_of` and run_available()
///
/// return true if any subgraph actually executed
pub fn run_available_with_schedule(&mut self, df: &mut Dfir) -> bool {
pub fn run_available_with_schedule(&mut self, df: &mut Hydroflow) -> bool {
// first split keys <= as_of into another map
let mut before = self
.schedule_subgraph

View File

@@ -18,10 +18,10 @@ use std::rc::Rc;
use std::sync::Arc;
use common_error::ext::ErrorExt;
use dfir_rs::scheduled::graph::Dfir;
use dfir_rs::scheduled::handoff::TeeingHandoff;
use dfir_rs::scheduled::port::RecvPort;
use dfir_rs::scheduled::SubgraphId;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::handoff::TeeingHandoff;
use hydroflow::scheduled::port::RecvPort;
use hydroflow::scheduled::SubgraphId;
use itertools::Itertools;
use tokio::sync::Mutex;
@@ -46,7 +46,7 @@ impl<T: 'static + Clone> Collection<T> {
/// clone a collection, require a mutable reference to the hydroflow instance
///
/// Note: need to be the same hydroflow instance that this collection is created from
pub fn clone(&self, df: &mut Dfir) -> Self {
pub fn clone(&self, df: &mut Hydroflow) -> Self {
Collection {
stream: self.stream.tee(df),
}
@@ -151,7 +151,7 @@ impl<T: 'static> CollectionBundle<T> {
}
impl<T: 'static + Clone> CollectionBundle<T> {
pub fn clone(&self, df: &mut Dfir) -> Self {
pub fn clone(&self, df: &mut Hydroflow) -> Self {
Self {
collection: self.collection.clone(df),
arranged: self

View File

@@ -25,42 +25,6 @@ use crate::Error;
pub type FlowId = u64;
pub type TableName = [String; 3];
#[derive(Clone)]
pub struct FlowAuthHeader {
auth_schema: api::v1::auth_header::AuthScheme,
}
impl std::fmt::Debug for FlowAuthHeader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.auth() {
api::v1::auth_header::AuthScheme::Basic(basic) => f
.debug_struct("Basic")
.field("username", &basic.username)
.field("password", &"<RETRACTED>")
.finish(),
api::v1::auth_header::AuthScheme::Token(_) => f
.debug_struct("Token")
.field("token", &"<RETRACTED>")
.finish(),
}
}
}
impl FlowAuthHeader {
pub fn from_user_pwd(username: &str, pwd: &str) -> Self {
Self {
auth_schema: api::v1::auth_header::AuthScheme::Basic(api::v1::Basic {
username: username.to_string(),
password: pwd.to_string(),
}),
}
}
pub fn auth(&self) -> &api::v1::auth_header::AuthScheme {
&self.auth_schema
}
}
/// The arguments to create a flow
#[derive(Debug, Clone)]
pub struct CreateFlowArgs {

View File

@@ -152,9 +152,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid auth config"))]
IllegalAuthConfig { source: auth::error::Error },
#[snafu(display("Flow plan error: {reason}"))]
Plan {
reason: String,
@@ -333,10 +330,9 @@ impl ErrorExt for Error {
}
Self::MetaClientInit { source, .. } => source.status_code(),
Self::InvalidQuery { .. }
| Self::InvalidRequest { .. }
| Self::ParseAddr { .. }
| Self::IllegalAuthConfig { .. } => StatusCode::InvalidArguments,
Self::InvalidQuery { .. } | Self::InvalidRequest { .. } | Self::ParseAddr { .. } => {
StatusCode::InvalidArguments
}
Error::SubstraitEncodeLogicalPlan { source, .. } => source.status_code(),

View File

@@ -21,7 +21,7 @@ use common_error::ext::BoxedError;
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::value::Value;
use datatypes::vectors::{BooleanVector, Helper, VectorRef};
use dfir_rs::lattices::cc_traits::Iter;
use hydroflow::lattices::cc_traits::Iter;
use itertools::Itertools;
use snafu::{ensure, OptionExt, ResultExt};

View File

@@ -60,7 +60,7 @@ pub enum GenericFn {
Mul,
Div,
Mod,
// variadic func
// varadic func
And,
Or,
// unmaterized func

View File

@@ -45,10 +45,8 @@ mod test_utils;
pub use adapter::{FlowConfig, FlowStreamingEngineRef, FlownodeOptions, StreamingEngine};
pub use batching_mode::frontend_client::{FrontendClient, GrpcQueryHandlerWithBoxedError};
pub use engine::FlowAuthHeader;
pub(crate) use engine::{CreateFlowArgs, FlowId, TableName};
pub use error::{Error, Result};
pub use server::{
get_flow_auth_options, FlownodeBuilder, FlownodeInstance, FlownodeServer,
FlownodeServiceBuilder, FrontendInvoker,
FlownodeBuilder, FlownodeInstance, FlownodeServer, FlownodeServiceBuilder, FrontendInvoker,
};

View File

@@ -57,16 +57,13 @@ use crate::batching_mode::engine::BatchingEngine;
use crate::engine::FlowEngine;
use crate::error::{
to_status_with_last_err, CacheRequiredSnafu, CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu,
IllegalAuthConfigSnafu, ListFlowsSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu,
UnexpectedSnafu,
ListFlowsSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
};
use crate::heartbeat::HeartbeatTask;
use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
use crate::transform::register_function_to_query_engine;
use crate::utils::{SizeReportSender, StateReportHandler};
use crate::{
CreateFlowArgs, Error, FlowAuthHeader, FlownodeOptions, FrontendClient, StreamingEngine,
};
use crate::{CreateFlowArgs, Error, FlownodeOptions, FrontendClient, StreamingEngine};
pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER";
/// wrapping flow node manager to avoid orphan rule with Arc<...>
@@ -234,10 +231,10 @@ impl servers::server::Server for FlownodeServer {
Ok(())
}
async fn start(&mut self, addr: SocketAddr) -> Result<(), servers::error::Error> {
async fn start(&self, addr: SocketAddr) -> Result<SocketAddr, servers::error::Error> {
let mut rx_server = self.inner.server_shutdown_tx.lock().await.subscribe();
let incoming = {
let (incoming, addr) = {
let listener = TcpListener::bind(addr)
.await
.context(TcpBindSnafu { addr })?;
@@ -246,7 +243,7 @@ impl servers::server::Server for FlownodeServer {
TcpIncoming::from_listener(listener, true, None).context(TcpIncomingSnafu)?;
info!("flow server is bound to {}", addr);
incoming
(incoming, addr)
};
let builder = tonic::transport::Server::builder().add_service(self.create_flow_service());
@@ -258,7 +255,7 @@ impl servers::server::Server for FlownodeServer {
.context(StartGrpcSnafu);
});
Ok(())
Ok(addr)
}
fn name(&self) -> &str {
@@ -285,7 +282,7 @@ impl FlownodeInstance {
Ok(())
}
pub async fn shutdown(&mut self) -> Result<(), Error> {
pub async fn shutdown(&self) -> Result<(), crate::Error> {
self.services
.shutdown_all()
.await
@@ -313,21 +310,6 @@ impl FlownodeInstance {
}
}
pub fn get_flow_auth_options(fn_opts: &FlownodeOptions) -> Result<Option<FlowAuthHeader>, Error> {
if let Some(user_provider) = fn_opts.user_provider.as_ref() {
let static_provider = auth::static_user_provider_from_option(user_provider)
.context(IllegalAuthConfigSnafu)?;
let (usr, pwd) = static_provider
.get_one_user_pwd()
.context(IllegalAuthConfigSnafu)?;
let auth_header = FlowAuthHeader::from_user_pwd(&usr, &pwd);
return Ok(Some(auth_header));
}
Ok(None)
}
/// [`FlownodeInstance`] Builder
pub struct FlownodeBuilder {
opts: FlownodeOptions,
@@ -401,7 +383,6 @@ impl FlownodeBuilder {
batching,
self.flow_metadata_manager.clone(),
self.catalog_manager.clone(),
self.plugins.clone(),
);
let server = FlownodeServer::new(FlowService::new(Arc::new(dual)));
@@ -410,7 +391,7 @@ impl FlownodeBuilder {
let instance = FlownodeInstance {
flownode_server: server,
services: ServerHandlers::default(),
services: ServerHandlers::new(),
heartbeat_task,
};
Ok(instance)
@@ -591,14 +572,14 @@ impl<'a> FlownodeServiceBuilder<'a> {
}
}
pub fn build(mut self) -> Result<ServerHandlers, Error> {
pub async fn build(mut self) -> Result<ServerHandlers, Error> {
let handlers = ServerHandlers::default();
if let Some(grpc_server) = self.grpc_server.take() {
let addr: SocketAddr = self.opts.grpc.bind_addr.parse().context(ParseAddrSnafu {
addr: &self.opts.grpc.bind_addr,
})?;
let handler: ServerHandler = (Box::new(grpc_server), addr);
handlers.insert(handler);
handlers.insert(handler).await;
}
if self.enable_http_service {
@@ -609,7 +590,7 @@ impl<'a> FlownodeServiceBuilder<'a> {
addr: &self.opts.http.addr,
})?;
let handler: ServerHandler = (Box::new(http_server), addr);
handlers.insert(handler);
handlers.insert(handler).await;
}
Ok(handlers)
}

View File

@@ -106,7 +106,7 @@ pub struct Frontend {
}
impl Frontend {
pub async fn start(&mut self) -> Result<()> {
pub async fn start(&self) -> Result<()> {
if let Some(t) = &self.heartbeat_task {
t.start().await?;
}
@@ -128,7 +128,7 @@ impl Frontend {
.context(error::StartServerSnafu)
}
pub async fn shutdown(&mut self) -> Result<()> {
pub async fn shutdown(&self) -> Result<()> {
self.servers
.shutdown_all()
.await

View File

@@ -24,18 +24,15 @@ use api::v1::{
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_base::AffectedRows;
use common_grpc::flight::FlightDecoder;
use common_grpc::FlightData;
use common_query::logical_plan::add_insert_to_logical_plan;
use common_query::Output;
use common_telemetry::tracing::{self};
use query::parser::PromQuery;
use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
use servers::query_handler::grpc::GrpcQueryHandler;
use servers::query_handler::grpc::{GrpcQueryHandler, RawRecordBatch};
use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableId;
use table::table_name::TableName;
use crate::error::{
@@ -233,35 +230,29 @@ impl GrpcQueryHandler for Instance {
async fn put_record_batch(
&self,
table: &TableName,
table_id: &mut Option<TableId>,
decoder: &mut FlightDecoder,
data: FlightData,
record_batch: RawRecordBatch,
) -> Result<AffectedRows> {
let table_id = if let Some(table_id) = table_id {
*table_id
} else {
let table = self
.catalog_manager()
.table(
&table.catalog_name,
&table.schema_name,
&table.table_name,
None,
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table.to_string(),
})?;
let id = table.table_info().table_id();
*table_id = Some(id);
id
};
self.inserter
.handle_bulk_insert(table_id, decoder, data)
let _table = self
.catalog_manager()
.table(
&table.catalog_name,
&table.schema_name,
&table.table_name,
None,
)
.await
.context(TableOperationSnafu)
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table.to_string(),
})?;
// TODO(LFC): Implement it.
common_telemetry::debug!(
"calling put_record_batch with table: {:?} and record_batch size: {}",
table,
record_batch.len()
);
Ok(record_batch.len())
}
}

View File

@@ -17,7 +17,7 @@ use std::sync::Arc;
use auth::UserProviderRef;
use common_base::Plugins;
use common_config::Configurable;
use common_config::{Configurable, Mode};
use servers::error::Error as ServerError;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::greptime_handler::GreptimeRequestHandler;
@@ -143,10 +143,15 @@ where
let user_provider = self.plugins.get::<UserProviderRef>();
// Determine whether it is Standalone or Distributed mode based on whether the meta client is configured.
let runtime = if opts.meta_client.is_none() {
Some(builder.runtime().clone())
let mode = if opts.meta_client.is_none() {
Mode::Standalone
} else {
None
Mode::Distributed
};
let runtime = match mode {
Mode::Standalone => Some(builder.runtime().clone()),
_ => None,
};
let greptime_request_handler = GreptimeRequestHandler::new(
@@ -179,7 +184,7 @@ where
Ok(http_server)
}
pub fn build(mut self) -> Result<ServerHandlers> {
pub async fn build(mut self) -> Result<ServerHandlers> {
let opts = self.opts.clone();
let instance = self.instance.clone();
@@ -194,7 +199,7 @@ where
// Always init GRPC server
let grpc_addr = parse_addr(&opts.grpc.bind_addr)?;
let grpc_server = self.build_grpc_server(&opts)?;
handlers.insert((Box::new(grpc_server), grpc_addr));
handlers.insert((Box::new(grpc_server), grpc_addr)).await;
}
{
@@ -202,7 +207,7 @@ where
let http_options = &opts.http;
let http_addr = parse_addr(&http_options.addr)?;
let http_server = self.build_http_server(&opts, toml)?;
handlers.insert((Box::new(http_server), http_addr));
handlers.insert((Box::new(http_server), http_addr)).await;
}
if opts.mysql.enable {
@@ -230,7 +235,7 @@ where
opts.reject_no_database.unwrap_or(false),
)),
);
handlers.insert((mysql_server, mysql_addr));
handlers.insert((mysql_server, mysql_addr)).await;
}
if opts.postgres.enable {
@@ -253,7 +258,7 @@ where
user_provider.clone(),
)) as Box<dyn Server>;
handlers.insert((pg_server, pg_addr));
handlers.insert((pg_server, pg_addr)).await;
}
Ok(handlers)

View File

@@ -79,7 +79,7 @@ use crate::{error, Result};
pub struct MetasrvInstance {
metasrv: Arc<Metasrv>,
http_server: HttpServer,
httpsrv: Arc<HttpServer>,
opts: MetasrvOptions,
@@ -96,11 +96,12 @@ impl MetasrvInstance {
plugins: Plugins,
metasrv: Metasrv,
) -> Result<MetasrvInstance> {
let http_server = HttpServerBuilder::new(opts.http.clone())
.with_metrics_handler(MetricsHandler)
.with_greptime_config_options(opts.to_toml().context(error::TomlFormatSnafu)?)
.build();
let httpsrv = Arc::new(
HttpServerBuilder::new(opts.http.clone())
.with_metrics_handler(MetricsHandler)
.with_greptime_config_options(opts.to_toml().context(error::TomlFormatSnafu)?)
.build(),
);
let metasrv = Arc::new(metasrv);
// put metasrv into plugins for later use
plugins.insert::<Arc<Metasrv>>(metasrv.clone());
@@ -108,7 +109,7 @@ impl MetasrvInstance {
.context(error::InitExportMetricsTaskSnafu)?;
Ok(MetasrvInstance {
metasrv,
http_server,
httpsrv,
opts,
signal_sender: None,
plugins,
@@ -137,9 +138,10 @@ impl MetasrvInstance {
addr: &self.opts.http.addr,
})?;
let http_srv = async {
self.http_server
self.httpsrv
.start(addr)
.await
.map(|_| ())
.context(error::StartHttpSnafu)
};
future::try_join(metasrv, http_srv).await?;
@@ -154,11 +156,11 @@ impl MetasrvInstance {
.context(error::SendShutdownSignalSnafu)?;
}
self.metasrv.shutdown().await?;
self.http_server
self.httpsrv
.shutdown()
.await
.context(error::ShutdownServerSnafu {
server: self.http_server.name(),
server: self.httpsrv.name(),
})?;
Ok(())
}

View File

@@ -336,15 +336,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to downgrade region leader, region: {}", region_id))]
DowngradeLeader {
region_id: RegionId,
#[snafu(implicit)]
location: Location,
#[snafu(source)]
source: BoxedError,
},
#[snafu(display("Region's leader peer changed: {}", msg))]
LeaderPeerChanged {
msg: String,
@@ -965,7 +956,7 @@ impl ErrorExt for Error {
Error::StartTelemetryTask { source, .. } => source.status_code(),
Error::NextSequence { source, .. } => source.status_code(),
Error::DowngradeLeader { source, .. } => source.status_code(),
Error::RegisterProcedureLoader { source, .. } => source.status_code(),
Error::SubmitDdlTask { source, .. } => source.status_code(),
Error::ConvertProtoData { source, .. }

View File

@@ -113,88 +113,3 @@ async fn put_into_memory_store(ctx: &mut Context, key: Vec<u8>, value: Vec<u8>,
warn!(err; "Failed to update lease KV, peer: {peer:?}");
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::meta::RequestHeader;
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::datanode::Stat;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use super::*;
use crate::cluster::MetaPeerClientBuilder;
use crate::handler::{HeartbeatMailbox, Pushers};
use crate::lease::find_datanode_lease_value;
use crate::service::store::cached_kv::LeaderCachedKvBackend;
#[tokio::test]
async fn test_put_into_memory_store() {
let in_memory = Arc::new(MemoryKvBackend::new());
let kv_backend = Arc::new(MemoryKvBackend::new());
let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader(
kv_backend.clone(),
));
let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build();
let mailbox = HeartbeatMailbox::create(Pushers::default(), seq);
let meta_peer_client = MetaPeerClientBuilder::default()
.election(None)
.in_memory(in_memory.clone())
.build()
.map(Arc::new)
// Safety: all required fields set at initialization
.unwrap();
let ctx = Context {
server_addr: "127.0.0.1:0000".to_string(),
in_memory,
kv_backend: kv_backend.clone(),
leader_cached_kv_backend,
meta_peer_client,
mailbox,
election: None,
is_infancy: false,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
cache_invalidator: Arc::new(DummyCacheInvalidator),
leader_region_registry: Arc::new(LeaderRegionRegistry::new()),
};
let handler = DatanodeKeepLeaseHandler;
handle_request_many_times(ctx.clone(), &handler, 1).await;
let lease_value = find_datanode_lease_value(1, &ctx.in_memory)
.await
.unwrap()
.unwrap();
assert_eq!(lease_value.node_addr, "127.0.0.1:1");
assert!(lease_value.timestamp_millis != 0);
}
async fn handle_request_many_times(
mut ctx: Context,
handler: &DatanodeKeepLeaseHandler,
loop_times: i32,
) {
let req = HeartbeatRequest {
header: Some(RequestHeader::new(1, Role::Datanode, HashMap::new())),
peer: Some(Peer::new(1, "127.0.0.1:1")),
..Default::default()
};
for i in 1..=loop_times {
let mut acc = HeartbeatAccumulator {
stat: Some(Stat {
id: 101,
region_num: i as _,
..Default::default()
}),
..Default::default()
};
handler.handle(&req, &mut ctx, &mut acc).await.unwrap();
}
}
}

View File

@@ -19,15 +19,13 @@ use api::v1::meta::{HeartbeatRequest, RegionLease, Role};
use async_trait::async_trait;
use common_meta::key::TableMetadataManagerRef;
use common_meta::region_keeper::MemoryRegionKeeperRef;
use store_api::region_engine::GrantedRegion;
use store_api::region_engine::{GrantedRegion, RegionRole};
use store_api::storage::RegionId;
use crate::error::Result;
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
use crate::region::lease_keeper::{
RegionLeaseInfo, RegionLeaseKeeperRef, RenewRegionLeasesResponse,
};
use crate::region::lease_keeper::{RegionLeaseKeeperRef, RenewRegionLeasesResponse};
use crate::region::RegionLeaseKeeper;
pub struct RegionLeaseHandler {
@@ -42,7 +40,7 @@ pub trait CustomizedRegionLeaseRenewer: Send + Sync {
fn renew(
&self,
ctx: &mut Context,
regions: HashMap<RegionId, RegionLeaseInfo>,
regions: HashMap<RegionId, RegionRole>,
) -> Vec<GrantedRegion>;
}
@@ -100,9 +98,7 @@ impl HeartbeatHandler for RegionLeaseHandler {
} else {
renewed
.into_iter()
.map(|(region_id, region_lease_info)| {
GrantedRegion::new(region_id, region_lease_info.role).into()
})
.map(|(region_id, region_role)| GrantedRegion::new(region_id, region_role).into())
.collect::<Vec<_>>()
};

View File

@@ -16,14 +16,14 @@ use std::collections::HashMap;
use std::hash::Hash;
use common_error::ext::BoxedError;
use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef};
use common_meta::kv_backend::KvBackend;
use common_meta::peer::{Peer, PeerLookupService};
use common_meta::{util, DatanodeId, FlownodeId};
use common_time::util as time_util;
use snafu::ResultExt;
use crate::cluster::MetaPeerClientRef;
use crate::error::{Error, KvBackendSnafu, Result};
use crate::error::{Error, Result};
use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue};
fn build_lease_filter(lease_secs: u64) -> impl Fn(&LeaseValue) -> bool {
@@ -33,28 +33,6 @@ fn build_lease_filter(lease_secs: u64) -> impl Fn(&LeaseValue) -> bool {
}
}
/// Returns the lease value of the given datanode id, if the datanode is not found, returns None.
pub async fn find_datanode_lease_value(
datanode_id: DatanodeId,
in_memory_key: &ResettableKvBackendRef,
) -> Result<Option<LeaseValue>> {
let lease_key = DatanodeLeaseKey {
node_id: datanode_id,
};
let lease_key_bytes: Vec<u8> = lease_key.try_into()?;
let Some(kv) = in_memory_key
.get(&lease_key_bytes)
.await
.context(KvBackendSnafu)?
else {
return Ok(None);
};
let lease_value: LeaseValue = kv.value.try_into()?;
Ok(Some(lease_value))
}
/// look up [`Peer`] given [`ClusterId`] and [`DatanodeId`], will only return if it's alive under given `lease_secs`
pub async fn lookup_datanode_peer(
datanode_id: DatanodeId,

View File

@@ -311,7 +311,6 @@ impl MetasrvBuilder {
let region_migration_manager = Arc::new(RegionMigrationManager::new(
procedure_manager.clone(),
DefaultContextFactory::new(
in_memory.clone(),
table_metadata_manager.clone(),
memory_region_keeper.clone(),
region_failure_detector_controller.clone(),

View File

@@ -37,8 +37,7 @@ use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use common_meta::kv_backend::ResettableKvBackendRef;
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock};
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock};
use common_meta::peer::Peer;
use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
use common_procedure::error::{
@@ -98,6 +97,9 @@ impl PersistentContext {
let lock_key = vec![
CatalogLock::Read(&self.catalog).into(),
SchemaLock::read(&self.catalog, &self.schema).into(),
// The optimistic updating of table route is not working very well,
// so we need to use the write lock here.
TableLock::Write(region_id.table_id()).into(),
RegionLock::Write(region_id).into(),
];
@@ -267,7 +269,6 @@ pub trait ContextFactory {
#[derive(Clone)]
pub struct DefaultContextFactory {
volatile_ctx: VolatileContext,
in_memory_key: ResettableKvBackendRef,
table_metadata_manager: TableMetadataManagerRef,
opening_region_keeper: MemoryRegionKeeperRef,
region_failure_detector_controller: RegionFailureDetectorControllerRef,
@@ -279,7 +280,6 @@ pub struct DefaultContextFactory {
impl DefaultContextFactory {
/// Returns an [`DefaultContextFactory`].
pub fn new(
in_memory_key: ResettableKvBackendRef,
table_metadata_manager: TableMetadataManagerRef,
opening_region_keeper: MemoryRegionKeeperRef,
region_failure_detector_controller: RegionFailureDetectorControllerRef,
@@ -289,7 +289,6 @@ impl DefaultContextFactory {
) -> Self {
Self {
volatile_ctx: VolatileContext::default(),
in_memory_key,
table_metadata_manager,
opening_region_keeper,
region_failure_detector_controller,
@@ -305,7 +304,6 @@ impl ContextFactory for DefaultContextFactory {
Context {
persistent_ctx,
volatile_ctx: self.volatile_ctx,
in_memory: self.in_memory_key,
table_metadata_manager: self.table_metadata_manager,
opening_region_keeper: self.opening_region_keeper,
region_failure_detector_controller: self.region_failure_detector_controller,
@@ -320,7 +318,6 @@ impl ContextFactory for DefaultContextFactory {
pub struct Context {
persistent_ctx: PersistentContext,
volatile_ctx: VolatileContext,
in_memory: ResettableKvBackendRef,
table_metadata_manager: TableMetadataManagerRef,
opening_region_keeper: MemoryRegionKeeperRef,
region_failure_detector_controller: RegionFailureDetectorControllerRef,
@@ -423,8 +420,8 @@ impl Context {
/// Notifies the RegionSupervisor to deregister failure detectors.
///
/// The original failure detectors won't be removed once the procedure was triggered.
/// We need to deregister the failure detectors for the original region if the procedure is finished.
/// The original failure detectors was removed once the procedure was triggered.
/// However, the `from_peer` may still send the heartbeats contains the failed region.
pub async fn deregister_failure_detectors(&self) {
let datanode_id = self.persistent_ctx.from_peer.id;
let region_id = self.persistent_ctx.region_id;
@@ -434,19 +431,6 @@ impl Context {
.await;
}
/// Notifies the RegionSupervisor to deregister failure detectors for the candidate region on the destination peer.
///
/// The candidate region may be created on the destination peer,
/// so we need to deregister the failure detectors for the candidate region if the procedure is aborted.
pub async fn deregister_failure_detectors_for_candidate_region(&self) {
let to_peer_id = self.persistent_ctx.to_peer.id;
let region_id = self.persistent_ctx.region_id;
self.region_failure_detector_controller
.deregister_failure_detectors(vec![(to_peer_id, region_id)])
.await;
}
/// Removes the `table_route` of [VolatileContext], returns true if any.
pub fn remove_table_route_value(&mut self) -> bool {
let value = self.volatile_ctx.table_route.take();
@@ -547,11 +531,7 @@ pub(crate) trait State: Sync + Send + Debug {
}
/// Yields the next [State] and [Status].
async fn next(
&mut self,
ctx: &mut Context,
procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)>;
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)>;
/// Returns as [Any](std::any::Any).
fn as_any(&self) -> &dyn Any;
@@ -686,45 +666,37 @@ impl Procedure for RegionMigrationProcedure {
true
}
async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;
let name = state.name();
let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
.with_label_values(&[name])
.start_timer();
match state.next(&mut self.context, ctx).await {
Ok((next, status)) => {
*state = next;
Ok(status)
let (next, status) = state.next(&mut self.context).await.map_err(|e| {
if e.is_retryable() {
METRIC_META_REGION_MIGRATION_ERROR
.with_label_values(&[name, "retryable"])
.inc();
ProcedureError::retry_later(e)
} else {
error!(
e;
"Region migration procedure failed, region_id: {}, from_peer: {}, to_peer: {}, {}",
self.context.region_id(),
self.context.persistent_ctx.from_peer,
self.context.persistent_ctx.to_peer,
self.context.volatile_ctx.metrics,
);
METRIC_META_REGION_MIGRATION_ERROR
.with_label_values(&[name, "external"])
.inc();
ProcedureError::external(e)
}
Err(e) => {
if e.is_retryable() {
METRIC_META_REGION_MIGRATION_ERROR
.with_label_values(&[name, "retryable"])
.inc();
Err(ProcedureError::retry_later(e))
} else {
// Consumes the opening region guard before deregistering the failure detectors.
self.context.volatile_ctx.opening_region_guard.take();
self.context
.deregister_failure_detectors_for_candidate_region()
.await;
error!(
e;
"Region migration procedure failed, region_id: {}, from_peer: {}, to_peer: {}, {}",
self.context.region_id(),
self.context.persistent_ctx.from_peer,
self.context.persistent_ctx.to_peer,
self.context.volatile_ctx.metrics,
);
METRIC_META_REGION_MIGRATION_ERROR
.with_label_values(&[name, "external"])
.inc();
Err(ProcedureError::external(e))
}
}
}
})?;
*state = next;
Ok(status)
}
fn dump(&self) -> ProcedureResult<String> {
@@ -813,11 +785,7 @@ mod tests {
#[async_trait::async_trait]
#[typetag::serde]
impl State for MockState {
async fn next(
&mut self,
_ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
async fn next(&mut self, _ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
Ok((Box::new(MockState), Status::done()))
}

View File

@@ -20,7 +20,7 @@ use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::key::datanode_table::RegionInfo;
use common_meta::RegionIdent;
use common_procedure::{Context as ProcedureContext, Status};
use common_procedure::Status;
use common_telemetry::{info, warn};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
@@ -40,11 +40,7 @@ pub struct CloseDowngradedRegion;
#[async_trait::async_trait]
#[typetag::serde]
impl State for CloseDowngradedRegion {
async fn next(
&mut self,
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
if let Err(err) = self.close_downgraded_leader_region(ctx).await {
let downgrade_leader_datanode = &ctx.persistent_ctx.from_peer;
let region_id = ctx.region_id();

View File

@@ -16,21 +16,18 @@ use std::any::Any;
use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_error::ext::BoxedError;
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::instruction::{
DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply,
};
use common_procedure::{Context as ProcedureContext, Status};
use common_procedure::Status;
use common_telemetry::{error, info, warn};
use common_time::util::current_time_millis;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use tokio::time::{sleep, Instant};
use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
use crate::lease::find_datanode_lease_value;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::upgrade_candidate_region::UpgradeCandidateRegion;
use crate::procedure::region_migration::{Context, State};
@@ -56,11 +53,7 @@ impl Default for DowngradeLeaderRegion {
#[async_trait::async_trait]
#[typetag::serde]
impl State for DowngradeLeaderRegion {
async fn next(
&mut self,
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
let now = Instant::now();
// Ensures the `leader_region_lease_deadline` must exist after recovering.
ctx.volatile_ctx
@@ -69,32 +62,19 @@ impl State for DowngradeLeaderRegion {
match self.downgrade_region_with_retry(ctx).await {
Ok(_) => {
// Do nothing
info!(
"Downgraded region leader success, region: {}",
ctx.persistent_ctx.region_id
);
}
Err(error::Error::ExceededDeadline { .. }) => {
info!(
"Downgrade region leader exceeded deadline, region: {}",
ctx.persistent_ctx.region_id
);
// Rollbacks the metadata if procedure is timeout
return Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)));
}
Err(err) => {
error!(err; "Occurs non-retryable error, region: {}", ctx.persistent_ctx.region_id);
error!(err; "Occurs non-retryable error");
if let Some(deadline) = ctx.volatile_ctx.leader_region_lease_deadline.as_ref() {
info!(
"Running into the downgrade region leader slow path, region: {}, sleep until {:?}",
ctx.persistent_ctx.region_id, deadline
"Running into the downgrade leader slow path, sleep until {:?}",
deadline
);
tokio::time::sleep_until(*deadline).await;
} else {
warn!(
"Leader region lease deadline is not set, region: {}",
ctx.persistent_ctx.region_id
);
}
}
}
@@ -236,61 +216,6 @@ impl DowngradeLeaderRegion {
}
}
async fn update_leader_region_lease_deadline(&self, ctx: &mut Context) {
let leader = &ctx.persistent_ctx.from_peer;
let last_connection_at = match find_datanode_lease_value(leader.id, &ctx.in_memory).await {
Ok(lease_value) => lease_value.map(|lease_value| lease_value.timestamp_millis),
Err(err) => {
error!(err; "Failed to find datanode lease value for datanode: {}, during region migration, region: {}", leader, ctx.persistent_ctx.region_id);
return;
}
};
if let Some(last_connection_at) = last_connection_at {
let now = current_time_millis();
let elapsed = now - last_connection_at;
let region_lease = Duration::from_secs(REGION_LEASE_SECS);
// It's safe to update the region leader lease deadline here because:
// 1. The old region leader has already been marked as downgraded in metadata,
// which means any attempts to renew its lease will be rejected.
// 2. The pusher disconnect time record only gets removed when the datanode (from_peer)
// establishes a new heartbeat connection stream.
if elapsed >= (REGION_LEASE_SECS * 1000) as i64 {
ctx.volatile_ctx.reset_leader_region_lease_deadline();
info!(
"Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {}",
leader,
last_connection_at,
region_lease,
ctx.persistent_ctx.region_id
);
} else if elapsed > 0 {
// `now - last_connection_at` < REGION_LEASE_SECS * 1000
let lease_timeout =
region_lease - Duration::from_millis((now - last_connection_at) as u64);
ctx.volatile_ctx.reset_leader_region_lease_deadline();
ctx.volatile_ctx
.set_leader_region_lease_deadline(lease_timeout);
info!(
"Datanode {}({}) last connected {:?} ago, updated leader region lease deadline to {:?}, region: {}",
leader, last_connection_at, elapsed, ctx.volatile_ctx.leader_region_lease_deadline, ctx.persistent_ctx.region_id
);
} else {
warn!(
"Datanode {} has invalid last connection timestamp: {} (which is after current time: {}), region: {}",
leader, last_connection_at, now, ctx.persistent_ctx.region_id
)
}
} else {
warn!(
"Failed to find last connection time for datanode {}, unable to update region lease deadline, region: {}",
leader, ctx.persistent_ctx.region_id
)
}
}
/// Downgrades a leader region.
///
/// Fast path:
@@ -311,20 +236,13 @@ impl DowngradeLeaderRegion {
retry += 1;
// Throws the error immediately if the procedure exceeded the deadline.
if matches!(err, error::Error::ExceededDeadline { .. }) {
error!(err; "Failed to downgrade region leader, region: {}, exceeded deadline", ctx.persistent_ctx.region_id);
return Err(err);
} else if matches!(err, error::Error::PusherNotFound { .. }) {
// Throws the error immediately if the datanode is unreachable.
error!(err; "Failed to downgrade region leader, region: {}, datanode({}) is unreachable(PusherNotFound)", ctx.persistent_ctx.region_id, ctx.persistent_ctx.from_peer.id);
self.update_leader_region_lease_deadline(ctx).await;
return Err(err);
} else if err.is_retryable() && retry < self.optimistic_retry {
error!(err; "Failed to downgrade region leader, region: {}, retry later", ctx.persistent_ctx.region_id);
error!("Failed to downgrade region, error: {err:?}, retry later");
sleep(self.retry_initial_interval).await;
} else {
return Err(BoxedError::new(err)).context(error::DowngradeLeaderSnafu {
region_id: ctx.persistent_ctx.region_id,
})?;
error!("Failed to downgrade region, error: {err:?}");
break;
}
} else {
ctx.update_operations_elapsed(timer);
@@ -352,7 +270,7 @@ mod tests {
use super::*;
use crate::error::Error;
use crate::procedure::region_migration::test_util::{new_procedure_context, TestingEnv};
use crate::procedure::region_migration::test_util::TestingEnv;
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
use crate::procedure::test_util::{
new_close_region_reply, new_downgrade_region_reply, send_mock_reply,
@@ -622,11 +540,7 @@ mod tests {
ctx.volatile_ctx
.set_leader_region_lease_deadline(Duration::from_secs(5));
let expected_deadline = ctx.volatile_ctx.leader_region_lease_deadline.unwrap();
let err = state
.downgrade_region_with_retry(&mut ctx)
.await
.unwrap_err();
assert_matches!(err, error::Error::DowngradeLeader { .. });
state.downgrade_region_with_retry(&mut ctx).await.unwrap();
assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, None);
// Should remain no change.
assert_eq!(
@@ -658,8 +572,7 @@ mod tests {
});
let timer = Instant::now();
let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
let elapsed = timer.elapsed().as_secs();
assert!(elapsed < REGION_LEASE_SECS / 2);
assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1));
@@ -693,8 +606,7 @@ mod tests {
send_mock_reply(mailbox, rx, |id| {
Ok(new_downgrade_region_reply(id, None, true, None))
});
let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
assert_matches!(update_metadata, UpdateMetadata::Rollback);
}

View File

@@ -16,7 +16,7 @@ use std::any::Any;
use api::v1::meta::MailboxMessage;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_procedure::{Context as ProcedureContext, Status};
use common_procedure::Status;
use common_telemetry::{info, warn};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
@@ -37,11 +37,7 @@ pub struct PreFlushRegion;
#[async_trait::async_trait]
#[typetag::serde]
impl State for PreFlushRegion {
async fn next(
&mut self,
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
let timer = Instant::now();
self.flush_region(ctx).await?;
ctx.update_flush_leader_region_elapsed(timer);
@@ -167,7 +163,7 @@ mod tests {
use store_api::storage::RegionId;
use super::*;
use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv};
use crate::procedure::region_migration::test_util::{self, TestingEnv};
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
use crate::procedure::test_util::{
new_close_region_reply, new_flush_region_reply, send_mock_reply,
@@ -281,8 +277,7 @@ mod tests {
.insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
.await;
send_mock_reply(mailbox, rx, |id| Ok(new_flush_region_reply(id, true, None)));
let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
assert_matches!(update_metadata, UpdateMetadata::Downgrade);

View File

@@ -14,7 +14,7 @@
use std::any::Any;
use common_procedure::{Context as ProcedureContext, Status};
use common_procedure::Status;
use common_telemetry::warn;
use serde::{Deserialize, Serialize};
@@ -38,11 +38,7 @@ impl RegionMigrationAbort {
#[async_trait::async_trait]
#[typetag::serde]
impl State for RegionMigrationAbort {
async fn next(
&mut self,
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
warn!(
"Region migration is aborted: {}, region_id: {}, from_peer: {}, to_peer: {}, {}",
self.reason,

View File

@@ -14,7 +14,7 @@
use std::any::Any;
use common_procedure::{Context as ProcedureContext, Status};
use common_procedure::Status;
use serde::{Deserialize, Serialize};
use crate::error::Result;
@@ -26,11 +26,7 @@ pub struct RegionMigrationEnd;
#[async_trait::async_trait]
#[typetag::serde]
impl State for RegionMigrationEnd {
async fn next(
&mut self,
_: &mut Context,
_: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
async fn next(&mut self, _: &mut Context) -> Result<(Box<dyn State>, Status)> {
Ok((Box::new(RegionMigrationEnd), Status::done()))
}

View File

@@ -16,8 +16,7 @@ use std::any::Any;
use common_meta::peer::Peer;
use common_meta::rpc::router::RegionRoute;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use common_procedure::Status;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
@@ -49,27 +48,15 @@ impl State for RegionMigrationStart {
/// If the candidate region has been opened on `to_peer`, go to the [UpdateMetadata::Downgrade] state.
///
/// Otherwise go to the [OpenCandidateRegion] state.
async fn next(
&mut self,
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
let region_id = ctx.persistent_ctx.region_id;
let region_route = self.retrieve_region_route(ctx, region_id).await?;
let to_peer = &ctx.persistent_ctx.to_peer;
let from_peer = &ctx.persistent_ctx.from_peer;
if self.has_migrated(&region_route, to_peer)? {
info!(
"Region has been migrated, region: {:?}, to_peer: {:?}",
region_route.region.id, to_peer
);
Ok((Box::new(RegionMigrationEnd), Status::done()))
} else if self.invalid_leader_peer(&region_route, from_peer)? {
info!(
"Abort region migration, region:{:?}, unexpected leader peer: {:?}, expected: {:?}",
region_route.region.id, region_route.leader_peer, from_peer,
);
Ok((
Box::new(RegionMigrationAbort::new(&format!(
"Invalid region leader peer: {from_peer:?}, expected: {:?}",
@@ -161,7 +148,7 @@ impl RegionMigrationStart {
fn has_migrated(&self, region_route: &RegionRoute, to_peer: &Peer) -> Result<bool> {
let region_id = region_route.region.id;
let region_migrated = region_route
let region_opened = region_route
.leader_peer
.as_ref()
.context(error::UnexpectedSnafu {
@@ -169,7 +156,8 @@ impl RegionMigrationStart {
})?
.id
== to_peer.id;
Ok(region_migrated)
Ok(region_opened)
}
}
@@ -184,7 +172,7 @@ mod tests {
use super::*;
use crate::error::Error;
use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv};
use crate::procedure::region_migration::test_util::{self, TestingEnv};
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
@@ -260,8 +248,8 @@ mod tests {
env.create_physical_table_metadata(table_info, region_routes)
.await;
let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
@@ -291,8 +279,8 @@ mod tests {
env.create_physical_table_metadata(table_info, region_routes)
.await;
let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
let _ = next.as_any().downcast_ref::<RegionMigrationEnd>().unwrap();
}
@@ -317,8 +305,8 @@ mod tests {
env.create_physical_table_metadata(table_info, region_routes)
.await;
let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
let _ = next.as_any().downcast_ref::<OpenCandidateRegion>().unwrap();
}
@@ -342,8 +330,8 @@ mod tests {
env.create_physical_table_metadata(table_info, region_routes)
.await;
let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
let _ = next
.as_any()

View File

@@ -20,7 +20,7 @@ use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
use common_meta::key::datanode_table::RegionInfo;
use common_meta::RegionIdent;
use common_procedure::{Context as ProcedureContext, Status};
use common_procedure::Status;
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
@@ -41,11 +41,7 @@ pub struct OpenCandidateRegion;
#[async_trait::async_trait]
#[typetag::serde]
impl State for OpenCandidateRegion {
async fn next(
&mut self,
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
let instruction = self.build_open_region_instruction(ctx).await?;
let now = Instant::now();
self.open_candidate_region(ctx, instruction).await?;
@@ -204,7 +200,7 @@ mod tests {
use super::*;
use crate::error::Error;
use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv};
use crate::procedure::region_migration::test_util::{self, TestingEnv};
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
use crate::procedure::test_util::{
new_close_region_reply, new_open_region_reply, send_mock_reply,
@@ -438,8 +434,8 @@ mod tests {
.await;
send_mock_reply(mailbox, rx, |id| Ok(new_open_region_reply(id, true, None)));
let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
let vc = ctx.volatile_ctx;
assert_eq!(
vc.opening_region_guard.unwrap().info(),

View File

@@ -121,7 +121,6 @@ impl TestingEnv {
table_metadata_manager: self.table_metadata_manager.clone(),
opening_region_keeper: self.opening_region_keeper.clone(),
volatile_ctx: Default::default(),
in_memory_key: Arc::new(MemoryKvBackend::default()),
mailbox: self.mailbox_ctx.mailbox().clone(),
server_addr: self.server_addr.to_string(),
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
@@ -263,8 +262,7 @@ impl ProcedureMigrationTestSuite {
}
debug!("suite test: {name} invoking next");
let procedure_ctx = new_procedure_context();
let result = self.state.next(&mut self.context, &procedure_ctx).await;
let result = self.state.next(&mut self.context).await;
match assertion {
Assertion::Simple(state_assert, status_assert) => {
@@ -565,11 +563,3 @@ fn test_merge_mailbox_messages() {
unreachable!()
}
}
/// Returns a new [ProcedureContext].
pub fn new_procedure_context() -> ProcedureContext {
ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
}
}

Some files were not shown because too many files have changed in this diff Show More