Compare commits

...

7 Commits

Author SHA1 Message Date
discord9
0443042f14 feat: better metrics 2025-06-09 20:28:29 +08:00
discord9
1786190235 feat(exp): adjust_flow admin function 2025-06-09 17:02:42 +08:00
discord9
705b2007cf feat: flownode to frontend load balance with guess 2025-06-09 16:34:19 +08:00
dennis zhuang
8d2c1b7f6a ci: refactor bump downstream versions worflow and adds demo-scene (#6171)
* ci: refactor bump downstream versions and adds demo-scene

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: rename

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: style

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: forgot DEMO_REPO_TOKEN

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: demo repo name

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

---------

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
2025-06-09 03:19:00 +00:00
Lei, HUANG
c50e84095e feat: disable compression for do_get API (#6254)
* feat/disable-flight-compression:
 ### Commit Summary

 - **Add Compression Control in Flight Encoder**: Introduced a new method `with_compression_disabled` in `FlightEncoder` to allow encoding without compression in `flight.rs`.
 - **Update Flight Stream Initialization**: Modified `FlightRecordBatchStream` to use the new `FlightEncoder::with_compression_disabled` method for initializing the encoder in `stream.rs`.

* feat/disable-flight-compression:
 Remove Unused Import in `flight.rs`

 - Removed the unused import `write_message` from `flight.rs` to clean up the codebase.

* feat/disable-flight-compression:
 ### Disable Compression in Flight Encoder

 - Updated `tests-integration/src/grpc/flight.rs` to use `FlightEncoder::with_compression_disabled()` instead of `FlightEncoder::default()` for encoding `FlightMessage::Schema` and `FlightMessage::RecordBatch`. This change disables compression in the Flight encoder for these operations.

Signed-off-by: Lei, HUANG <lhuang@greptime.com>

* Signed-off-by: Lei, HUANG <lhuang@greptime.com>

* disable flight client compression

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Lei, HUANG <lhuang@greptime.com>
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
2025-06-09 03:02:28 +00:00
Weny Xu
d3d233257d feat: add some metasrv metrics to grafana dashboard (#6264)
* feat: add metasrv dashboard panels

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-06-09 02:41:00 +00:00
zyy17
fdf32a8f46 refactor: respect data_home as root data home directory (#6050)
* refactor: initialize logging dir by using data_home

* chore: remove tail '/' for dir name
2025-06-09 02:31:21 +00:00
47 changed files with 10325 additions and 7893 deletions

View File

@@ -441,8 +441,8 @@ jobs:
aws-region: ${{ vars.EC2_RUNNER_REGION }}
github-token: ${{ secrets.GH_PERSONAL_ACCESS_TOKEN }}
bump-doc-version:
name: Bump doc version
bump-downstream-repo-versions:
name: Bump downstream repo versions
if: ${{ github.event_name == 'push' || github.event_name == 'schedule' }}
needs: [allocate-runners, publish-github-release]
runs-on: ubuntu-latest
@@ -456,36 +456,16 @@ jobs:
fetch-depth: 0
persist-credentials: false
- uses: ./.github/actions/setup-cyborg
- name: Bump doc version
- name: Bump downstream repo versions
working-directory: cyborg
run: pnpm tsx bin/bump-doc-version.ts
env:
VERSION: ${{ needs.allocate-runners.outputs.version }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
DOCS_REPO_TOKEN: ${{ secrets.DOCS_REPO_TOKEN }}
bump-website-version:
name: Bump website version
if: ${{ github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
needs: [allocate-runners, publish-github-release]
runs-on: ubuntu-latest
# Permission reference: https://docs.github.com/en/actions/using-jobs/assigning-permissions-to-jobs
permissions:
issues: write # Allows the action to create issues for cyborg.
contents: write # Allows the action to create a release.
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
persist-credentials: false
- uses: ./.github/actions/setup-cyborg
- name: Bump website version
working-directory: cyborg
run: pnpm tsx bin/bump-website-version.ts
run: pnpm tsx bin/bump-versions.ts
env:
TARGET_REPOS: website,docs,demo
VERSION: ${{ needs.allocate-runners.outputs.version }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
WEBSITE_REPO_TOKEN: ${{ secrets.WEBSITE_REPO_TOKEN }}
DOCS_REPO_TOKEN: ${{ secrets.DOCS_REPO_TOKEN }}
DEMO_REPO_TOKEN: ${{ secrets.DEMO_REPO_TOKEN }}
bump-helm-charts-version:
name: Bump helm charts version

2
Cargo.lock generated
View File

@@ -5133,7 +5133,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=454c52634c3bac27de10bf0d85d5533eed1cf03f#454c52634c3bac27de10bf0d85d5533eed1cf03f"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=2dca1dc67862d7b410838aef81232274c019b3f6#2dca1dc67862d7b410838aef81232274c019b3f6"
dependencies = [
"prost 0.13.5",
"serde",

View File

@@ -133,7 +133,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "454c52634c3bac27de10bf0d85d5533eed1cf03f" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2dca1dc67862d7b410838aef81232274c019b3f6" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -100,7 +100,7 @@
| `query` | -- | -- | The query engine options. |
| `query.parallelism` | Integer | `0` | Parallelism of the query engine.<br/>Default to 0, which means the number of CPU cores. |
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `./greptimedb_data/` | The working home directory. |
| `storage.data_home` | String | `./greptimedb_data` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
@@ -314,7 +314,7 @@
| Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- |
| `data_home` | String | `./greptimedb_data/metasrv/` | The working home directory. |
| `data_home` | String | `./greptimedb_data` | The working home directory. |
| `bind_addr` | String | `127.0.0.1:3002` | The bind address of metasrv. |
| `server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `bind_addr`. |
| `store_addrs` | Array | -- | Store server address default to etcd store.<br/>For postgres store, the format is:<br/>"password=password dbname=postgres user=postgres host=localhost port=5432"<br/>For etcd store, the format is:<br/>"127.0.0.1:2379" |
@@ -446,7 +446,7 @@
| `query` | -- | -- | The query engine options. |
| `query.parallelism` | Integer | `0` | Parallelism of the query engine.<br/>Default to 0, which means the number of CPU cores. |
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `./greptimedb_data/` | The working home directory. |
| `storage.data_home` | String | `./greptimedb_data` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |

View File

@@ -252,7 +252,7 @@ parallelism = 0
## The data storage options.
[storage]
## The working home directory.
data_home = "./greptimedb_data/"
data_home = "./greptimedb_data"
## The storage type used to store the data.
## - `File`: the data is stored in the local file system.

View File

@@ -1,5 +1,5 @@
## The working home directory.
data_home = "./greptimedb_data/metasrv/"
data_home = "./greptimedb_data"
## The bind address of metasrv.
bind_addr = "127.0.0.1:3002"

View File

@@ -350,7 +350,7 @@ parallelism = 0
## The data storage options.
[storage]
## The working home directory.
data_home = "./greptimedb_data/"
data_home = "./greptimedb_data"
## The storage type used to store the data.
## - `File`: the data is stored in the local file system.

View File

@@ -1,75 +0,0 @@
/*
* Copyright 2023 Greptime Team
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import * as core from "@actions/core";
import {obtainClient} from "@/common";
async function triggerWorkflow(workflowId: string, version: string) {
const docsClient = obtainClient("DOCS_REPO_TOKEN")
try {
await docsClient.rest.actions.createWorkflowDispatch({
owner: "GreptimeTeam",
repo: "docs",
workflow_id: workflowId,
ref: "main",
inputs: {
version,
},
});
console.log(`Successfully triggered ${workflowId} workflow with version ${version}`);
} catch (error) {
core.setFailed(`Failed to trigger workflow: ${error.message}`);
}
}
function determineWorkflow(version: string): [string, string] {
// Check if it's a nightly version
if (version.includes('nightly')) {
return ['bump-nightly-version.yml', version];
}
const parts = version.split('.');
if (parts.length !== 3) {
throw new Error('Invalid version format');
}
// If patch version (last number) is 0, it's a major version
// Return only major.minor version
if (parts[2] === '0') {
return ['bump-version.yml', `${parts[0]}.${parts[1]}`];
}
// Otherwise it's a patch version, use full version
return ['bump-patch-version.yml', version];
}
const version = process.env.VERSION;
if (!version) {
core.setFailed("VERSION environment variable is required");
process.exit(1);
}
// Remove 'v' prefix if exists
const cleanVersion = version.startsWith('v') ? version.slice(1) : version;
try {
const [workflowId, apiVersion] = determineWorkflow(cleanVersion);
triggerWorkflow(workflowId, apiVersion);
} catch (error) {
core.setFailed(`Error processing version: ${error.message}`);
process.exit(1);
}

156
cyborg/bin/bump-versions.ts Normal file
View File

@@ -0,0 +1,156 @@
/*
* Copyright 2023 Greptime Team
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import * as core from "@actions/core";
import {obtainClient} from "@/common";
interface RepoConfig {
tokenEnv: string;
repo: string;
workflowLogic: (version: string) => [string, string] | null;
}
const REPO_CONFIGS: Record<string, RepoConfig> = {
website: {
tokenEnv: "WEBSITE_REPO_TOKEN",
repo: "website",
workflowLogic: (version: string) => {
// Skip nightly versions for website
if (version.includes('nightly')) {
console.log('Nightly version detected for website, skipping workflow trigger.');
return null;
}
return ['bump-patch-version.yml', version];
}
},
demo: {
tokenEnv: "DEMO_REPO_TOKEN",
repo: "demo-scene",
workflowLogic: (version: string) => {
// Skip nightly versions for demo
if (version.includes('nightly')) {
console.log('Nightly version detected for demo, skipping workflow trigger.');
return null;
}
return ['bump-patch-version.yml', version];
}
},
docs: {
tokenEnv: "DOCS_REPO_TOKEN",
repo: "docs",
workflowLogic: (version: string) => {
// Check if it's a nightly version
if (version.includes('nightly')) {
return ['bump-nightly-version.yml', version];
}
const parts = version.split('.');
if (parts.length !== 3) {
throw new Error('Invalid version format');
}
// If patch version (last number) is 0, it's a major version
// Return only major.minor version
if (parts[2] === '0') {
return ['bump-version.yml', `${parts[0]}.${parts[1]}`];
}
// Otherwise it's a patch version, use full version
return ['bump-patch-version.yml', version];
}
}
};
async function triggerWorkflow(repoConfig: RepoConfig, workflowId: string, version: string) {
const client = obtainClient(repoConfig.tokenEnv);
try {
await client.rest.actions.createWorkflowDispatch({
owner: "GreptimeTeam",
repo: repoConfig.repo,
workflow_id: workflowId,
ref: "main",
inputs: {
version,
},
});
console.log(`Successfully triggered ${workflowId} workflow for ${repoConfig.repo} with version ${version}`);
} catch (error) {
core.setFailed(`Failed to trigger workflow for ${repoConfig.repo}: ${error.message}`);
throw error;
}
}
async function processRepo(repoName: string, version: string) {
const repoConfig = REPO_CONFIGS[repoName];
if (!repoConfig) {
throw new Error(`Unknown repository: ${repoName}`);
}
try {
const workflowResult = repoConfig.workflowLogic(version);
if (workflowResult === null) {
// Skip this repo (e.g., nightly version for website)
return;
}
const [workflowId, apiVersion] = workflowResult;
await triggerWorkflow(repoConfig, workflowId, apiVersion);
} catch (error) {
core.setFailed(`Error processing ${repoName} with version ${version}: ${error.message}`);
throw error;
}
}
async function main() {
const version = process.env.VERSION;
if (!version) {
core.setFailed("VERSION environment variable is required");
process.exit(1);
}
// Remove 'v' prefix if exists
const cleanVersion = version.startsWith('v') ? version.slice(1) : version;
// Get target repositories from environment variable
// Default to both if not specified
const targetRepos = process.env.TARGET_REPOS?.split(',').map(repo => repo.trim()) || ['website', 'docs'];
console.log(`Processing version ${cleanVersion} for repositories: ${targetRepos.join(', ')}`);
const errors: string[] = [];
// Process each repository
for (const repo of targetRepos) {
try {
await processRepo(repo, cleanVersion);
} catch (error) {
errors.push(`${repo}: ${error.message}`);
}
}
if (errors.length > 0) {
core.setFailed(`Failed to process some repositories: ${errors.join('; ')}`);
process.exit(1);
}
console.log('All repositories processed successfully');
}
// Execute main function
main().catch((error) => {
core.setFailed(`Unexpected error: ${error.message}`);
process.exit(1);
});

View File

@@ -1,57 +0,0 @@
/*
* Copyright 2023 Greptime Team
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import * as core from "@actions/core";
import {obtainClient} from "@/common";
async function triggerWorkflow(workflowId: string, version: string) {
const websiteClient = obtainClient("WEBSITE_REPO_TOKEN")
try {
await websiteClient.rest.actions.createWorkflowDispatch({
owner: "GreptimeTeam",
repo: "website",
workflow_id: workflowId,
ref: "main",
inputs: {
version,
},
});
console.log(`Successfully triggered ${workflowId} workflow with version ${version}`);
} catch (error) {
core.setFailed(`Failed to trigger workflow: ${error.message}`);
}
}
const version = process.env.VERSION;
if (!version) {
core.setFailed("VERSION environment variable is required");
process.exit(1);
}
// Remove 'v' prefix if exists
const cleanVersion = version.startsWith('v') ? version.slice(1) : version;
if (cleanVersion.includes('nightly')) {
console.log('Nightly version detected, skipping workflow trigger.');
process.exit(0);
}
try {
triggerWorkflow('bump-patch-version.yml', cleanVersion);
} catch (error) {
core.setFailed(`Error processing version: ${error.message}`);
process.exit(1);
}

File diff suppressed because it is too large Load Diff

View File

@@ -60,7 +60,7 @@
| Read Stage P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_read_stage_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))` | `timeseries` | Read Stage P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]` |
| Write Stage P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_write_stage_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))` | `timeseries` | Write Stage P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]` |
| Compaction OPS per Instance | `sum by(instance, pod) (rate(greptime_mito_compaction_total_elapsed_count{instance=~"$datanode"}[$__rate_interval]))` | `timeseries` | Compaction OPS per Instance. | `prometheus` | `ops` | `[{{ instance }}]-[{{pod}}]` |
| Compaction Elapsed Time per Instance by Stage | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))`<br/>`sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{instance=~"$datanode"}[$__rate_interval]))/sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_count{instance=~"$datanode"}[$__rate_interval]))` | `timeseries` | Compaction latency by stage | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-p99` |
| Compaction Elapsed Time per Instance by Stage | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))`<br/>`sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{instance=~"$datanode"}[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_count{instance=~"$datanode"}[$__rate_interval]))` | `timeseries` | Compaction latency by stage | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-p99` |
| Compaction P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le,stage) (rate(greptime_mito_compaction_total_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))` | `timeseries` | Compaction P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-compaction` |
| WAL write size | `histogram_quantile(0.95, sum by(le,instance, pod) (rate(raft_engine_write_size_bucket[$__rate_interval])))`<br/>`histogram_quantile(0.99, sum by(le,instance,pod) (rate(raft_engine_write_size_bucket[$__rate_interval])))`<br/>`sum by (instance, pod)(rate(raft_engine_write_size_sum[$__rate_interval]))` | `timeseries` | Write-ahead logs write size as bytes. This chart includes stats of p95 and p99 size by instance, total WAL write rate. | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-req-size-p95` |
| Cached Bytes per Instance | `greptime_mito_cache_bytes{instance=~"$datanode"}` | `timeseries` | Cached Bytes per Instance. | `prometheus` | `decbytes` | `[{{instance}}]-[{{pod}}]-[{{type}}]` |
@@ -69,7 +69,7 @@
| Log Store op duration seconds | `histogram_quantile(0.99, sum by(le,logstore,optype,instance, pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))` | `timeseries` | Write-ahead log operations latency at p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{logstore}}]-[{{optype}}]-p99` |
| Inflight Flush | `greptime_mito_inflight_flush_count` | `timeseries` | Ongoing flush task count | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]` |
| Compaction Input/Output Bytes | `sum by(instance, pod) (greptime_mito_compaction_input_bytes)`<br/>`sum by(instance, pod) (greptime_mito_compaction_output_bytes)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-input` |
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
| Region Worker Convert Requests | `histogram_quantile(0.95, sum by(le, instance, stage, pod) (rate(greptime_datanode_convert_region_request_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to decode requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
# OpenDAL
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
@@ -88,9 +88,19 @@
# Metasrv
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |
| Region migration datanode | `greptime_meta_region_migration_stat{datanode_type="src"}`<br/>`greptime_meta_region_migration_stat{datanode_type="desc"}` | `state-timeline` | Counter of region migration by source and destination | `prometheus` | `none` | `from-datanode-{{datanode_id}}` |
| Region migration error | `greptime_meta_region_migration_error` | `timeseries` | Counter of region migration error | `prometheus` | `none` | `__auto` |
| Datanode load | `greptime_datanode_load` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `__auto` |
| Region migration datanode | `greptime_meta_region_migration_stat{datanode_type="src"}`<br/>`greptime_meta_region_migration_stat{datanode_type="desc"}` | `status-history` | Counter of region migration by source and destination | `prometheus` | -- | `from-datanode-{{datanode_id}}` |
| Region migration error | `greptime_meta_region_migration_error` | `timeseries` | Counter of region migration error | `prometheus` | `none` | `{{pod}}-{{state}}-{{error_type}}` |
| Datanode load | `greptime_datanode_load` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `binBps` | `Datanode-{{datanode_id}}-writeload` |
| Rate of SQL Executions (RDS) | `rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_count[$__rate_interval])` | `timeseries` | Displays the rate of SQL executions processed by the Meta service using the RDS backend. | `prometheus` | `none` | `{{pod}} {{op}} {{type}} {{result}} ` |
| SQL Execution Latency (RDS) | `histogram_quantile(0.90, sum by(pod, op, type, result, le) (rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_bucket[$__rate_interval])))` | `timeseries` | Measures the response time of SQL executions via the RDS backend. | `prometheus` | `ms` | `{{pod}} {{op}} {{type}} {{result}} p90` |
| Handler Execution Latency | `histogram_quantile(0.90, sum by(pod, le, name) (
rate(greptime_meta_handler_execute_bucket[$__rate_interval])
))` | `timeseries` | Shows latency of Meta handlers by pod and handler name, useful for monitoring handler performance and detecting latency spikes.<br/> | `prometheus` | `s` | `{{pod}} {{name}} p90` |
| Heartbeat Packet Size | `histogram_quantile(0.9, sum by(pod, le) (greptime_meta_heartbeat_stat_memory_size_bucket))` | `timeseries` | Shows p90 heartbeat message sizes, helping track network usage and identify anomalies in heartbeat payload.<br/> | `prometheus` | `bytes` | `{{pod}}` |
| Meta Heartbeat Receive Rate | `rate(greptime_meta_heartbeat_rate[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}` |
| Meta KV Ops Latency | `histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}-{{op}} p99` |
| Rate of meta KV Ops | `rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `{{pod}}-{{op}} p99` |
| DDL Latency | `histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `CreateLogicalTables-{{step}} p90` |
# Flownode
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |

View File

@@ -497,7 +497,7 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-p99'
- expr: sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{instance=~"$datanode"}[$__rate_interval]))/sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_count{instance=~"$datanode"}[$__rate_interval]))
- expr: sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{instance=~"$datanode"}[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_count{instance=~"$datanode"}[$__rate_interval]))
datasource:
type: prometheus
uid: ${metrics}
@@ -607,7 +607,7 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
- expr: sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))
- expr: sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))
datasource:
type: prometheus
uid: ${metrics}
@@ -741,9 +741,8 @@ groups:
- title: Metasrv
panels:
- title: Region migration datanode
type: state-timeline
type: status-history
description: Counter of region migration by source and destination
unit: none
queries:
- expr: greptime_meta_region_migration_stat{datanode_type="src"}
datasource:
@@ -764,17 +763,127 @@ groups:
datasource:
type: prometheus
uid: ${metrics}
legendFormat: __auto
legendFormat: '{{pod}}-{{state}}-{{error_type}}'
- title: Datanode load
type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: none
unit: binBps
queries:
- expr: greptime_datanode_load
datasource:
type: prometheus
uid: ${metrics}
legendFormat: __auto
legendFormat: Datanode-{{datanode_id}}-writeload
- title: Rate of SQL Executions (RDS)
type: timeseries
description: Displays the rate of SQL executions processed by the Meta service using the RDS backend.
unit: none
queries:
- expr: rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_count[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}} {{op}} {{type}} {{result}} '
- title: SQL Execution Latency (RDS)
type: timeseries
description: 'Measures the response time of SQL executions via the RDS backend. '
unit: ms
queries:
- expr: histogram_quantile(0.90, sum by(pod, op, type, result, le) (rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_bucket[$__rate_interval])))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}} {{op}} {{type}} {{result}} p90'
- title: Handler Execution Latency
type: timeseries
description: |
Shows latency of Meta handlers by pod and handler name, useful for monitoring handler performance and detecting latency spikes.
unit: s
queries:
- expr: |-
histogram_quantile(0.90, sum by(pod, le, name) (
rate(greptime_meta_handler_execute_bucket[$__rate_interval])
))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}} {{name}} p90'
- title: Heartbeat Packet Size
type: timeseries
description: |
Shows p90 heartbeat message sizes, helping track network usage and identify anomalies in heartbeat payload.
unit: bytes
queries:
- expr: histogram_quantile(0.9, sum by(pod, le) (greptime_meta_heartbeat_stat_memory_size_bucket))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}'
- title: Meta Heartbeat Receive Rate
type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: s
queries:
- expr: rate(greptime_meta_heartbeat_rate[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}'
- title: Meta KV Ops Latency
type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: s
queries:
- expr: histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{op}} p99'
- title: Rate of meta KV Ops
type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: none
queries:
- expr: rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{op}} p99'
- title: DDL Latency
type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: s
queries:
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: CreateLogicalTables-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: CreateTable-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: CreateView-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: CreateFlow-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: DropTable-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: AlterTable-{{step}} p90
- title: Flownode
panels:
- title: Flow Ingest / Output Rate

File diff suppressed because it is too large Load Diff

View File

@@ -60,7 +60,7 @@
| Read Stage P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_read_stage_elapsed_bucket{}[$__rate_interval])))` | `timeseries` | Read Stage P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]` |
| Write Stage P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_write_stage_elapsed_bucket{}[$__rate_interval])))` | `timeseries` | Write Stage P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]` |
| Compaction OPS per Instance | `sum by(instance, pod) (rate(greptime_mito_compaction_total_elapsed_count{}[$__rate_interval]))` | `timeseries` | Compaction OPS per Instance. | `prometheus` | `ops` | `[{{ instance }}]-[{{pod}}]` |
| Compaction Elapsed Time per Instance by Stage | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{}[$__rate_interval])))`<br/>`sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{}[$__rate_interval]))/sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_count{}[$__rate_interval]))` | `timeseries` | Compaction latency by stage | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-p99` |
| Compaction Elapsed Time per Instance by Stage | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{}[$__rate_interval])))`<br/>`sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{}[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_count{}[$__rate_interval]))` | `timeseries` | Compaction latency by stage | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-p99` |
| Compaction P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le,stage) (rate(greptime_mito_compaction_total_elapsed_bucket{}[$__rate_interval])))` | `timeseries` | Compaction P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-compaction` |
| WAL write size | `histogram_quantile(0.95, sum by(le,instance, pod) (rate(raft_engine_write_size_bucket[$__rate_interval])))`<br/>`histogram_quantile(0.99, sum by(le,instance,pod) (rate(raft_engine_write_size_bucket[$__rate_interval])))`<br/>`sum by (instance, pod)(rate(raft_engine_write_size_sum[$__rate_interval]))` | `timeseries` | Write-ahead logs write size as bytes. This chart includes stats of p95 and p99 size by instance, total WAL write rate. | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-req-size-p95` |
| Cached Bytes per Instance | `greptime_mito_cache_bytes{}` | `timeseries` | Cached Bytes per Instance. | `prometheus` | `decbytes` | `[{{instance}}]-[{{pod}}]-[{{type}}]` |
@@ -69,7 +69,7 @@
| Log Store op duration seconds | `histogram_quantile(0.99, sum by(le,logstore,optype,instance, pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))` | `timeseries` | Write-ahead log operations latency at p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{logstore}}]-[{{optype}}]-p99` |
| Inflight Flush | `greptime_mito_inflight_flush_count` | `timeseries` | Ongoing flush task count | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]` |
| Compaction Input/Output Bytes | `sum by(instance, pod) (greptime_mito_compaction_input_bytes)`<br/>`sum by(instance, pod) (greptime_mito_compaction_output_bytes)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-input` |
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
| Region Worker Convert Requests | `histogram_quantile(0.95, sum by(le, instance, stage, pod) (rate(greptime_datanode_convert_region_request_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to decode requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
# OpenDAL
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
@@ -88,9 +88,19 @@
# Metasrv
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |
| Region migration datanode | `greptime_meta_region_migration_stat{datanode_type="src"}`<br/>`greptime_meta_region_migration_stat{datanode_type="desc"}` | `state-timeline` | Counter of region migration by source and destination | `prometheus` | `none` | `from-datanode-{{datanode_id}}` |
| Region migration error | `greptime_meta_region_migration_error` | `timeseries` | Counter of region migration error | `prometheus` | `none` | `__auto` |
| Datanode load | `greptime_datanode_load` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `__auto` |
| Region migration datanode | `greptime_meta_region_migration_stat{datanode_type="src"}`<br/>`greptime_meta_region_migration_stat{datanode_type="desc"}` | `status-history` | Counter of region migration by source and destination | `prometheus` | -- | `from-datanode-{{datanode_id}}` |
| Region migration error | `greptime_meta_region_migration_error` | `timeseries` | Counter of region migration error | `prometheus` | `none` | `{{pod}}-{{state}}-{{error_type}}` |
| Datanode load | `greptime_datanode_load` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `binBps` | `Datanode-{{datanode_id}}-writeload` |
| Rate of SQL Executions (RDS) | `rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_count[$__rate_interval])` | `timeseries` | Displays the rate of SQL executions processed by the Meta service using the RDS backend. | `prometheus` | `none` | `{{pod}} {{op}} {{type}} {{result}} ` |
| SQL Execution Latency (RDS) | `histogram_quantile(0.90, sum by(pod, op, type, result, le) (rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_bucket[$__rate_interval])))` | `timeseries` | Measures the response time of SQL executions via the RDS backend. | `prometheus` | `ms` | `{{pod}} {{op}} {{type}} {{result}} p90` |
| Handler Execution Latency | `histogram_quantile(0.90, sum by(pod, le, name) (
rate(greptime_meta_handler_execute_bucket[$__rate_interval])
))` | `timeseries` | Shows latency of Meta handlers by pod and handler name, useful for monitoring handler performance and detecting latency spikes.<br/> | `prometheus` | `s` | `{{pod}} {{name}} p90` |
| Heartbeat Packet Size | `histogram_quantile(0.9, sum by(pod, le) (greptime_meta_heartbeat_stat_memory_size_bucket))` | `timeseries` | Shows p90 heartbeat message sizes, helping track network usage and identify anomalies in heartbeat payload.<br/> | `prometheus` | `bytes` | `{{pod}}` |
| Meta Heartbeat Receive Rate | `rate(greptime_meta_heartbeat_rate[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}` |
| Meta KV Ops Latency | `histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}-{{op}} p99` |
| Rate of meta KV Ops | `rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `{{pod}}-{{op}} p99` |
| DDL Latency | `histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `CreateLogicalTables-{{step}} p90` |
# Flownode
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |

View File

@@ -497,7 +497,7 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-p99'
- expr: sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{}[$__rate_interval]))/sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_count{}[$__rate_interval]))
- expr: sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{}[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_count{}[$__rate_interval]))
datasource:
type: prometheus
uid: ${metrics}
@@ -607,7 +607,7 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
- expr: sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))
- expr: sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))
datasource:
type: prometheus
uid: ${metrics}
@@ -741,9 +741,8 @@ groups:
- title: Metasrv
panels:
- title: Region migration datanode
type: state-timeline
type: status-history
description: Counter of region migration by source and destination
unit: none
queries:
- expr: greptime_meta_region_migration_stat{datanode_type="src"}
datasource:
@@ -764,17 +763,127 @@ groups:
datasource:
type: prometheus
uid: ${metrics}
legendFormat: __auto
legendFormat: '{{pod}}-{{state}}-{{error_type}}'
- title: Datanode load
type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: none
unit: binBps
queries:
- expr: greptime_datanode_load
datasource:
type: prometheus
uid: ${metrics}
legendFormat: __auto
legendFormat: Datanode-{{datanode_id}}-writeload
- title: Rate of SQL Executions (RDS)
type: timeseries
description: Displays the rate of SQL executions processed by the Meta service using the RDS backend.
unit: none
queries:
- expr: rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_count[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}} {{op}} {{type}} {{result}} '
- title: SQL Execution Latency (RDS)
type: timeseries
description: 'Measures the response time of SQL executions via the RDS backend. '
unit: ms
queries:
- expr: histogram_quantile(0.90, sum by(pod, op, type, result, le) (rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_bucket[$__rate_interval])))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}} {{op}} {{type}} {{result}} p90'
- title: Handler Execution Latency
type: timeseries
description: |
Shows latency of Meta handlers by pod and handler name, useful for monitoring handler performance and detecting latency spikes.
unit: s
queries:
- expr: |-
histogram_quantile(0.90, sum by(pod, le, name) (
rate(greptime_meta_handler_execute_bucket[$__rate_interval])
))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}} {{name}} p90'
- title: Heartbeat Packet Size
type: timeseries
description: |
Shows p90 heartbeat message sizes, helping track network usage and identify anomalies in heartbeat payload.
unit: bytes
queries:
- expr: histogram_quantile(0.9, sum by(pod, le) (greptime_meta_heartbeat_stat_memory_size_bucket))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}'
- title: Meta Heartbeat Receive Rate
type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: s
queries:
- expr: rate(greptime_meta_heartbeat_rate[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}'
- title: Meta KV Ops Latency
type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: s
queries:
- expr: histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{op}} p99'
- title: Rate of meta KV Ops
type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: none
queries:
- expr: rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{op}} p99'
- title: DDL Latency
type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: s
queries:
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: CreateLogicalTables-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: CreateTable-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: CreateView-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: CreateFlow-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: DropTable-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: AlterTable-{{step}} p90
- title: Flownode
panels:
- title: Flow Ingest / Output Rate

View File

@@ -167,9 +167,7 @@ impl Client {
let client = FlightServiceClient::new(channel)
.max_decoding_message_size(self.max_grpc_recv_message_size())
.max_encoding_message_size(self.max_grpc_send_message_size())
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Zstd);
.max_encoding_message_size(self.max_grpc_send_message_size());
Ok(FlightClient { addr, client })
}
@@ -178,9 +176,7 @@ impl Client {
let (addr, channel) = self.find_channel()?;
let client = PbRegionClient::new(channel)
.max_decoding_message_size(self.max_grpc_recv_message_size())
.max_encoding_message_size(self.max_grpc_send_message_size())
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Zstd);
.max_encoding_message_size(self.max_grpc_send_message_size());
Ok((addr, client))
}

View File

@@ -14,12 +14,13 @@
pub mod builder;
use std::path::Path;
use std::time::Duration;
use async_trait::async_trait;
use clap::Parser;
use common_config::Configurable;
use common_telemetry::logging::TracingOptions;
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
use common_telemetry::{info, warn};
use common_wal::config::DatanodeWalConfig;
use datanode::datanode::Datanode;
@@ -248,6 +249,14 @@ impl StartCommand {
raft_engine_config.dir.replace(wal_dir.clone());
}
// If the logging dir is not set, use the default logs dir in the data home.
if opts.logging.dir.is_empty() {
opts.logging.dir = Path::new(&opts.storage.data_home)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string();
}
if let Some(http_addr) = &self.http_addr {
opts.http.addr.clone_from(http_addr);
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
@@ -21,7 +22,7 @@ use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKv
use clap::Parser;
use client::client_manager::NodeClients;
use common_base::Plugins;
use common_config::Configurable;
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_grpc::channel_manager::ChannelConfig;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
@@ -30,7 +31,7 @@ use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::TableMetadataManager;
use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
use common_version::{short_version, version};
use flow::{
get_flow_auth_options, FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder,
@@ -186,6 +187,14 @@ impl StartCommand {
opts.logging.dir.clone_from(dir);
}
// If the logging dir is not set, use the default logs dir in the data home.
if opts.logging.dir.is_empty() {
opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string();
}
if global_options.log_level.is_some() {
opts.logging.level.clone_from(&global_options.log_level);
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
@@ -22,14 +23,14 @@ use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKv
use clap::Parser;
use client::client_manager::NodeClients;
use common_base::Plugins;
use common_config::Configurable;
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_grpc::channel_manager::ChannelConfig;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
use common_time::timezone::set_default_timezone;
use common_version::{short_version, version};
use frontend::frontend::Frontend;
@@ -194,6 +195,14 @@ impl StartCommand {
opts.logging.dir.clone_from(dir);
}
// If the logging dir is not set, use the default logs dir in the data home.
if opts.logging.dir.is_empty() {
opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string();
}
if global_options.log_level.is_some() {
opts.logging.level.clone_from(&global_options.log_level);
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::fmt;
use std::path::Path;
use std::time::Duration;
use async_trait::async_trait;
@@ -20,7 +21,7 @@ use clap::Parser;
use common_base::Plugins;
use common_config::Configurable;
use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
use common_version::{short_version, version};
use meta_srv::bootstrap::MetasrvInstance;
use meta_srv::metasrv::BackendImpl;
@@ -274,6 +275,14 @@ impl StartCommand {
opts.data_home.clone_from(data_home);
}
// If the logging dir is not set, use the default logs dir in the data home.
if opts.logging.dir.is_empty() {
opts.logging.dir = Path::new(&opts.data_home)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string();
}
if !self.store_key_prefix.is_empty() {
opts.store_key_prefix.clone_from(&self.store_key_prefix)
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
use std::{fs, path};
@@ -49,7 +50,9 @@ use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::{build_wal_options_allocator, WalOptionsAllocatorRef};
use common_procedure::{ProcedureInfo, ProcedureManagerRef};
use common_telemetry::info;
use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
use common_telemetry::logging::{
LoggingOptions, SlowQueryOptions, TracingOptions, DEFAULT_LOGGING_DIR,
};
use common_time::timezone::set_default_timezone;
use common_version::{short_version, version};
use common_wal::config::DatanodeWalConfig;
@@ -407,6 +410,14 @@ impl StartCommand {
opts.storage.data_home.clone_from(data_home);
}
// If the logging dir is not set, use the default logs dir in the data home.
if opts.logging.dir.is_empty() {
opts.logging.dir = Path::new(&opts.storage.data_home)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string();
}
if let Some(addr) = &self.rpc_bind_addr {
// frontend grpc addr conflict with datanode default grpc addr
let datanode_grpc_addr = DatanodeOptions::default().grpc.bind_addr;

View File

@@ -12,13 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::Path;
use std::time::Duration;
use cmd::options::GreptimeOptions;
use cmd::standalone::StandaloneOptions;
use common_config::Configurable;
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_options::datanode::{ClientOptions, DatanodeClientOptions};
use common_telemetry::logging::{LoggingOptions, DEFAULT_OTLP_ENDPOINT};
use common_telemetry::logging::{LoggingOptions, DEFAULT_LOGGING_DIR, DEFAULT_OTLP_ENDPOINT};
use common_wal::config::raft_engine::RaftEngineConfig;
use common_wal::config::DatanodeWalConfig;
use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
@@ -32,6 +33,7 @@ use mito2::config::MitoConfig;
use servers::export_metrics::ExportMetricsOption;
use servers::grpc::GrpcOptions;
use servers::http::HttpOptions;
use store_api::path_utils::WAL_DIR;
#[allow(deprecated)]
#[test]
@@ -56,13 +58,18 @@ fn test_load_datanode_example_config() {
metadata_cache_tti: Duration::from_secs(300),
}),
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
dir: Some("./greptimedb_data/wal".to_string()),
dir: Some(
Path::new(DEFAULT_DATA_HOME)
.join(WAL_DIR)
.to_string_lossy()
.to_string(),
),
sync_period: Some(Duration::from_secs(10)),
recovery_parallelism: 2,
..Default::default()
}),
storage: StorageConfig {
data_home: "./greptimedb_data/".to_string(),
data_home: DEFAULT_DATA_HOME.to_string(),
..Default::default()
},
region_engine: vec![
@@ -79,6 +86,10 @@ fn test_load_datanode_example_config() {
],
logging: LoggingOptions {
level: Some("info".to_string()),
dir: Path::new(DEFAULT_DATA_HOME)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string(),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()
@@ -121,6 +132,10 @@ fn test_load_frontend_example_config() {
}),
logging: LoggingOptions {
level: Some("info".to_string()),
dir: Path::new(DEFAULT_DATA_HOME)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string(),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()
@@ -160,10 +175,13 @@ fn test_load_metasrv_example_config() {
let expected = GreptimeOptions::<MetasrvOptions> {
component: MetasrvOptions {
selector: SelectorType::default(),
data_home: "./greptimedb_data/metasrv/".to_string(),
data_home: DEFAULT_DATA_HOME.to_string(),
server_addr: "127.0.0.1:3002".to_string(),
logging: LoggingOptions {
dir: "./greptimedb_data/logs".to_string(),
dir: Path::new(DEFAULT_DATA_HOME)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string(),
level: Some("info".to_string()),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
@@ -198,7 +216,12 @@ fn test_load_standalone_example_config() {
component: StandaloneOptions {
default_timezone: Some("UTC".to_string()),
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
dir: Some("./greptimedb_data/wal".to_string()),
dir: Some(
Path::new(DEFAULT_DATA_HOME)
.join(WAL_DIR)
.to_string_lossy()
.to_string(),
),
sync_period: Some(Duration::from_secs(10)),
recovery_parallelism: 2,
..Default::default()
@@ -216,11 +239,15 @@ fn test_load_standalone_example_config() {
}),
],
storage: StorageConfig {
data_home: "./greptimedb_data/".to_string(),
data_home: DEFAULT_DATA_HOME.to_string(),
..Default::default()
},
logging: LoggingOptions {
level: Some("info".to_string()),
dir: Path::new(DEFAULT_DATA_HOME)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string(),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()

View File

@@ -26,6 +26,9 @@ pub fn metadata_store_dir(store_dir: &str) -> String {
format!("{store_dir}/metadata")
}
/// The default data home directory.
pub const DEFAULT_DATA_HOME: &str = "./greptimedb_data";
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct KvBackendConfig {

View File

@@ -0,0 +1,90 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_macro::admin_fn;
use common_query::error::{
InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::Signature;
use datafusion::logical_expr::Volatility;
use datatypes::value::{Value, ValueRef};
use session::context::QueryContextRef;
use snafu::ensure;
use store_api::storage::ConcreteDataType;
use crate::handlers::FlowServiceHandlerRef;
use crate::helper::parse_catalog_flow;
fn adjust_signature() -> Signature {
Signature::exact(
vec![
ConcreteDataType::string_datatype(), // flow name
ConcreteDataType::uint64_datatype(), // min_run_interval in seconds
ConcreteDataType::uint64_datatype(), // max filter number per query
],
Volatility::Immutable,
)
}
#[admin_fn(
name = AdjustFlowFunction,
display_name = adjust_flow,
sig_fn = adjust_signature,
ret = uint64
)]
pub(crate) async fn adjust_flow(
flow_service_handler: &FlowServiceHandlerRef,
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
params.len() == 3,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 3, have: {}",
params.len()
),
}
);
let (flow_name, min_run_interval, max_filter_num) = match (params[0], params[1], params[2]) {
(
ValueRef::String(flow_name),
ValueRef::UInt64(min_run_interval),
ValueRef::UInt64(max_filter_num),
) => (flow_name, min_run_interval, max_filter_num),
_ => {
return UnsupportedInputDataTypeSnafu {
function: "adjust_flow",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
}
};
let (catalog_name, flow_name) = parse_catalog_flow(flow_name, query_ctx)?;
let res = flow_service_handler
.adjust(
&catalog_name,
&flow_name,
min_run_interval,
max_filter_num as usize,
query_ctx.clone(),
)
.await?;
let affected_rows = res.affected_rows;
Ok(Value::from(affected_rows))
}

View File

@@ -26,6 +26,7 @@ use flush_compact_table::{CompactTableFunction, FlushTableFunction};
use migrate_region::MigrateRegionFunction;
use remove_region_follower::RemoveRegionFollowerFunction;
use crate::adjust_flow::AdjustFlowFunction;
use crate::flush_flow::FlushFlowFunction;
use crate::function_registry::FunctionRegistry;
@@ -43,5 +44,6 @@ impl AdminFunction {
registry.register_async(Arc::new(FlushTableFunction));
registry.register_async(Arc::new(CompactTableFunction));
registry.register_async(Arc::new(FlushFlowFunction));
registry.register_async(Arc::new(AdjustFlowFunction));
}
}

View File

@@ -12,21 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_error::ext::BoxedError;
use common_macro::admin_fn;
use common_query::error::{
ExecuteSnafu, InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result,
UnsupportedInputDataTypeSnafu,
InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::Signature;
use datafusion::logical_expr::Volatility;
use datatypes::value::{Value, ValueRef};
use session::context::QueryContextRef;
use snafu::{ensure, ResultExt};
use sql::parser::ParserContext;
use snafu::ensure;
use store_api::storage::ConcreteDataType;
use crate::handlers::FlowServiceHandlerRef;
use crate::helper::parse_catalog_flow;
fn flush_signature() -> Signature {
Signature::uniform(
@@ -47,20 +45,6 @@ pub(crate) async fn flush_flow(
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
let (catalog_name, flow_name) = parse_flush_flow(params, query_ctx)?;
let res = flow_service_handler
.flush(&catalog_name, &flow_name, query_ctx.clone())
.await?;
let affected_rows = res.affected_rows;
Ok(Value::from(affected_rows))
}
fn parse_flush_flow(
params: &[ValueRef<'_>],
query_ctx: &QueryContextRef,
) -> Result<(String, String)> {
ensure!(
params.len() == 1,
InvalidFuncArgsSnafu {
@@ -70,7 +54,6 @@ fn parse_flush_flow(
),
}
);
let ValueRef::String(flow_name) = params[0] else {
return UnsupportedInputDataTypeSnafu {
function: "flush_flow",
@@ -78,27 +61,14 @@ fn parse_flush_flow(
}
.fail();
};
let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect())
.map_err(BoxedError::new)
.context(ExecuteSnafu)?;
let (catalog_name, flow_name) = parse_catalog_flow(flow_name, query_ctx)?;
let (catalog_name, flow_name) = match &obj_name.0[..] {
[flow_name] => (
query_ctx.current_catalog().to_string(),
flow_name.value.clone(),
),
[catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
_ => {
return InvalidFuncArgsSnafu {
err_msg: format!(
"expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
obj_name
),
}
.fail()
}
};
Ok((catalog_name, flow_name))
let res = flow_service_handler
.flush(&catalog_name, &flow_name, query_ctx.clone())
.await?;
let affected_rows = res.affected_rows;
Ok(Value::from(affected_rows))
}
#[cfg(test)]
@@ -154,10 +124,7 @@ mod test {
("catalog.flow_name", ("catalog", "flow_name")),
];
for (input, expected) in testcases.iter() {
let args = vec![*input];
let args = args.into_iter().map(ValueRef::String).collect::<Vec<_>>();
let result = parse_flush_flow(&args, &QueryContext::arc()).unwrap();
let result = parse_catalog_flow(input, &QueryContext::arc()).unwrap();
assert_eq!(*expected, (result.0.as_str(), result.1.as_str()));
}
}

View File

@@ -87,6 +87,15 @@ pub trait FlowServiceHandler: Send + Sync {
flow: &str,
ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse>;
async fn adjust(
&self,
catalog: &str,
flow: &str,
min_run_interval_secs: u64,
max_filter_num_per_query: usize,
ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse>;
}
pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>;

View File

@@ -12,12 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_query::error::{InvalidInputTypeSnafu, Result};
use common_error::ext::BoxedError;
use common_query::error::{ExecuteSnafu, InvalidFuncArgsSnafu, InvalidInputTypeSnafu, Result};
use common_query::prelude::{Signature, TypeSignature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::types::cast::cast;
use datatypes::value::ValueRef;
use session::context::QueryContextRef;
use snafu::ResultExt;
use sql::parser::ParserContext;
/// Create a function signature with oneof signatures of interleaving two arguments.
pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>) -> Signature {
@@ -43,3 +46,30 @@ pub fn cast_u64(value: &ValueRef) -> Result<Option<u64>> {
})
.map(|v| v.as_u64())
}
pub fn parse_catalog_flow(
flow_name: &str,
query_ctx: &QueryContextRef,
) -> Result<(String, String)> {
let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect())
.map_err(BoxedError::new)
.context(ExecuteSnafu)?;
let (catalog_name, flow_name) = match &obj_name.0[..] {
[flow_name] => (
query_ctx.current_catalog().to_string(),
flow_name.value.clone(),
),
[catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
_ => {
return InvalidFuncArgsSnafu {
err_msg: format!(
"expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
obj_name
),
}
.fail()
}
};
Ok((catalog_name, flow_name))
}

View File

@@ -15,6 +15,7 @@
#![feature(let_chains)]
#![feature(try_blocks)]
mod adjust_flow;
mod admin;
mod flush_flow;
mod macros;

View File

@@ -148,6 +148,17 @@ impl FunctionState {
) -> Result<api::v1::flow::FlowResponse> {
todo!()
}
async fn adjust(
&self,
_catalog: &str,
_flow: &str,
_min_run_interval_secs: u64,
_max_filter_num_per_query: usize,
_ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse> {
todo!()
}
}
Self {

View File

@@ -64,6 +64,18 @@ impl Default for FlightEncoder {
}
impl FlightEncoder {
pub fn with_compression_disabled() -> Self {
let write_options = writer::IpcWriteOptions::default()
.try_with_compression(None)
.unwrap();
Self {
write_options,
data_gen: writer::IpcDataGenerator::default(),
dictionary_tracker: writer::DictionaryTracker::new(false),
}
}
pub fn encode(&mut self, flight_message: FlightMessage) -> FlightData {
match flight_message {
FlightMessage::Schema(schema) => SchemaAsIpc::new(&schema, &self.write_options).into(),

View File

@@ -37,6 +37,9 @@ use crate::tracing_sampler::{create_sampler, TracingSampleOptions};
pub const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
/// The default logs directory.
pub const DEFAULT_LOGGING_DIR: &str = "logs";
// Handle for reloading log level
pub static RELOAD_HANDLE: OnceCell<tracing_subscriber::reload::Handle<Targets, Registry>> =
OnceCell::new();
@@ -133,7 +136,8 @@ impl Eq for LoggingOptions {}
impl Default for LoggingOptions {
fn default() -> Self {
Self {
dir: "./greptimedb_data/logs".to_string(),
// The directory path will be configured at application startup, typically using the data home directory as a base.
dir: "".to_string(),
level: None,
log_format: LogFormat::Text,
enable_otlp_tracing: false,

View File

@@ -18,7 +18,7 @@ use core::time::Duration;
use common_base::readable_size::ReadableSize;
use common_base::secrets::{ExposeSecret, SecretString};
use common_config::Configurable;
use common_config::{Configurable, DEFAULT_DATA_HOME};
pub use common_procedure::options::ProcedureConfig;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
use common_wal::config::DatanodeWalConfig;
@@ -36,9 +36,6 @@ use servers::http::HttpOptions;
pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
/// Default data home in file storage
const DEFAULT_DATA_HOME: &str = "./greptimedb_data";
/// Object storage config
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type")]

View File

@@ -61,6 +61,7 @@ prost.workspace = true
query.workspace = true
rand.workspace = true
serde.workspace = true
serde_json.workspace = true
servers.workspace = true
session.workspace = true
smallvec.workspace = true

View File

@@ -18,7 +18,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use api::v1::flow::{
flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
flow_request, AdjustFlow, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
};
use api::v1::region::InsertRequests;
use catalog::CatalogManager;
@@ -32,6 +32,7 @@ use common_telemetry::{error, info, trace, warn};
use datatypes::value::Value;
use futures::TryStreamExt;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use session::context::QueryContextBuilder;
use snafu::{ensure, IntoError, OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId};
@@ -809,6 +810,25 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
..Default::default()
})
}
Some(flow_request::Body::Adjust(AdjustFlow { flow_id, options })) => {
#[derive(Debug, Serialize, Deserialize)]
struct Options {
min_run_interval_secs: u64,
max_filter_num_per_query: usize,
}
let options: Options = serde_json::from_str(&options).with_context(|_| {
common_meta::error::DeserializeFromJsonSnafu { input: options }
})?;
self.batching_engine
.adjust_flow(
flow_id.unwrap().id as u64,
options.min_run_interval_secs,
options.max_filter_num_per_query,
)
.await
.map_err(to_meta_err(snafu::location!()))?;
Ok(Default::default())
}
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
}
}
@@ -841,93 +861,6 @@ fn to_meta_err(
}
}
#[async_trait::async_trait]
impl common_meta::node_manager::Flownode for StreamingEngine {
async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
let query_ctx = request
.header
.and_then(|h| h.query_context)
.map(|ctx| ctx.into());
match request.body {
Some(flow_request::Body::Create(CreateRequest {
flow_id: Some(task_id),
source_table_ids,
sink_table_name: Some(sink_table_name),
create_if_not_exists,
expire_after,
comment,
sql,
flow_options,
or_replace,
})) => {
let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
let sink_table_name = [
sink_table_name.catalog_name,
sink_table_name.schema_name,
sink_table_name.table_name,
];
let expire_after = expire_after.map(|e| e.value);
let args = CreateFlowArgs {
flow_id: task_id.id as u64,
sink_table_name,
source_table_ids,
create_if_not_exists,
or_replace,
expire_after,
comment: Some(comment),
sql: sql.clone(),
flow_options,
query_ctx,
};
let ret = self
.create_flow(args)
.await
.map_err(BoxedError::new)
.with_context(|_| CreateFlowSnafu { sql: sql.clone() })
.map_err(to_meta_err(snafu::location!()))?;
METRIC_FLOW_TASK_COUNT.inc();
Ok(FlowResponse {
affected_flows: ret
.map(|id| greptime_proto::v1::FlowId { id: id as u32 })
.into_iter()
.collect_vec(),
..Default::default()
})
}
Some(flow_request::Body::Drop(DropRequest {
flow_id: Some(flow_id),
})) => {
self.remove_flow(flow_id.id as u64)
.await
.map_err(to_meta_err(snafu::location!()))?;
METRIC_FLOW_TASK_COUNT.dec();
Ok(Default::default())
}
Some(flow_request::Body::Flush(FlushFlow {
flow_id: Some(flow_id),
})) => {
let row = self
.flush_flow_inner(flow_id.id as u64)
.await
.map_err(to_meta_err(snafu::location!()))?;
Ok(FlowResponse {
affected_flows: vec![flow_id],
affected_rows: row as u64,
..Default::default()
})
}
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
}
}
async fn handle_inserts(&self, request: InsertRequests) -> MetaResult<FlowResponse> {
self.handle_inserts_inner(request)
.await
.map(|_| Default::default())
.map_err(to_meta_err(snafu::location!()))
}
}
impl FlowEngine for StreamingEngine {
async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
self.create_flow_inner(args).await

View File

@@ -388,6 +388,20 @@ impl BatchingEngine {
pub async fn flow_exist_inner(&self, flow_id: FlowId) -> bool {
self.tasks.read().await.contains_key(&flow_id)
}
pub async fn adjust_flow(
&self,
flow_id: FlowId,
min_run_interval_secs: u64,
max_filter_num_per_query: usize,
) -> Result<(), Error> {
let task = self.tasks.read().await.get(&flow_id).cloned();
let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?;
debug!("Adjusting flow {flow_id} with min_run_interval_secs={} and max_filter_num_per_query={}", min_run_interval_secs, max_filter_num_per_query);
task.adjust(min_run_interval_secs, max_filter_num_per_query);
Ok(())
}
}
impl FlowEngine for BatchingEngine {

View File

@@ -14,8 +14,9 @@
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
use std::sync::{Arc, Weak};
use std::time::SystemTime;
use std::collections::HashMap;
use std::sync::{Arc, Mutex, Weak};
use std::time::{Duration, Instant, SystemTime};
use api::v1::greptime_request::Request;
use api::v1::CreateTableExpr;
@@ -26,20 +27,21 @@ use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
use common_meta::peer::Peer;
use common_meta::rpc::store::RangeRequest;
use common_query::Output;
use common_telemetry::warn;
use common_telemetry::{debug, warn};
use itertools::Itertools;
use meta_client::client::MetaClient;
use rand::rng;
use rand::seq::SliceRandom;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use crate::batching_mode::task::BatchingTask;
use crate::batching_mode::{
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT,
GRPC_MAX_RETRIES,
};
use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
use crate::{Error, FlowAuthHeader};
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD;
use crate::{Error, FlowAuthHeader, FlowId};
/// Just like [`GrpcQueryHandler`] but use BoxedError
///
@@ -74,6 +76,105 @@ impl<
type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
/// Statistics about running query on this frontend from flownode
#[derive(Debug, Default, Clone)]
struct FrontendStat {
/// The query for flow id has been running since this timestamp
since: HashMap<FlowId, Instant>,
/// The average query time for each flow id
/// This is used to calculate the average query time for each flow id
past_query_avg: HashMap<FlowId, (usize, Duration)>,
}
#[derive(Debug, Default, Clone)]
pub struct FrontendStats {
/// The statistics for each flow id
stats: Arc<Mutex<HashMap<String, FrontendStat>>>,
}
impl FrontendStats {
pub fn observe(&self, frontend_addr: &str, flow_id: FlowId) -> FrontendStatsGuard {
let mut stats = self.stats.lock().expect("Failed to lock frontend stats");
let stat = stats.entry(frontend_addr.to_string()).or_default();
stat.since.insert(flow_id, Instant::now());
FrontendStatsGuard {
stats: self.stats.clone(),
frontend_addr: frontend_addr.to_string(),
cur: flow_id,
}
}
/// return frontend addrs sorted by load, from lightest to heaviest
/// The load is calculated as the total average query time for each flow id plus running query's total running time elapsed
pub fn sort_by_load(&self) -> Vec<String> {
let stats = self.stats.lock().expect("Failed to lock frontend stats");
let fe_load_factor = stats
.iter()
.map(|(node_addr, stat)| {
// total expected avg running time for all currently running queries
let total_expect_avg_run_time = stat
.since
.keys()
.map(|f| {
let (count, total_duration) =
stat.past_query_avg.get(f).unwrap_or(&(0, Duration::ZERO));
if *count == 0 {
0.0
} else {
total_duration.as_secs_f64() / *count as f64
}
})
.sum::<f64>();
let total_cur_running_time = stat
.since
.values()
.map(|since| since.elapsed().as_secs_f64())
.sum::<f64>();
(
node_addr.to_string(),
total_expect_avg_run_time + total_cur_running_time,
)
})
.sorted_by(|(_, load_a), (_, load_b)| {
load_a
.partial_cmp(load_b)
.unwrap_or(std::cmp::Ordering::Equal)
})
.collect::<Vec<_>>();
debug!("Frontend load factor: {:?}", fe_load_factor);
for (node_addr, load) in &fe_load_factor {
METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD
.with_label_values(&[&node_addr.to_string()])
.observe(*load);
}
fe_load_factor
.into_iter()
.map(|(addr, _)| addr)
.collect::<Vec<_>>()
}
}
pub struct FrontendStatsGuard {
stats: Arc<Mutex<HashMap<String, FrontendStat>>>,
frontend_addr: String,
cur: FlowId,
}
impl Drop for FrontendStatsGuard {
fn drop(&mut self) {
let mut stats = self.stats.lock().expect("Failed to lock frontend stats");
if let Some(stat) = stats.get_mut(&self.frontend_addr) {
if let Some(since) = stat.since.remove(&self.cur) {
let elapsed = since.elapsed();
let (count, total_duration) = stat.past_query_avg.entry(self.cur).or_default();
*count += 1;
*total_duration += elapsed;
}
}
}
}
/// A simple frontend client able to execute sql using grpc protocol
///
/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
@@ -83,6 +184,7 @@ pub enum FrontendClient {
meta_client: Arc<MetaClient>,
chnl_mgr: ChannelManager,
auth: Option<FlowAuthHeader>,
fe_stats: FrontendStats,
},
Standalone {
/// for the sake of simplicity still use grpc even in standalone mode
@@ -114,6 +216,7 @@ impl FrontendClient {
ChannelManager::with_config(cfg)
},
auth,
fe_stats: Default::default(),
}
}
@@ -183,7 +286,7 @@ impl FrontendClient {
/// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts`
/// and is able to process query
async fn get_random_active_frontend(
pub(crate) async fn get_random_active_frontend(
&self,
catalog: &str,
schema: &str,
@@ -192,6 +295,7 @@ impl FrontendClient {
meta_client: _,
chnl_mgr,
auth,
fe_stats,
} = self
else {
return UnexpectedSnafu {
@@ -208,8 +312,21 @@ impl FrontendClient {
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
// shuffle the frontends to avoid always pick the same one
frontends.shuffle(&mut rng());
let node_addrs_by_load = fe_stats.sort_by_load();
// index+1 to load order asc, so that the lightest node has load 1 and non-existent node has load 0
let addr2load = node_addrs_by_load
.iter()
.enumerate()
.map(|(i, id)| (id.clone(), i + 1))
.collect::<HashMap<_, _>>();
// sort frontends by load, from lightest to heaviest
frontends.sort_by(|(_, a), (_, b)| {
// if not even in stats, treat as 0 load since never been queried
let load_a = addr2load.get(&a.peer.addr).unwrap_or(&0);
let load_b = addr2load.get(&b.peer.addr).unwrap_or(&0);
load_a.cmp(load_b)
});
debug!("Frontend nodes sorted by load: {:?}", frontends);
// found node with maximum last_activity_ts
for (_, node_info) in frontends
@@ -257,6 +374,7 @@ impl FrontendClient {
create: CreateTableExpr,
catalog: &str,
schema: &str,
task: Option<&BatchingTask>,
) -> Result<u32, Error> {
self.handle(
Request::Ddl(api::v1::DdlRequest {
@@ -264,7 +382,8 @@ impl FrontendClient {
}),
catalog,
schema,
&mut None,
None,
task,
)
.await
}
@@ -275,15 +394,31 @@ impl FrontendClient {
req: api::v1::greptime_request::Request,
catalog: &str,
schema: &str,
peer_desc: &mut Option<PeerDesc>,
use_peer: Option<Peer>,
task: Option<&BatchingTask>,
) -> Result<u32, Error> {
match self {
FrontendClient::Distributed { .. } => {
let db = self.get_random_active_frontend(catalog, schema).await?;
FrontendClient::Distributed {
fe_stats, chnl_mgr, ..
} => {
let db = if let Some(peer) = use_peer {
DatabaseWithPeer::new(
Database::new(
catalog,
schema,
Client::with_manager_and_urls(
chnl_mgr.clone(),
vec![peer.addr.clone()],
),
),
peer,
)
} else {
self.get_random_active_frontend(catalog, schema).await?
};
*peer_desc = Some(PeerDesc::Dist {
peer: db.peer.clone(),
});
let flow_id = task.map(|t| t.config.flow_id).unwrap_or_default();
let _guard = fe_stats.observe(&db.peer.addr, flow_id);
db.database
.handle_with_retry(req.clone(), GRPC_MAX_RETRIES)

View File

@@ -32,6 +32,7 @@ use crate::batching_mode::MIN_REFRESH_DURATION;
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT,
};
use crate::{Error, FlowId};
@@ -52,6 +53,13 @@ pub struct TaskState {
pub(crate) shutdown_rx: oneshot::Receiver<()>,
/// Task handle
pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
/// Slow Query metrics update task handle
pub(crate) slow_query_metric_task: Option<tokio::task::JoinHandle<()>>,
/// min run interval in seconds
pub(crate) min_run_interval: Option<u64>,
/// max filter number per query
pub(crate) max_filter_num: Option<usize>,
}
impl TaskState {
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
@@ -63,6 +71,9 @@ impl TaskState {
exec_state: ExecState::Idle,
shutdown_rx,
task_handle: None,
slow_query_metric_task: None,
min_run_interval: None,
max_filter_num: None,
}
}
@@ -87,20 +98,17 @@ impl TaskState {
pub fn get_next_start_query_time(
&self,
flow_id: FlowId,
time_window_size: &Option<Duration>,
_time_window_size: &Option<Duration>,
max_timeout: Option<Duration>,
) -> Instant {
let last_duration = max_timeout
let next_duration = max_timeout
.unwrap_or(self.last_query_duration)
.min(self.last_query_duration)
.max(MIN_REFRESH_DURATION);
let next_duration = time_window_size
.map(|t| {
let half = t / 2;
half.max(last_duration)
})
.unwrap_or(last_duration);
.max(
self.min_run_interval
.map(Duration::from_secs)
.unwrap_or(MIN_REFRESH_DURATION),
);
// if have dirty time window, execute immediately to clean dirty time window
if self.dirty_time_windows.windows.is_empty() {
@@ -243,9 +251,19 @@ impl DirtyTimeWindows {
};
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
.with_label_values(&[flow_id.to_string().as_str()])
.with_label_values(&[
flow_id.to_string().as_str(),
format!("{}", window_size).as_str(),
])
.observe(first_nth.len() as f64);
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT
.with_label_values(&[
flow_id.to_string().as_str(),
format!("{}", window_size).as_str(),
])
.observe(self.windows.len() as f64);
let full_time_range = first_nth
.iter()
.fold(chrono::Duration::zero(), |acc, (start, end)| {
@@ -257,7 +275,10 @@ impl DirtyTimeWindows {
})
.num_seconds() as f64;
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE
.with_label_values(&[flow_id.to_string().as_str()])
.with_label_values(&[
flow_id.to_string().as_str(),
format!("{}", window_size).as_str(),
])
.observe(full_time_range);
let mut expr_lst = vec![];

View File

@@ -61,7 +61,8 @@ use crate::error::{
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
};
use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT,
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
};
use crate::{Error, FlowId};
@@ -81,6 +82,14 @@ pub struct TaskConfig {
query_type: QueryType,
}
impl TaskConfig {
pub fn time_window_size(&self) -> Option<Duration> {
self.time_window_expr
.as_ref()
.and_then(|expr| *expr.time_window_size())
}
}
fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
let stmts =
ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default())
@@ -144,6 +153,12 @@ impl BatchingTask {
})
}
pub fn adjust(&self, min_run_interval_secs: u64, max_filter_num_per_query: usize) {
let mut state = self.state.write().unwrap();
state.min_run_interval = Some(min_run_interval_secs);
state.max_filter_num = Some(max_filter_num_per_query);
}
/// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
///
/// useful for flush_flow to flush dirty time windows range
@@ -280,7 +295,7 @@ impl BatchingTask {
let catalog = &self.config.sink_table_name[0];
let schema = &self.config.sink_table_name[1];
frontend_client
.create(expr.clone(), catalog, schema)
.create(expr.clone(), catalog, schema, Some(self))
.await?;
Ok(())
}
@@ -328,11 +343,53 @@ impl BatchingTask {
})?;
let plan = expanded_plan;
let mut peer_desc = None;
let db = frontend_client
.get_random_active_frontend(catalog, schema)
.await?;
let peer_desc = db.peer.clone();
let (tx, mut rx) = oneshot::channel();
let peer_inner = peer_desc.clone();
let window_size_pretty = format!(
"{}s",
self.config.time_window_size().unwrap_or_default().as_secs()
);
let inner_window_size_pretty = window_size_pretty.clone();
let flow_id = self.config.flow_id;
let slow_query_metric_task = tokio::task::spawn(async move {
tokio::time::sleep(SLOW_QUERY_THRESHOLD).await;
METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT
.with_label_values(&[
flow_id.to_string().as_str(),
&peer_inner.to_string(),
inner_window_size_pretty.as_str(),
])
.add(1.0);
while rx.try_recv() == Err(TryRecvError::Empty) {
// sleep for a while before next update
tokio::time::sleep(MIN_REFRESH_DURATION).await;
}
METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT
.with_label_values(&[
flow_id.to_string().as_str(),
&peer_inner.to_string(),
inner_window_size_pretty.as_str(),
])
.sub(1.0);
});
self.state.write().unwrap().slow_query_metric_task = Some(slow_query_metric_task);
let res = {
let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
.with_label_values(&[flow_id.to_string().as_str()])
.with_label_values(&[
flow_id.to_string().as_str(),
format!(
"{}s",
self.config.time_window_size().unwrap_or_default().as_secs()
)
.as_str(),
])
.start_timer();
// hack and special handling the insert logical plan
@@ -361,10 +418,12 @@ impl BatchingTask {
};
frontend_client
.handle(req, catalog, schema, &mut peer_desc)
.handle(req, catalog, schema, Some(db.peer), Some(self))
.await
};
// signaling the slow query metric task to stop
let _ = tx.send(());
let elapsed = instant.elapsed();
if let Ok(affected_rows) = &res {
debug!(
@@ -387,7 +446,12 @@ impl BatchingTask {
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
.with_label_values(&[
flow_id.to_string().as_str(),
&peer_desc.unwrap_or_default().to_string(),
&peer_desc.to_string(),
format!(
"{}s",
self.config.time_window_size().unwrap_or_default().as_secs()
)
.as_str(),
])
.observe(elapsed.as_secs_f64());
}
@@ -580,19 +644,20 @@ impl BatchingTask {
),
})?;
let expr = self
.state
.write()
.unwrap()
.dirty_time_windows
.gen_filter_exprs(
let expr = {
let mut state = self.state.write().unwrap();
let max_window_cnt = state
.max_filter_num
.unwrap_or(DirtyTimeWindows::MAX_FILTER_NUM);
state.dirty_time_windows.gen_filter_exprs(
&col_name,
Some(l),
window_size,
DirtyTimeWindows::MAX_FILTER_NUM,
max_window_cnt,
self.config.flow_id,
Some(self),
)?;
)?
};
debug!(
"Flow id={:?}, Generated filter expr: {:?}",

View File

@@ -31,22 +31,37 @@ lazy_static! {
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME: HistogramVec = register_histogram_vec!(
"greptime_flow_batching_engine_query_time_secs",
"flow batching engine query time(seconds)",
&["flow_id"],
&["flow_id", "time_window_granularity"],
vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!(
"greptime_flow_batching_engine_slow_query_secs",
"flow batching engine slow query(seconds)",
&["flow_id", "peer"],
"flow batching engine slow query(seconds), updated after query finished",
&["flow_id", "peer", "time_window_granularity"],
vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT: GaugeVec =
register_gauge_vec!(
"greptime_flow_batching_engine_real_time_slow_query_number",
"flow batching engine real time slow query number, updated in real time",
&["flow_id", "peer", "time_window_granularity"],
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT: HistogramVec =
register_histogram_vec!(
"greptime_flow_batching_engine_stalled_query_window_cnt",
"flow batching engine stalled query time window count",
&["flow_id", "time_window_granularity"],
vec![0.0, 5., 10., 20., 40.]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT: HistogramVec =
register_histogram_vec!(
"greptime_flow_batching_engine_query_window_cnt",
"flow batching engine query time window count",
&["flow_id"],
&["flow_id", "time_window_granularity"],
vec![0.0, 5., 10., 20., 40.]
)
.unwrap();
@@ -54,7 +69,15 @@ lazy_static! {
register_histogram_vec!(
"greptime_flow_batching_engine_query_time_range_secs",
"flow batching engine query time range(seconds)",
&["flow_id"],
&["flow_id", "time_window_granularity"],
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD: HistogramVec =
register_histogram_vec!(
"greptime_flow_batching_engine_guess_fe_load",
"flow batching engine guessed frontend load",
&["fe_addr"],
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
)
.unwrap();

View File

@@ -22,7 +22,7 @@ use std::time::Duration;
use clap::ValueEnum;
use common_base::readable_size::ReadableSize;
use common_base::Plugins;
use common_config::Configurable;
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::ProcedureExecutorRef;
@@ -73,7 +73,7 @@ use crate::state::{become_follower, become_leader, StateRef};
pub const TABLE_ID_SEQ: &str = "table_id";
pub const FLOW_ID_SEQ: &str = "flow_id";
pub const METASRV_HOME: &str = "./greptimedb_data/metasrv";
pub const METASRV_DATA_DIR: &str = "metasrv";
// The datastores that implements metadata kvbackend.
#[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)]
@@ -217,10 +217,7 @@ impl Default for MetasrvOptions {
enable_region_failover: false,
allow_region_failover_on_local_wal: false,
http: HttpOptions::default(),
logging: LoggingOptions {
dir: format!("{METASRV_HOME}/logs"),
..Default::default()
},
logging: LoggingOptions::default(),
procedure: ProcedureConfig {
max_retry_times: 12,
retry_delay: Duration::from_millis(500),
@@ -232,7 +229,7 @@ impl Default for MetasrvOptions {
failure_detector: PhiAccrualFailureDetectorOptions::default(),
datanode: DatanodeClientOptions::default(),
enable_telemetry: true,
data_home: METASRV_HOME.to_string(),
data_home: DEFAULT_DATA_HOME.to_string(),
wal: MetasrvWalConfig::default(),
export_metrics: ExportMetricsOption::default(),
store_key_prefix: String::new(),

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::Path;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex, RwLock};
@@ -55,7 +56,7 @@ use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers};
use crate::lease::MetaPeerLookupService;
use crate::metasrv::{
ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, RegionStatAwareSelectorRef, SelectTarget,
SelectorContext, SelectorRef, FLOW_ID_SEQ, TABLE_ID_SEQ,
SelectorContext, SelectorRef, FLOW_ID_SEQ, METASRV_DATA_DIR, TABLE_ID_SEQ,
};
use crate::procedure::region_migration::manager::RegionMigrationManager;
use crate::procedure::region_migration::DefaultContextFactory;
@@ -436,7 +437,10 @@ impl MetasrvBuilder {
};
let enable_telemetry = options.enable_telemetry;
let metasrv_home = options.data_home.to_string();
let metasrv_home = Path::new(&options.data_home)
.join(METASRV_DATA_DIR)
.to_string_lossy()
.to_string();
Ok(Metasrv {
state,

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::flow::FlowRequestHeader;
use api::v1::flow::{AdjustFlow, FlowRequestHeader};
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_function::handlers::FlowServiceHandler;
@@ -22,6 +22,7 @@ use common_query::error::Result;
use common_telemetry::tracing_context::TracingContext;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use serde_json::json;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
@@ -57,9 +58,96 @@ impl FlowServiceHandler for FlowServiceOperator {
) -> Result<api::v1::flow::FlowResponse> {
self.flush_inner(catalog, flow, ctx).await
}
async fn adjust(
&self,
catalog: &str,
flow: &str,
min_run_interval_secs: u64,
max_filter_num_per_query: usize,
ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse> {
self.adjust_inner(
catalog,
flow,
min_run_interval_secs,
max_filter_num_per_query,
ctx,
)
.await
}
}
impl FlowServiceOperator {
async fn adjust_inner(
&self,
catalog: &str,
flow: &str,
min_run_interval_secs: u64,
max_filter_num_per_query: usize,
ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse> {
let id = self
.flow_metadata_manager
.flow_name_manager()
.get(catalog, flow)
.await
.map_err(BoxedError::new)
.context(common_query::error::ExecuteSnafu)?
.context(common_meta::error::FlowNotFoundSnafu {
flow_name: format!("{}.{}", catalog, flow),
})
.map_err(BoxedError::new)
.context(common_query::error::ExecuteSnafu)?
.flow_id();
let all_flownode_peers = self
.flow_metadata_manager
.flow_route_manager()
.routes(id)
.await
.map_err(BoxedError::new)
.context(common_query::error::ExecuteSnafu)?;
// order of flownodes doesn't matter here
let all_flow_nodes = FuturesUnordered::from_iter(
all_flownode_peers
.iter()
.map(|(_key, peer)| self.node_manager.flownode(peer.peer())),
)
.collect::<Vec<_>>()
.await;
// TODO(discord9): use proper type for flow options
let options = json!({
"min_run_interval_secs": min_run_interval_secs,
"max_filter_num_per_query": max_filter_num_per_query,
});
for node in all_flow_nodes {
let _res = {
use api::v1::flow::{flow_request, FlowRequest};
let flush_req = FlowRequest {
header: Some(FlowRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
query_context: Some(
common_meta::rpc::ddl::QueryContext::from(ctx.clone()).into(),
),
}),
body: Some(flow_request::Body::Adjust(AdjustFlow {
flow_id: Some(api::v1::FlowId { id }),
options: options.to_string(),
})),
};
node.handle(flush_req)
.await
.map_err(BoxedError::new)
.context(common_query::error::ExecuteSnafu)?
};
}
Ok(Default::default())
}
/// Flush the flownodes according to the flow id.
async fn flush_inner(
&self,

View File

@@ -52,7 +52,7 @@ impl FlightRecordBatchStream {
rx,
join_handle,
done: false,
encoder: FlightEncoder::default(),
encoder: FlightEncoder::with_compression_disabled(),
}
}

View File

@@ -18,7 +18,7 @@
use crate::storage::{RegionId, RegionNumber, TableId};
/// WAL dir for local file storage
pub const WAL_DIR: &str = "wal/";
pub const WAL_DIR: &str = "wal";
/// Data dir for table engines
pub const DATA_DIR: &str = "data/";

View File

@@ -139,7 +139,8 @@ mod test {
let schema = record_batches[0].schema.arrow_schema().clone();
let stream = futures::stream::once(async move {
let mut schema_data = FlightEncoder::default().encode(FlightMessage::Schema(schema));
let mut schema_data =
FlightEncoder::with_compression_disabled().encode(FlightMessage::Schema(schema));
let metadata = DoPutMetadata::new(0);
schema_data.app_metadata = serde_json::to_vec(&metadata).unwrap().into();
// first message in "DoPut" stream should carry table name in flight descriptor
@@ -154,7 +155,7 @@ mod test {
tokio_stream::iter(record_batches)
.enumerate()
.map(|(i, x)| {
let mut encoder = FlightEncoder::default();
let mut encoder = FlightEncoder::with_compression_disabled();
let message = FlightMessage::RecordBatch(x.into_df_record_batch());
let mut data = encoder.encode(message);
let metadata = DoPutMetadata::new((i + 1) as i64);