mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
Compare commits
14 Commits
flow/lb_fe
...
flow/add_a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0443042f14 | ||
|
|
1786190235 | ||
|
|
705b2007cf | ||
|
|
8d2c1b7f6a | ||
|
|
c50e84095e | ||
|
|
d3d233257d | ||
|
|
fdf32a8f46 | ||
|
|
69870e2762 | ||
|
|
f9f4ac1dca | ||
|
|
99e56af98c | ||
|
|
538b5abaae | ||
|
|
a2b3ad77df | ||
|
|
0eb9e97f79 | ||
|
|
06b1627da5 |
@@ -64,11 +64,11 @@ inputs:
|
||||
upload-max-retry-times:
|
||||
description: Max retry times for uploading artifacts to S3
|
||||
required: false
|
||||
default: "20"
|
||||
default: "30"
|
||||
upload-retry-timeout:
|
||||
description: Timeout for uploading artifacts to S3
|
||||
required: false
|
||||
default: "30" # minutes
|
||||
default: "120" # minutes
|
||||
runs:
|
||||
using: composite
|
||||
steps:
|
||||
|
||||
34
.github/workflows/release.yml
vendored
34
.github/workflows/release.yml
vendored
@@ -441,8 +441,8 @@ jobs:
|
||||
aws-region: ${{ vars.EC2_RUNNER_REGION }}
|
||||
github-token: ${{ secrets.GH_PERSONAL_ACCESS_TOKEN }}
|
||||
|
||||
bump-doc-version:
|
||||
name: Bump doc version
|
||||
bump-downstream-repo-versions:
|
||||
name: Bump downstream repo versions
|
||||
if: ${{ github.event_name == 'push' || github.event_name == 'schedule' }}
|
||||
needs: [allocate-runners, publish-github-release]
|
||||
runs-on: ubuntu-latest
|
||||
@@ -456,36 +456,16 @@ jobs:
|
||||
fetch-depth: 0
|
||||
persist-credentials: false
|
||||
- uses: ./.github/actions/setup-cyborg
|
||||
- name: Bump doc version
|
||||
- name: Bump downstream repo versions
|
||||
working-directory: cyborg
|
||||
run: pnpm tsx bin/bump-doc-version.ts
|
||||
env:
|
||||
VERSION: ${{ needs.allocate-runners.outputs.version }}
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
DOCS_REPO_TOKEN: ${{ secrets.DOCS_REPO_TOKEN }}
|
||||
|
||||
bump-website-version:
|
||||
name: Bump website version
|
||||
if: ${{ github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
|
||||
needs: [allocate-runners, publish-github-release]
|
||||
runs-on: ubuntu-latest
|
||||
# Permission reference: https://docs.github.com/en/actions/using-jobs/assigning-permissions-to-jobs
|
||||
permissions:
|
||||
issues: write # Allows the action to create issues for cyborg.
|
||||
contents: write # Allows the action to create a release.
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
persist-credentials: false
|
||||
- uses: ./.github/actions/setup-cyborg
|
||||
- name: Bump website version
|
||||
working-directory: cyborg
|
||||
run: pnpm tsx bin/bump-website-version.ts
|
||||
run: pnpm tsx bin/bump-versions.ts
|
||||
env:
|
||||
TARGET_REPOS: website,docs,demo
|
||||
VERSION: ${{ needs.allocate-runners.outputs.version }}
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
WEBSITE_REPO_TOKEN: ${{ secrets.WEBSITE_REPO_TOKEN }}
|
||||
DOCS_REPO_TOKEN: ${{ secrets.DOCS_REPO_TOKEN }}
|
||||
DEMO_REPO_TOKEN: ${{ secrets.DEMO_REPO_TOKEN }}
|
||||
|
||||
bump-helm-charts-version:
|
||||
name: Bump helm charts version
|
||||
|
||||
780
Cargo.lock
generated
780
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -30,6 +30,7 @@ members = [
|
||||
"src/common/recordbatch",
|
||||
"src/common/runtime",
|
||||
"src/common/session",
|
||||
"src/common/stat",
|
||||
"src/common/substrait",
|
||||
"src/common/telemetry",
|
||||
"src/common/test-util",
|
||||
@@ -132,7 +133,7 @@ etcd-client = "0.14"
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "454c52634c3bac27de10bf0d85d5533eed1cf03f" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2dca1dc67862d7b410838aef81232274c019b3f6" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
@@ -148,6 +149,7 @@ meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev =
|
||||
mockall = "0.13"
|
||||
moka = "0.12"
|
||||
nalgebra = "0.33"
|
||||
nix = { version = "0.30.1", default-features = false, features = ["event", "fs", "process"] }
|
||||
notify = "8.0"
|
||||
num_cpus = "1.16"
|
||||
object_store_opendal = "0.50"
|
||||
@@ -287,6 +289,7 @@ query = { path = "src/query" }
|
||||
servers = { path = "src/servers" }
|
||||
session = { path = "src/session" }
|
||||
sql = { path = "src/sql" }
|
||||
stat = { path = "src/common/stat" }
|
||||
store-api = { path = "src/store-api" }
|
||||
substrait = { path = "src/common/substrait" }
|
||||
table = { path = "src/table" }
|
||||
|
||||
@@ -100,7 +100,7 @@
|
||||
| `query` | -- | -- | The query engine options. |
|
||||
| `query.parallelism` | Integer | `0` | Parallelism of the query engine.<br/>Default to 0, which means the number of CPU cores. |
|
||||
| `storage` | -- | -- | The data storage options. |
|
||||
| `storage.data_home` | String | `./greptimedb_data/` | The working home directory. |
|
||||
| `storage.data_home` | String | `./greptimedb_data` | The working home directory. |
|
||||
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
|
||||
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
|
||||
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
|
||||
@@ -314,7 +314,7 @@
|
||||
|
||||
| Key | Type | Default | Descriptions |
|
||||
| --- | -----| ------- | ----------- |
|
||||
| `data_home` | String | `./greptimedb_data/metasrv/` | The working home directory. |
|
||||
| `data_home` | String | `./greptimedb_data` | The working home directory. |
|
||||
| `bind_addr` | String | `127.0.0.1:3002` | The bind address of metasrv. |
|
||||
| `server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `bind_addr`. |
|
||||
| `store_addrs` | Array | -- | Store server address default to etcd store.<br/>For postgres store, the format is:<br/>"password=password dbname=postgres user=postgres host=localhost port=5432"<br/>For etcd store, the format is:<br/>"127.0.0.1:2379" |
|
||||
@@ -446,7 +446,7 @@
|
||||
| `query` | -- | -- | The query engine options. |
|
||||
| `query.parallelism` | Integer | `0` | Parallelism of the query engine.<br/>Default to 0, which means the number of CPU cores. |
|
||||
| `storage` | -- | -- | The data storage options. |
|
||||
| `storage.data_home` | String | `./greptimedb_data/` | The working home directory. |
|
||||
| `storage.data_home` | String | `./greptimedb_data` | The working home directory. |
|
||||
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
|
||||
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
|
||||
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
|
||||
|
||||
@@ -252,7 +252,7 @@ parallelism = 0
|
||||
## The data storage options.
|
||||
[storage]
|
||||
## The working home directory.
|
||||
data_home = "./greptimedb_data/"
|
||||
data_home = "./greptimedb_data"
|
||||
|
||||
## The storage type used to store the data.
|
||||
## - `File`: the data is stored in the local file system.
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
## The working home directory.
|
||||
data_home = "./greptimedb_data/metasrv/"
|
||||
data_home = "./greptimedb_data"
|
||||
|
||||
## The bind address of metasrv.
|
||||
bind_addr = "127.0.0.1:3002"
|
||||
|
||||
@@ -350,7 +350,7 @@ parallelism = 0
|
||||
## The data storage options.
|
||||
[storage]
|
||||
## The working home directory.
|
||||
data_home = "./greptimedb_data/"
|
||||
data_home = "./greptimedb_data"
|
||||
|
||||
## The storage type used to store the data.
|
||||
## - `File`: the data is stored in the local file system.
|
||||
|
||||
@@ -1,75 +0,0 @@
|
||||
/*
|
||||
* Copyright 2023 Greptime Team
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import * as core from "@actions/core";
|
||||
import {obtainClient} from "@/common";
|
||||
|
||||
async function triggerWorkflow(workflowId: string, version: string) {
|
||||
const docsClient = obtainClient("DOCS_REPO_TOKEN")
|
||||
try {
|
||||
await docsClient.rest.actions.createWorkflowDispatch({
|
||||
owner: "GreptimeTeam",
|
||||
repo: "docs",
|
||||
workflow_id: workflowId,
|
||||
ref: "main",
|
||||
inputs: {
|
||||
version,
|
||||
},
|
||||
});
|
||||
console.log(`Successfully triggered ${workflowId} workflow with version ${version}`);
|
||||
} catch (error) {
|
||||
core.setFailed(`Failed to trigger workflow: ${error.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
function determineWorkflow(version: string): [string, string] {
|
||||
// Check if it's a nightly version
|
||||
if (version.includes('nightly')) {
|
||||
return ['bump-nightly-version.yml', version];
|
||||
}
|
||||
|
||||
const parts = version.split('.');
|
||||
|
||||
if (parts.length !== 3) {
|
||||
throw new Error('Invalid version format');
|
||||
}
|
||||
|
||||
// If patch version (last number) is 0, it's a major version
|
||||
// Return only major.minor version
|
||||
if (parts[2] === '0') {
|
||||
return ['bump-version.yml', `${parts[0]}.${parts[1]}`];
|
||||
}
|
||||
|
||||
// Otherwise it's a patch version, use full version
|
||||
return ['bump-patch-version.yml', version];
|
||||
}
|
||||
|
||||
const version = process.env.VERSION;
|
||||
if (!version) {
|
||||
core.setFailed("VERSION environment variable is required");
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
// Remove 'v' prefix if exists
|
||||
const cleanVersion = version.startsWith('v') ? version.slice(1) : version;
|
||||
|
||||
try {
|
||||
const [workflowId, apiVersion] = determineWorkflow(cleanVersion);
|
||||
triggerWorkflow(workflowId, apiVersion);
|
||||
} catch (error) {
|
||||
core.setFailed(`Error processing version: ${error.message}`);
|
||||
process.exit(1);
|
||||
}
|
||||
156
cyborg/bin/bump-versions.ts
Normal file
156
cyborg/bin/bump-versions.ts
Normal file
@@ -0,0 +1,156 @@
|
||||
/*
|
||||
* Copyright 2023 Greptime Team
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import * as core from "@actions/core";
|
||||
import {obtainClient} from "@/common";
|
||||
|
||||
interface RepoConfig {
|
||||
tokenEnv: string;
|
||||
repo: string;
|
||||
workflowLogic: (version: string) => [string, string] | null;
|
||||
}
|
||||
|
||||
const REPO_CONFIGS: Record<string, RepoConfig> = {
|
||||
website: {
|
||||
tokenEnv: "WEBSITE_REPO_TOKEN",
|
||||
repo: "website",
|
||||
workflowLogic: (version: string) => {
|
||||
// Skip nightly versions for website
|
||||
if (version.includes('nightly')) {
|
||||
console.log('Nightly version detected for website, skipping workflow trigger.');
|
||||
return null;
|
||||
}
|
||||
return ['bump-patch-version.yml', version];
|
||||
}
|
||||
},
|
||||
demo: {
|
||||
tokenEnv: "DEMO_REPO_TOKEN",
|
||||
repo: "demo-scene",
|
||||
workflowLogic: (version: string) => {
|
||||
// Skip nightly versions for demo
|
||||
if (version.includes('nightly')) {
|
||||
console.log('Nightly version detected for demo, skipping workflow trigger.');
|
||||
return null;
|
||||
}
|
||||
return ['bump-patch-version.yml', version];
|
||||
}
|
||||
},
|
||||
docs: {
|
||||
tokenEnv: "DOCS_REPO_TOKEN",
|
||||
repo: "docs",
|
||||
workflowLogic: (version: string) => {
|
||||
// Check if it's a nightly version
|
||||
if (version.includes('nightly')) {
|
||||
return ['bump-nightly-version.yml', version];
|
||||
}
|
||||
|
||||
const parts = version.split('.');
|
||||
if (parts.length !== 3) {
|
||||
throw new Error('Invalid version format');
|
||||
}
|
||||
|
||||
// If patch version (last number) is 0, it's a major version
|
||||
// Return only major.minor version
|
||||
if (parts[2] === '0') {
|
||||
return ['bump-version.yml', `${parts[0]}.${parts[1]}`];
|
||||
}
|
||||
|
||||
// Otherwise it's a patch version, use full version
|
||||
return ['bump-patch-version.yml', version];
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
async function triggerWorkflow(repoConfig: RepoConfig, workflowId: string, version: string) {
|
||||
const client = obtainClient(repoConfig.tokenEnv);
|
||||
try {
|
||||
await client.rest.actions.createWorkflowDispatch({
|
||||
owner: "GreptimeTeam",
|
||||
repo: repoConfig.repo,
|
||||
workflow_id: workflowId,
|
||||
ref: "main",
|
||||
inputs: {
|
||||
version,
|
||||
},
|
||||
});
|
||||
console.log(`Successfully triggered ${workflowId} workflow for ${repoConfig.repo} with version ${version}`);
|
||||
} catch (error) {
|
||||
core.setFailed(`Failed to trigger workflow for ${repoConfig.repo}: ${error.message}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function processRepo(repoName: string, version: string) {
|
||||
const repoConfig = REPO_CONFIGS[repoName];
|
||||
if (!repoConfig) {
|
||||
throw new Error(`Unknown repository: ${repoName}`);
|
||||
}
|
||||
|
||||
try {
|
||||
const workflowResult = repoConfig.workflowLogic(version);
|
||||
if (workflowResult === null) {
|
||||
// Skip this repo (e.g., nightly version for website)
|
||||
return;
|
||||
}
|
||||
|
||||
const [workflowId, apiVersion] = workflowResult;
|
||||
await triggerWorkflow(repoConfig, workflowId, apiVersion);
|
||||
} catch (error) {
|
||||
core.setFailed(`Error processing ${repoName} with version ${version}: ${error.message}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function main() {
|
||||
const version = process.env.VERSION;
|
||||
if (!version) {
|
||||
core.setFailed("VERSION environment variable is required");
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
// Remove 'v' prefix if exists
|
||||
const cleanVersion = version.startsWith('v') ? version.slice(1) : version;
|
||||
|
||||
// Get target repositories from environment variable
|
||||
// Default to both if not specified
|
||||
const targetRepos = process.env.TARGET_REPOS?.split(',').map(repo => repo.trim()) || ['website', 'docs'];
|
||||
|
||||
console.log(`Processing version ${cleanVersion} for repositories: ${targetRepos.join(', ')}`);
|
||||
|
||||
const errors: string[] = [];
|
||||
|
||||
// Process each repository
|
||||
for (const repo of targetRepos) {
|
||||
try {
|
||||
await processRepo(repo, cleanVersion);
|
||||
} catch (error) {
|
||||
errors.push(`${repo}: ${error.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
if (errors.length > 0) {
|
||||
core.setFailed(`Failed to process some repositories: ${errors.join('; ')}`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
console.log('All repositories processed successfully');
|
||||
}
|
||||
|
||||
// Execute main function
|
||||
main().catch((error) => {
|
||||
core.setFailed(`Unexpected error: ${error.message}`);
|
||||
process.exit(1);
|
||||
});
|
||||
@@ -1,57 +0,0 @@
|
||||
/*
|
||||
* Copyright 2023 Greptime Team
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import * as core from "@actions/core";
|
||||
import {obtainClient} from "@/common";
|
||||
|
||||
async function triggerWorkflow(workflowId: string, version: string) {
|
||||
const websiteClient = obtainClient("WEBSITE_REPO_TOKEN")
|
||||
try {
|
||||
await websiteClient.rest.actions.createWorkflowDispatch({
|
||||
owner: "GreptimeTeam",
|
||||
repo: "website",
|
||||
workflow_id: workflowId,
|
||||
ref: "main",
|
||||
inputs: {
|
||||
version,
|
||||
},
|
||||
});
|
||||
console.log(`Successfully triggered ${workflowId} workflow with version ${version}`);
|
||||
} catch (error) {
|
||||
core.setFailed(`Failed to trigger workflow: ${error.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
const version = process.env.VERSION;
|
||||
if (!version) {
|
||||
core.setFailed("VERSION environment variable is required");
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
// Remove 'v' prefix if exists
|
||||
const cleanVersion = version.startsWith('v') ? version.slice(1) : version;
|
||||
|
||||
if (cleanVersion.includes('nightly')) {
|
||||
console.log('Nightly version detected, skipping workflow trigger.');
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
try {
|
||||
triggerWorkflow('bump-patch-version.yml', cleanVersion);
|
||||
} catch (error) {
|
||||
core.setFailed(`Error processing version: ${error.message}`);
|
||||
process.exit(1);
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -60,7 +60,7 @@
|
||||
| Read Stage P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_read_stage_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))` | `timeseries` | Read Stage P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]` |
|
||||
| Write Stage P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_write_stage_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))` | `timeseries` | Write Stage P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]` |
|
||||
| Compaction OPS per Instance | `sum by(instance, pod) (rate(greptime_mito_compaction_total_elapsed_count{instance=~"$datanode"}[$__rate_interval]))` | `timeseries` | Compaction OPS per Instance. | `prometheus` | `ops` | `[{{ instance }}]-[{{pod}}]` |
|
||||
| Compaction Elapsed Time per Instance by Stage | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))`<br/>`sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{instance=~"$datanode"}[$__rate_interval]))/sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_count{instance=~"$datanode"}[$__rate_interval]))` | `timeseries` | Compaction latency by stage | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-p99` |
|
||||
| Compaction Elapsed Time per Instance by Stage | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))`<br/>`sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{instance=~"$datanode"}[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_count{instance=~"$datanode"}[$__rate_interval]))` | `timeseries` | Compaction latency by stage | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-p99` |
|
||||
| Compaction P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le,stage) (rate(greptime_mito_compaction_total_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))` | `timeseries` | Compaction P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-compaction` |
|
||||
| WAL write size | `histogram_quantile(0.95, sum by(le,instance, pod) (rate(raft_engine_write_size_bucket[$__rate_interval])))`<br/>`histogram_quantile(0.99, sum by(le,instance,pod) (rate(raft_engine_write_size_bucket[$__rate_interval])))`<br/>`sum by (instance, pod)(rate(raft_engine_write_size_sum[$__rate_interval]))` | `timeseries` | Write-ahead logs write size as bytes. This chart includes stats of p95 and p99 size by instance, total WAL write rate. | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-req-size-p95` |
|
||||
| Cached Bytes per Instance | `greptime_mito_cache_bytes{instance=~"$datanode"}` | `timeseries` | Cached Bytes per Instance. | `prometheus` | `decbytes` | `[{{instance}}]-[{{pod}}]-[{{type}}]` |
|
||||
@@ -69,7 +69,7 @@
|
||||
| Log Store op duration seconds | `histogram_quantile(0.99, sum by(le,logstore,optype,instance, pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))` | `timeseries` | Write-ahead log operations latency at p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{logstore}}]-[{{optype}}]-p99` |
|
||||
| Inflight Flush | `greptime_mito_inflight_flush_count` | `timeseries` | Ongoing flush task count | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]` |
|
||||
| Compaction Input/Output Bytes | `sum by(instance, pod) (greptime_mito_compaction_input_bytes)`<br/>`sum by(instance, pod) (greptime_mito_compaction_output_bytes)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-input` |
|
||||
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
|
||||
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
|
||||
| Region Worker Convert Requests | `histogram_quantile(0.95, sum by(le, instance, stage, pod) (rate(greptime_datanode_convert_region_request_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to decode requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
|
||||
# OpenDAL
|
||||
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||
@@ -88,9 +88,19 @@
|
||||
# Metasrv
|
||||
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||
| --- | --- | --- | --- | --- | --- | --- |
|
||||
| Region migration datanode | `greptime_meta_region_migration_stat{datanode_type="src"}`<br/>`greptime_meta_region_migration_stat{datanode_type="desc"}` | `state-timeline` | Counter of region migration by source and destination | `prometheus` | `none` | `from-datanode-{{datanode_id}}` |
|
||||
| Region migration error | `greptime_meta_region_migration_error` | `timeseries` | Counter of region migration error | `prometheus` | `none` | `__auto` |
|
||||
| Datanode load | `greptime_datanode_load` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `__auto` |
|
||||
| Region migration datanode | `greptime_meta_region_migration_stat{datanode_type="src"}`<br/>`greptime_meta_region_migration_stat{datanode_type="desc"}` | `status-history` | Counter of region migration by source and destination | `prometheus` | -- | `from-datanode-{{datanode_id}}` |
|
||||
| Region migration error | `greptime_meta_region_migration_error` | `timeseries` | Counter of region migration error | `prometheus` | `none` | `{{pod}}-{{state}}-{{error_type}}` |
|
||||
| Datanode load | `greptime_datanode_load` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `binBps` | `Datanode-{{datanode_id}}-writeload` |
|
||||
| Rate of SQL Executions (RDS) | `rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_count[$__rate_interval])` | `timeseries` | Displays the rate of SQL executions processed by the Meta service using the RDS backend. | `prometheus` | `none` | `{{pod}} {{op}} {{type}} {{result}} ` |
|
||||
| SQL Execution Latency (RDS) | `histogram_quantile(0.90, sum by(pod, op, type, result, le) (rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_bucket[$__rate_interval])))` | `timeseries` | Measures the response time of SQL executions via the RDS backend. | `prometheus` | `ms` | `{{pod}} {{op}} {{type}} {{result}} p90` |
|
||||
| Handler Execution Latency | `histogram_quantile(0.90, sum by(pod, le, name) (
|
||||
rate(greptime_meta_handler_execute_bucket[$__rate_interval])
|
||||
))` | `timeseries` | Shows latency of Meta handlers by pod and handler name, useful for monitoring handler performance and detecting latency spikes.<br/> | `prometheus` | `s` | `{{pod}} {{name}} p90` |
|
||||
| Heartbeat Packet Size | `histogram_quantile(0.9, sum by(pod, le) (greptime_meta_heartbeat_stat_memory_size_bucket))` | `timeseries` | Shows p90 heartbeat message sizes, helping track network usage and identify anomalies in heartbeat payload.<br/> | `prometheus` | `bytes` | `{{pod}}` |
|
||||
| Meta Heartbeat Receive Rate | `rate(greptime_meta_heartbeat_rate[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}` |
|
||||
| Meta KV Ops Latency | `histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}-{{op}} p99` |
|
||||
| Rate of meta KV Ops | `rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `{{pod}}-{{op}} p99` |
|
||||
| DDL Latency | `histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `CreateLogicalTables-{{step}} p90` |
|
||||
# Flownode
|
||||
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||
| --- | --- | --- | --- | --- | --- | --- |
|
||||
|
||||
@@ -497,7 +497,7 @@ groups:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-p99'
|
||||
- expr: sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{instance=~"$datanode"}[$__rate_interval]))/sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_count{instance=~"$datanode"}[$__rate_interval]))
|
||||
- expr: sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{instance=~"$datanode"}[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_count{instance=~"$datanode"}[$__rate_interval]))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
@@ -607,7 +607,7 @@ groups:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
|
||||
- expr: sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))
|
||||
- expr: sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
@@ -741,9 +741,8 @@ groups:
|
||||
- title: Metasrv
|
||||
panels:
|
||||
- title: Region migration datanode
|
||||
type: state-timeline
|
||||
type: status-history
|
||||
description: Counter of region migration by source and destination
|
||||
unit: none
|
||||
queries:
|
||||
- expr: greptime_meta_region_migration_stat{datanode_type="src"}
|
||||
datasource:
|
||||
@@ -764,17 +763,127 @@ groups:
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: __auto
|
||||
legendFormat: '{{pod}}-{{state}}-{{error_type}}'
|
||||
- title: Datanode load
|
||||
type: timeseries
|
||||
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
|
||||
unit: none
|
||||
unit: binBps
|
||||
queries:
|
||||
- expr: greptime_datanode_load
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: __auto
|
||||
legendFormat: Datanode-{{datanode_id}}-writeload
|
||||
- title: Rate of SQL Executions (RDS)
|
||||
type: timeseries
|
||||
description: Displays the rate of SQL executions processed by the Meta service using the RDS backend.
|
||||
unit: none
|
||||
queries:
|
||||
- expr: rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_count[$__rate_interval])
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '{{pod}} {{op}} {{type}} {{result}} '
|
||||
- title: SQL Execution Latency (RDS)
|
||||
type: timeseries
|
||||
description: 'Measures the response time of SQL executions via the RDS backend. '
|
||||
unit: ms
|
||||
queries:
|
||||
- expr: histogram_quantile(0.90, sum by(pod, op, type, result, le) (rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_bucket[$__rate_interval])))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '{{pod}} {{op}} {{type}} {{result}} p90'
|
||||
- title: Handler Execution Latency
|
||||
type: timeseries
|
||||
description: |
|
||||
Shows latency of Meta handlers by pod and handler name, useful for monitoring handler performance and detecting latency spikes.
|
||||
unit: s
|
||||
queries:
|
||||
- expr: |-
|
||||
histogram_quantile(0.90, sum by(pod, le, name) (
|
||||
rate(greptime_meta_handler_execute_bucket[$__rate_interval])
|
||||
))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '{{pod}} {{name}} p90'
|
||||
- title: Heartbeat Packet Size
|
||||
type: timeseries
|
||||
description: |
|
||||
Shows p90 heartbeat message sizes, helping track network usage and identify anomalies in heartbeat payload.
|
||||
unit: bytes
|
||||
queries:
|
||||
- expr: histogram_quantile(0.9, sum by(pod, le) (greptime_meta_heartbeat_stat_memory_size_bucket))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '{{pod}}'
|
||||
- title: Meta Heartbeat Receive Rate
|
||||
type: timeseries
|
||||
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
|
||||
unit: s
|
||||
queries:
|
||||
- expr: rate(greptime_meta_heartbeat_rate[$__rate_interval])
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '{{pod}}'
|
||||
- title: Meta KV Ops Latency
|
||||
type: timeseries
|
||||
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
|
||||
unit: s
|
||||
queries:
|
||||
- expr: histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '{{pod}}-{{op}} p99'
|
||||
- title: Rate of meta KV Ops
|
||||
type: timeseries
|
||||
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
|
||||
unit: none
|
||||
queries:
|
||||
- expr: rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '{{pod}}-{{op}} p99'
|
||||
- title: DDL Latency
|
||||
type: timeseries
|
||||
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
|
||||
unit: s
|
||||
queries:
|
||||
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: CreateLogicalTables-{{step}} p90
|
||||
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: CreateTable-{{step}} p90
|
||||
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: CreateView-{{step}} p90
|
||||
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: CreateFlow-{{step}} p90
|
||||
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: DropTable-{{step}} p90
|
||||
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: AlterTable-{{step}} p90
|
||||
- title: Flownode
|
||||
panels:
|
||||
- title: Flow Ingest / Output Rate
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -60,7 +60,7 @@
|
||||
| Read Stage P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_read_stage_elapsed_bucket{}[$__rate_interval])))` | `timeseries` | Read Stage P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]` |
|
||||
| Write Stage P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_write_stage_elapsed_bucket{}[$__rate_interval])))` | `timeseries` | Write Stage P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]` |
|
||||
| Compaction OPS per Instance | `sum by(instance, pod) (rate(greptime_mito_compaction_total_elapsed_count{}[$__rate_interval]))` | `timeseries` | Compaction OPS per Instance. | `prometheus` | `ops` | `[{{ instance }}]-[{{pod}}]` |
|
||||
| Compaction Elapsed Time per Instance by Stage | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{}[$__rate_interval])))`<br/>`sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{}[$__rate_interval]))/sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_count{}[$__rate_interval]))` | `timeseries` | Compaction latency by stage | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-p99` |
|
||||
| Compaction Elapsed Time per Instance by Stage | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{}[$__rate_interval])))`<br/>`sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{}[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_count{}[$__rate_interval]))` | `timeseries` | Compaction latency by stage | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-p99` |
|
||||
| Compaction P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le,stage) (rate(greptime_mito_compaction_total_elapsed_bucket{}[$__rate_interval])))` | `timeseries` | Compaction P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-compaction` |
|
||||
| WAL write size | `histogram_quantile(0.95, sum by(le,instance, pod) (rate(raft_engine_write_size_bucket[$__rate_interval])))`<br/>`histogram_quantile(0.99, sum by(le,instance,pod) (rate(raft_engine_write_size_bucket[$__rate_interval])))`<br/>`sum by (instance, pod)(rate(raft_engine_write_size_sum[$__rate_interval]))` | `timeseries` | Write-ahead logs write size as bytes. This chart includes stats of p95 and p99 size by instance, total WAL write rate. | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-req-size-p95` |
|
||||
| Cached Bytes per Instance | `greptime_mito_cache_bytes{}` | `timeseries` | Cached Bytes per Instance. | `prometheus` | `decbytes` | `[{{instance}}]-[{{pod}}]-[{{type}}]` |
|
||||
@@ -69,7 +69,7 @@
|
||||
| Log Store op duration seconds | `histogram_quantile(0.99, sum by(le,logstore,optype,instance, pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))` | `timeseries` | Write-ahead log operations latency at p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{logstore}}]-[{{optype}}]-p99` |
|
||||
| Inflight Flush | `greptime_mito_inflight_flush_count` | `timeseries` | Ongoing flush task count | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]` |
|
||||
| Compaction Input/Output Bytes | `sum by(instance, pod) (greptime_mito_compaction_input_bytes)`<br/>`sum by(instance, pod) (greptime_mito_compaction_output_bytes)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-input` |
|
||||
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
|
||||
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
|
||||
| Region Worker Convert Requests | `histogram_quantile(0.95, sum by(le, instance, stage, pod) (rate(greptime_datanode_convert_region_request_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to decode requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
|
||||
# OpenDAL
|
||||
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||
@@ -88,9 +88,19 @@
|
||||
# Metasrv
|
||||
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||
| --- | --- | --- | --- | --- | --- | --- |
|
||||
| Region migration datanode | `greptime_meta_region_migration_stat{datanode_type="src"}`<br/>`greptime_meta_region_migration_stat{datanode_type="desc"}` | `state-timeline` | Counter of region migration by source and destination | `prometheus` | `none` | `from-datanode-{{datanode_id}}` |
|
||||
| Region migration error | `greptime_meta_region_migration_error` | `timeseries` | Counter of region migration error | `prometheus` | `none` | `__auto` |
|
||||
| Datanode load | `greptime_datanode_load` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `__auto` |
|
||||
| Region migration datanode | `greptime_meta_region_migration_stat{datanode_type="src"}`<br/>`greptime_meta_region_migration_stat{datanode_type="desc"}` | `status-history` | Counter of region migration by source and destination | `prometheus` | -- | `from-datanode-{{datanode_id}}` |
|
||||
| Region migration error | `greptime_meta_region_migration_error` | `timeseries` | Counter of region migration error | `prometheus` | `none` | `{{pod}}-{{state}}-{{error_type}}` |
|
||||
| Datanode load | `greptime_datanode_load` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `binBps` | `Datanode-{{datanode_id}}-writeload` |
|
||||
| Rate of SQL Executions (RDS) | `rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_count[$__rate_interval])` | `timeseries` | Displays the rate of SQL executions processed by the Meta service using the RDS backend. | `prometheus` | `none` | `{{pod}} {{op}} {{type}} {{result}} ` |
|
||||
| SQL Execution Latency (RDS) | `histogram_quantile(0.90, sum by(pod, op, type, result, le) (rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_bucket[$__rate_interval])))` | `timeseries` | Measures the response time of SQL executions via the RDS backend. | `prometheus` | `ms` | `{{pod}} {{op}} {{type}} {{result}} p90` |
|
||||
| Handler Execution Latency | `histogram_quantile(0.90, sum by(pod, le, name) (
|
||||
rate(greptime_meta_handler_execute_bucket[$__rate_interval])
|
||||
))` | `timeseries` | Shows latency of Meta handlers by pod and handler name, useful for monitoring handler performance and detecting latency spikes.<br/> | `prometheus` | `s` | `{{pod}} {{name}} p90` |
|
||||
| Heartbeat Packet Size | `histogram_quantile(0.9, sum by(pod, le) (greptime_meta_heartbeat_stat_memory_size_bucket))` | `timeseries` | Shows p90 heartbeat message sizes, helping track network usage and identify anomalies in heartbeat payload.<br/> | `prometheus` | `bytes` | `{{pod}}` |
|
||||
| Meta Heartbeat Receive Rate | `rate(greptime_meta_heartbeat_rate[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}` |
|
||||
| Meta KV Ops Latency | `histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}-{{op}} p99` |
|
||||
| Rate of meta KV Ops | `rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `{{pod}}-{{op}} p99` |
|
||||
| DDL Latency | `histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `CreateLogicalTables-{{step}} p90` |
|
||||
# Flownode
|
||||
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||
| --- | --- | --- | --- | --- | --- | --- |
|
||||
|
||||
@@ -497,7 +497,7 @@ groups:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-p99'
|
||||
- expr: sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{}[$__rate_interval]))/sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_count{}[$__rate_interval]))
|
||||
- expr: sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{}[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_count{}[$__rate_interval]))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
@@ -607,7 +607,7 @@ groups:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
|
||||
- expr: sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))
|
||||
- expr: sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
@@ -741,9 +741,8 @@ groups:
|
||||
- title: Metasrv
|
||||
panels:
|
||||
- title: Region migration datanode
|
||||
type: state-timeline
|
||||
type: status-history
|
||||
description: Counter of region migration by source and destination
|
||||
unit: none
|
||||
queries:
|
||||
- expr: greptime_meta_region_migration_stat{datanode_type="src"}
|
||||
datasource:
|
||||
@@ -764,17 +763,127 @@ groups:
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: __auto
|
||||
legendFormat: '{{pod}}-{{state}}-{{error_type}}'
|
||||
- title: Datanode load
|
||||
type: timeseries
|
||||
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
|
||||
unit: none
|
||||
unit: binBps
|
||||
queries:
|
||||
- expr: greptime_datanode_load
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: __auto
|
||||
legendFormat: Datanode-{{datanode_id}}-writeload
|
||||
- title: Rate of SQL Executions (RDS)
|
||||
type: timeseries
|
||||
description: Displays the rate of SQL executions processed by the Meta service using the RDS backend.
|
||||
unit: none
|
||||
queries:
|
||||
- expr: rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_count[$__rate_interval])
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '{{pod}} {{op}} {{type}} {{result}} '
|
||||
- title: SQL Execution Latency (RDS)
|
||||
type: timeseries
|
||||
description: 'Measures the response time of SQL executions via the RDS backend. '
|
||||
unit: ms
|
||||
queries:
|
||||
- expr: histogram_quantile(0.90, sum by(pod, op, type, result, le) (rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_bucket[$__rate_interval])))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '{{pod}} {{op}} {{type}} {{result}} p90'
|
||||
- title: Handler Execution Latency
|
||||
type: timeseries
|
||||
description: |
|
||||
Shows latency of Meta handlers by pod and handler name, useful for monitoring handler performance and detecting latency spikes.
|
||||
unit: s
|
||||
queries:
|
||||
- expr: |-
|
||||
histogram_quantile(0.90, sum by(pod, le, name) (
|
||||
rate(greptime_meta_handler_execute_bucket[$__rate_interval])
|
||||
))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '{{pod}} {{name}} p90'
|
||||
- title: Heartbeat Packet Size
|
||||
type: timeseries
|
||||
description: |
|
||||
Shows p90 heartbeat message sizes, helping track network usage and identify anomalies in heartbeat payload.
|
||||
unit: bytes
|
||||
queries:
|
||||
- expr: histogram_quantile(0.9, sum by(pod, le) (greptime_meta_heartbeat_stat_memory_size_bucket))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '{{pod}}'
|
||||
- title: Meta Heartbeat Receive Rate
|
||||
type: timeseries
|
||||
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
|
||||
unit: s
|
||||
queries:
|
||||
- expr: rate(greptime_meta_heartbeat_rate[$__rate_interval])
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '{{pod}}'
|
||||
- title: Meta KV Ops Latency
|
||||
type: timeseries
|
||||
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
|
||||
unit: s
|
||||
queries:
|
||||
- expr: histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '{{pod}}-{{op}} p99'
|
||||
- title: Rate of meta KV Ops
|
||||
type: timeseries
|
||||
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
|
||||
unit: none
|
||||
queries:
|
||||
- expr: rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '{{pod}}-{{op}} p99'
|
||||
- title: DDL Latency
|
||||
type: timeseries
|
||||
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
|
||||
unit: s
|
||||
queries:
|
||||
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: CreateLogicalTables-{{step}} p90
|
||||
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: CreateTable-{{step}} p90
|
||||
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: CreateView-{{step}} p90
|
||||
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: CreateFlow-{{step}} p90
|
||||
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: DropTable-{{step}} p90
|
||||
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: AlterTable-{{step}} p90
|
||||
- title: Flownode
|
||||
panels:
|
||||
- title: Flow Ingest / Output Rate
|
||||
|
||||
@@ -19,9 +19,11 @@ use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use clap::{Parser, ValueEnum};
|
||||
use common_base::secrets::{ExposeSecret, SecretString};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_telemetry::{debug, error, info};
|
||||
use object_store::layers::LoggingLayer;
|
||||
use object_store::services::Oss;
|
||||
use object_store::{services, ObjectStore};
|
||||
use serde_json::Value;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
@@ -110,15 +112,15 @@ pub struct ExportCommand {
|
||||
#[clap(long)]
|
||||
s3: bool,
|
||||
|
||||
/// if both `s3_ddl_local_dir` and `s3` are set, `s3_ddl_local_dir` will be only used for
|
||||
/// exported SQL files, and the data will be exported to s3.
|
||||
/// if both `ddl_local_dir` and remote storage (s3/oss) are set, `ddl_local_dir` will be only used for
|
||||
/// exported SQL files, and the data will be exported to remote storage.
|
||||
///
|
||||
/// Note that `s3_ddl_local_dir` export sql files to **LOCAL** file system, this is useful if export client don't have
|
||||
/// direct access to s3.
|
||||
/// Note that `ddl_local_dir` export sql files to **LOCAL** file system, this is useful if export client don't have
|
||||
/// direct access to remote storage.
|
||||
///
|
||||
/// if `s3` is set but `s3_ddl_local_dir` is not set, both SQL&data will be exported to s3.
|
||||
/// if remote storage is set but `ddl_local_dir` is not set, both SQL&data will be exported to remote storage.
|
||||
#[clap(long)]
|
||||
s3_ddl_local_dir: Option<String>,
|
||||
ddl_local_dir: Option<String>,
|
||||
|
||||
/// The s3 bucket name
|
||||
/// if s3 is set, this is required
|
||||
@@ -149,6 +151,30 @@ pub struct ExportCommand {
|
||||
/// if s3 is set, this is required
|
||||
#[clap(long)]
|
||||
s3_region: Option<String>,
|
||||
|
||||
/// if export data to oss
|
||||
#[clap(long)]
|
||||
oss: bool,
|
||||
|
||||
/// The oss bucket name
|
||||
/// if oss is set, this is required
|
||||
#[clap(long)]
|
||||
oss_bucket: Option<String>,
|
||||
|
||||
/// The oss endpoint
|
||||
/// if oss is set, this is required
|
||||
#[clap(long)]
|
||||
oss_endpoint: Option<String>,
|
||||
|
||||
/// The oss access key id
|
||||
/// if oss is set, this is required
|
||||
#[clap(long)]
|
||||
oss_access_key_id: Option<String>,
|
||||
|
||||
/// The oss access key secret
|
||||
/// if oss is set, this is required
|
||||
#[clap(long)]
|
||||
oss_access_key_secret: Option<String>,
|
||||
}
|
||||
|
||||
impl ExportCommand {
|
||||
@@ -162,7 +188,7 @@ impl ExportCommand {
|
||||
{
|
||||
return Err(BoxedError::new(S3ConfigNotSetSnafu {}.build()));
|
||||
}
|
||||
if !self.s3 && self.output_dir.is_none() {
|
||||
if !self.s3 && !self.oss && self.output_dir.is_none() {
|
||||
return Err(BoxedError::new(OutputDirNotSetSnafu {}.build()));
|
||||
}
|
||||
let (catalog, schema) =
|
||||
@@ -187,13 +213,32 @@ impl ExportCommand {
|
||||
start_time: self.start_time.clone(),
|
||||
end_time: self.end_time.clone(),
|
||||
s3: self.s3,
|
||||
s3_ddl_local_dir: self.s3_ddl_local_dir.clone(),
|
||||
ddl_local_dir: self.ddl_local_dir.clone(),
|
||||
s3_bucket: self.s3_bucket.clone(),
|
||||
s3_root: self.s3_root.clone(),
|
||||
s3_endpoint: self.s3_endpoint.clone(),
|
||||
s3_access_key: self.s3_access_key.clone(),
|
||||
s3_secret_key: self.s3_secret_key.clone(),
|
||||
// Wrap sensitive values in SecretString
|
||||
s3_access_key: self
|
||||
.s3_access_key
|
||||
.as_ref()
|
||||
.map(|k| SecretString::from(k.clone())),
|
||||
s3_secret_key: self
|
||||
.s3_secret_key
|
||||
.as_ref()
|
||||
.map(|k| SecretString::from(k.clone())),
|
||||
s3_region: self.s3_region.clone(),
|
||||
oss: self.oss,
|
||||
oss_bucket: self.oss_bucket.clone(),
|
||||
oss_endpoint: self.oss_endpoint.clone(),
|
||||
// Wrap sensitive values in SecretString
|
||||
oss_access_key_id: self
|
||||
.oss_access_key_id
|
||||
.as_ref()
|
||||
.map(|k| SecretString::from(k.clone())),
|
||||
oss_access_key_secret: self
|
||||
.oss_access_key_secret
|
||||
.as_ref()
|
||||
.map(|k| SecretString::from(k.clone())),
|
||||
}))
|
||||
}
|
||||
}
|
||||
@@ -209,23 +254,30 @@ pub struct Export {
|
||||
start_time: Option<String>,
|
||||
end_time: Option<String>,
|
||||
s3: bool,
|
||||
s3_ddl_local_dir: Option<String>,
|
||||
ddl_local_dir: Option<String>,
|
||||
s3_bucket: Option<String>,
|
||||
s3_root: Option<String>,
|
||||
s3_endpoint: Option<String>,
|
||||
s3_access_key: Option<String>,
|
||||
s3_secret_key: Option<String>,
|
||||
// Changed to SecretString for sensitive data
|
||||
s3_access_key: Option<SecretString>,
|
||||
s3_secret_key: Option<SecretString>,
|
||||
s3_region: Option<String>,
|
||||
oss: bool,
|
||||
oss_bucket: Option<String>,
|
||||
oss_endpoint: Option<String>,
|
||||
// Changed to SecretString for sensitive data
|
||||
oss_access_key_id: Option<SecretString>,
|
||||
oss_access_key_secret: Option<SecretString>,
|
||||
}
|
||||
|
||||
impl Export {
|
||||
fn catalog_path(&self) -> PathBuf {
|
||||
if self.s3 {
|
||||
if self.s3 || self.oss {
|
||||
PathBuf::from(&self.catalog)
|
||||
} else if let Some(dir) = &self.output_dir {
|
||||
PathBuf::from(dir).join(&self.catalog)
|
||||
} else {
|
||||
unreachable!("catalog_path: output_dir must be set when not using s3")
|
||||
unreachable!("catalog_path: output_dir must be set when not using remote storage")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -427,7 +479,7 @@ impl Export {
|
||||
.await?;
|
||||
|
||||
// Create directory if needed for file system storage
|
||||
if !export_self.s3 {
|
||||
if !export_self.s3 && !export_self.oss {
|
||||
let db_dir = format!("{}/{}/", export_self.catalog, schema);
|
||||
operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
|
||||
}
|
||||
@@ -473,6 +525,8 @@ impl Export {
|
||||
async fn build_operator(&self) -> Result<ObjectStore> {
|
||||
if self.s3 {
|
||||
self.build_s3_operator().await
|
||||
} else if self.oss {
|
||||
self.build_oss_operator().await
|
||||
} else {
|
||||
self.build_fs_operator().await
|
||||
}
|
||||
@@ -480,9 +534,8 @@ impl Export {
|
||||
|
||||
/// build operator with preference for file system
|
||||
async fn build_prefer_fs_operator(&self) -> Result<ObjectStore> {
|
||||
// is under s3 mode and s3_ddl_dir is set, use it as root
|
||||
if self.s3 && self.s3_ddl_local_dir.is_some() {
|
||||
let root = self.s3_ddl_local_dir.as_ref().unwrap().clone();
|
||||
if (self.s3 || self.oss) && self.ddl_local_dir.is_some() {
|
||||
let root = self.ddl_local_dir.as_ref().unwrap().clone();
|
||||
let op = ObjectStore::new(services::Fs::default().root(&root))
|
||||
.context(OpenDalSnafu)?
|
||||
.layer(LoggingLayer::default())
|
||||
@@ -490,6 +543,8 @@ impl Export {
|
||||
Ok(op)
|
||||
} else if self.s3 {
|
||||
self.build_s3_operator().await
|
||||
} else if self.oss {
|
||||
self.build_oss_operator().await
|
||||
} else {
|
||||
self.build_fs_operator().await
|
||||
}
|
||||
@@ -515,11 +570,35 @@ impl Export {
|
||||
}
|
||||
|
||||
if let Some(key_id) = self.s3_access_key.as_ref() {
|
||||
builder = builder.access_key_id(key_id);
|
||||
builder = builder.access_key_id(key_id.expose_secret());
|
||||
}
|
||||
|
||||
if let Some(secret_key) = self.s3_secret_key.as_ref() {
|
||||
builder = builder.secret_access_key(secret_key);
|
||||
builder = builder.secret_access_key(secret_key.expose_secret());
|
||||
}
|
||||
|
||||
let op = ObjectStore::new(builder)
|
||||
.context(OpenDalSnafu)?
|
||||
.layer(LoggingLayer::default())
|
||||
.finish();
|
||||
Ok(op)
|
||||
}
|
||||
|
||||
async fn build_oss_operator(&self) -> Result<ObjectStore> {
|
||||
let mut builder = Oss::default()
|
||||
.bucket(self.oss_bucket.as_ref().expect("oss_bucket must be set"))
|
||||
.endpoint(
|
||||
self.oss_endpoint
|
||||
.as_ref()
|
||||
.expect("oss_endpoint must be set"),
|
||||
);
|
||||
|
||||
// Use expose_secret() to access the actual secret value
|
||||
if let Some(key_id) = self.oss_access_key_id.as_ref() {
|
||||
builder = builder.access_key_id(key_id.expose_secret());
|
||||
}
|
||||
if let Some(secret_key) = self.oss_access_key_secret.as_ref() {
|
||||
builder = builder.access_key_secret(secret_key.expose_secret());
|
||||
}
|
||||
|
||||
let op = ObjectStore::new(builder)
|
||||
@@ -562,8 +641,8 @@ impl Export {
|
||||
tasks.push(async move {
|
||||
let _permit = semaphore_moved.acquire().await.unwrap();
|
||||
|
||||
// Create directory if not using S3
|
||||
if !export_self.s3 {
|
||||
// Create directory if not using remote storage
|
||||
if !export_self.s3 && !export_self.oss {
|
||||
let db_dir = format!("{}/{}/", export_self.catalog, schema);
|
||||
operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
|
||||
}
|
||||
@@ -575,7 +654,11 @@ impl Export {
|
||||
r#"COPY DATABASE "{}"."{}" TO '{}' WITH ({}){};"#,
|
||||
export_self.catalog, schema, path, with_options_clone, connection_part
|
||||
);
|
||||
info!("Executing sql: {sql}");
|
||||
|
||||
// Log SQL command but mask sensitive information
|
||||
let safe_sql = export_self.mask_sensitive_sql(&sql);
|
||||
info!("Executing sql: {}", safe_sql);
|
||||
|
||||
export_self.database_client.sql_in_public(&sql).await?;
|
||||
info!(
|
||||
"Finished exporting {}.{} data to {}",
|
||||
@@ -615,6 +698,29 @@ impl Export {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Mask sensitive information in SQL commands for safe logging
|
||||
fn mask_sensitive_sql(&self, sql: &str) -> String {
|
||||
let mut masked_sql = sql.to_string();
|
||||
|
||||
// Mask S3 credentials
|
||||
if let Some(access_key) = &self.s3_access_key {
|
||||
masked_sql = masked_sql.replace(access_key.expose_secret(), "[REDACTED]");
|
||||
}
|
||||
if let Some(secret_key) = &self.s3_secret_key {
|
||||
masked_sql = masked_sql.replace(secret_key.expose_secret(), "[REDACTED]");
|
||||
}
|
||||
|
||||
// Mask OSS credentials
|
||||
if let Some(access_key_id) = &self.oss_access_key_id {
|
||||
masked_sql = masked_sql.replace(access_key_id.expose_secret(), "[REDACTED]");
|
||||
}
|
||||
if let Some(access_key_secret) = &self.oss_access_key_secret {
|
||||
masked_sql = masked_sql.replace(access_key_secret.expose_secret(), "[REDACTED]");
|
||||
}
|
||||
|
||||
masked_sql
|
||||
}
|
||||
|
||||
fn get_file_path(&self, schema: &str, file_name: &str) -> String {
|
||||
format!("{}/{}/{}", self.catalog, schema, file_name)
|
||||
}
|
||||
@@ -631,6 +737,13 @@ impl Export {
|
||||
},
|
||||
file_path
|
||||
)
|
||||
} else if self.oss {
|
||||
format!(
|
||||
"oss://{}/{}/{}",
|
||||
self.oss_bucket.as_ref().unwrap_or(&String::new()),
|
||||
self.catalog,
|
||||
file_path
|
||||
)
|
||||
} else {
|
||||
format!(
|
||||
"{}/{}",
|
||||
@@ -675,15 +788,36 @@ impl Export {
|
||||
};
|
||||
|
||||
// Safety: All s3 options are required
|
||||
// Use expose_secret() to access the actual secret values
|
||||
let connection_options = format!(
|
||||
"ACCESS_KEY_ID='{}', SECRET_ACCESS_KEY='{}', REGION='{}'{}",
|
||||
self.s3_access_key.as_ref().unwrap(),
|
||||
self.s3_secret_key.as_ref().unwrap(),
|
||||
self.s3_access_key.as_ref().unwrap().expose_secret(),
|
||||
self.s3_secret_key.as_ref().unwrap().expose_secret(),
|
||||
self.s3_region.as_ref().unwrap(),
|
||||
endpoint_option
|
||||
);
|
||||
|
||||
(s3_path, format!(" CONNECTION ({})", connection_options))
|
||||
} else if self.oss {
|
||||
let oss_path = format!(
|
||||
"oss://{}/{}/{}/",
|
||||
self.oss_bucket.as_ref().unwrap(),
|
||||
self.catalog,
|
||||
schema
|
||||
);
|
||||
let endpoint_option = if let Some(endpoint) = self.oss_endpoint.as_ref() {
|
||||
format!(", ENDPOINT='{}'", endpoint)
|
||||
} else {
|
||||
String::new()
|
||||
};
|
||||
|
||||
let connection_options = format!(
|
||||
"ACCESS_KEY_ID='{}', ACCESS_KEY_SECRET='{}'{}",
|
||||
self.oss_access_key_id.as_ref().unwrap().expose_secret(),
|
||||
self.oss_access_key_secret.as_ref().unwrap().expose_secret(),
|
||||
endpoint_option
|
||||
);
|
||||
(oss_path, format!(" CONNECTION ({})", connection_options))
|
||||
} else {
|
||||
(
|
||||
self.catalog_path()
|
||||
|
||||
@@ -167,9 +167,7 @@ impl Client {
|
||||
|
||||
let client = FlightServiceClient::new(channel)
|
||||
.max_decoding_message_size(self.max_grpc_recv_message_size())
|
||||
.max_encoding_message_size(self.max_grpc_send_message_size())
|
||||
.accept_compressed(CompressionEncoding::Zstd)
|
||||
.send_compressed(CompressionEncoding::Zstd);
|
||||
.max_encoding_message_size(self.max_grpc_send_message_size());
|
||||
|
||||
Ok(FlightClient { addr, client })
|
||||
}
|
||||
@@ -178,9 +176,7 @@ impl Client {
|
||||
let (addr, channel) = self.find_channel()?;
|
||||
let client = PbRegionClient::new(channel)
|
||||
.max_decoding_message_size(self.max_grpc_recv_message_size())
|
||||
.max_encoding_message_size(self.max_grpc_send_message_size())
|
||||
.accept_compressed(CompressionEncoding::Zstd)
|
||||
.send_compressed(CompressionEncoding::Zstd);
|
||||
.max_encoding_message_size(self.max_grpc_send_message_size());
|
||||
Ok((addr, client))
|
||||
}
|
||||
|
||||
|
||||
@@ -80,6 +80,7 @@ servers.workspace = true
|
||||
session.workspace = true
|
||||
similar-asserts.workspace = true
|
||||
snafu.workspace = true
|
||||
stat.workspace = true
|
||||
store-api.workspace = true
|
||||
substrait.workspace = true
|
||||
table.workspace = true
|
||||
|
||||
@@ -14,12 +14,13 @@
|
||||
|
||||
pub mod builder;
|
||||
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use clap::Parser;
|
||||
use common_config::Configurable;
|
||||
use common_telemetry::logging::TracingOptions;
|
||||
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
|
||||
use common_telemetry::{info, warn};
|
||||
use common_wal::config::DatanodeWalConfig;
|
||||
use datanode::datanode::Datanode;
|
||||
@@ -248,6 +249,14 @@ impl StartCommand {
|
||||
raft_engine_config.dir.replace(wal_dir.clone());
|
||||
}
|
||||
|
||||
// If the logging dir is not set, use the default logs dir in the data home.
|
||||
if opts.logging.dir.is_empty() {
|
||||
opts.logging.dir = Path::new(&opts.storage.data_home)
|
||||
.join(DEFAULT_LOGGING_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string();
|
||||
}
|
||||
|
||||
if let Some(http_addr) = &self.http_addr {
|
||||
opts.http.addr.clone_from(http_addr);
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ use tracing_appender::non_blocking::WorkerGuard;
|
||||
|
||||
use crate::datanode::{DatanodeOptions, Instance, APP_NAME};
|
||||
use crate::error::{MetaClientInitSnafu, MissingConfigSnafu, Result, StartDatanodeSnafu};
|
||||
use crate::log_versions;
|
||||
use crate::{create_resource_limit_metrics, log_versions};
|
||||
|
||||
/// Builder for Datanode instance.
|
||||
pub struct InstanceBuilder {
|
||||
@@ -68,6 +68,7 @@ impl InstanceBuilder {
|
||||
);
|
||||
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
create_resource_limit_metrics(APP_NAME);
|
||||
|
||||
plugins::setup_datanode_plugins(plugins, &opts.plugins, dn_opts)
|
||||
.await
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -21,7 +22,7 @@ use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKv
|
||||
use clap::Parser;
|
||||
use client::client_manager::NodeClients;
|
||||
use common_base::Plugins;
|
||||
use common_config::Configurable;
|
||||
use common_config::{Configurable, DEFAULT_DATA_HOME};
|
||||
use common_grpc::channel_manager::ChannelConfig;
|
||||
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
|
||||
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
|
||||
@@ -30,7 +31,7 @@ use common_meta::heartbeat::handler::HandlerGroupExecutor;
|
||||
use common_meta::key::flow::FlowMetadataManager;
|
||||
use common_meta::key::TableMetadataManager;
|
||||
use common_telemetry::info;
|
||||
use common_telemetry::logging::TracingOptions;
|
||||
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
|
||||
use common_version::{short_version, version};
|
||||
use flow::{
|
||||
get_flow_auth_options, FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder,
|
||||
@@ -45,7 +46,7 @@ use crate::error::{
|
||||
MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
|
||||
};
|
||||
use crate::options::{GlobalOptions, GreptimeOptions};
|
||||
use crate::{log_versions, App};
|
||||
use crate::{create_resource_limit_metrics, log_versions, App};
|
||||
|
||||
pub const APP_NAME: &str = "greptime-flownode";
|
||||
|
||||
@@ -186,6 +187,14 @@ impl StartCommand {
|
||||
opts.logging.dir.clone_from(dir);
|
||||
}
|
||||
|
||||
// If the logging dir is not set, use the default logs dir in the data home.
|
||||
if opts.logging.dir.is_empty() {
|
||||
opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
|
||||
.join(DEFAULT_LOGGING_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string();
|
||||
}
|
||||
|
||||
if global_options.log_level.is_some() {
|
||||
opts.logging.level.clone_from(&global_options.log_level);
|
||||
}
|
||||
@@ -246,7 +255,9 @@ impl StartCommand {
|
||||
opts.component.node_id.map(|x| x.to_string()),
|
||||
None,
|
||||
);
|
||||
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
create_resource_limit_metrics(APP_NAME);
|
||||
|
||||
info!("Flownode start command: {:#?}", self);
|
||||
info!("Flownode options: {:#?}", opts);
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -22,14 +23,14 @@ use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKv
|
||||
use clap::Parser;
|
||||
use client::client_manager::NodeClients;
|
||||
use common_base::Plugins;
|
||||
use common_config::Configurable;
|
||||
use common_config::{Configurable, DEFAULT_DATA_HOME};
|
||||
use common_grpc::channel_manager::ChannelConfig;
|
||||
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
|
||||
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
|
||||
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
|
||||
use common_meta::heartbeat::handler::HandlerGroupExecutor;
|
||||
use common_telemetry::info;
|
||||
use common_telemetry::logging::TracingOptions;
|
||||
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
|
||||
use common_time::timezone::set_default_timezone;
|
||||
use common_version::{short_version, version};
|
||||
use frontend::frontend::Frontend;
|
||||
@@ -44,7 +45,7 @@ use tracing_appender::non_blocking::WorkerGuard;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::options::{GlobalOptions, GreptimeOptions};
|
||||
use crate::{log_versions, App};
|
||||
use crate::{create_resource_limit_metrics, log_versions, App};
|
||||
|
||||
type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>;
|
||||
|
||||
@@ -194,6 +195,14 @@ impl StartCommand {
|
||||
opts.logging.dir.clone_from(dir);
|
||||
}
|
||||
|
||||
// If the logging dir is not set, use the default logs dir in the data home.
|
||||
if opts.logging.dir.is_empty() {
|
||||
opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
|
||||
.join(DEFAULT_LOGGING_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string();
|
||||
}
|
||||
|
||||
if global_options.log_level.is_some() {
|
||||
opts.logging.level.clone_from(&global_options.log_level);
|
||||
}
|
||||
@@ -270,7 +279,9 @@ impl StartCommand {
|
||||
opts.component.node_id.clone(),
|
||||
opts.component.slow_query.as_ref(),
|
||||
);
|
||||
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
create_resource_limit_metrics(APP_NAME);
|
||||
|
||||
info!("Frontend start command: {:#?}", self);
|
||||
info!("Frontend options: {:#?}", opts);
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_telemetry::{error, info};
|
||||
use stat::{get_cpu_limit, get_memory_limit};
|
||||
|
||||
use crate::error::Result;
|
||||
|
||||
@@ -31,6 +32,12 @@ pub mod standalone;
|
||||
lazy_static::lazy_static! {
|
||||
static ref APP_VERSION: prometheus::IntGaugeVec =
|
||||
prometheus::register_int_gauge_vec!("greptime_app_version", "app version", &["version", "short_version", "app"]).unwrap();
|
||||
|
||||
static ref CPU_LIMIT: prometheus::IntGaugeVec =
|
||||
prometheus::register_int_gauge_vec!("greptime_cpu_limit_in_millicores", "cpu limit in millicores", &["app"]).unwrap();
|
||||
|
||||
static ref MEMORY_LIMIT: prometheus::IntGaugeVec =
|
||||
prometheus::register_int_gauge_vec!("greptime_memory_limit_in_bytes", "memory limit in bytes", &["app"]).unwrap();
|
||||
}
|
||||
|
||||
/// wait for the close signal, for unix platform it's SIGINT or SIGTERM
|
||||
@@ -114,6 +121,24 @@ pub fn log_versions(version: &str, short_version: &str, app: &str) {
|
||||
log_env_flags();
|
||||
}
|
||||
|
||||
pub fn create_resource_limit_metrics(app: &str) {
|
||||
if let Some(cpu_limit) = get_cpu_limit() {
|
||||
info!(
|
||||
"GreptimeDB start with cpu limit in millicores: {}",
|
||||
cpu_limit
|
||||
);
|
||||
CPU_LIMIT.with_label_values(&[app]).set(cpu_limit);
|
||||
}
|
||||
|
||||
if let Some(memory_limit) = get_memory_limit() {
|
||||
info!(
|
||||
"GreptimeDB start with memory limit in bytes: {}",
|
||||
memory_limit
|
||||
);
|
||||
MEMORY_LIMIT.with_label_values(&[app]).set(memory_limit);
|
||||
}
|
||||
}
|
||||
|
||||
fn log_env_flags() {
|
||||
info!("command line arguments");
|
||||
for argument in std::env::args() {
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
@@ -20,7 +21,7 @@ use clap::Parser;
|
||||
use common_base::Plugins;
|
||||
use common_config::Configurable;
|
||||
use common_telemetry::info;
|
||||
use common_telemetry::logging::TracingOptions;
|
||||
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
|
||||
use common_version::{short_version, version};
|
||||
use meta_srv::bootstrap::MetasrvInstance;
|
||||
use meta_srv::metasrv::BackendImpl;
|
||||
@@ -29,7 +30,7 @@ use tracing_appender::non_blocking::WorkerGuard;
|
||||
|
||||
use crate::error::{self, LoadLayeredConfigSnafu, Result, StartMetaServerSnafu};
|
||||
use crate::options::{GlobalOptions, GreptimeOptions};
|
||||
use crate::{log_versions, App};
|
||||
use crate::{create_resource_limit_metrics, log_versions, App};
|
||||
|
||||
type MetasrvOptions = GreptimeOptions<meta_srv::metasrv::MetasrvOptions>;
|
||||
|
||||
@@ -274,6 +275,14 @@ impl StartCommand {
|
||||
opts.data_home.clone_from(data_home);
|
||||
}
|
||||
|
||||
// If the logging dir is not set, use the default logs dir in the data home.
|
||||
if opts.logging.dir.is_empty() {
|
||||
opts.logging.dir = Path::new(&opts.data_home)
|
||||
.join(DEFAULT_LOGGING_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string();
|
||||
}
|
||||
|
||||
if !self.store_key_prefix.is_empty() {
|
||||
opts.store_key_prefix.clone_from(&self.store_key_prefix)
|
||||
}
|
||||
@@ -302,7 +311,9 @@ impl StartCommand {
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
create_resource_limit_metrics(APP_NAME);
|
||||
|
||||
info!("Metasrv start command: {:#?}", self);
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::{fs, path};
|
||||
|
||||
@@ -49,7 +50,9 @@ use common_meta::sequence::SequenceBuilder;
|
||||
use common_meta::wal_options_allocator::{build_wal_options_allocator, WalOptionsAllocatorRef};
|
||||
use common_procedure::{ProcedureInfo, ProcedureManagerRef};
|
||||
use common_telemetry::info;
|
||||
use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
|
||||
use common_telemetry::logging::{
|
||||
LoggingOptions, SlowQueryOptions, TracingOptions, DEFAULT_LOGGING_DIR,
|
||||
};
|
||||
use common_time::timezone::set_default_timezone;
|
||||
use common_version::{short_version, version};
|
||||
use common_wal::config::DatanodeWalConfig;
|
||||
@@ -83,7 +86,7 @@ use tracing_appender::non_blocking::WorkerGuard;
|
||||
|
||||
use crate::error::{Result, StartFlownodeSnafu};
|
||||
use crate::options::{GlobalOptions, GreptimeOptions};
|
||||
use crate::{error, log_versions, App};
|
||||
use crate::{create_resource_limit_metrics, error, log_versions, App};
|
||||
|
||||
pub const APP_NAME: &str = "greptime-standalone";
|
||||
|
||||
@@ -407,6 +410,14 @@ impl StartCommand {
|
||||
opts.storage.data_home.clone_from(data_home);
|
||||
}
|
||||
|
||||
// If the logging dir is not set, use the default logs dir in the data home.
|
||||
if opts.logging.dir.is_empty() {
|
||||
opts.logging.dir = Path::new(&opts.storage.data_home)
|
||||
.join(DEFAULT_LOGGING_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string();
|
||||
}
|
||||
|
||||
if let Some(addr) = &self.rpc_bind_addr {
|
||||
// frontend grpc addr conflict with datanode default grpc addr
|
||||
let datanode_grpc_addr = DatanodeOptions::default().grpc.bind_addr;
|
||||
@@ -457,7 +468,9 @@ impl StartCommand {
|
||||
None,
|
||||
opts.component.slow_query.as_ref(),
|
||||
);
|
||||
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
create_resource_limit_metrics(APP_NAME);
|
||||
|
||||
info!("Standalone start command: {:#?}", self);
|
||||
info!("Standalone options: {opts:#?}");
|
||||
|
||||
@@ -12,13 +12,14 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
|
||||
use cmd::options::GreptimeOptions;
|
||||
use cmd::standalone::StandaloneOptions;
|
||||
use common_config::Configurable;
|
||||
use common_config::{Configurable, DEFAULT_DATA_HOME};
|
||||
use common_options::datanode::{ClientOptions, DatanodeClientOptions};
|
||||
use common_telemetry::logging::{LoggingOptions, DEFAULT_OTLP_ENDPOINT};
|
||||
use common_telemetry::logging::{LoggingOptions, DEFAULT_LOGGING_DIR, DEFAULT_OTLP_ENDPOINT};
|
||||
use common_wal::config::raft_engine::RaftEngineConfig;
|
||||
use common_wal::config::DatanodeWalConfig;
|
||||
use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
|
||||
@@ -32,6 +33,7 @@ use mito2::config::MitoConfig;
|
||||
use servers::export_metrics::ExportMetricsOption;
|
||||
use servers::grpc::GrpcOptions;
|
||||
use servers::http::HttpOptions;
|
||||
use store_api::path_utils::WAL_DIR;
|
||||
|
||||
#[allow(deprecated)]
|
||||
#[test]
|
||||
@@ -56,13 +58,18 @@ fn test_load_datanode_example_config() {
|
||||
metadata_cache_tti: Duration::from_secs(300),
|
||||
}),
|
||||
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
|
||||
dir: Some("./greptimedb_data/wal".to_string()),
|
||||
dir: Some(
|
||||
Path::new(DEFAULT_DATA_HOME)
|
||||
.join(WAL_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string(),
|
||||
),
|
||||
sync_period: Some(Duration::from_secs(10)),
|
||||
recovery_parallelism: 2,
|
||||
..Default::default()
|
||||
}),
|
||||
storage: StorageConfig {
|
||||
data_home: "./greptimedb_data/".to_string(),
|
||||
data_home: DEFAULT_DATA_HOME.to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
region_engine: vec![
|
||||
@@ -79,6 +86,10 @@ fn test_load_datanode_example_config() {
|
||||
],
|
||||
logging: LoggingOptions {
|
||||
level: Some("info".to_string()),
|
||||
dir: Path::new(DEFAULT_DATA_HOME)
|
||||
.join(DEFAULT_LOGGING_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string(),
|
||||
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
|
||||
tracing_sample_ratio: Some(Default::default()),
|
||||
..Default::default()
|
||||
@@ -121,6 +132,10 @@ fn test_load_frontend_example_config() {
|
||||
}),
|
||||
logging: LoggingOptions {
|
||||
level: Some("info".to_string()),
|
||||
dir: Path::new(DEFAULT_DATA_HOME)
|
||||
.join(DEFAULT_LOGGING_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string(),
|
||||
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
|
||||
tracing_sample_ratio: Some(Default::default()),
|
||||
..Default::default()
|
||||
@@ -160,10 +175,13 @@ fn test_load_metasrv_example_config() {
|
||||
let expected = GreptimeOptions::<MetasrvOptions> {
|
||||
component: MetasrvOptions {
|
||||
selector: SelectorType::default(),
|
||||
data_home: "./greptimedb_data/metasrv/".to_string(),
|
||||
data_home: DEFAULT_DATA_HOME.to_string(),
|
||||
server_addr: "127.0.0.1:3002".to_string(),
|
||||
logging: LoggingOptions {
|
||||
dir: "./greptimedb_data/logs".to_string(),
|
||||
dir: Path::new(DEFAULT_DATA_HOME)
|
||||
.join(DEFAULT_LOGGING_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string(),
|
||||
level: Some("info".to_string()),
|
||||
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
|
||||
tracing_sample_ratio: Some(Default::default()),
|
||||
@@ -198,7 +216,12 @@ fn test_load_standalone_example_config() {
|
||||
component: StandaloneOptions {
|
||||
default_timezone: Some("UTC".to_string()),
|
||||
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
|
||||
dir: Some("./greptimedb_data/wal".to_string()),
|
||||
dir: Some(
|
||||
Path::new(DEFAULT_DATA_HOME)
|
||||
.join(WAL_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string(),
|
||||
),
|
||||
sync_period: Some(Duration::from_secs(10)),
|
||||
recovery_parallelism: 2,
|
||||
..Default::default()
|
||||
@@ -216,11 +239,15 @@ fn test_load_standalone_example_config() {
|
||||
}),
|
||||
],
|
||||
storage: StorageConfig {
|
||||
data_home: "./greptimedb_data/".to_string(),
|
||||
data_home: DEFAULT_DATA_HOME.to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
logging: LoggingOptions {
|
||||
level: Some("info".to_string()),
|
||||
dir: Path::new(DEFAULT_DATA_HOME)
|
||||
.join(DEFAULT_LOGGING_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string(),
|
||||
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
|
||||
tracing_sample_ratio: Some(Default::default()),
|
||||
..Default::default()
|
||||
|
||||
@@ -26,6 +26,9 @@ pub fn metadata_store_dir(store_dir: &str) -> String {
|
||||
format!("{store_dir}/metadata")
|
||||
}
|
||||
|
||||
/// The default data home directory.
|
||||
pub const DEFAULT_DATA_HOME: &str = "./greptimedb_data";
|
||||
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(default)]
|
||||
pub struct KvBackendConfig {
|
||||
|
||||
@@ -13,7 +13,9 @@
|
||||
// limitations under the License.
|
||||
|
||||
pub mod fs;
|
||||
pub mod oss;
|
||||
pub mod s3;
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use lazy_static::lazy_static;
|
||||
@@ -25,10 +27,12 @@ use url::{ParseError, Url};
|
||||
use self::fs::build_fs_backend;
|
||||
use self::s3::build_s3_backend;
|
||||
use crate::error::{self, Result};
|
||||
use crate::object_store::oss::build_oss_backend;
|
||||
use crate::util::find_dir_and_filename;
|
||||
|
||||
pub const FS_SCHEMA: &str = "FS";
|
||||
pub const S3_SCHEMA: &str = "S3";
|
||||
pub const OSS_SCHEMA: &str = "OSS";
|
||||
|
||||
/// Returns `(schema, Option<host>, path)`
|
||||
pub fn parse_url(url: &str) -> Result<(String, Option<String>, String)> {
|
||||
@@ -64,6 +68,12 @@ pub fn build_backend(url: &str, connection: &HashMap<String, String>) -> Result<
|
||||
})?;
|
||||
Ok(build_s3_backend(&host, &root, connection)?)
|
||||
}
|
||||
OSS_SCHEMA => {
|
||||
let host = host.context(error::EmptyHostPathSnafu {
|
||||
url: url.to_string(),
|
||||
})?;
|
||||
Ok(build_oss_backend(&host, &root, connection)?)
|
||||
}
|
||||
FS_SCHEMA => Ok(build_fs_backend(&root)?),
|
||||
|
||||
_ => error::UnsupportedBackendProtocolSnafu {
|
||||
|
||||
118
src/common/datasource/src/object_store/oss.rs
Normal file
118
src/common/datasource/src/object_store/oss.rs
Normal file
@@ -0,0 +1,118 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use object_store::services::Oss;
|
||||
use object_store::ObjectStore;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
|
||||
const BUCKET: &str = "bucket";
|
||||
const ENDPOINT: &str = "endpoint";
|
||||
const ACCESS_KEY_ID: &str = "access_key_id";
|
||||
const ACCESS_KEY_SECRET: &str = "access_key_secret";
|
||||
const ROOT: &str = "root";
|
||||
const ALLOW_ANONYMOUS: &str = "allow_anonymous";
|
||||
|
||||
/// Check if the key is supported in OSS configuration.
|
||||
pub fn is_supported_in_oss(key: &str) -> bool {
|
||||
[
|
||||
ROOT,
|
||||
ALLOW_ANONYMOUS,
|
||||
BUCKET,
|
||||
ENDPOINT,
|
||||
ACCESS_KEY_ID,
|
||||
ACCESS_KEY_SECRET,
|
||||
]
|
||||
.contains(&key)
|
||||
}
|
||||
|
||||
/// Build an OSS backend using the provided bucket, root, and connection parameters.
|
||||
pub fn build_oss_backend(
|
||||
bucket: &str,
|
||||
root: &str,
|
||||
connection: &HashMap<String, String>,
|
||||
) -> Result<ObjectStore> {
|
||||
let mut builder = Oss::default().bucket(bucket).root(root);
|
||||
|
||||
if let Some(endpoint) = connection.get(ENDPOINT) {
|
||||
builder = builder.endpoint(endpoint);
|
||||
}
|
||||
|
||||
if let Some(access_key_id) = connection.get(ACCESS_KEY_ID) {
|
||||
builder = builder.access_key_id(access_key_id);
|
||||
}
|
||||
|
||||
if let Some(access_key_secret) = connection.get(ACCESS_KEY_SECRET) {
|
||||
builder = builder.access_key_secret(access_key_secret);
|
||||
}
|
||||
|
||||
if let Some(allow_anonymous) = connection.get(ALLOW_ANONYMOUS) {
|
||||
let allow = allow_anonymous.as_str().parse::<bool>().map_err(|e| {
|
||||
error::InvalidConnectionSnafu {
|
||||
msg: format!(
|
||||
"failed to parse the option {}={}, {}",
|
||||
ALLOW_ANONYMOUS, allow_anonymous, e
|
||||
),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
if allow {
|
||||
builder = builder.allow_anonymous();
|
||||
}
|
||||
}
|
||||
|
||||
let op = ObjectStore::new(builder)
|
||||
.context(error::BuildBackendSnafu)?
|
||||
.layer(object_store::layers::LoggingLayer::default())
|
||||
.layer(object_store::layers::TracingLayer)
|
||||
.layer(object_store::layers::build_prometheus_metrics_layer(true))
|
||||
.finish();
|
||||
|
||||
Ok(op)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_is_supported_in_oss() {
|
||||
assert!(is_supported_in_oss(ROOT));
|
||||
assert!(is_supported_in_oss(ALLOW_ANONYMOUS));
|
||||
assert!(is_supported_in_oss(BUCKET));
|
||||
assert!(is_supported_in_oss(ENDPOINT));
|
||||
assert!(is_supported_in_oss(ACCESS_KEY_ID));
|
||||
assert!(is_supported_in_oss(ACCESS_KEY_SECRET));
|
||||
assert!(!is_supported_in_oss("foo"));
|
||||
assert!(!is_supported_in_oss("BAR"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_oss_backend_all_fields_valid() {
|
||||
let mut connection = HashMap::new();
|
||||
connection.insert(
|
||||
ENDPOINT.to_string(),
|
||||
"http://oss-ap-southeast-1.aliyuncs.com".to_string(),
|
||||
);
|
||||
connection.insert(ACCESS_KEY_ID.to_string(), "key_id".to_string());
|
||||
connection.insert(ACCESS_KEY_SECRET.to_string(), "key_secret".to_string());
|
||||
connection.insert(ALLOW_ANONYMOUS.to_string(), "true".to_string());
|
||||
|
||||
let result = build_oss_backend("my-bucket", "my-root", &connection);
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
}
|
||||
90
src/common/function/src/adjust_flow.rs
Normal file
90
src/common/function/src/adjust_flow.rs
Normal file
@@ -0,0 +1,90 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_macro::admin_fn;
|
||||
use common_query::error::{
|
||||
InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
|
||||
};
|
||||
use common_query::prelude::Signature;
|
||||
use datafusion::logical_expr::Volatility;
|
||||
use datatypes::value::{Value, ValueRef};
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ensure;
|
||||
use store_api::storage::ConcreteDataType;
|
||||
|
||||
use crate::handlers::FlowServiceHandlerRef;
|
||||
use crate::helper::parse_catalog_flow;
|
||||
|
||||
fn adjust_signature() -> Signature {
|
||||
Signature::exact(
|
||||
vec![
|
||||
ConcreteDataType::string_datatype(), // flow name
|
||||
ConcreteDataType::uint64_datatype(), // min_run_interval in seconds
|
||||
ConcreteDataType::uint64_datatype(), // max filter number per query
|
||||
],
|
||||
Volatility::Immutable,
|
||||
)
|
||||
}
|
||||
|
||||
#[admin_fn(
|
||||
name = AdjustFlowFunction,
|
||||
display_name = adjust_flow,
|
||||
sig_fn = adjust_signature,
|
||||
ret = uint64
|
||||
)]
|
||||
pub(crate) async fn adjust_flow(
|
||||
flow_service_handler: &FlowServiceHandlerRef,
|
||||
query_ctx: &QueryContextRef,
|
||||
params: &[ValueRef<'_>],
|
||||
) -> Result<Value> {
|
||||
ensure!(
|
||||
params.len() == 3,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect 3, have: {}",
|
||||
params.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let (flow_name, min_run_interval, max_filter_num) = match (params[0], params[1], params[2]) {
|
||||
(
|
||||
ValueRef::String(flow_name),
|
||||
ValueRef::UInt64(min_run_interval),
|
||||
ValueRef::UInt64(max_filter_num),
|
||||
) => (flow_name, min_run_interval, max_filter_num),
|
||||
_ => {
|
||||
return UnsupportedInputDataTypeSnafu {
|
||||
function: "adjust_flow",
|
||||
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
};
|
||||
|
||||
let (catalog_name, flow_name) = parse_catalog_flow(flow_name, query_ctx)?;
|
||||
|
||||
let res = flow_service_handler
|
||||
.adjust(
|
||||
&catalog_name,
|
||||
&flow_name,
|
||||
min_run_interval,
|
||||
max_filter_num as usize,
|
||||
query_ctx.clone(),
|
||||
)
|
||||
.await?;
|
||||
let affected_rows = res.affected_rows;
|
||||
|
||||
Ok(Value::from(affected_rows))
|
||||
}
|
||||
@@ -26,6 +26,7 @@ use flush_compact_table::{CompactTableFunction, FlushTableFunction};
|
||||
use migrate_region::MigrateRegionFunction;
|
||||
use remove_region_follower::RemoveRegionFollowerFunction;
|
||||
|
||||
use crate::adjust_flow::AdjustFlowFunction;
|
||||
use crate::flush_flow::FlushFlowFunction;
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
|
||||
@@ -43,5 +44,6 @@ impl AdminFunction {
|
||||
registry.register_async(Arc::new(FlushTableFunction));
|
||||
registry.register_async(Arc::new(CompactTableFunction));
|
||||
registry.register_async(Arc::new(FlushFlowFunction));
|
||||
registry.register_async(Arc::new(AdjustFlowFunction));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,21 +12,19 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_macro::admin_fn;
|
||||
use common_query::error::{
|
||||
ExecuteSnafu, InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result,
|
||||
UnsupportedInputDataTypeSnafu,
|
||||
InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
|
||||
};
|
||||
use common_query::prelude::Signature;
|
||||
use datafusion::logical_expr::Volatility;
|
||||
use datatypes::value::{Value, ValueRef};
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use sql::parser::ParserContext;
|
||||
use snafu::ensure;
|
||||
use store_api::storage::ConcreteDataType;
|
||||
|
||||
use crate::handlers::FlowServiceHandlerRef;
|
||||
use crate::helper::parse_catalog_flow;
|
||||
|
||||
fn flush_signature() -> Signature {
|
||||
Signature::uniform(
|
||||
@@ -47,20 +45,6 @@ pub(crate) async fn flush_flow(
|
||||
query_ctx: &QueryContextRef,
|
||||
params: &[ValueRef<'_>],
|
||||
) -> Result<Value> {
|
||||
let (catalog_name, flow_name) = parse_flush_flow(params, query_ctx)?;
|
||||
|
||||
let res = flow_service_handler
|
||||
.flush(&catalog_name, &flow_name, query_ctx.clone())
|
||||
.await?;
|
||||
let affected_rows = res.affected_rows;
|
||||
|
||||
Ok(Value::from(affected_rows))
|
||||
}
|
||||
|
||||
fn parse_flush_flow(
|
||||
params: &[ValueRef<'_>],
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> Result<(String, String)> {
|
||||
ensure!(
|
||||
params.len() == 1,
|
||||
InvalidFuncArgsSnafu {
|
||||
@@ -70,7 +54,6 @@ fn parse_flush_flow(
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let ValueRef::String(flow_name) = params[0] else {
|
||||
return UnsupportedInputDataTypeSnafu {
|
||||
function: "flush_flow",
|
||||
@@ -78,27 +61,14 @@ fn parse_flush_flow(
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect())
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExecuteSnafu)?;
|
||||
let (catalog_name, flow_name) = parse_catalog_flow(flow_name, query_ctx)?;
|
||||
|
||||
let (catalog_name, flow_name) = match &obj_name.0[..] {
|
||||
[flow_name] => (
|
||||
query_ctx.current_catalog().to_string(),
|
||||
flow_name.value.clone(),
|
||||
),
|
||||
[catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
|
||||
_ => {
|
||||
return InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
|
||||
obj_name
|
||||
),
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
};
|
||||
Ok((catalog_name, flow_name))
|
||||
let res = flow_service_handler
|
||||
.flush(&catalog_name, &flow_name, query_ctx.clone())
|
||||
.await?;
|
||||
let affected_rows = res.affected_rows;
|
||||
|
||||
Ok(Value::from(affected_rows))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -154,10 +124,7 @@ mod test {
|
||||
("catalog.flow_name", ("catalog", "flow_name")),
|
||||
];
|
||||
for (input, expected) in testcases.iter() {
|
||||
let args = vec![*input];
|
||||
let args = args.into_iter().map(ValueRef::String).collect::<Vec<_>>();
|
||||
|
||||
let result = parse_flush_flow(&args, &QueryContext::arc()).unwrap();
|
||||
let result = parse_catalog_flow(input, &QueryContext::arc()).unwrap();
|
||||
assert_eq!(*expected, (result.0.as_str(), result.1.as_str()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,6 +87,15 @@ pub trait FlowServiceHandler: Send + Sync {
|
||||
flow: &str,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<api::v1::flow::FlowResponse>;
|
||||
|
||||
async fn adjust(
|
||||
&self,
|
||||
catalog: &str,
|
||||
flow: &str,
|
||||
min_run_interval_secs: u64,
|
||||
max_filter_num_per_query: usize,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<api::v1::flow::FlowResponse>;
|
||||
}
|
||||
|
||||
pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>;
|
||||
|
||||
@@ -12,12 +12,15 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_query::error::{InvalidInputTypeSnafu, Result};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_query::error::{ExecuteSnafu, InvalidFuncArgsSnafu, InvalidInputTypeSnafu, Result};
|
||||
use common_query::prelude::{Signature, TypeSignature, Volatility};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::types::cast::cast;
|
||||
use datatypes::value::ValueRef;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ResultExt;
|
||||
use sql::parser::ParserContext;
|
||||
|
||||
/// Create a function signature with oneof signatures of interleaving two arguments.
|
||||
pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>) -> Signature {
|
||||
@@ -43,3 +46,30 @@ pub fn cast_u64(value: &ValueRef) -> Result<Option<u64>> {
|
||||
})
|
||||
.map(|v| v.as_u64())
|
||||
}
|
||||
|
||||
pub fn parse_catalog_flow(
|
||||
flow_name: &str,
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> Result<(String, String)> {
|
||||
let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect())
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExecuteSnafu)?;
|
||||
|
||||
let (catalog_name, flow_name) = match &obj_name.0[..] {
|
||||
[flow_name] => (
|
||||
query_ctx.current_catalog().to_string(),
|
||||
flow_name.value.clone(),
|
||||
),
|
||||
[catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
|
||||
_ => {
|
||||
return InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
|
||||
obj_name
|
||||
),
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
};
|
||||
Ok((catalog_name, flow_name))
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
#![feature(let_chains)]
|
||||
#![feature(try_blocks)]
|
||||
|
||||
mod adjust_flow;
|
||||
mod admin;
|
||||
mod flush_flow;
|
||||
mod macros;
|
||||
|
||||
@@ -148,6 +148,17 @@ impl FunctionState {
|
||||
) -> Result<api::v1::flow::FlowResponse> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn adjust(
|
||||
&self,
|
||||
_catalog: &str,
|
||||
_flow: &str,
|
||||
_min_run_interval_secs: u64,
|
||||
_max_filter_num_per_query: usize,
|
||||
_ctx: QueryContextRef,
|
||||
) -> Result<api::v1::flow::FlowResponse> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
Self {
|
||||
|
||||
@@ -64,6 +64,18 @@ impl Default for FlightEncoder {
|
||||
}
|
||||
|
||||
impl FlightEncoder {
|
||||
pub fn with_compression_disabled() -> Self {
|
||||
let write_options = writer::IpcWriteOptions::default()
|
||||
.try_with_compression(None)
|
||||
.unwrap();
|
||||
|
||||
Self {
|
||||
write_options,
|
||||
data_gen: writer::IpcDataGenerator::default(),
|
||||
dictionary_tracker: writer::DictionaryTracker::new(false),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn encode(&mut self, flight_message: FlightMessage) -> FlightData {
|
||||
match flight_message {
|
||||
FlightMessage::Schema(schema) => SchemaAsIpc::new(&schema, &self.write_options).into(),
|
||||
|
||||
@@ -188,7 +188,71 @@ pub const CACHE_KEY_PREFIXES: [&str; 5] = [
|
||||
NODE_ADDRESS_PREFIX,
|
||||
];
|
||||
|
||||
pub type RegionDistribution = BTreeMap<DatanodeId, Vec<RegionNumber>>;
|
||||
/// A set of regions with the same role.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
|
||||
pub struct RegionRoleSet {
|
||||
/// Leader regions.
|
||||
pub leader_regions: Vec<RegionNumber>,
|
||||
/// Follower regions.
|
||||
pub follower_regions: Vec<RegionNumber>,
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for RegionRoleSet {
|
||||
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
#[derive(Deserialize)]
|
||||
#[serde(untagged)]
|
||||
enum RegionRoleSetOrLeaderOnly {
|
||||
Full {
|
||||
leader_regions: Vec<RegionNumber>,
|
||||
follower_regions: Vec<RegionNumber>,
|
||||
},
|
||||
LeaderOnly(Vec<RegionNumber>),
|
||||
}
|
||||
match RegionRoleSetOrLeaderOnly::deserialize(deserializer)? {
|
||||
RegionRoleSetOrLeaderOnly::Full {
|
||||
leader_regions,
|
||||
follower_regions,
|
||||
} => Ok(RegionRoleSet::new(leader_regions, follower_regions)),
|
||||
RegionRoleSetOrLeaderOnly::LeaderOnly(leader_regions) => {
|
||||
Ok(RegionRoleSet::new(leader_regions, vec![]))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RegionRoleSet {
|
||||
/// Create a new region role set.
|
||||
pub fn new(leader_regions: Vec<RegionNumber>, follower_regions: Vec<RegionNumber>) -> Self {
|
||||
Self {
|
||||
leader_regions,
|
||||
follower_regions,
|
||||
}
|
||||
}
|
||||
|
||||
/// Add a leader region to the set.
|
||||
pub fn add_leader_region(&mut self, region_number: RegionNumber) {
|
||||
self.leader_regions.push(region_number);
|
||||
}
|
||||
|
||||
/// Add a follower region to the set.
|
||||
pub fn add_follower_region(&mut self, region_number: RegionNumber) {
|
||||
self.follower_regions.push(region_number);
|
||||
}
|
||||
|
||||
/// Sort the regions.
|
||||
pub fn sort(&mut self) {
|
||||
self.follower_regions.sort();
|
||||
self.leader_regions.sort();
|
||||
}
|
||||
}
|
||||
|
||||
/// The distribution of regions.
|
||||
///
|
||||
/// The key is the datanode id, the value is the region role set.
|
||||
pub type RegionDistribution = BTreeMap<DatanodeId, RegionRoleSet>;
|
||||
|
||||
/// The id of flow.
|
||||
pub type FlowId = u32;
|
||||
@@ -1368,7 +1432,8 @@ mod tests {
|
||||
use crate::key::table_name::TableNameKey;
|
||||
use crate::key::table_route::TableRouteValue;
|
||||
use crate::key::{
|
||||
DeserializedValueWithBytes, TableMetadataManager, ViewInfoValue, TOPIC_REGION_PREFIX,
|
||||
DeserializedValueWithBytes, RegionDistribution, RegionRoleSet, TableMetadataManager,
|
||||
ViewInfoValue, TOPIC_REGION_PREFIX,
|
||||
};
|
||||
use crate::kv_backend::memory::MemoryKvBackend;
|
||||
use crate::kv_backend::KvBackend;
|
||||
@@ -1995,7 +2060,8 @@ mod tests {
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(got.regions, regions)
|
||||
assert_eq!(got.regions, regions.leader_regions);
|
||||
assert_eq!(got.follower_regions, regions.follower_regions);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2412,4 +2478,28 @@ mod tests {
|
||||
assert_eq!(current_view_info.columns, new_columns);
|
||||
assert_eq!(current_view_info.plan_columns, new_plan_columns);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_region_role_set_deserialize() {
|
||||
let s = r#"{"leader_regions": [1, 2, 3], "follower_regions": [4, 5, 6]}"#;
|
||||
let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
|
||||
assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
|
||||
assert_eq!(region_role_set.follower_regions, vec![4, 5, 6]);
|
||||
|
||||
let s = r#"[1, 2, 3]"#;
|
||||
let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
|
||||
assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
|
||||
assert!(region_role_set.follower_regions.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_region_distribution_deserialize() {
|
||||
let s = r#"{"1": [1,2,3], "2": {"leader_regions": [7, 8, 9], "follower_regions": [10, 11, 12]}}"#;
|
||||
let region_distribution: RegionDistribution = serde_json::from_str(s).unwrap();
|
||||
assert_eq!(region_distribution.len(), 2);
|
||||
assert_eq!(region_distribution[&1].leader_regions, vec![1, 2, 3]);
|
||||
assert!(region_distribution[&1].follower_regions.is_empty());
|
||||
assert_eq!(region_distribution[&2].leader_regions, vec![7, 8, 9]);
|
||||
assert_eq!(region_distribution[&2].follower_regions, vec![10, 11, 12]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@ use table::metadata::TableId;
|
||||
use crate::error::{DatanodeTableInfoNotFoundSnafu, InvalidMetadataSnafu, Result};
|
||||
use crate::key::table_route::PhysicalTableRouteValue;
|
||||
use crate::key::{
|
||||
MetadataKey, MetadataValue, RegionDistribution, DATANODE_TABLE_KEY_PATTERN,
|
||||
MetadataKey, MetadataValue, RegionDistribution, RegionRoleSet, DATANODE_TABLE_KEY_PATTERN,
|
||||
DATANODE_TABLE_KEY_PREFIX,
|
||||
};
|
||||
use crate::kv_backend::txn::{Txn, TxnOp};
|
||||
@@ -118,23 +118,31 @@ impl Display for DatanodeTableKey {
|
||||
pub struct DatanodeTableValue {
|
||||
pub table_id: TableId,
|
||||
pub regions: Vec<RegionNumber>,
|
||||
#[serde(default)]
|
||||
pub follower_regions: Vec<RegionNumber>,
|
||||
#[serde(flatten)]
|
||||
pub region_info: RegionInfo,
|
||||
version: u64,
|
||||
}
|
||||
|
||||
impl DatanodeTableValue {
|
||||
pub fn new(table_id: TableId, regions: Vec<RegionNumber>, region_info: RegionInfo) -> Self {
|
||||
pub fn new(table_id: TableId, region_role_set: RegionRoleSet, region_info: RegionInfo) -> Self {
|
||||
let RegionRoleSet {
|
||||
leader_regions,
|
||||
follower_regions,
|
||||
} = region_role_set;
|
||||
|
||||
Self {
|
||||
table_id,
|
||||
regions,
|
||||
regions: leader_regions,
|
||||
follower_regions,
|
||||
region_info,
|
||||
version: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Decodes `KeyValue` to ((),`DatanodeTableValue`)
|
||||
/// Decodes [`KeyValue`] to [`DatanodeTableValue`].
|
||||
pub fn datanode_table_value_decoder(kv: KeyValue) -> Result<DatanodeTableValue> {
|
||||
DatanodeTableValue::try_from_raw_value(&kv.value)
|
||||
}
|
||||
@@ -373,10 +381,11 @@ mod tests {
|
||||
let value = DatanodeTableValue {
|
||||
table_id: 42,
|
||||
regions: vec![1, 2, 3],
|
||||
follower_regions: vec![],
|
||||
region_info: RegionInfo::default(),
|
||||
version: 1,
|
||||
};
|
||||
let literal = br#"{"table_id":42,"regions":[1,2,3],"engine":"","region_storage_path":"","region_options":{},"region_wal_options":{},"version":1}"#;
|
||||
let literal = br#"{"table_id":42,"regions":[1,2,3],"follower_regions":[],"engine":"","region_storage_path":"","region_options":{},"region_wal_options":{},"version":1}"#;
|
||||
|
||||
let raw_value = value.try_as_raw_value().unwrap();
|
||||
assert_eq!(raw_value, literal);
|
||||
@@ -467,6 +476,7 @@ mod tests {
|
||||
let table_value = DatanodeTableValue {
|
||||
table_id: 1,
|
||||
regions: vec![],
|
||||
follower_regions: vec![],
|
||||
region_info,
|
||||
version: 1,
|
||||
};
|
||||
|
||||
@@ -40,17 +40,23 @@ pub fn region_distribution(region_routes: &[RegionRoute]) -> RegionDistribution
|
||||
let mut regions_id_map = RegionDistribution::new();
|
||||
for route in region_routes.iter() {
|
||||
if let Some(peer) = route.leader_peer.as_ref() {
|
||||
let region_id = route.region.id.region_number();
|
||||
regions_id_map.entry(peer.id).or_default().push(region_id);
|
||||
let region_number = route.region.id.region_number();
|
||||
regions_id_map
|
||||
.entry(peer.id)
|
||||
.or_default()
|
||||
.add_leader_region(region_number);
|
||||
}
|
||||
for peer in route.follower_peers.iter() {
|
||||
let region_id = route.region.id.region_number();
|
||||
regions_id_map.entry(peer.id).or_default().push(region_id);
|
||||
let region_number = route.region.id.region_number();
|
||||
regions_id_map
|
||||
.entry(peer.id)
|
||||
.or_default()
|
||||
.add_follower_region(region_number);
|
||||
}
|
||||
}
|
||||
for (_, regions) in regions_id_map.iter_mut() {
|
||||
// id asc
|
||||
regions.sort()
|
||||
for (_, region_role_set) in regions_id_map.iter_mut() {
|
||||
// Sort the regions in ascending order.
|
||||
region_role_set.sort()
|
||||
}
|
||||
regions_id_map
|
||||
}
|
||||
@@ -455,6 +461,7 @@ impl From<PbPartition> for Partition {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::key::RegionRoleSet;
|
||||
|
||||
#[test]
|
||||
fn test_leader_is_downgraded() {
|
||||
@@ -611,8 +618,8 @@ mod tests {
|
||||
|
||||
let distribution = region_distribution(®ion_routes);
|
||||
assert_eq!(distribution.len(), 3);
|
||||
assert_eq!(distribution[&1], vec![1, 2]);
|
||||
assert_eq!(distribution[&2], vec![1, 2]);
|
||||
assert_eq!(distribution[&3], vec![1, 2]);
|
||||
assert_eq!(distribution[&1], RegionRoleSet::new(vec![1], vec![2]));
|
||||
assert_eq!(distribution[&2], RegionRoleSet::new(vec![2], vec![1]));
|
||||
assert_eq!(distribution[&3], RegionRoleSet::new(vec![], vec![1, 2]));
|
||||
}
|
||||
}
|
||||
|
||||
11
src/common/stat/Cargo.toml
Normal file
11
src/common/stat/Cargo.toml
Normal file
@@ -0,0 +1,11 @@
|
||||
[package]
|
||||
name = "stat"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
nix.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
183
src/common/stat/src/cgroups.rs
Normal file
183
src/common/stat/src/cgroups.rs
Normal file
@@ -0,0 +1,183 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#![allow(dead_code)]
|
||||
|
||||
use std::fs::read_to_string;
|
||||
use std::path::Path;
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
use nix::sys::{statfs, statfs::statfs};
|
||||
|
||||
/// `MAX_VALUE` is used to indicate that the resource is unlimited.
|
||||
pub const MAX_VALUE: i64 = -1;
|
||||
|
||||
const CGROUP_UNIFIED_MOUNTPOINT: &str = "/sys/fs/cgroup";
|
||||
|
||||
const MEMORY_MAX_FILE_CGROUP_V2: &str = "memory.max";
|
||||
const MEMORY_MAX_FILE_CGROUP_V1: &str = "memory.limit_in_bytes";
|
||||
const CPU_MAX_FILE_CGROUP_V2: &str = "cpu.max";
|
||||
const CPU_QUOTA_FILE_CGROUP_V1: &str = "cpu.cfs_quota_us";
|
||||
const CPU_PERIOD_FILE_CGROUP_V1: &str = "cpu.cfs_period_us";
|
||||
|
||||
// `MAX_VALUE_CGROUP_V2` string in `/sys/fs/cgroup/cpu.max` and `/sys/fs/cgroup/memory.max` to indicate that the resource is unlimited.
|
||||
const MAX_VALUE_CGROUP_V2: &str = "max";
|
||||
|
||||
// For cgroup v1, if the memory is unlimited, it will return a very large value(different from platform) that close to 2^63.
|
||||
// For easier comparison, if the memory limit is larger than 1PB we consider it as unlimited.
|
||||
const MAX_MEMORY_IN_BYTES: i64 = 1125899906842624; // 1PB
|
||||
|
||||
/// Get the limit of memory in bytes.
|
||||
///
|
||||
/// - If the memory is unlimited, return `-1`.
|
||||
/// - Return `None` if it fails to read the memory limit or not on linux.
|
||||
pub fn get_memory_limit() -> Option<i64> {
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
let memory_max_file = if is_cgroup_v2()? {
|
||||
// Read `/sys/fs/cgroup/memory.max` to get the memory limit.
|
||||
MEMORY_MAX_FILE_CGROUP_V2
|
||||
} else {
|
||||
// Read `/sys/fs/cgroup/memory.limit_in_bytes` to get the memory limit.
|
||||
MEMORY_MAX_FILE_CGROUP_V1
|
||||
};
|
||||
|
||||
// For cgroup v1, it will return a very large value(different from platform) if the memory is unlimited.
|
||||
let memory_limit =
|
||||
read_value_from_file(Path::new(CGROUP_UNIFIED_MOUNTPOINT).join(memory_max_file))?;
|
||||
|
||||
// If memory limit exceeds 1PB(cgroup v1), consider it as unlimited.
|
||||
if memory_limit > MAX_MEMORY_IN_BYTES {
|
||||
return Some(MAX_VALUE);
|
||||
}
|
||||
Some(memory_limit)
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
None
|
||||
}
|
||||
|
||||
/// Get the limit of cpu in millicores.
|
||||
///
|
||||
/// - If the cpu is unlimited, return `-1`.
|
||||
/// - Return `None` if it fails to read the cpu limit or not on linux.
|
||||
pub fn get_cpu_limit() -> Option<i64> {
|
||||
#[cfg(target_os = "linux")]
|
||||
if is_cgroup_v2()? {
|
||||
// Read `/sys/fs/cgroup/cpu.max` to get the cpu limit.
|
||||
get_cgroup_v2_cpu_limit(Path::new(CGROUP_UNIFIED_MOUNTPOINT).join(CPU_MAX_FILE_CGROUP_V2))
|
||||
} else {
|
||||
// Read `/sys/fs/cgroup/cpu.cfs_quota_us` and `/sys/fs/cgroup/cpu.cfs_period_us` to get the cpu limit.
|
||||
let quota = read_value_from_file(
|
||||
Path::new(CGROUP_UNIFIED_MOUNTPOINT).join(CPU_QUOTA_FILE_CGROUP_V1),
|
||||
)?;
|
||||
|
||||
if quota == MAX_VALUE {
|
||||
return Some(MAX_VALUE);
|
||||
}
|
||||
|
||||
let period = read_value_from_file(
|
||||
Path::new(CGROUP_UNIFIED_MOUNTPOINT).join(CPU_PERIOD_FILE_CGROUP_V1),
|
||||
)?;
|
||||
|
||||
// Return the cpu limit in millicores.
|
||||
Some(quota * 1000 / period)
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
None
|
||||
}
|
||||
|
||||
// Check whether the cgroup is v2.
|
||||
// - Return `true` if the cgroup is v2, otherwise return `false`.
|
||||
// - Return `None` if the detection fails or not on linux.
|
||||
fn is_cgroup_v2() -> Option<bool> {
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
let path = Path::new(CGROUP_UNIFIED_MOUNTPOINT);
|
||||
let fs_stat = statfs(path).ok()?;
|
||||
Some(fs_stat.filesystem_type() == statfs::CGROUP2_SUPER_MAGIC)
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
None
|
||||
}
|
||||
|
||||
fn read_value_from_file<P: AsRef<Path>>(path: P) -> Option<i64> {
|
||||
let content = read_to_string(&path).ok()?;
|
||||
|
||||
// If the content starts with "max", return `MAX_VALUE`.
|
||||
if content.starts_with(MAX_VALUE_CGROUP_V2) {
|
||||
return Some(MAX_VALUE);
|
||||
}
|
||||
|
||||
content.trim().parse::<i64>().ok()
|
||||
}
|
||||
|
||||
fn get_cgroup_v2_cpu_limit<P: AsRef<Path>>(path: P) -> Option<i64> {
|
||||
let content = read_to_string(&path).ok()?;
|
||||
|
||||
let fields = content.trim().split(' ').collect::<Vec<&str>>();
|
||||
if fields.len() != 2 {
|
||||
return None;
|
||||
}
|
||||
|
||||
// If the cpu is unlimited, it will be `-1`.
|
||||
let quota = fields[0].trim();
|
||||
if quota == MAX_VALUE_CGROUP_V2 {
|
||||
return Some(MAX_VALUE);
|
||||
}
|
||||
|
||||
let quota = quota.parse::<i64>().ok()?;
|
||||
|
||||
let period = fields[1].trim();
|
||||
let period = period.parse::<i64>().ok()?;
|
||||
|
||||
// Return the cpu limit in millicores.
|
||||
Some(quota * 1000 / period)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_read_value_from_file() {
|
||||
assert_eq!(
|
||||
read_value_from_file(Path::new("testdata").join("memory.max")).unwrap(),
|
||||
100000
|
||||
);
|
||||
assert_eq!(
|
||||
read_value_from_file(Path::new("testdata").join("memory.max.unlimited")).unwrap(),
|
||||
MAX_VALUE
|
||||
);
|
||||
assert_eq!(read_value_from_file(Path::new("non_existent_file")), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_cgroup_v2_cpu_limit() {
|
||||
assert_eq!(
|
||||
get_cgroup_v2_cpu_limit(Path::new("testdata").join("cpu.max")).unwrap(),
|
||||
1500
|
||||
);
|
||||
assert_eq!(
|
||||
get_cgroup_v2_cpu_limit(Path::new("testdata").join("cpu.max.unlimited")).unwrap(),
|
||||
MAX_VALUE
|
||||
);
|
||||
assert_eq!(
|
||||
get_cgroup_v2_cpu_limit(Path::new("non_existent_file")),
|
||||
None
|
||||
);
|
||||
}
|
||||
}
|
||||
17
src/common/stat/src/lib.rs
Normal file
17
src/common/stat/src/lib.rs
Normal file
@@ -0,0 +1,17 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod cgroups;
|
||||
|
||||
pub use cgroups::*;
|
||||
1
src/common/stat/testdata/cpu.max
vendored
Normal file
1
src/common/stat/testdata/cpu.max
vendored
Normal file
@@ -0,0 +1 @@
|
||||
150000 100000
|
||||
1
src/common/stat/testdata/cpu.max.unlimited
vendored
Normal file
1
src/common/stat/testdata/cpu.max.unlimited
vendored
Normal file
@@ -0,0 +1 @@
|
||||
max 100000
|
||||
1
src/common/stat/testdata/memory.max
vendored
Normal file
1
src/common/stat/testdata/memory.max
vendored
Normal file
@@ -0,0 +1 @@
|
||||
100000
|
||||
1
src/common/stat/testdata/memory.max.unlimited
vendored
Normal file
1
src/common/stat/testdata/memory.max.unlimited
vendored
Normal file
@@ -0,0 +1 @@
|
||||
max
|
||||
@@ -37,6 +37,9 @@ use crate::tracing_sampler::{create_sampler, TracingSampleOptions};
|
||||
|
||||
pub const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
|
||||
|
||||
/// The default logs directory.
|
||||
pub const DEFAULT_LOGGING_DIR: &str = "logs";
|
||||
|
||||
// Handle for reloading log level
|
||||
pub static RELOAD_HANDLE: OnceCell<tracing_subscriber::reload::Handle<Targets, Registry>> =
|
||||
OnceCell::new();
|
||||
@@ -133,7 +136,8 @@ impl Eq for LoggingOptions {}
|
||||
impl Default for LoggingOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
dir: "./greptimedb_data/logs".to_string(),
|
||||
// The directory path will be configured at application startup, typically using the data home directory as a base.
|
||||
dir: "".to_string(),
|
||||
level: None,
|
||||
log_format: LogFormat::Text,
|
||||
enable_otlp_tracing: false,
|
||||
|
||||
@@ -6,6 +6,7 @@ license.workspace = true
|
||||
|
||||
[features]
|
||||
testing = []
|
||||
enterprise = []
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
@@ -18,7 +18,7 @@ use core::time::Duration;
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_base::secrets::{ExposeSecret, SecretString};
|
||||
use common_config::Configurable;
|
||||
use common_config::{Configurable, DEFAULT_DATA_HOME};
|
||||
pub use common_procedure::options::ProcedureConfig;
|
||||
use common_telemetry::logging::{LoggingOptions, TracingOptions};
|
||||
use common_wal::config::DatanodeWalConfig;
|
||||
@@ -36,9 +36,6 @@ use servers::http::HttpOptions;
|
||||
|
||||
pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
|
||||
|
||||
/// Default data home in file storage
|
||||
const DEFAULT_DATA_HOME: &str = "./greptimedb_data";
|
||||
|
||||
/// Object storage config
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(tag = "type")]
|
||||
|
||||
@@ -559,6 +559,8 @@ async fn open_all_regions(
|
||||
init_regions_parallelism: usize,
|
||||
) -> Result<()> {
|
||||
let mut regions = vec![];
|
||||
#[cfg(feature = "enterprise")]
|
||||
let mut follower_regions = vec![];
|
||||
for table_value in table_values {
|
||||
for region_number in table_value.regions {
|
||||
// Augments region options with wal options if a wal options is provided.
|
||||
@@ -576,6 +578,24 @@ async fn open_all_regions(
|
||||
region_options,
|
||||
));
|
||||
}
|
||||
|
||||
#[cfg(feature = "enterprise")]
|
||||
for region_number in table_value.follower_regions {
|
||||
// Augments region options with wal options if a wal options is provided.
|
||||
let mut region_options = table_value.region_info.region_options.clone();
|
||||
prepare_wal_options(
|
||||
&mut region_options,
|
||||
RegionId::new(table_value.table_id, region_number),
|
||||
&table_value.region_info.region_wal_options,
|
||||
);
|
||||
|
||||
follower_regions.push((
|
||||
RegionId::new(table_value.table_id, region_number),
|
||||
table_value.region_info.engine.clone(),
|
||||
table_value.region_info.region_storage_path.clone(),
|
||||
region_options,
|
||||
));
|
||||
}
|
||||
}
|
||||
let num_regions = regions.len();
|
||||
info!("going to open {} region(s)", num_regions);
|
||||
@@ -617,6 +637,43 @@ async fn open_all_regions(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "enterprise")]
|
||||
if !follower_regions.is_empty() {
|
||||
info!(
|
||||
"going to open {} follower region(s)",
|
||||
follower_regions.len()
|
||||
);
|
||||
let mut region_requests = Vec::with_capacity(follower_regions.len());
|
||||
for (region_id, engine, store_path, options) in follower_regions {
|
||||
let region_dir = region_dir(&store_path, region_id);
|
||||
region_requests.push((
|
||||
region_id,
|
||||
RegionOpenRequest {
|
||||
engine,
|
||||
region_dir,
|
||||
options,
|
||||
skip_wal_replay: true,
|
||||
},
|
||||
));
|
||||
}
|
||||
|
||||
let open_regions = region_server
|
||||
.handle_batch_open_requests(init_regions_parallelism, region_requests)
|
||||
.await?;
|
||||
|
||||
ensure!(
|
||||
open_regions.len() == num_regions,
|
||||
error::UnexpectedSnafu {
|
||||
violated: format!(
|
||||
"Expected to open {} of follower regions, only {} of regions has opened",
|
||||
num_regions,
|
||||
open_regions.len()
|
||||
)
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
info!("all regions are opened");
|
||||
|
||||
Ok(())
|
||||
@@ -632,6 +689,7 @@ mod tests {
|
||||
use common_base::Plugins;
|
||||
use common_meta::cache::LayeredCacheRegistryBuilder;
|
||||
use common_meta::key::datanode_table::DatanodeTableManager;
|
||||
use common_meta::key::RegionRoleSet;
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use mito2::engine::MITO_ENGINE_NAME;
|
||||
@@ -651,7 +709,7 @@ mod tests {
|
||||
"foo/bar/weny",
|
||||
HashMap::from([("foo".to_string(), "bar".to_string())]),
|
||||
HashMap::default(),
|
||||
BTreeMap::from([(0, vec![0, 1, 2])]),
|
||||
BTreeMap::from([(0, RegionRoleSet::new(vec![0, 1, 2], vec![]))]),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
||||
@@ -61,6 +61,7 @@ prost.workspace = true
|
||||
query.workspace = true
|
||||
rand.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
servers.workspace = true
|
||||
session.workspace = true
|
||||
smallvec.workspace = true
|
||||
|
||||
@@ -18,7 +18,7 @@ use std::sync::atomic::AtomicBool;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::flow::{
|
||||
flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
|
||||
flow_request, AdjustFlow, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
|
||||
};
|
||||
use api::v1::region::InsertRequests;
|
||||
use catalog::CatalogManager;
|
||||
@@ -32,6 +32,7 @@ use common_telemetry::{error, info, trace, warn};
|
||||
use datatypes::value::Value;
|
||||
use futures::TryStreamExt;
|
||||
use itertools::Itertools;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use session::context::QueryContextBuilder;
|
||||
use snafu::{ensure, IntoError, OptionExt, ResultExt};
|
||||
use store_api::storage::{RegionId, TableId};
|
||||
@@ -809,6 +810,25 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
Some(flow_request::Body::Adjust(AdjustFlow { flow_id, options })) => {
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct Options {
|
||||
min_run_interval_secs: u64,
|
||||
max_filter_num_per_query: usize,
|
||||
}
|
||||
let options: Options = serde_json::from_str(&options).with_context(|_| {
|
||||
common_meta::error::DeserializeFromJsonSnafu { input: options }
|
||||
})?;
|
||||
self.batching_engine
|
||||
.adjust_flow(
|
||||
flow_id.unwrap().id as u64,
|
||||
options.min_run_interval_secs,
|
||||
options.max_filter_num_per_query,
|
||||
)
|
||||
.await
|
||||
.map_err(to_meta_err(snafu::location!()))?;
|
||||
Ok(Default::default())
|
||||
}
|
||||
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
|
||||
}
|
||||
}
|
||||
@@ -841,93 +861,6 @@ fn to_meta_err(
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl common_meta::node_manager::Flownode for StreamingEngine {
|
||||
async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
|
||||
let query_ctx = request
|
||||
.header
|
||||
.and_then(|h| h.query_context)
|
||||
.map(|ctx| ctx.into());
|
||||
match request.body {
|
||||
Some(flow_request::Body::Create(CreateRequest {
|
||||
flow_id: Some(task_id),
|
||||
source_table_ids,
|
||||
sink_table_name: Some(sink_table_name),
|
||||
create_if_not_exists,
|
||||
expire_after,
|
||||
comment,
|
||||
sql,
|
||||
flow_options,
|
||||
or_replace,
|
||||
})) => {
|
||||
let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
|
||||
let sink_table_name = [
|
||||
sink_table_name.catalog_name,
|
||||
sink_table_name.schema_name,
|
||||
sink_table_name.table_name,
|
||||
];
|
||||
let expire_after = expire_after.map(|e| e.value);
|
||||
let args = CreateFlowArgs {
|
||||
flow_id: task_id.id as u64,
|
||||
sink_table_name,
|
||||
source_table_ids,
|
||||
create_if_not_exists,
|
||||
or_replace,
|
||||
expire_after,
|
||||
comment: Some(comment),
|
||||
sql: sql.clone(),
|
||||
flow_options,
|
||||
query_ctx,
|
||||
};
|
||||
let ret = self
|
||||
.create_flow(args)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.with_context(|_| CreateFlowSnafu { sql: sql.clone() })
|
||||
.map_err(to_meta_err(snafu::location!()))?;
|
||||
METRIC_FLOW_TASK_COUNT.inc();
|
||||
Ok(FlowResponse {
|
||||
affected_flows: ret
|
||||
.map(|id| greptime_proto::v1::FlowId { id: id as u32 })
|
||||
.into_iter()
|
||||
.collect_vec(),
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
Some(flow_request::Body::Drop(DropRequest {
|
||||
flow_id: Some(flow_id),
|
||||
})) => {
|
||||
self.remove_flow(flow_id.id as u64)
|
||||
.await
|
||||
.map_err(to_meta_err(snafu::location!()))?;
|
||||
METRIC_FLOW_TASK_COUNT.dec();
|
||||
Ok(Default::default())
|
||||
}
|
||||
Some(flow_request::Body::Flush(FlushFlow {
|
||||
flow_id: Some(flow_id),
|
||||
})) => {
|
||||
let row = self
|
||||
.flush_flow_inner(flow_id.id as u64)
|
||||
.await
|
||||
.map_err(to_meta_err(snafu::location!()))?;
|
||||
Ok(FlowResponse {
|
||||
affected_flows: vec![flow_id],
|
||||
affected_rows: row as u64,
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_inserts(&self, request: InsertRequests) -> MetaResult<FlowResponse> {
|
||||
self.handle_inserts_inner(request)
|
||||
.await
|
||||
.map(|_| Default::default())
|
||||
.map_err(to_meta_err(snafu::location!()))
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowEngine for StreamingEngine {
|
||||
async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
|
||||
self.create_flow_inner(args).await
|
||||
|
||||
@@ -388,6 +388,20 @@ impl BatchingEngine {
|
||||
pub async fn flow_exist_inner(&self, flow_id: FlowId) -> bool {
|
||||
self.tasks.read().await.contains_key(&flow_id)
|
||||
}
|
||||
|
||||
pub async fn adjust_flow(
|
||||
&self,
|
||||
flow_id: FlowId,
|
||||
min_run_interval_secs: u64,
|
||||
max_filter_num_per_query: usize,
|
||||
) -> Result<(), Error> {
|
||||
let task = self.tasks.read().await.get(&flow_id).cloned();
|
||||
let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?;
|
||||
debug!("Adjusting flow {flow_id} with min_run_interval_secs={} and max_filter_num_per_query={}", min_run_interval_secs, max_filter_num_per_query);
|
||||
task.adjust(min_run_interval_secs, max_filter_num_per_query);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowEngine for BatchingEngine {
|
||||
|
||||
@@ -14,8 +14,9 @@
|
||||
|
||||
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
|
||||
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::SystemTime;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex, Weak};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use api::v1::greptime_request::Request;
|
||||
use api::v1::CreateTableExpr;
|
||||
@@ -26,20 +27,21 @@ use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::rpc::store::RangeRequest;
|
||||
use common_query::Output;
|
||||
use common_telemetry::warn;
|
||||
use common_telemetry::{debug, warn};
|
||||
use itertools::Itertools;
|
||||
use meta_client::client::MetaClient;
|
||||
use rand::rng;
|
||||
use rand::seq::SliceRandom;
|
||||
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::batching_mode::task::BatchingTask;
|
||||
use crate::batching_mode::{
|
||||
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT,
|
||||
GRPC_MAX_RETRIES,
|
||||
};
|
||||
use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
|
||||
use crate::{Error, FlowAuthHeader};
|
||||
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD;
|
||||
use crate::{Error, FlowAuthHeader, FlowId};
|
||||
|
||||
/// Just like [`GrpcQueryHandler`] but use BoxedError
|
||||
///
|
||||
@@ -74,6 +76,105 @@ impl<
|
||||
|
||||
type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
|
||||
|
||||
/// Statistics about running query on this frontend from flownode
|
||||
#[derive(Debug, Default, Clone)]
|
||||
struct FrontendStat {
|
||||
/// The query for flow id has been running since this timestamp
|
||||
since: HashMap<FlowId, Instant>,
|
||||
/// The average query time for each flow id
|
||||
/// This is used to calculate the average query time for each flow id
|
||||
past_query_avg: HashMap<FlowId, (usize, Duration)>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct FrontendStats {
|
||||
/// The statistics for each flow id
|
||||
stats: Arc<Mutex<HashMap<String, FrontendStat>>>,
|
||||
}
|
||||
|
||||
impl FrontendStats {
|
||||
pub fn observe(&self, frontend_addr: &str, flow_id: FlowId) -> FrontendStatsGuard {
|
||||
let mut stats = self.stats.lock().expect("Failed to lock frontend stats");
|
||||
let stat = stats.entry(frontend_addr.to_string()).or_default();
|
||||
stat.since.insert(flow_id, Instant::now());
|
||||
|
||||
FrontendStatsGuard {
|
||||
stats: self.stats.clone(),
|
||||
frontend_addr: frontend_addr.to_string(),
|
||||
cur: flow_id,
|
||||
}
|
||||
}
|
||||
|
||||
/// return frontend addrs sorted by load, from lightest to heaviest
|
||||
/// The load is calculated as the total average query time for each flow id plus running query's total running time elapsed
|
||||
pub fn sort_by_load(&self) -> Vec<String> {
|
||||
let stats = self.stats.lock().expect("Failed to lock frontend stats");
|
||||
let fe_load_factor = stats
|
||||
.iter()
|
||||
.map(|(node_addr, stat)| {
|
||||
// total expected avg running time for all currently running queries
|
||||
let total_expect_avg_run_time = stat
|
||||
.since
|
||||
.keys()
|
||||
.map(|f| {
|
||||
let (count, total_duration) =
|
||||
stat.past_query_avg.get(f).unwrap_or(&(0, Duration::ZERO));
|
||||
if *count == 0 {
|
||||
0.0
|
||||
} else {
|
||||
total_duration.as_secs_f64() / *count as f64
|
||||
}
|
||||
})
|
||||
.sum::<f64>();
|
||||
let total_cur_running_time = stat
|
||||
.since
|
||||
.values()
|
||||
.map(|since| since.elapsed().as_secs_f64())
|
||||
.sum::<f64>();
|
||||
(
|
||||
node_addr.to_string(),
|
||||
total_expect_avg_run_time + total_cur_running_time,
|
||||
)
|
||||
})
|
||||
.sorted_by(|(_, load_a), (_, load_b)| {
|
||||
load_a
|
||||
.partial_cmp(load_b)
|
||||
.unwrap_or(std::cmp::Ordering::Equal)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
debug!("Frontend load factor: {:?}", fe_load_factor);
|
||||
for (node_addr, load) in &fe_load_factor {
|
||||
METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD
|
||||
.with_label_values(&[&node_addr.to_string()])
|
||||
.observe(*load);
|
||||
}
|
||||
fe_load_factor
|
||||
.into_iter()
|
||||
.map(|(addr, _)| addr)
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FrontendStatsGuard {
|
||||
stats: Arc<Mutex<HashMap<String, FrontendStat>>>,
|
||||
frontend_addr: String,
|
||||
cur: FlowId,
|
||||
}
|
||||
|
||||
impl Drop for FrontendStatsGuard {
|
||||
fn drop(&mut self) {
|
||||
let mut stats = self.stats.lock().expect("Failed to lock frontend stats");
|
||||
if let Some(stat) = stats.get_mut(&self.frontend_addr) {
|
||||
if let Some(since) = stat.since.remove(&self.cur) {
|
||||
let elapsed = since.elapsed();
|
||||
let (count, total_duration) = stat.past_query_avg.entry(self.cur).or_default();
|
||||
*count += 1;
|
||||
*total_duration += elapsed;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A simple frontend client able to execute sql using grpc protocol
|
||||
///
|
||||
/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
|
||||
@@ -83,6 +184,7 @@ pub enum FrontendClient {
|
||||
meta_client: Arc<MetaClient>,
|
||||
chnl_mgr: ChannelManager,
|
||||
auth: Option<FlowAuthHeader>,
|
||||
fe_stats: FrontendStats,
|
||||
},
|
||||
Standalone {
|
||||
/// for the sake of simplicity still use grpc even in standalone mode
|
||||
@@ -114,6 +216,7 @@ impl FrontendClient {
|
||||
ChannelManager::with_config(cfg)
|
||||
},
|
||||
auth,
|
||||
fe_stats: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -183,7 +286,7 @@ impl FrontendClient {
|
||||
|
||||
/// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts`
|
||||
/// and is able to process query
|
||||
async fn get_random_active_frontend(
|
||||
pub(crate) async fn get_random_active_frontend(
|
||||
&self,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
@@ -192,6 +295,7 @@ impl FrontendClient {
|
||||
meta_client: _,
|
||||
chnl_mgr,
|
||||
auth,
|
||||
fe_stats,
|
||||
} = self
|
||||
else {
|
||||
return UnexpectedSnafu {
|
||||
@@ -208,8 +312,21 @@ impl FrontendClient {
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis() as i64;
|
||||
// shuffle the frontends to avoid always pick the same one
|
||||
frontends.shuffle(&mut rng());
|
||||
let node_addrs_by_load = fe_stats.sort_by_load();
|
||||
// index+1 to load order asc, so that the lightest node has load 1 and non-existent node has load 0
|
||||
let addr2load = node_addrs_by_load
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, id)| (id.clone(), i + 1))
|
||||
.collect::<HashMap<_, _>>();
|
||||
// sort frontends by load, from lightest to heaviest
|
||||
frontends.sort_by(|(_, a), (_, b)| {
|
||||
// if not even in stats, treat as 0 load since never been queried
|
||||
let load_a = addr2load.get(&a.peer.addr).unwrap_or(&0);
|
||||
let load_b = addr2load.get(&b.peer.addr).unwrap_or(&0);
|
||||
load_a.cmp(load_b)
|
||||
});
|
||||
debug!("Frontend nodes sorted by load: {:?}", frontends);
|
||||
|
||||
// found node with maximum last_activity_ts
|
||||
for (_, node_info) in frontends
|
||||
@@ -257,6 +374,7 @@ impl FrontendClient {
|
||||
create: CreateTableExpr,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
task: Option<&BatchingTask>,
|
||||
) -> Result<u32, Error> {
|
||||
self.handle(
|
||||
Request::Ddl(api::v1::DdlRequest {
|
||||
@@ -264,7 +382,8 @@ impl FrontendClient {
|
||||
}),
|
||||
catalog,
|
||||
schema,
|
||||
&mut None,
|
||||
None,
|
||||
task,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -275,15 +394,31 @@ impl FrontendClient {
|
||||
req: api::v1::greptime_request::Request,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
peer_desc: &mut Option<PeerDesc>,
|
||||
use_peer: Option<Peer>,
|
||||
task: Option<&BatchingTask>,
|
||||
) -> Result<u32, Error> {
|
||||
match self {
|
||||
FrontendClient::Distributed { .. } => {
|
||||
let db = self.get_random_active_frontend(catalog, schema).await?;
|
||||
FrontendClient::Distributed {
|
||||
fe_stats, chnl_mgr, ..
|
||||
} => {
|
||||
let db = if let Some(peer) = use_peer {
|
||||
DatabaseWithPeer::new(
|
||||
Database::new(
|
||||
catalog,
|
||||
schema,
|
||||
Client::with_manager_and_urls(
|
||||
chnl_mgr.clone(),
|
||||
vec![peer.addr.clone()],
|
||||
),
|
||||
),
|
||||
peer,
|
||||
)
|
||||
} else {
|
||||
self.get_random_active_frontend(catalog, schema).await?
|
||||
};
|
||||
|
||||
*peer_desc = Some(PeerDesc::Dist {
|
||||
peer: db.peer.clone(),
|
||||
});
|
||||
let flow_id = task.map(|t| t.config.flow_id).unwrap_or_default();
|
||||
let _guard = fe_stats.observe(&db.peer.addr, flow_id);
|
||||
|
||||
db.database
|
||||
.handle_with_retry(req.clone(), GRPC_MAX_RETRIES)
|
||||
|
||||
@@ -32,6 +32,7 @@ use crate::batching_mode::MIN_REFRESH_DURATION;
|
||||
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
|
||||
use crate::metrics::{
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
|
||||
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT,
|
||||
};
|
||||
use crate::{Error, FlowId};
|
||||
|
||||
@@ -52,6 +53,13 @@ pub struct TaskState {
|
||||
pub(crate) shutdown_rx: oneshot::Receiver<()>,
|
||||
/// Task handle
|
||||
pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
|
||||
/// Slow Query metrics update task handle
|
||||
pub(crate) slow_query_metric_task: Option<tokio::task::JoinHandle<()>>,
|
||||
|
||||
/// min run interval in seconds
|
||||
pub(crate) min_run_interval: Option<u64>,
|
||||
/// max filter number per query
|
||||
pub(crate) max_filter_num: Option<usize>,
|
||||
}
|
||||
impl TaskState {
|
||||
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
|
||||
@@ -63,6 +71,9 @@ impl TaskState {
|
||||
exec_state: ExecState::Idle,
|
||||
shutdown_rx,
|
||||
task_handle: None,
|
||||
slow_query_metric_task: None,
|
||||
min_run_interval: None,
|
||||
max_filter_num: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,20 +98,17 @@ impl TaskState {
|
||||
pub fn get_next_start_query_time(
|
||||
&self,
|
||||
flow_id: FlowId,
|
||||
time_window_size: &Option<Duration>,
|
||||
_time_window_size: &Option<Duration>,
|
||||
max_timeout: Option<Duration>,
|
||||
) -> Instant {
|
||||
let last_duration = max_timeout
|
||||
let next_duration = max_timeout
|
||||
.unwrap_or(self.last_query_duration)
|
||||
.min(self.last_query_duration)
|
||||
.max(MIN_REFRESH_DURATION);
|
||||
|
||||
let next_duration = time_window_size
|
||||
.map(|t| {
|
||||
let half = t / 2;
|
||||
half.max(last_duration)
|
||||
})
|
||||
.unwrap_or(last_duration);
|
||||
.max(
|
||||
self.min_run_interval
|
||||
.map(Duration::from_secs)
|
||||
.unwrap_or(MIN_REFRESH_DURATION),
|
||||
);
|
||||
|
||||
// if have dirty time window, execute immediately to clean dirty time window
|
||||
if self.dirty_time_windows.windows.is_empty() {
|
||||
@@ -243,9 +251,19 @@ impl DirtyTimeWindows {
|
||||
};
|
||||
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
|
||||
.with_label_values(&[flow_id.to_string().as_str()])
|
||||
.with_label_values(&[
|
||||
flow_id.to_string().as_str(),
|
||||
format!("{}", window_size).as_str(),
|
||||
])
|
||||
.observe(first_nth.len() as f64);
|
||||
|
||||
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT
|
||||
.with_label_values(&[
|
||||
flow_id.to_string().as_str(),
|
||||
format!("{}", window_size).as_str(),
|
||||
])
|
||||
.observe(self.windows.len() as f64);
|
||||
|
||||
let full_time_range = first_nth
|
||||
.iter()
|
||||
.fold(chrono::Duration::zero(), |acc, (start, end)| {
|
||||
@@ -257,7 +275,10 @@ impl DirtyTimeWindows {
|
||||
})
|
||||
.num_seconds() as f64;
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE
|
||||
.with_label_values(&[flow_id.to_string().as_str()])
|
||||
.with_label_values(&[
|
||||
flow_id.to_string().as_str(),
|
||||
format!("{}", window_size).as_str(),
|
||||
])
|
||||
.observe(full_time_range);
|
||||
|
||||
let mut expr_lst = vec![];
|
||||
|
||||
@@ -61,7 +61,8 @@ use crate::error::{
|
||||
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::metrics::{
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT,
|
||||
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
|
||||
};
|
||||
use crate::{Error, FlowId};
|
||||
|
||||
@@ -81,6 +82,14 @@ pub struct TaskConfig {
|
||||
query_type: QueryType,
|
||||
}
|
||||
|
||||
impl TaskConfig {
|
||||
pub fn time_window_size(&self) -> Option<Duration> {
|
||||
self.time_window_expr
|
||||
.as_ref()
|
||||
.and_then(|expr| *expr.time_window_size())
|
||||
}
|
||||
}
|
||||
|
||||
fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
|
||||
let stmts =
|
||||
ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default())
|
||||
@@ -144,6 +153,12 @@ impl BatchingTask {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn adjust(&self, min_run_interval_secs: u64, max_filter_num_per_query: usize) {
|
||||
let mut state = self.state.write().unwrap();
|
||||
state.min_run_interval = Some(min_run_interval_secs);
|
||||
state.max_filter_num = Some(max_filter_num_per_query);
|
||||
}
|
||||
|
||||
/// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
|
||||
///
|
||||
/// useful for flush_flow to flush dirty time windows range
|
||||
@@ -280,7 +295,7 @@ impl BatchingTask {
|
||||
let catalog = &self.config.sink_table_name[0];
|
||||
let schema = &self.config.sink_table_name[1];
|
||||
frontend_client
|
||||
.create(expr.clone(), catalog, schema)
|
||||
.create(expr.clone(), catalog, schema, Some(self))
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -328,11 +343,53 @@ impl BatchingTask {
|
||||
})?;
|
||||
|
||||
let plan = expanded_plan;
|
||||
let mut peer_desc = None;
|
||||
|
||||
let db = frontend_client
|
||||
.get_random_active_frontend(catalog, schema)
|
||||
.await?;
|
||||
let peer_desc = db.peer.clone();
|
||||
|
||||
let (tx, mut rx) = oneshot::channel();
|
||||
let peer_inner = peer_desc.clone();
|
||||
let window_size_pretty = format!(
|
||||
"{}s",
|
||||
self.config.time_window_size().unwrap_or_default().as_secs()
|
||||
);
|
||||
let inner_window_size_pretty = window_size_pretty.clone();
|
||||
let flow_id = self.config.flow_id;
|
||||
let slow_query_metric_task = tokio::task::spawn(async move {
|
||||
tokio::time::sleep(SLOW_QUERY_THRESHOLD).await;
|
||||
METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT
|
||||
.with_label_values(&[
|
||||
flow_id.to_string().as_str(),
|
||||
&peer_inner.to_string(),
|
||||
inner_window_size_pretty.as_str(),
|
||||
])
|
||||
.add(1.0);
|
||||
while rx.try_recv() == Err(TryRecvError::Empty) {
|
||||
// sleep for a while before next update
|
||||
tokio::time::sleep(MIN_REFRESH_DURATION).await;
|
||||
}
|
||||
METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT
|
||||
.with_label_values(&[
|
||||
flow_id.to_string().as_str(),
|
||||
&peer_inner.to_string(),
|
||||
inner_window_size_pretty.as_str(),
|
||||
])
|
||||
.sub(1.0);
|
||||
});
|
||||
self.state.write().unwrap().slow_query_metric_task = Some(slow_query_metric_task);
|
||||
|
||||
let res = {
|
||||
let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
|
||||
.with_label_values(&[flow_id.to_string().as_str()])
|
||||
.with_label_values(&[
|
||||
flow_id.to_string().as_str(),
|
||||
format!(
|
||||
"{}s",
|
||||
self.config.time_window_size().unwrap_or_default().as_secs()
|
||||
)
|
||||
.as_str(),
|
||||
])
|
||||
.start_timer();
|
||||
|
||||
// hack and special handling the insert logical plan
|
||||
@@ -361,10 +418,12 @@ impl BatchingTask {
|
||||
};
|
||||
|
||||
frontend_client
|
||||
.handle(req, catalog, schema, &mut peer_desc)
|
||||
.handle(req, catalog, schema, Some(db.peer), Some(self))
|
||||
.await
|
||||
};
|
||||
|
||||
// signaling the slow query metric task to stop
|
||||
let _ = tx.send(());
|
||||
let elapsed = instant.elapsed();
|
||||
if let Ok(affected_rows) = &res {
|
||||
debug!(
|
||||
@@ -387,7 +446,12 @@ impl BatchingTask {
|
||||
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
|
||||
.with_label_values(&[
|
||||
flow_id.to_string().as_str(),
|
||||
&peer_desc.unwrap_or_default().to_string(),
|
||||
&peer_desc.to_string(),
|
||||
format!(
|
||||
"{}s",
|
||||
self.config.time_window_size().unwrap_or_default().as_secs()
|
||||
)
|
||||
.as_str(),
|
||||
])
|
||||
.observe(elapsed.as_secs_f64());
|
||||
}
|
||||
@@ -580,19 +644,20 @@ impl BatchingTask {
|
||||
),
|
||||
})?;
|
||||
|
||||
let expr = self
|
||||
.state
|
||||
.write()
|
||||
.unwrap()
|
||||
.dirty_time_windows
|
||||
.gen_filter_exprs(
|
||||
let expr = {
|
||||
let mut state = self.state.write().unwrap();
|
||||
let max_window_cnt = state
|
||||
.max_filter_num
|
||||
.unwrap_or(DirtyTimeWindows::MAX_FILTER_NUM);
|
||||
state.dirty_time_windows.gen_filter_exprs(
|
||||
&col_name,
|
||||
Some(l),
|
||||
window_size,
|
||||
DirtyTimeWindows::MAX_FILTER_NUM,
|
||||
max_window_cnt,
|
||||
self.config.flow_id,
|
||||
Some(self),
|
||||
)?;
|
||||
)?
|
||||
};
|
||||
|
||||
debug!(
|
||||
"Flow id={:?}, Generated filter expr: {:?}",
|
||||
|
||||
@@ -31,22 +31,37 @@ lazy_static! {
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME: HistogramVec = register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_query_time_secs",
|
||||
"flow batching engine query time(seconds)",
|
||||
&["flow_id"],
|
||||
&["flow_id", "time_window_granularity"],
|
||||
vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_slow_query_secs",
|
||||
"flow batching engine slow query(seconds)",
|
||||
&["flow_id", "peer"],
|
||||
"flow batching engine slow query(seconds), updated after query finished",
|
||||
&["flow_id", "peer", "time_window_granularity"],
|
||||
vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT: GaugeVec =
|
||||
register_gauge_vec!(
|
||||
"greptime_flow_batching_engine_real_time_slow_query_number",
|
||||
"flow batching engine real time slow query number, updated in real time",
|
||||
&["flow_id", "peer", "time_window_granularity"],
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT: HistogramVec =
|
||||
register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_stalled_query_window_cnt",
|
||||
"flow batching engine stalled query time window count",
|
||||
&["flow_id", "time_window_granularity"],
|
||||
vec![0.0, 5., 10., 20., 40.]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT: HistogramVec =
|
||||
register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_query_window_cnt",
|
||||
"flow batching engine query time window count",
|
||||
&["flow_id"],
|
||||
&["flow_id", "time_window_granularity"],
|
||||
vec![0.0, 5., 10., 20., 40.]
|
||||
)
|
||||
.unwrap();
|
||||
@@ -54,7 +69,15 @@ lazy_static! {
|
||||
register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_query_time_range_secs",
|
||||
"flow batching engine query time range(seconds)",
|
||||
&["flow_id"],
|
||||
&["flow_id", "time_window_granularity"],
|
||||
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD: HistogramVec =
|
||||
register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_guess_fe_load",
|
||||
"flow batching engine guessed frontend load",
|
||||
&["fe_addr"],
|
||||
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -22,7 +22,7 @@ use std::time::Duration;
|
||||
use clap::ValueEnum;
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_base::Plugins;
|
||||
use common_config::Configurable;
|
||||
use common_config::{Configurable, DEFAULT_DATA_HOME};
|
||||
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
|
||||
use common_meta::cache_invalidator::CacheInvalidatorRef;
|
||||
use common_meta::ddl::ProcedureExecutorRef;
|
||||
@@ -73,7 +73,7 @@ use crate::state::{become_follower, become_leader, StateRef};
|
||||
|
||||
pub const TABLE_ID_SEQ: &str = "table_id";
|
||||
pub const FLOW_ID_SEQ: &str = "flow_id";
|
||||
pub const METASRV_HOME: &str = "./greptimedb_data/metasrv";
|
||||
pub const METASRV_DATA_DIR: &str = "metasrv";
|
||||
|
||||
// The datastores that implements metadata kvbackend.
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)]
|
||||
@@ -217,10 +217,7 @@ impl Default for MetasrvOptions {
|
||||
enable_region_failover: false,
|
||||
allow_region_failover_on_local_wal: false,
|
||||
http: HttpOptions::default(),
|
||||
logging: LoggingOptions {
|
||||
dir: format!("{METASRV_HOME}/logs"),
|
||||
..Default::default()
|
||||
},
|
||||
logging: LoggingOptions::default(),
|
||||
procedure: ProcedureConfig {
|
||||
max_retry_times: 12,
|
||||
retry_delay: Duration::from_millis(500),
|
||||
@@ -232,7 +229,7 @@ impl Default for MetasrvOptions {
|
||||
failure_detector: PhiAccrualFailureDetectorOptions::default(),
|
||||
datanode: DatanodeClientOptions::default(),
|
||||
enable_telemetry: true,
|
||||
data_home: METASRV_HOME.to_string(),
|
||||
data_home: DEFAULT_DATA_HOME.to_string(),
|
||||
wal: MetasrvWalConfig::default(),
|
||||
export_metrics: ExportMetricsOption::default(),
|
||||
store_key_prefix: String::new(),
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::path::Path;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
|
||||
@@ -55,7 +56,7 @@ use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers};
|
||||
use crate::lease::MetaPeerLookupService;
|
||||
use crate::metasrv::{
|
||||
ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, RegionStatAwareSelectorRef, SelectTarget,
|
||||
SelectorContext, SelectorRef, FLOW_ID_SEQ, TABLE_ID_SEQ,
|
||||
SelectorContext, SelectorRef, FLOW_ID_SEQ, METASRV_DATA_DIR, TABLE_ID_SEQ,
|
||||
};
|
||||
use crate::procedure::region_migration::manager::RegionMigrationManager;
|
||||
use crate::procedure::region_migration::DefaultContextFactory;
|
||||
@@ -436,7 +437,10 @@ impl MetasrvBuilder {
|
||||
};
|
||||
|
||||
let enable_telemetry = options.enable_telemetry;
|
||||
let metasrv_home = options.data_home.to_string();
|
||||
let metasrv_home = Path::new(&options.data_home)
|
||||
.join(METASRV_DATA_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string();
|
||||
|
||||
Ok(Metasrv {
|
||||
state,
|
||||
|
||||
@@ -40,7 +40,7 @@ pub(crate) fn infer_time_bucket<'a>(files: impl Iterator<Item = &'a FileHandle>)
|
||||
.unwrap_or_else(|| TIME_BUCKETS.max()) // safety: TIME_BUCKETS cannot be empty.
|
||||
}
|
||||
|
||||
pub(crate) struct TimeBuckets([i64; 7]);
|
||||
pub(crate) struct TimeBuckets([i64; 5]);
|
||||
|
||||
impl TimeBuckets {
|
||||
/// Fits a given time span into time bucket by find the minimum bucket that can cover the span.
|
||||
@@ -71,13 +71,11 @@ impl TimeBuckets {
|
||||
|
||||
/// A set of predefined time buckets.
|
||||
pub(crate) const TIME_BUCKETS: TimeBuckets = TimeBuckets([
|
||||
60 * 60, // one hour
|
||||
2 * 60 * 60, // two hours
|
||||
12 * 60 * 60, // twelve hours
|
||||
24 * 60 * 60, // one day
|
||||
7 * 24 * 60 * 60, // one week
|
||||
365 * 24 * 60 * 60, // one year
|
||||
10 * 365 * 24 * 60 * 60, // ten years
|
||||
60 * 60, // one hour
|
||||
2 * 60 * 60, // two hours
|
||||
12 * 60 * 60, // twelve hours
|
||||
24 * 60 * 60, // one day
|
||||
7 * 24 * 60 * 60, // one week
|
||||
]);
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -107,7 +105,7 @@ mod tests {
|
||||
TIME_BUCKETS.get(3),
|
||||
TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(3) - 1)
|
||||
);
|
||||
assert_eq!(TIME_BUCKETS.get(6), TIME_BUCKETS.fit_time_bucket(i64::MAX));
|
||||
assert_eq!(TIME_BUCKETS.get(4), TIME_BUCKETS.fit_time_bucket(i64::MAX));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -61,6 +61,30 @@ async fn put_and_flush(
|
||||
assert_eq!(0, result.affected_rows);
|
||||
}
|
||||
|
||||
async fn flush(engine: &MitoEngine, region_id: RegionId) {
|
||||
let result = engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Flush(RegionFlushRequest {
|
||||
row_group_size: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(0, result.affected_rows);
|
||||
}
|
||||
|
||||
async fn compact(engine: &MitoEngine, region_id: RegionId) {
|
||||
let result = engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Compact(RegionCompactRequest::default()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result.affected_rows, 0);
|
||||
}
|
||||
|
||||
async fn delete_and_flush(
|
||||
engine: &MitoEngine,
|
||||
region_id: RegionId,
|
||||
@@ -147,14 +171,7 @@ async fn test_compaction_region() {
|
||||
delete_and_flush(&engine, region_id, &column_schemas, 15..30).await;
|
||||
put_and_flush(&engine, region_id, &column_schemas, 15..25).await;
|
||||
|
||||
let result = engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Compact(RegionCompactRequest::default()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result.affected_rows, 0);
|
||||
compact(&engine, region_id).await;
|
||||
|
||||
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
|
||||
// Input:
|
||||
@@ -179,6 +196,136 @@ async fn test_compaction_region() {
|
||||
assert_eq!((0..25).map(|v| v * 1000).collect::<Vec<_>>(), vec);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_infer_compaction_time_window() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let mut env = TestEnv::new();
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
env.get_schema_metadata_manager()
|
||||
.register_region_table_info(
|
||||
region_id.table_id(),
|
||||
"test_table",
|
||||
"test_catalog",
|
||||
"test_schema",
|
||||
None,
|
||||
env.get_kv_backend(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let request = CreateRequestBuilder::new()
|
||||
.insert_option("compaction.type", "twcs")
|
||||
.build();
|
||||
|
||||
let column_schemas = request
|
||||
.column_metadatas
|
||||
.iter()
|
||||
.map(column_metadata_to_column_schema)
|
||||
.collect::<Vec<_>>();
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
// time window should be absent
|
||||
assert!(engine
|
||||
.get_region(region_id)
|
||||
.unwrap()
|
||||
.version_control
|
||||
.current()
|
||||
.version
|
||||
.compaction_time_window
|
||||
.is_none());
|
||||
|
||||
put_and_flush(&engine, region_id, &column_schemas, 1..2).await;
|
||||
put_and_flush(&engine, region_id, &column_schemas, 2..3).await;
|
||||
put_and_flush(&engine, region_id, &column_schemas, 3..4).await;
|
||||
put_and_flush(&engine, region_id, &column_schemas, 4..5).await;
|
||||
|
||||
compact(&engine, region_id).await;
|
||||
|
||||
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
|
||||
assert_eq!(
|
||||
1,
|
||||
scanner.num_files(),
|
||||
"unexpected files: {:?}",
|
||||
scanner.file_ids()
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
Duration::from_secs(3600),
|
||||
engine
|
||||
.get_region(region_id)
|
||||
.unwrap()
|
||||
.version_control
|
||||
.current()
|
||||
.version
|
||||
.compaction_time_window
|
||||
.unwrap()
|
||||
);
|
||||
|
||||
// write two rows to trigger another flush.
|
||||
// note: this two rows still use the original part_duration (1day by default), so they are written
|
||||
// to the same time partition and flushed to one file.
|
||||
put_rows(
|
||||
&engine,
|
||||
region_id,
|
||||
Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: build_rows_for_key("a", 3601, 3602, 0),
|
||||
},
|
||||
)
|
||||
.await;
|
||||
put_rows(
|
||||
&engine,
|
||||
region_id,
|
||||
Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: build_rows_for_key("a", 7201, 7202, 0),
|
||||
},
|
||||
)
|
||||
.await;
|
||||
// this flush should update part_duration in TimePartitions.
|
||||
flush(&engine, region_id).await;
|
||||
compact(&engine, region_id).await;
|
||||
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
|
||||
assert_eq!(
|
||||
2,
|
||||
scanner.num_files(),
|
||||
"unexpected files: {:?}",
|
||||
scanner.file_ids()
|
||||
);
|
||||
|
||||
// These data should use new part_duration in TimePartitions and get written to two different
|
||||
// time partitions so we end up with 4 ssts.
|
||||
put_rows(
|
||||
&engine,
|
||||
region_id,
|
||||
Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: build_rows_for_key("a", 3601, 3602, 0),
|
||||
},
|
||||
)
|
||||
.await;
|
||||
put_rows(
|
||||
&engine,
|
||||
region_id,
|
||||
Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: build_rows_for_key("a", 7201, 7202, 0),
|
||||
},
|
||||
)
|
||||
.await;
|
||||
flush(&engine, region_id).await;
|
||||
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
|
||||
assert_eq!(
|
||||
4,
|
||||
scanner.num_files(),
|
||||
"unexpected files: {:?}",
|
||||
scanner.file_ids()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_compaction_overlapping_files() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
@@ -216,14 +363,7 @@ async fn test_compaction_overlapping_files() {
|
||||
put_and_flush(&engine, region_id, &column_schemas, 20..30).await;
|
||||
delete_and_flush(&engine, region_id, &column_schemas, 30..40).await;
|
||||
|
||||
let result = engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Compact(RegionCompactRequest::default()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result.affected_rows, 0);
|
||||
compact(&engine, region_id).await;
|
||||
|
||||
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
|
||||
assert_eq!(
|
||||
@@ -282,15 +422,7 @@ async fn test_compaction_region_with_overlapping() {
|
||||
put_and_flush(&engine, region_id, &column_schemas, 3600..10800).await; // window 10800
|
||||
delete_and_flush(&engine, region_id, &column_schemas, 0..3600).await; // window 3600
|
||||
|
||||
let result = engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Compact(RegionCompactRequest::default()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result.affected_rows, 0);
|
||||
|
||||
compact(&engine, region_id).await;
|
||||
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
|
||||
let stream = scanner.scan().await.unwrap();
|
||||
let vec = collect_stream_ts(stream).await;
|
||||
@@ -336,15 +468,7 @@ async fn test_compaction_region_with_overlapping_delete_all() {
|
||||
put_and_flush(&engine, region_id, &column_schemas, 0..3600).await; // window 3600
|
||||
delete_and_flush(&engine, region_id, &column_schemas, 0..10800).await; // window 10800
|
||||
|
||||
let result = engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Compact(RegionCompactRequest::default()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result.affected_rows, 0);
|
||||
|
||||
compact(&engine, region_id).await;
|
||||
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
|
||||
assert_eq!(
|
||||
2,
|
||||
@@ -477,15 +601,7 @@ async fn test_compaction_update_time_window() {
|
||||
put_and_flush(&engine, region_id, &column_schemas, 1800..2700).await; // window 3600
|
||||
put_and_flush(&engine, region_id, &column_schemas, 2700..3600).await; // window 3600
|
||||
|
||||
let result = engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Compact(RegionCompactRequest::default()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result.affected_rows, 0);
|
||||
|
||||
compact(&engine, region_id).await;
|
||||
assert_eq!(
|
||||
engine
|
||||
.get_region(region_id)
|
||||
@@ -572,13 +688,7 @@ async fn test_change_region_compaction_window() {
|
||||
put_and_flush(&engine, region_id, &column_schemas, 1200..1800).await; // window 3600
|
||||
put_and_flush(&engine, region_id, &column_schemas, 1800..2400).await; // window 3600
|
||||
|
||||
engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Compact(RegionCompactRequest::default()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
compact(&engine, region_id).await;
|
||||
|
||||
// Put window 7200
|
||||
put_and_flush(&engine, region_id, &column_schemas, 4000..5000).await;
|
||||
@@ -623,13 +733,7 @@ async fn test_change_region_compaction_window() {
|
||||
|
||||
// Compaction again. It should compacts window 3600 and 7200
|
||||
// into 7200.
|
||||
engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Compact(RegionCompactRequest::default()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
compact(&engine, region_id).await;
|
||||
// Check compaction window.
|
||||
{
|
||||
let region = engine.get_region(region_id).unwrap();
|
||||
@@ -709,13 +813,7 @@ async fn test_open_overwrite_compaction_window() {
|
||||
put_and_flush(&engine, region_id, &column_schemas, 1200..1800).await; // window 3600
|
||||
put_and_flush(&engine, region_id, &column_schemas, 1800..2400).await; // window 3600
|
||||
|
||||
engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Compact(RegionCompactRequest::default()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
compact(&engine, region_id).await;
|
||||
|
||||
// Check compaction window.
|
||||
{
|
||||
|
||||
@@ -21,6 +21,7 @@
|
||||
#![feature(result_flattening)]
|
||||
#![feature(int_roundings)]
|
||||
#![feature(debug_closure_helpers)]
|
||||
#![feature(duration_constructors)]
|
||||
|
||||
#[cfg(any(test, feature = "test"))]
|
||||
#[cfg_attr(feature = "test", allow(unused))]
|
||||
|
||||
@@ -40,25 +40,22 @@ use crate::memtable::key_values::KeyValue;
|
||||
use crate::memtable::version::SmallMemtableVec;
|
||||
use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef};
|
||||
|
||||
/// Initial time window if not specified.
|
||||
const INITIAL_TIME_WINDOW: Duration = Duration::from_days(1);
|
||||
|
||||
/// A partition holds rows with timestamps between `[min, max)`.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TimePartition {
|
||||
/// Memtable of the partition.
|
||||
memtable: MemtableRef,
|
||||
/// Time range of the partition. `min` is inclusive and `max` is exclusive.
|
||||
/// `None` means there is no time range. The time
|
||||
/// range is `None` if and only if the [TimePartitions::part_duration] is `None`.
|
||||
time_range: Option<PartTimeRange>,
|
||||
time_range: PartTimeRange,
|
||||
}
|
||||
|
||||
impl TimePartition {
|
||||
/// Returns whether the `ts` belongs to the partition.
|
||||
fn contains_timestamp(&self, ts: Timestamp) -> bool {
|
||||
let Some(range) = self.time_range else {
|
||||
return true;
|
||||
};
|
||||
|
||||
range.contains_timestamp(ts)
|
||||
self.time_range.contains_timestamp(ts)
|
||||
}
|
||||
|
||||
/// Write rows to the part.
|
||||
@@ -72,14 +69,11 @@ impl TimePartition {
|
||||
}
|
||||
|
||||
/// Write a partial [BulkPart] according to [TimePartition::time_range].
|
||||
fn write_record_batch_partial(&self, part: &BulkPart) -> error::Result<()> {
|
||||
let Some(range) = self.time_range else {
|
||||
unreachable!("TimePartition must have explicit time range when a bulk request involves multiple time partition")
|
||||
};
|
||||
fn write_record_batch_partial(&self, part: &BulkPart) -> Result<()> {
|
||||
let Some(filtered) = filter_record_batch(
|
||||
part,
|
||||
range.min_timestamp.value(),
|
||||
range.max_timestamp.value(),
|
||||
self.time_range.min_timestamp.value(),
|
||||
self.time_range.max_timestamp.value(),
|
||||
)?
|
||||
else {
|
||||
return Ok(());
|
||||
@@ -209,10 +203,7 @@ pub struct TimePartitions {
|
||||
/// Mutable data of partitions.
|
||||
inner: Mutex<PartitionsInner>,
|
||||
/// Duration of a partition.
|
||||
///
|
||||
/// `None` means there is only one partition and the [TimePartition::time_range] is
|
||||
/// also `None`.
|
||||
part_duration: Option<Duration>,
|
||||
part_duration: Duration,
|
||||
/// Metadata of the region.
|
||||
metadata: RegionMetadataRef,
|
||||
/// Builder of memtables.
|
||||
@@ -229,26 +220,10 @@ impl TimePartitions {
|
||||
next_memtable_id: MemtableId,
|
||||
part_duration: Option<Duration>,
|
||||
) -> Self {
|
||||
let mut inner = PartitionsInner::new(next_memtable_id);
|
||||
if part_duration.is_none() {
|
||||
// If `part_duration` is None, then we create a partition with `None` time
|
||||
// range so we will write all rows to that partition.
|
||||
let memtable = builder.build(inner.alloc_memtable_id(), &metadata);
|
||||
debug!(
|
||||
"Creates a time partition for all timestamps, region: {}, memtable_id: {}",
|
||||
metadata.region_id,
|
||||
memtable.id(),
|
||||
);
|
||||
let part = TimePartition {
|
||||
memtable,
|
||||
time_range: None,
|
||||
};
|
||||
inner.parts.push(part);
|
||||
}
|
||||
|
||||
let inner = PartitionsInner::new(next_memtable_id);
|
||||
Self {
|
||||
inner: Mutex::new(inner),
|
||||
part_duration,
|
||||
part_duration: part_duration.unwrap_or(INITIAL_TIME_WINDOW),
|
||||
metadata,
|
||||
builder,
|
||||
}
|
||||
@@ -329,19 +304,18 @@ impl TimePartitions {
|
||||
part_start: Timestamp,
|
||||
inner: &mut MutexGuard<PartitionsInner>,
|
||||
) -> Result<TimePartition> {
|
||||
let part_duration = self.part_duration.unwrap();
|
||||
let part_pos = match inner
|
||||
.parts
|
||||
.iter()
|
||||
.position(|part| part.time_range.unwrap().min_timestamp == part_start)
|
||||
.position(|part| part.time_range.min_timestamp == part_start)
|
||||
{
|
||||
Some(pos) => pos,
|
||||
None => {
|
||||
let range = PartTimeRange::from_start_duration(part_start, part_duration)
|
||||
let range = PartTimeRange::from_start_duration(part_start, self.part_duration)
|
||||
.with_context(|| InvalidRequestSnafu {
|
||||
region_id: self.metadata.region_id,
|
||||
reason: format!(
|
||||
"Partition time range for {part_start:?} is out of bound, bucket size: {part_duration:?}",
|
||||
"Partition time range for {part_start:?} is out of bound, bucket size: {:?}", self.part_duration
|
||||
),
|
||||
})?;
|
||||
let memtable = self
|
||||
@@ -351,14 +325,14 @@ impl TimePartitions {
|
||||
"Create time partition {:?} for region {}, duration: {:?}, memtable_id: {}, parts_total: {}",
|
||||
range,
|
||||
self.metadata.region_id,
|
||||
part_duration,
|
||||
self.part_duration,
|
||||
memtable.id(),
|
||||
inner.parts.len() + 1
|
||||
);
|
||||
let pos = inner.parts.len();
|
||||
inner.parts.push(TimePartition {
|
||||
memtable,
|
||||
time_range: Some(range),
|
||||
time_range: range,
|
||||
});
|
||||
pos
|
||||
}
|
||||
@@ -396,13 +370,13 @@ impl TimePartitions {
|
||||
/// Forks latest partition and updates the partition duration if `part_duration` is Some.
|
||||
pub fn fork(&self, metadata: &RegionMetadataRef, part_duration: Option<Duration>) -> Self {
|
||||
// Fall back to the existing partition duration.
|
||||
let part_duration = part_duration.or(self.part_duration);
|
||||
let part_duration = part_duration.unwrap_or(self.part_duration);
|
||||
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
let latest_part = inner
|
||||
.parts
|
||||
.iter()
|
||||
.max_by_key(|part| part.time_range.map(|range| range.min_timestamp))
|
||||
.max_by_key(|part| part.time_range.min_timestamp)
|
||||
.cloned();
|
||||
|
||||
let Some(old_part) = latest_part else {
|
||||
@@ -411,33 +385,31 @@ impl TimePartitions {
|
||||
metadata.clone(),
|
||||
self.builder.clone(),
|
||||
inner.next_memtable_id,
|
||||
part_duration,
|
||||
Some(part_duration),
|
||||
);
|
||||
};
|
||||
|
||||
let old_stats = old_part.memtable.stats();
|
||||
// Use the max timestamp to compute the new time range for the memtable.
|
||||
// If `part_duration` is None, the new range will be None.
|
||||
let new_time_range =
|
||||
old_stats
|
||||
.time_range()
|
||||
.zip(part_duration)
|
||||
.and_then(|(range, bucket)| {
|
||||
partition_start_timestamp(range.1, bucket)
|
||||
.and_then(|start| PartTimeRange::from_start_duration(start, bucket))
|
||||
});
|
||||
// Forks the latest partition, but compute the time range based on the new duration.
|
||||
let memtable = old_part.memtable.fork(inner.alloc_memtable_id(), metadata);
|
||||
let new_part = TimePartition {
|
||||
memtable,
|
||||
time_range: new_time_range,
|
||||
};
|
||||
let partitions_inner = old_stats
|
||||
.time_range()
|
||||
.and_then(|(_, old_stats_end_timestamp)| {
|
||||
partition_start_timestamp(old_stats_end_timestamp, part_duration)
|
||||
.and_then(|start| PartTimeRange::from_start_duration(start, part_duration))
|
||||
})
|
||||
.map(|part_time_range| {
|
||||
// Forks the latest partition, but compute the time range based on the new duration.
|
||||
let memtable = old_part.memtable.fork(inner.alloc_memtable_id(), metadata);
|
||||
let part = TimePartition {
|
||||
memtable,
|
||||
time_range: part_time_range,
|
||||
};
|
||||
PartitionsInner::with_partition(part, inner.next_memtable_id)
|
||||
})
|
||||
.unwrap_or_else(|| PartitionsInner::new(inner.next_memtable_id));
|
||||
|
||||
Self {
|
||||
inner: Mutex::new(PartitionsInner::with_partition(
|
||||
new_part,
|
||||
inner.next_memtable_id,
|
||||
)),
|
||||
inner: Mutex::new(partitions_inner),
|
||||
part_duration,
|
||||
metadata: metadata.clone(),
|
||||
builder: self.builder.clone(),
|
||||
@@ -445,7 +417,7 @@ impl TimePartitions {
|
||||
}
|
||||
|
||||
/// Returns partition duration.
|
||||
pub(crate) fn part_duration(&self) -> Option<Duration> {
|
||||
pub(crate) fn part_duration(&self) -> Duration {
|
||||
self.part_duration
|
||||
}
|
||||
|
||||
@@ -490,7 +462,7 @@ impl TimePartitions {
|
||||
self.metadata.clone(),
|
||||
self.builder.clone(),
|
||||
self.next_memtable_id(),
|
||||
part_duration.or(self.part_duration),
|
||||
Some(part_duration.unwrap_or(self.part_duration)),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -514,11 +486,7 @@ impl TimePartitions {
|
||||
let mut present = HashSet::new();
|
||||
// First find any existing partitions that overlap
|
||||
for part in existing_parts {
|
||||
let Some(part_time_range) = part.time_range.as_ref() else {
|
||||
matching.push(part);
|
||||
return Ok((matching, Vec::new()));
|
||||
};
|
||||
|
||||
let part_time_range = &part.time_range;
|
||||
if !(max < part_time_range.min_timestamp || min >= part_time_range.max_timestamp) {
|
||||
matching.push(part);
|
||||
present.insert(part_time_range.min_timestamp.value());
|
||||
@@ -526,7 +494,7 @@ impl TimePartitions {
|
||||
}
|
||||
|
||||
// safety: self.part_duration can only be present when reach here.
|
||||
let part_duration = self.part_duration.unwrap();
|
||||
let part_duration = self.part_duration_or_default();
|
||||
let timestamp_unit = self.metadata.time_index_type().unit();
|
||||
|
||||
let part_duration_sec = part_duration.as_secs() as i64;
|
||||
@@ -621,12 +589,13 @@ impl TimePartitions {
|
||||
Ok((matching, missing))
|
||||
}
|
||||
|
||||
/// Returns partition duration, or use default 1day duration is not present.
|
||||
fn part_duration_or_default(&self) -> Duration {
|
||||
self.part_duration
|
||||
}
|
||||
|
||||
/// Write to multiple partitions.
|
||||
fn write_multi_parts(&self, kvs: &KeyValues, parts: &PartitionVec) -> Result<()> {
|
||||
// If part duration is `None` then there is always one partition and all rows
|
||||
// will be put in that partition before invoking this method.
|
||||
debug_assert!(self.part_duration.is_some());
|
||||
|
||||
let mut parts_to_write = HashMap::new();
|
||||
let mut missing_parts = HashMap::new();
|
||||
for kv in kvs.iter() {
|
||||
@@ -635,9 +604,8 @@ impl TimePartitions {
|
||||
let ts = kv.timestamp().as_timestamp().unwrap().unwrap();
|
||||
for part in parts {
|
||||
if part.contains_timestamp(ts) {
|
||||
// Safety: Since part duration is `Some` so all time range should be `Some`.
|
||||
parts_to_write
|
||||
.entry(part.time_range.unwrap().min_timestamp)
|
||||
.entry(part.time_range.min_timestamp)
|
||||
.or_insert_with(|| PartitionToWrite {
|
||||
partition: part.clone(),
|
||||
key_values: Vec::new(),
|
||||
@@ -652,7 +620,7 @@ impl TimePartitions {
|
||||
if !part_found {
|
||||
// We need to write it to a new part.
|
||||
// Safety: `new()` ensures duration is always Some if we do to this method.
|
||||
let part_duration = self.part_duration.unwrap();
|
||||
let part_duration = self.part_duration_or_default();
|
||||
let part_start =
|
||||
partition_start_timestamp(ts, part_duration).with_context(|| {
|
||||
InvalidRequestSnafu {
|
||||
@@ -787,7 +755,7 @@ mod tests {
|
||||
let metadata = memtable_util::metadata_for_test();
|
||||
let builder = Arc::new(PartitionTreeMemtableBuilder::default());
|
||||
let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
|
||||
assert_eq!(1, partitions.num_partitions());
|
||||
assert_eq!(0, partitions.num_partitions());
|
||||
assert!(partitions.is_empty());
|
||||
|
||||
let kvs = memtable_util::build_key_values(
|
||||
@@ -849,14 +817,15 @@ mod tests {
|
||||
let parts = partitions.list_partitions();
|
||||
assert_eq!(
|
||||
Timestamp::new_millisecond(0),
|
||||
parts[0].time_range.unwrap().min_timestamp
|
||||
parts[0].time_range.min_timestamp
|
||||
);
|
||||
assert_eq!(
|
||||
Timestamp::new_millisecond(10000),
|
||||
parts[0].time_range.unwrap().max_timestamp
|
||||
parts[0].time_range.max_timestamp
|
||||
);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn new_multi_partitions(metadata: &RegionMetadataRef) -> TimePartitions {
|
||||
let builder = Arc::new(PartitionTreeMemtableBuilder::default());
|
||||
let partitions =
|
||||
@@ -900,11 +869,11 @@ mod tests {
|
||||
assert_eq!(0, parts[0].memtable.id());
|
||||
assert_eq!(
|
||||
Timestamp::new_millisecond(0),
|
||||
parts[0].time_range.unwrap().min_timestamp
|
||||
parts[0].time_range.min_timestamp
|
||||
);
|
||||
assert_eq!(
|
||||
Timestamp::new_millisecond(5000),
|
||||
parts[0].time_range.unwrap().max_timestamp
|
||||
parts[0].time_range.max_timestamp
|
||||
);
|
||||
assert_eq!(&[0, 2000, 3000, 4000], ×tamps[..]);
|
||||
let iter = parts[1].memtable.iter(None, None, None).unwrap();
|
||||
@@ -913,11 +882,11 @@ mod tests {
|
||||
assert_eq!(&[5000, 7000], ×tamps[..]);
|
||||
assert_eq!(
|
||||
Timestamp::new_millisecond(5000),
|
||||
parts[1].time_range.unwrap().min_timestamp
|
||||
parts[1].time_range.min_timestamp
|
||||
);
|
||||
assert_eq!(
|
||||
Timestamp::new_millisecond(10000),
|
||||
parts[1].time_range.unwrap().max_timestamp
|
||||
parts[1].time_range.max_timestamp
|
||||
);
|
||||
}
|
||||
|
||||
@@ -928,26 +897,26 @@ mod tests {
|
||||
let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
|
||||
|
||||
let new_parts = partitions.new_with_part_duration(Some(Duration::from_secs(5)));
|
||||
assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
|
||||
assert_eq!(1, new_parts.next_memtable_id());
|
||||
assert_eq!(Duration::from_secs(5), new_parts.part_duration());
|
||||
assert_eq!(0, new_parts.next_memtable_id());
|
||||
|
||||
// Won't update the duration if it's None.
|
||||
let new_parts = new_parts.new_with_part_duration(None);
|
||||
assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
|
||||
assert_eq!(Duration::from_secs(5), new_parts.part_duration());
|
||||
// Don't need to create new memtables.
|
||||
assert_eq!(1, new_parts.next_memtable_id());
|
||||
assert_eq!(0, new_parts.next_memtable_id());
|
||||
|
||||
let new_parts = new_parts.new_with_part_duration(Some(Duration::from_secs(10)));
|
||||
assert_eq!(Duration::from_secs(10), new_parts.part_duration().unwrap());
|
||||
assert_eq!(Duration::from_secs(10), new_parts.part_duration());
|
||||
// Don't need to create new memtables.
|
||||
assert_eq!(1, new_parts.next_memtable_id());
|
||||
assert_eq!(0, new_parts.next_memtable_id());
|
||||
|
||||
let builder = Arc::new(PartitionTreeMemtableBuilder::default());
|
||||
let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
|
||||
// Need to build a new memtable as duration is still None.
|
||||
let new_parts = partitions.new_with_part_duration(None);
|
||||
assert!(new_parts.part_duration().is_none());
|
||||
assert_eq!(2, new_parts.next_memtable_id());
|
||||
assert_eq!(INITIAL_TIME_WINDOW, new_parts.part_duration());
|
||||
assert_eq!(0, new_parts.next_memtable_id());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -957,28 +926,28 @@ mod tests {
|
||||
let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
|
||||
partitions.freeze().unwrap();
|
||||
let new_parts = partitions.fork(&metadata, None);
|
||||
assert!(new_parts.part_duration().is_none());
|
||||
assert_eq!(1, new_parts.list_partitions()[0].memtable.id());
|
||||
assert_eq!(2, new_parts.next_memtable_id());
|
||||
assert_eq!(INITIAL_TIME_WINDOW, new_parts.part_duration());
|
||||
assert!(new_parts.list_partitions().is_empty());
|
||||
assert_eq!(0, new_parts.next_memtable_id());
|
||||
|
||||
new_parts.freeze().unwrap();
|
||||
let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(5)));
|
||||
assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
|
||||
assert_eq!(2, new_parts.list_partitions()[0].memtable.id());
|
||||
assert_eq!(3, new_parts.next_memtable_id());
|
||||
assert_eq!(Duration::from_secs(5), new_parts.part_duration());
|
||||
assert!(new_parts.list_partitions().is_empty());
|
||||
assert_eq!(0, new_parts.next_memtable_id());
|
||||
|
||||
new_parts.freeze().unwrap();
|
||||
let new_parts = new_parts.fork(&metadata, None);
|
||||
// Won't update the duration.
|
||||
assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
|
||||
assert_eq!(3, new_parts.list_partitions()[0].memtable.id());
|
||||
assert_eq!(4, new_parts.next_memtable_id());
|
||||
assert_eq!(Duration::from_secs(5), new_parts.part_duration());
|
||||
assert!(new_parts.list_partitions().is_empty());
|
||||
assert_eq!(0, new_parts.next_memtable_id());
|
||||
|
||||
new_parts.freeze().unwrap();
|
||||
let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(10)));
|
||||
assert_eq!(Duration::from_secs(10), new_parts.part_duration().unwrap());
|
||||
assert_eq!(4, new_parts.list_partitions()[0].memtable.id());
|
||||
assert_eq!(5, new_parts.next_memtable_id());
|
||||
assert_eq!(Duration::from_secs(10), new_parts.part_duration());
|
||||
assert!(new_parts.list_partitions().is_empty());
|
||||
assert_eq!(0, new_parts.next_memtable_id());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -990,14 +959,14 @@ mod tests {
|
||||
// Won't update the duration.
|
||||
let new_parts = partitions.fork(&metadata, None);
|
||||
assert!(new_parts.is_empty());
|
||||
assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap());
|
||||
assert_eq!(Duration::from_secs(5), new_parts.part_duration());
|
||||
assert_eq!(2, new_parts.list_partitions()[0].memtable.id());
|
||||
assert_eq!(3, new_parts.next_memtable_id());
|
||||
|
||||
// Although we don't fork a memtable multiple times, we still add a test for it.
|
||||
let new_parts = partitions.fork(&metadata, Some(Duration::from_secs(10)));
|
||||
assert!(new_parts.is_empty());
|
||||
assert_eq!(Duration::from_secs(10), new_parts.part_duration().unwrap());
|
||||
assert_eq!(Duration::from_secs(10), new_parts.part_duration());
|
||||
assert_eq!(3, new_parts.list_partitions()[0].memtable.id());
|
||||
assert_eq!(4, new_parts.next_memtable_id());
|
||||
}
|
||||
@@ -1018,9 +987,9 @@ mod tests {
|
||||
Timestamp::new_millisecond(2000),
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(matching.len(), 1);
|
||||
assert!(missing.is_empty());
|
||||
assert!(matching[0].time_range.is_none());
|
||||
assert_eq!(matching.len(), 0);
|
||||
assert_eq!(missing.len(), 1);
|
||||
assert_eq!(missing[0], Timestamp::new_millisecond(0));
|
||||
|
||||
// Case 2: With time range partitioning
|
||||
let partitions = TimePartitions::new(
|
||||
@@ -1052,7 +1021,7 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(matching.len(), 1);
|
||||
assert!(missing.is_empty());
|
||||
assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
|
||||
assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
|
||||
|
||||
// Test case 2b: Query spanning multiple existing partitions
|
||||
let (matching, missing) = partitions
|
||||
@@ -1065,8 +1034,8 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(matching.len(), 2);
|
||||
assert!(missing.is_empty());
|
||||
assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
|
||||
assert_eq!(matching[1].time_range.unwrap().min_timestamp.value(), 5000);
|
||||
assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
|
||||
assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
|
||||
|
||||
// Test case 2c: Query requiring new partition
|
||||
let (matching, missing) = partitions
|
||||
@@ -1092,8 +1061,8 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(matching.len(), 2);
|
||||
assert!(missing.is_empty());
|
||||
assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
|
||||
assert_eq!(matching[1].time_range.unwrap().min_timestamp.value(), 5000);
|
||||
assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
|
||||
assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
|
||||
|
||||
// Test case 2e: Corner case
|
||||
let (matching, missing) = partitions
|
||||
@@ -1106,8 +1075,8 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(matching.len(), 2);
|
||||
assert!(missing.is_empty());
|
||||
assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
|
||||
assert_eq!(matching[1].time_range.unwrap().min_timestamp.value(), 5000);
|
||||
assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
|
||||
assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
|
||||
|
||||
// Test case 2f: Corner case with
|
||||
let (matching, missing) = partitions
|
||||
@@ -1120,7 +1089,7 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(matching.len(), 1);
|
||||
assert_eq!(1, missing.len());
|
||||
assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 5000);
|
||||
assert_eq!(matching[0].time_range.min_timestamp.value(), 5000);
|
||||
assert_eq!(missing[0].value(), 10000);
|
||||
|
||||
// Test case 2g: Cross 0
|
||||
@@ -1133,7 +1102,7 @@ mod tests {
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(matching.len(), 1);
|
||||
assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
|
||||
assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
|
||||
assert_eq!(1, missing.len());
|
||||
assert_eq!(missing[0].value(), -5000);
|
||||
|
||||
@@ -1151,8 +1120,8 @@ mod tests {
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(2, matching.len());
|
||||
assert_eq!(matching[0].time_range.unwrap().min_timestamp.value(), 0);
|
||||
assert_eq!(matching[1].time_range.unwrap().min_timestamp.value(), 5000);
|
||||
assert_eq!(matching[0].time_range.min_timestamp.value(), 0);
|
||||
assert_eq!(matching[1].time_range.min_timestamp.value(), 5000);
|
||||
assert_eq!(2, missing.len());
|
||||
assert_eq!(missing[0].value(), -100000000000);
|
||||
assert_eq!(missing[1].value(), 100000000000);
|
||||
@@ -1162,10 +1131,7 @@ mod tests {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new(
|
||||
"ts",
|
||||
arrow::datatypes::DataType::Timestamp(
|
||||
arrow::datatypes::TimeUnit::Millisecond,
|
||||
None,
|
||||
),
|
||||
DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
|
||||
false,
|
||||
),
|
||||
Field::new("val", DataType::Utf8, true),
|
||||
|
||||
@@ -76,7 +76,7 @@ impl MemtableVersion {
|
||||
) -> Result<Option<MemtableVersion>> {
|
||||
if self.mutable.is_empty() {
|
||||
// No need to freeze the mutable memtable, but we need to check the time window.
|
||||
if self.mutable.part_duration() == time_window {
|
||||
if Some(self.mutable.part_duration()) == time_window {
|
||||
// If the time window is the same, we don't need to update it.
|
||||
return Ok(None);
|
||||
}
|
||||
@@ -98,7 +98,7 @@ impl MemtableVersion {
|
||||
// soft limit.
|
||||
self.mutable.freeze()?;
|
||||
// Fork the memtable.
|
||||
if self.mutable.part_duration() != time_window {
|
||||
if Some(self.mutable.part_duration()) != time_window {
|
||||
common_telemetry::debug!(
|
||||
"Fork memtable, update partition duration from {:?}, to {:?}",
|
||||
self.mutable.part_duration(),
|
||||
|
||||
@@ -142,7 +142,7 @@ impl VersionControl {
|
||||
/// Mark all opened files as deleted and set the delete marker in [VersionControlData]
|
||||
pub(crate) fn mark_dropped(&self, memtable_builder: &MemtableBuilderRef) {
|
||||
let version = self.current().version;
|
||||
let part_duration = version.memtables.mutable.part_duration();
|
||||
let part_duration = Some(version.memtables.mutable.part_duration());
|
||||
let next_memtable_id = version.memtables.mutable.next_memtable_id();
|
||||
let new_mutable = Arc::new(TimePartitions::new(
|
||||
version.metadata.clone(),
|
||||
@@ -166,7 +166,7 @@ impl VersionControl {
|
||||
/// new schema. Memtables of the version must be empty.
|
||||
pub(crate) fn alter_schema(&self, metadata: RegionMetadataRef, builder: &MemtableBuilderRef) {
|
||||
let version = self.current().version;
|
||||
let part_duration = version.memtables.mutable.part_duration();
|
||||
let part_duration = Some(version.memtables.mutable.part_duration());
|
||||
let next_memtable_id = version.memtables.mutable.next_memtable_id();
|
||||
let new_mutable = Arc::new(TimePartitions::new(
|
||||
metadata.clone(),
|
||||
@@ -202,7 +202,7 @@ impl VersionControl {
|
||||
version.metadata.clone(),
|
||||
memtable_builder.clone(),
|
||||
next_memtable_id,
|
||||
part_duration,
|
||||
Some(part_duration),
|
||||
));
|
||||
let new_version = Arc::new(
|
||||
VersionBuilder::new(version.metadata.clone(), new_mutable)
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::flow::FlowRequestHeader;
|
||||
use api::v1::flow::{AdjustFlow, FlowRequestHeader};
|
||||
use async_trait::async_trait;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_function::handlers::FlowServiceHandler;
|
||||
@@ -22,6 +22,7 @@ use common_query::error::Result;
|
||||
use common_telemetry::tracing_context::TracingContext;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::StreamExt;
|
||||
use serde_json::json;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
@@ -57,9 +58,96 @@ impl FlowServiceHandler for FlowServiceOperator {
|
||||
) -> Result<api::v1::flow::FlowResponse> {
|
||||
self.flush_inner(catalog, flow, ctx).await
|
||||
}
|
||||
|
||||
async fn adjust(
|
||||
&self,
|
||||
catalog: &str,
|
||||
flow: &str,
|
||||
min_run_interval_secs: u64,
|
||||
max_filter_num_per_query: usize,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<api::v1::flow::FlowResponse> {
|
||||
self.adjust_inner(
|
||||
catalog,
|
||||
flow,
|
||||
min_run_interval_secs,
|
||||
max_filter_num_per_query,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowServiceOperator {
|
||||
async fn adjust_inner(
|
||||
&self,
|
||||
catalog: &str,
|
||||
flow: &str,
|
||||
min_run_interval_secs: u64,
|
||||
max_filter_num_per_query: usize,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<api::v1::flow::FlowResponse> {
|
||||
let id = self
|
||||
.flow_metadata_manager
|
||||
.flow_name_manager()
|
||||
.get(catalog, flow)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_query::error::ExecuteSnafu)?
|
||||
.context(common_meta::error::FlowNotFoundSnafu {
|
||||
flow_name: format!("{}.{}", catalog, flow),
|
||||
})
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_query::error::ExecuteSnafu)?
|
||||
.flow_id();
|
||||
|
||||
let all_flownode_peers = self
|
||||
.flow_metadata_manager
|
||||
.flow_route_manager()
|
||||
.routes(id)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_query::error::ExecuteSnafu)?;
|
||||
|
||||
// order of flownodes doesn't matter here
|
||||
let all_flow_nodes = FuturesUnordered::from_iter(
|
||||
all_flownode_peers
|
||||
.iter()
|
||||
.map(|(_key, peer)| self.node_manager.flownode(peer.peer())),
|
||||
)
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
|
||||
// TODO(discord9): use proper type for flow options
|
||||
let options = json!({
|
||||
"min_run_interval_secs": min_run_interval_secs,
|
||||
"max_filter_num_per_query": max_filter_num_per_query,
|
||||
});
|
||||
|
||||
for node in all_flow_nodes {
|
||||
let _res = {
|
||||
use api::v1::flow::{flow_request, FlowRequest};
|
||||
let flush_req = FlowRequest {
|
||||
header: Some(FlowRequestHeader {
|
||||
tracing_context: TracingContext::from_current_span().to_w3c(),
|
||||
query_context: Some(
|
||||
common_meta::rpc::ddl::QueryContext::from(ctx.clone()).into(),
|
||||
),
|
||||
}),
|
||||
body: Some(flow_request::Body::Adjust(AdjustFlow {
|
||||
flow_id: Some(api::v1::FlowId { id }),
|
||||
options: options.to_string(),
|
||||
})),
|
||||
};
|
||||
node.handle(flush_req)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_query::error::ExecuteSnafu)?
|
||||
};
|
||||
}
|
||||
Ok(Default::default())
|
||||
}
|
||||
|
||||
/// Flush the flownodes according to the flow id.
|
||||
async fn flush_inner(
|
||||
&self,
|
||||
|
||||
@@ -59,6 +59,7 @@ sql.workspace = true
|
||||
table.workspace = true
|
||||
tokio.workspace = true
|
||||
urlencoding = "2.1"
|
||||
vrl = "0.24"
|
||||
yaml-rust = "0.4"
|
||||
|
||||
[dev-dependencies]
|
||||
|
||||
@@ -24,9 +24,9 @@ fn processor_mut(
|
||||
let mut result = Vec::with_capacity(input_values.len());
|
||||
|
||||
for v in input_values {
|
||||
let mut payload = json_to_map(v).unwrap();
|
||||
let payload = json_to_map(v).unwrap();
|
||||
let r = pipeline
|
||||
.exec_mut(&mut payload)?
|
||||
.exec_mut(payload)?
|
||||
.into_transformed()
|
||||
.expect("expect transformed result ");
|
||||
result.push(r.0);
|
||||
|
||||
@@ -20,10 +20,10 @@ use crate::error::{
|
||||
Error, FieldRequiredForDispatcherSnafu, Result, TableSuffixRequiredForDispatcherRuleSnafu,
|
||||
ValueRequiredForDispatcherRuleSnafu,
|
||||
};
|
||||
use crate::etl::ctx_req::TABLE_SUFFIX_KEY;
|
||||
use crate::{PipelineMap, Value};
|
||||
|
||||
const FIELD: &str = "field";
|
||||
const TABLE_SUFFIX: &str = "table_suffix";
|
||||
const PIPELINE: &str = "pipeline";
|
||||
const VALUE: &str = "value";
|
||||
const RULES: &str = "rules";
|
||||
@@ -80,7 +80,7 @@ impl TryFrom<&Yaml> for Dispatcher {
|
||||
rules
|
||||
.iter()
|
||||
.map(|rule| {
|
||||
let table_part = rule[TABLE_SUFFIX]
|
||||
let table_part = rule[TABLE_SUFFIX_KEY]
|
||||
.as_str()
|
||||
.map(|s| s.to_string())
|
||||
.context(TableSuffixRequiredForDispatcherRuleSnafu)?;
|
||||
|
||||
@@ -411,13 +411,6 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
#[snafu(display(
|
||||
"At least one timestamp-related processor is required to use auto transform"
|
||||
))]
|
||||
TransformNoTimestampProcessor {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
#[snafu(display(
|
||||
"Illegal to set multiple timestamp Index columns, please set only one: {columns}"
|
||||
))]
|
||||
@@ -433,7 +426,7 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
#[snafu(display("Exactly one timestamp value is required to use auto transform"))]
|
||||
#[snafu(display("Exactly one time-related processor and one timestamp value is required to use auto transform"))]
|
||||
AutoTransformOneTimestamp {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
@@ -686,6 +679,54 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to compile VRL, {}", msg))]
|
||||
CompileVrl {
|
||||
msg: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to execute VRL, {}", msg))]
|
||||
ExecuteVrl {
|
||||
msg: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Float is not a number: {}", input_float))]
|
||||
FloatNaN {
|
||||
input_float: f64,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid timestamp value: {}", input))]
|
||||
InvalidTimestamp {
|
||||
input: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to convert bytes to utf8"))]
|
||||
BytesToUtf8 {
|
||||
#[snafu(source)]
|
||||
error: std::string::FromUtf8Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Please don't use regex in Vrl script"))]
|
||||
VrlRegexValue {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Vrl script should return `.` in the end"))]
|
||||
VrlReturnValue {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to cast type, msg: {}", msg))]
|
||||
CastType {
|
||||
msg: String,
|
||||
@@ -832,7 +873,6 @@ impl ErrorExt for Error {
|
||||
| TransformTypeMustBeSet { .. }
|
||||
| TransformColumnNameMustBeUnique { .. }
|
||||
| TransformMultipleTimestampIndex { .. }
|
||||
| TransformNoTimestampProcessor { .. }
|
||||
| TransformTimestampIndexCount { .. }
|
||||
| AutoTransformOneTimestamp { .. }
|
||||
| CoerceUnsupportedNullType { .. }
|
||||
@@ -866,6 +906,13 @@ impl ErrorExt for Error {
|
||||
| ReachedMaxNestedLevels { .. }
|
||||
| RequiredTableSuffixTemplate
|
||||
| InvalidTableSuffixTemplate { .. }
|
||||
| CompileVrl { .. }
|
||||
| ExecuteVrl { .. }
|
||||
| FloatNaN { .. }
|
||||
| BytesToUtf8 { .. }
|
||||
| InvalidTimestamp { .. }
|
||||
| VrlRegexValue { .. }
|
||||
| VrlReturnValue { .. }
|
||||
| PipelineMissing { .. } => StatusCode::InvalidArguments,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,12 +30,13 @@ use yaml_rust::YamlLoader;
|
||||
|
||||
use crate::dispatcher::{Dispatcher, Rule};
|
||||
use crate::error::{
|
||||
InputValueMustBeObjectSnafu, IntermediateKeyIndexSnafu, Result,
|
||||
TransformNoTimestampProcessorSnafu, YamlLoadSnafu, YamlParseSnafu,
|
||||
AutoTransformOneTimestampSnafu, InputValueMustBeObjectSnafu, IntermediateKeyIndexSnafu, Result,
|
||||
YamlLoadSnafu, YamlParseSnafu,
|
||||
};
|
||||
use crate::etl::ctx_req::TABLE_SUFFIX_KEY;
|
||||
use crate::etl::processor::ProcessorKind;
|
||||
use crate::tablesuffix::TableSuffixTemplate;
|
||||
use crate::GreptimeTransformer;
|
||||
use crate::{ContextOpt, GreptimeTransformer};
|
||||
|
||||
const DESCRIPTION: &str = "description";
|
||||
const PROCESSORS: &str = "processors";
|
||||
@@ -80,16 +81,14 @@ pub fn parse(input: &Content) -> Result<Pipeline> {
|
||||
// check processors have at least one timestamp-related processor
|
||||
let cnt = processors
|
||||
.iter()
|
||||
.filter(|p| {
|
||||
matches!(
|
||||
p,
|
||||
ProcessorKind::Date(_)
|
||||
| ProcessorKind::Timestamp(_)
|
||||
| ProcessorKind::Epoch(_)
|
||||
)
|
||||
.filter_map(|p| match p {
|
||||
ProcessorKind::Date(d) => Some(d.target_count()),
|
||||
ProcessorKind::Timestamp(t) => Some(t.target_count()),
|
||||
ProcessorKind::Epoch(e) => Some(e.target_count()),
|
||||
_ => None,
|
||||
})
|
||||
.count();
|
||||
ensure!(cnt > 0, TransformNoTimestampProcessorSnafu);
|
||||
.sum::<usize>();
|
||||
ensure!(cnt == 1, AutoTransformOneTimestampSnafu);
|
||||
None
|
||||
} else {
|
||||
Some(GreptimeTransformer::new(transformers)?)
|
||||
@@ -156,14 +155,15 @@ impl DispatchedTo {
|
||||
pub enum PipelineExecOutput {
|
||||
Transformed(TransformedOutput),
|
||||
AutoTransform(AutoTransformOutput),
|
||||
DispatchedTo(DispatchedTo),
|
||||
DispatchedTo(DispatchedTo, PipelineMap),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct TransformedOutput {
|
||||
pub opt: String,
|
||||
pub opt: ContextOpt,
|
||||
pub row: Row,
|
||||
pub table_suffix: Option<String>,
|
||||
pub pipeline_map: PipelineMap,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -171,6 +171,7 @@ pub struct AutoTransformOutput {
|
||||
pub table_suffix: Option<String>,
|
||||
// ts_column_name -> unit
|
||||
pub ts_unit_map: HashMap<String, TimeUnit>,
|
||||
pub pipeline_map: PipelineMap,
|
||||
}
|
||||
|
||||
impl PipelineExecOutput {
|
||||
@@ -188,7 +189,7 @@ impl PipelineExecOutput {
|
||||
|
||||
// Note: This is a test only function, do not use it in production.
|
||||
pub fn into_dispatched(self) -> Option<DispatchedTo> {
|
||||
if let Self::DispatchedTo(d) = self {
|
||||
if let Self::DispatchedTo(d, _) = self {
|
||||
Some(d)
|
||||
} else {
|
||||
None
|
||||
@@ -231,30 +232,38 @@ pub fn simd_json_array_to_map(val: Vec<simd_json::OwnedValue>) -> Result<Vec<Pip
|
||||
}
|
||||
|
||||
impl Pipeline {
|
||||
pub fn exec_mut(&self, val: &mut PipelineMap) -> Result<PipelineExecOutput> {
|
||||
pub fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineExecOutput> {
|
||||
// process
|
||||
for processor in self.processors.iter() {
|
||||
processor.exec_mut(val)?;
|
||||
val = processor.exec_mut(val)?;
|
||||
}
|
||||
|
||||
// dispatch, fast return if matched
|
||||
if let Some(rule) = self.dispatcher.as_ref().and_then(|d| d.exec(val)) {
|
||||
return Ok(PipelineExecOutput::DispatchedTo(rule.into()));
|
||||
if let Some(rule) = self.dispatcher.as_ref().and_then(|d| d.exec(&val)) {
|
||||
return Ok(PipelineExecOutput::DispatchedTo(rule.into(), val));
|
||||
}
|
||||
|
||||
// do transform
|
||||
if let Some(transformer) = self.transformer() {
|
||||
let (opt, row) = transformer.transform_mut(val)?;
|
||||
let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(val));
|
||||
let (mut opt, row) = transformer.transform_mut(&mut val)?;
|
||||
let table_suffix = opt.resolve_table_suffix(self.tablesuffix.as_ref(), &val);
|
||||
|
||||
Ok(PipelineExecOutput::Transformed(TransformedOutput {
|
||||
opt,
|
||||
row,
|
||||
table_suffix,
|
||||
pipeline_map: val,
|
||||
}))
|
||||
} else {
|
||||
let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(val));
|
||||
// check table suffix var
|
||||
let table_suffix = val
|
||||
.remove(TABLE_SUFFIX_KEY)
|
||||
.map(|f| f.to_str_value())
|
||||
.or_else(|| self.tablesuffix.as_ref().and_then(|t| t.apply(&val)));
|
||||
|
||||
let mut ts_unit_map = HashMap::with_capacity(4);
|
||||
// get all ts values
|
||||
for (k, v) in val {
|
||||
for (k, v) in val.iter() {
|
||||
if let Value::Timestamp(ts) = v {
|
||||
if !ts_unit_map.contains_key(k) {
|
||||
ts_unit_map.insert(k.clone(), ts.get_unit());
|
||||
@@ -264,6 +273,7 @@ impl Pipeline {
|
||||
Ok(PipelineExecOutput::AutoTransform(AutoTransformOutput {
|
||||
table_suffix,
|
||||
ts_unit_map,
|
||||
pipeline_map: val,
|
||||
}))
|
||||
}
|
||||
}
|
||||
@@ -318,9 +328,9 @@ transform:
|
||||
type: uint32
|
||||
"#;
|
||||
let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
|
||||
let mut payload = json_to_map(input_value).unwrap();
|
||||
let payload = json_to_map(input_value).unwrap();
|
||||
let result = pipeline
|
||||
.exec_mut(&mut payload)
|
||||
.exec_mut(payload)
|
||||
.unwrap()
|
||||
.into_transformed()
|
||||
.unwrap();
|
||||
@@ -371,7 +381,7 @@ transform:
|
||||
let mut payload = PipelineMap::new();
|
||||
payload.insert("message".to_string(), Value::String(message));
|
||||
let result = pipeline
|
||||
.exec_mut(&mut payload)
|
||||
.exec_mut(payload)
|
||||
.unwrap()
|
||||
.into_transformed()
|
||||
.unwrap();
|
||||
@@ -446,9 +456,9 @@ transform:
|
||||
"#;
|
||||
|
||||
let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
|
||||
let mut payload = json_to_map(input_value).unwrap();
|
||||
let payload = json_to_map(input_value).unwrap();
|
||||
let result = pipeline
|
||||
.exec_mut(&mut payload)
|
||||
.exec_mut(payload)
|
||||
.unwrap()
|
||||
.into_transformed()
|
||||
.unwrap();
|
||||
@@ -488,10 +498,10 @@ transform:
|
||||
|
||||
let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
|
||||
let schema = pipeline.schemas().unwrap().clone();
|
||||
let mut result = json_to_map(input_value).unwrap();
|
||||
let result = json_to_map(input_value).unwrap();
|
||||
|
||||
let row = pipeline
|
||||
.exec_mut(&mut result)
|
||||
.exec_mut(result)
|
||||
.unwrap()
|
||||
.into_transformed()
|
||||
.unwrap();
|
||||
|
||||
@@ -13,69 +13,145 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::hash_map::IntoIter;
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use ahash::{HashMap, HashMapExt};
|
||||
use api::v1::{RowInsertRequest, RowInsertRequests, Rows};
|
||||
use itertools::Itertools;
|
||||
use session::context::{QueryContext, QueryContextRef};
|
||||
|
||||
use crate::tablesuffix::TableSuffixTemplate;
|
||||
use crate::PipelineMap;
|
||||
|
||||
const DEFAULT_OPT: &str = "";
|
||||
const GREPTIME_AUTO_CREATE_TABLE: &str = "greptime_auto_create_table";
|
||||
const GREPTIME_TTL: &str = "greptime_ttl";
|
||||
const GREPTIME_APPEND_MODE: &str = "greptime_append_mode";
|
||||
const GREPTIME_MERGE_MODE: &str = "greptime_merge_mode";
|
||||
const GREPTIME_PHYSICAL_TABLE: &str = "greptime_physical_table";
|
||||
const GREPTIME_SKIP_WAL: &str = "greptime_skip_wal";
|
||||
const GREPTIME_TABLE_SUFFIX: &str = "greptime_table_suffix";
|
||||
|
||||
pub const PIPELINE_HINT_KEYS: [&str; 6] = [
|
||||
"greptime_auto_create_table",
|
||||
"greptime_ttl",
|
||||
"greptime_append_mode",
|
||||
"greptime_merge_mode",
|
||||
"greptime_physical_table",
|
||||
"greptime_skip_wal",
|
||||
pub(crate) const AUTO_CREATE_TABLE_KEY: &str = "auto_create_table";
|
||||
pub(crate) const TTL_KEY: &str = "ttl";
|
||||
pub(crate) const APPEND_MODE_KEY: &str = "append_mode";
|
||||
pub(crate) const MERGE_MODE_KEY: &str = "merge_mode";
|
||||
pub(crate) const PHYSICAL_TABLE_KEY: &str = "physical_table";
|
||||
pub(crate) const SKIP_WAL_KEY: &str = "skip_wal";
|
||||
pub(crate) const TABLE_SUFFIX_KEY: &str = "table_suffix";
|
||||
|
||||
pub const PIPELINE_HINT_KEYS: [&str; 7] = [
|
||||
GREPTIME_AUTO_CREATE_TABLE,
|
||||
GREPTIME_TTL,
|
||||
GREPTIME_APPEND_MODE,
|
||||
GREPTIME_MERGE_MODE,
|
||||
GREPTIME_PHYSICAL_TABLE,
|
||||
GREPTIME_SKIP_WAL,
|
||||
GREPTIME_TABLE_SUFFIX,
|
||||
];
|
||||
|
||||
const PIPELINE_HINT_PREFIX: &str = "greptime_";
|
||||
|
||||
// Remove hints from the pipeline context and form a option string
|
||||
// e.g: skip_wal=true,ttl=1d
|
||||
pub fn from_pipeline_map_to_opt(pipeline_map: &mut PipelineMap) -> String {
|
||||
let mut btreemap = BTreeMap::new();
|
||||
for k in PIPELINE_HINT_KEYS {
|
||||
if let Some(v) = pipeline_map.remove(k) {
|
||||
btreemap.insert(k, v.to_str_value());
|
||||
/// ContextOpt is a collection of options(including table options and pipeline options)
|
||||
/// that should be extracted during the pipeline execution.
|
||||
///
|
||||
/// The options are set in the format of hint keys. See [`PIPELINE_HINT_KEYS`].
|
||||
/// It's is used as the key in [`ContextReq`] for grouping the row insert requests.
|
||||
#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub struct ContextOpt {
|
||||
// table options, that need to be set in the query context before making row insert requests
|
||||
auto_create_table: Option<String>,
|
||||
ttl: Option<String>,
|
||||
append_mode: Option<String>,
|
||||
merge_mode: Option<String>,
|
||||
physical_table: Option<String>,
|
||||
skip_wal: Option<String>,
|
||||
|
||||
// pipeline options, not set in query context
|
||||
// can be removed before the end of the pipeline execution
|
||||
table_suffix: Option<String>,
|
||||
}
|
||||
|
||||
impl ContextOpt {
|
||||
pub fn from_pipeline_map_to_opt(pipeline_map: &mut PipelineMap) -> Self {
|
||||
let mut opt = Self::default();
|
||||
for k in PIPELINE_HINT_KEYS {
|
||||
if let Some(v) = pipeline_map.remove(k) {
|
||||
match k {
|
||||
GREPTIME_AUTO_CREATE_TABLE => {
|
||||
opt.auto_create_table = Some(v.to_str_value());
|
||||
}
|
||||
GREPTIME_TTL => {
|
||||
opt.ttl = Some(v.to_str_value());
|
||||
}
|
||||
GREPTIME_APPEND_MODE => {
|
||||
opt.append_mode = Some(v.to_str_value());
|
||||
}
|
||||
GREPTIME_MERGE_MODE => {
|
||||
opt.merge_mode = Some(v.to_str_value());
|
||||
}
|
||||
GREPTIME_PHYSICAL_TABLE => {
|
||||
opt.physical_table = Some(v.to_str_value());
|
||||
}
|
||||
GREPTIME_SKIP_WAL => {
|
||||
opt.skip_wal = Some(v.to_str_value());
|
||||
}
|
||||
GREPTIME_TABLE_SUFFIX => {
|
||||
opt.table_suffix = Some(v.to_str_value());
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
opt
|
||||
}
|
||||
|
||||
pub(crate) fn resolve_table_suffix(
|
||||
&mut self,
|
||||
table_suffix: Option<&TableSuffixTemplate>,
|
||||
pipeline_map: &PipelineMap,
|
||||
) -> Option<String> {
|
||||
self.table_suffix
|
||||
.take()
|
||||
.or_else(|| table_suffix.and_then(|s| s.apply(pipeline_map)))
|
||||
}
|
||||
|
||||
pub fn set_query_context(self, ctx: &mut QueryContext) {
|
||||
if let Some(auto_create_table) = &self.auto_create_table {
|
||||
ctx.set_extension(AUTO_CREATE_TABLE_KEY, auto_create_table);
|
||||
}
|
||||
if let Some(ttl) = &self.ttl {
|
||||
ctx.set_extension(TTL_KEY, ttl);
|
||||
}
|
||||
if let Some(append_mode) = &self.append_mode {
|
||||
ctx.set_extension(APPEND_MODE_KEY, append_mode);
|
||||
}
|
||||
if let Some(merge_mode) = &self.merge_mode {
|
||||
ctx.set_extension(MERGE_MODE_KEY, merge_mode);
|
||||
}
|
||||
if let Some(physical_table) = &self.physical_table {
|
||||
ctx.set_extension(PHYSICAL_TABLE_KEY, physical_table);
|
||||
}
|
||||
if let Some(skip_wal) = &self.skip_wal {
|
||||
ctx.set_extension(SKIP_WAL_KEY, skip_wal);
|
||||
}
|
||||
}
|
||||
btreemap
|
||||
.into_iter()
|
||||
.map(|(k, v)| format!("{}={}", k.replace(PIPELINE_HINT_PREFIX, ""), v))
|
||||
.join(",")
|
||||
}
|
||||
|
||||
// split the option string back to a map
|
||||
fn from_opt_to_map(opt: &str) -> HashMap<&str, &str> {
|
||||
opt.split(',')
|
||||
.filter_map(|s| {
|
||||
s.split_once("=")
|
||||
.filter(|(k, v)| !k.is_empty() && !v.is_empty())
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
// ContextReq is a collection of row insert requests with different options.
|
||||
// The default option is empty string.
|
||||
// Because options are set in query context, we have to split them into sequential calls
|
||||
// e.g:
|
||||
// {
|
||||
// "skip_wal=true,ttl=1d": [RowInsertRequest],
|
||||
// "ttl=1d": [RowInsertRequest],
|
||||
// }
|
||||
/// ContextReq is a collection of row insert requests with different options.
|
||||
/// The default option is all empty.
|
||||
/// Because options are set in query context, we have to split them into sequential calls
|
||||
/// The key is a [`ContextOpt`] struct for strong type.
|
||||
/// e.g:
|
||||
/// {
|
||||
/// "skip_wal=true,ttl=1d": [RowInsertRequest],
|
||||
/// "ttl=1d": [RowInsertRequest],
|
||||
/// }
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ContextReq {
|
||||
req: HashMap<String, Vec<RowInsertRequest>>,
|
||||
req: HashMap<ContextOpt, Vec<RowInsertRequest>>,
|
||||
}
|
||||
|
||||
impl ContextReq {
|
||||
pub fn from_opt_map(opt_map: HashMap<String, Rows>, table_name: String) -> Self {
|
||||
pub fn from_opt_map(opt_map: HashMap<ContextOpt, Rows>, table_name: String) -> Self {
|
||||
Self {
|
||||
req: opt_map
|
||||
.into_iter()
|
||||
@@ -88,17 +164,17 @@ impl ContextReq {
|
||||
}],
|
||||
)
|
||||
})
|
||||
.collect::<HashMap<String, Vec<RowInsertRequest>>>(),
|
||||
.collect::<HashMap<ContextOpt, Vec<RowInsertRequest>>>(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn default_opt_with_reqs(reqs: Vec<RowInsertRequest>) -> Self {
|
||||
let mut req_map = HashMap::new();
|
||||
req_map.insert(DEFAULT_OPT.to_string(), reqs);
|
||||
req_map.insert(ContextOpt::default(), reqs);
|
||||
Self { req: req_map }
|
||||
}
|
||||
|
||||
pub fn add_rows(&mut self, opt: String, req: RowInsertRequest) {
|
||||
pub fn add_rows(&mut self, opt: ContextOpt, req: RowInsertRequest) {
|
||||
self.req.entry(opt).or_default().push(req);
|
||||
}
|
||||
|
||||
@@ -131,7 +207,7 @@ impl ContextReq {
|
||||
// It will clone the query context for each option and set the options to the context.
|
||||
// Then it will return the context and the row insert requests for actual insert.
|
||||
pub struct ContextReqIter {
|
||||
opt_req: IntoIter<String, Vec<RowInsertRequest>>,
|
||||
opt_req: IntoIter<ContextOpt, Vec<RowInsertRequest>>,
|
||||
ctx_template: QueryContext,
|
||||
}
|
||||
|
||||
@@ -140,13 +216,8 @@ impl Iterator for ContextReqIter {
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let (opt, req_vec) = self.opt_req.next()?;
|
||||
|
||||
let opt_map = from_opt_to_map(&opt);
|
||||
|
||||
let mut ctx = self.ctx_template.clone();
|
||||
for (k, v) in opt_map {
|
||||
ctx.set_extension(k, v);
|
||||
}
|
||||
opt.set_query_context(&mut ctx);
|
||||
|
||||
Some((Arc::new(ctx), RowInsertRequests { inserts: req_vec }))
|
||||
}
|
||||
|
||||
@@ -29,6 +29,7 @@ pub mod select;
|
||||
pub mod simple_extract;
|
||||
pub mod timestamp;
|
||||
pub mod urlencoding;
|
||||
pub mod vrl;
|
||||
|
||||
use std::str::FromStr;
|
||||
|
||||
@@ -58,6 +59,7 @@ use crate::etl::field::{Field, Fields};
|
||||
use crate::etl::processor::json_parse::JsonParseProcessor;
|
||||
use crate::etl::processor::select::SelectProcessor;
|
||||
use crate::etl::processor::simple_extract::SimpleExtractProcessor;
|
||||
use crate::etl::processor::vrl::VrlProcessor;
|
||||
use crate::etl::PipelineMap;
|
||||
|
||||
const FIELD_NAME: &str = "field";
|
||||
@@ -123,7 +125,7 @@ pub trait Processor: std::fmt::Debug + Send + Sync + 'static {
|
||||
fn ignore_missing(&self) -> bool;
|
||||
|
||||
/// Execute the processor on a vector which be preprocessed by the pipeline
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()>;
|
||||
fn exec_mut(&self, val: PipelineMap) -> Result<PipelineMap>;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -146,6 +148,7 @@ pub enum ProcessorKind {
|
||||
Decolorize(DecolorizeProcessor),
|
||||
Digest(DigestProcessor),
|
||||
Select(SelectProcessor),
|
||||
Vrl(VrlProcessor),
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
@@ -227,6 +230,7 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorKind> {
|
||||
json_parse::PROCESSOR_JSON_PARSE => {
|
||||
ProcessorKind::JsonParse(JsonParseProcessor::try_from(value)?)
|
||||
}
|
||||
vrl::PROCESSOR_VRL => ProcessorKind::Vrl(VrlProcessor::try_from(value)?),
|
||||
select::PROCESSOR_SELECT => ProcessorKind::Select(SelectProcessor::try_from(value)?),
|
||||
_ => return UnsupportedProcessorSnafu { processor: str_key }.fail(),
|
||||
};
|
||||
|
||||
@@ -249,7 +249,7 @@ impl Processor for CmcdProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let name = field.input_field();
|
||||
|
||||
@@ -277,7 +277,7 @@ impl Processor for CmcdProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -189,7 +189,7 @@ impl Processor for CsvProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let name = field.input_field();
|
||||
|
||||
@@ -216,7 +216,7 @@ impl Processor for CsvProcessor {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -163,6 +163,10 @@ pub struct DateProcessor {
|
||||
}
|
||||
|
||||
impl DateProcessor {
|
||||
pub(crate) fn target_count(&self) -> usize {
|
||||
self.fields.len()
|
||||
}
|
||||
|
||||
fn parse(&self, val: &str) -> Result<Timestamp> {
|
||||
let mut tz = Tz::UTC;
|
||||
if let Some(timezone) = &self.timezone {
|
||||
@@ -194,7 +198,7 @@ impl Processor for DateProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -221,7 +225,7 @@ impl Processor for DateProcessor {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -102,7 +102,7 @@ impl crate::etl::processor::Processor for DecolorizeProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -122,7 +122,7 @@ impl crate::etl::processor::Processor for DecolorizeProcessor {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -201,7 +201,7 @@ impl crate::etl::processor::Processor for DigestProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -221,7 +221,7 @@ impl crate::etl::processor::Processor for DigestProcessor {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -601,7 +601,7 @@ impl Processor for DissectProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -629,7 +629,7 @@ impl Processor for DissectProcessor {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -111,6 +111,10 @@ impl EpochProcessor {
|
||||
Resolution::Nano => Ok(Timestamp::Nanosecond(t)),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn target_count(&self) -> usize {
|
||||
self.fields.len()
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&yaml_rust::yaml::Hash> for EpochProcessor {
|
||||
@@ -163,7 +167,7 @@ impl Processor for EpochProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -183,7 +187,7 @@ impl Processor for EpochProcessor {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -118,7 +118,7 @@ impl crate::etl::processor::Processor for GsubProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -138,7 +138,7 @@ impl crate::etl::processor::Processor for GsubProcessor {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -95,7 +95,7 @@ impl Processor for JoinProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -123,7 +123,7 @@ impl Processor for JoinProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -97,7 +97,7 @@ impl Processor for JsonParseProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -117,7 +117,7 @@ impl Processor for JsonParseProcessor {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -125,7 +125,7 @@ impl Processor for JsonPathProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -145,7 +145,7 @@ impl Processor for JsonPathProcessor {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -126,7 +126,7 @@ impl Processor for LetterProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -154,7 +154,7 @@ impl Processor for LetterProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -192,7 +192,7 @@ impl Processor for RegexProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
let prefix = field.target_or_input_field();
|
||||
@@ -220,7 +220,7 @@ impl Processor for RegexProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -96,7 +96,7 @@ impl Processor for SelectProcessor {
|
||||
true
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
match self.select_type {
|
||||
SelectType::Include => {
|
||||
let mut include_key_set = HashSet::with_capacity(val.len());
|
||||
@@ -121,7 +121,7 @@ impl Processor for SelectProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -142,8 +142,9 @@ mod test {
|
||||
p.insert("hello".to_string(), Value::String("world".to_string()));
|
||||
p.insert("hello2".to_string(), Value::String("world2".to_string()));
|
||||
|
||||
let result = processor.exec_mut(&mut p);
|
||||
let result = processor.exec_mut(p);
|
||||
assert!(result.is_ok());
|
||||
let p = result.unwrap();
|
||||
assert_eq!(p.len(), 1);
|
||||
assert_eq!(p.get("hello"), Some(&Value::String("world".to_string())));
|
||||
}
|
||||
@@ -159,8 +160,9 @@ mod test {
|
||||
p.insert("hello".to_string(), Value::String("world".to_string()));
|
||||
p.insert("hello2".to_string(), Value::String("world2".to_string()));
|
||||
|
||||
let result = processor.exec_mut(&mut p);
|
||||
let result = processor.exec_mut(p);
|
||||
assert!(result.is_ok());
|
||||
let p = result.unwrap();
|
||||
assert_eq!(p.len(), 1);
|
||||
assert_eq!(p.get("hello3"), Some(&Value::String("world".to_string())));
|
||||
}
|
||||
@@ -176,8 +178,9 @@ mod test {
|
||||
p.insert("hello".to_string(), Value::String("world".to_string()));
|
||||
p.insert("hello2".to_string(), Value::String("world2".to_string()));
|
||||
|
||||
let result = processor.exec_mut(&mut p);
|
||||
let result = processor.exec_mut(p);
|
||||
assert!(result.is_ok());
|
||||
let p = result.unwrap();
|
||||
assert_eq!(p.len(), 1);
|
||||
assert_eq!(p.get("hello"), None);
|
||||
assert_eq!(p.get("hello2"), Some(&Value::String("world2".to_string())));
|
||||
|
||||
@@ -98,7 +98,7 @@ impl Processor for SimpleExtractProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -118,7 +118,7 @@ impl Processor for SimpleExtractProcessor {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -205,6 +205,10 @@ impl TimestampProcessor {
|
||||
Resolution::Nano => Ok(Timestamp::Nanosecond(t)),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn target_count(&self) -> usize {
|
||||
self.fields.len()
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_formats(yaml: &yaml_rust::yaml::Yaml) -> Result<Vec<(Arc<String>, Tz)>> {
|
||||
@@ -298,7 +302,7 @@ impl Processor for TimestampProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -318,7 +322,7 @@ impl Processor for TimestampProcessor {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -126,7 +126,7 @@ impl crate::etl::processor::Processor for UrlEncodingProcessor {
|
||||
self.ignore_missing
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
|
||||
fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
|
||||
for field in self.fields.iter() {
|
||||
let index = field.input_field();
|
||||
match val.get(index) {
|
||||
@@ -153,7 +153,7 @@ impl crate::etl::processor::Processor for UrlEncodingProcessor {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
319
src/pipeline/src/etl/processor/vrl.rs
Normal file
319
src/pipeline/src/etl/processor/vrl.rs
Normal file
@@ -0,0 +1,319 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use chrono_tz::Tz;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use vrl::compiler::runtime::Runtime;
|
||||
use vrl::compiler::{compile, Program, TargetValue};
|
||||
use vrl::diagnostic::Formatter;
|
||||
use vrl::prelude::{Bytes, NotNan, TimeZone};
|
||||
use vrl::value::{KeyString, Kind, Secrets, Value as VrlValue};
|
||||
|
||||
use crate::error::{
|
||||
BytesToUtf8Snafu, CompileVrlSnafu, Error, ExecuteVrlSnafu, FloatNaNSnafu,
|
||||
InvalidTimestampSnafu, KeyMustBeStringSnafu, Result, VrlRegexValueSnafu, VrlReturnValueSnafu,
|
||||
};
|
||||
use crate::etl::processor::yaml_string;
|
||||
use crate::{PipelineMap, Value as PipelineValue};
|
||||
|
||||
pub(crate) const PROCESSOR_VRL: &str = "vrl";
|
||||
const SOURCE: &str = "source";
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct VrlProcessor {
|
||||
source: String,
|
||||
program: Program,
|
||||
}
|
||||
|
||||
impl VrlProcessor {
|
||||
pub fn new(source: String) -> Result<Self> {
|
||||
let fns = vrl::stdlib::all();
|
||||
|
||||
let compile_result = compile(&source, &fns).map_err(|e| {
|
||||
CompileVrlSnafu {
|
||||
msg: Formatter::new(&source, e).to_string(),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
|
||||
let program = compile_result.program;
|
||||
|
||||
// check if the return value is have regex
|
||||
let result_def = program.final_type_info().result;
|
||||
let kind = result_def.kind();
|
||||
if !kind.is_object() {
|
||||
return VrlReturnValueSnafu.fail();
|
||||
}
|
||||
check_regex_output(kind)?;
|
||||
|
||||
Ok(Self { source, program })
|
||||
}
|
||||
|
||||
pub fn resolve(&self, m: PipelineMap) -> Result<PipelineValue> {
|
||||
let pipeline_vrl = m
|
||||
.into_iter()
|
||||
.map(|(k, v)| pipeline_value_to_vrl_value(v).map(|v| (KeyString::from(k), v)))
|
||||
.collect::<Result<BTreeMap<_, _>>>()?;
|
||||
|
||||
let mut target = TargetValue {
|
||||
value: VrlValue::Object(pipeline_vrl),
|
||||
metadata: VrlValue::Object(BTreeMap::new()),
|
||||
secrets: Secrets::default(),
|
||||
};
|
||||
|
||||
let timezone = TimeZone::Named(Tz::UTC);
|
||||
let mut runtime = Runtime::default();
|
||||
let re = runtime
|
||||
.resolve(&mut target, &self.program, &timezone)
|
||||
.map_err(|e| {
|
||||
ExecuteVrlSnafu {
|
||||
msg: e.get_expression_error().to_string(),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
|
||||
vrl_value_to_pipeline_value(re)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&yaml_rust::yaml::Hash> for VrlProcessor {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
|
||||
let mut source = String::new();
|
||||
for (k, v) in value.iter() {
|
||||
let key = k
|
||||
.as_str()
|
||||
.with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
|
||||
if key == SOURCE {
|
||||
source = yaml_string(v, SOURCE)?;
|
||||
}
|
||||
}
|
||||
let processor = VrlProcessor::new(source)?;
|
||||
Ok(processor)
|
||||
}
|
||||
}
|
||||
|
||||
impl crate::etl::processor::Processor for VrlProcessor {
|
||||
fn kind(&self) -> &str {
|
||||
PROCESSOR_VRL
|
||||
}
|
||||
|
||||
fn ignore_missing(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn exec_mut(&self, val: PipelineMap) -> Result<PipelineMap> {
|
||||
let val = self.resolve(val)?;
|
||||
|
||||
if let PipelineValue::Map(m) = val {
|
||||
Ok(m.values)
|
||||
} else {
|
||||
VrlRegexValueSnafu.fail()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn pipeline_value_to_vrl_value(v: PipelineValue) -> Result<VrlValue> {
|
||||
match v {
|
||||
PipelineValue::Null => Ok(VrlValue::Null),
|
||||
PipelineValue::Int8(x) => Ok(VrlValue::Integer(x as i64)),
|
||||
PipelineValue::Int16(x) => Ok(VrlValue::Integer(x as i64)),
|
||||
PipelineValue::Int32(x) => Ok(VrlValue::Integer(x as i64)),
|
||||
PipelineValue::Int64(x) => Ok(VrlValue::Integer(x)),
|
||||
PipelineValue::Uint8(x) => Ok(VrlValue::Integer(x as i64)),
|
||||
PipelineValue::Uint16(x) => Ok(VrlValue::Integer(x as i64)),
|
||||
PipelineValue::Uint32(x) => Ok(VrlValue::Integer(x as i64)),
|
||||
PipelineValue::Uint64(x) => Ok(VrlValue::Integer(x as i64)),
|
||||
PipelineValue::Float32(x) => NotNan::new(x as f64)
|
||||
.map_err(|_| FloatNaNSnafu { input_float: x }.build())
|
||||
.map(VrlValue::Float),
|
||||
PipelineValue::Float64(x) => NotNan::new(x)
|
||||
.map_err(|_| FloatNaNSnafu { input_float: x }.build())
|
||||
.map(VrlValue::Float),
|
||||
PipelineValue::Boolean(x) => Ok(VrlValue::Boolean(x)),
|
||||
PipelineValue::String(x) => Ok(VrlValue::Bytes(Bytes::copy_from_slice(x.as_bytes()))),
|
||||
PipelineValue::Timestamp(x) => x
|
||||
.to_datetime()
|
||||
.context(InvalidTimestampSnafu {
|
||||
input: x.to_string(),
|
||||
})
|
||||
.map(VrlValue::Timestamp),
|
||||
PipelineValue::Array(array) => Ok(VrlValue::Array(
|
||||
array
|
||||
.into_iter()
|
||||
.map(pipeline_value_to_vrl_value)
|
||||
.collect::<Result<Vec<_>>>()?,
|
||||
)),
|
||||
PipelineValue::Map(m) => {
|
||||
let values = m
|
||||
.values
|
||||
.into_iter()
|
||||
.map(|(k, v)| pipeline_value_to_vrl_value(v).map(|v| (KeyString::from(k), v)))
|
||||
.collect::<Result<BTreeMap<_, _>>>()?;
|
||||
Ok(VrlValue::Object(values))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn vrl_value_to_pipeline_value(v: VrlValue) -> Result<PipelineValue> {
|
||||
match v {
|
||||
VrlValue::Bytes(bytes) => String::from_utf8(bytes.to_vec())
|
||||
.context(BytesToUtf8Snafu)
|
||||
.map(PipelineValue::String),
|
||||
VrlValue::Regex(_) => VrlRegexValueSnafu.fail(),
|
||||
VrlValue::Integer(x) => Ok(PipelineValue::Int64(x)),
|
||||
VrlValue::Float(not_nan) => Ok(PipelineValue::Float64(not_nan.into_inner())),
|
||||
VrlValue::Boolean(b) => Ok(PipelineValue::Boolean(b)),
|
||||
VrlValue::Timestamp(date_time) => crate::etl::value::Timestamp::from_datetime(date_time)
|
||||
.context(InvalidTimestampSnafu {
|
||||
input: date_time.to_string(),
|
||||
})
|
||||
.map(PipelineValue::Timestamp),
|
||||
VrlValue::Object(bm) => {
|
||||
let b = bm
|
||||
.into_iter()
|
||||
.map(|(k, v)| vrl_value_to_pipeline_value(v).map(|v| (k.to_string(), v)))
|
||||
.collect::<Result<BTreeMap<String, PipelineValue>>>()?;
|
||||
Ok(PipelineValue::Map(b.into()))
|
||||
}
|
||||
VrlValue::Array(values) => {
|
||||
let a = values
|
||||
.into_iter()
|
||||
.map(vrl_value_to_pipeline_value)
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
Ok(PipelineValue::Array(a.into()))
|
||||
}
|
||||
VrlValue::Null => Ok(PipelineValue::Null),
|
||||
}
|
||||
}
|
||||
|
||||
fn check_regex_output(output_kind: &Kind) -> Result<()> {
|
||||
if output_kind.is_regex() {
|
||||
return VrlRegexValueSnafu.fail();
|
||||
}
|
||||
|
||||
if let Some(arr) = output_kind.as_array() {
|
||||
let k = arr.known();
|
||||
for v in k.values() {
|
||||
check_regex_output(v)?
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(obj) = output_kind.as_object() {
|
||||
let k = obj.known();
|
||||
for v in k.values() {
|
||||
check_regex_output(v)?
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::*;
|
||||
use crate::etl::value::Timestamp;
|
||||
use crate::Map;
|
||||
|
||||
#[test]
|
||||
fn test_vrl() {
|
||||
let source = r#"
|
||||
.name.a = .user_info.name
|
||||
.name.b = .user_info.name
|
||||
del(.user_info)
|
||||
.timestamp = now()
|
||||
.
|
||||
"#;
|
||||
|
||||
let v = VrlProcessor::new(source.to_string());
|
||||
assert!(v.is_ok());
|
||||
let v = v.unwrap();
|
||||
|
||||
let mut n = PipelineMap::new();
|
||||
n.insert(
|
||||
"name".to_string(),
|
||||
PipelineValue::String("certain_name".to_string()),
|
||||
);
|
||||
|
||||
let mut m = PipelineMap::new();
|
||||
m.insert(
|
||||
"user_info".to_string(),
|
||||
PipelineValue::Map(Map { values: n }),
|
||||
);
|
||||
|
||||
let re = v.resolve(m);
|
||||
assert!(re.is_ok());
|
||||
let re = re.unwrap();
|
||||
|
||||
assert!(matches!(re, PipelineValue::Map(_)));
|
||||
assert!(re.get("name").is_some());
|
||||
let name = re.get("name").unwrap();
|
||||
assert!(matches!(name.get("a").unwrap(), PipelineValue::String(x) if x == "certain_name"));
|
||||
assert!(matches!(name.get("b").unwrap(), PipelineValue::String(x) if x == "certain_name"));
|
||||
assert!(re.get("timestamp").is_some());
|
||||
let timestamp = re.get("timestamp").unwrap();
|
||||
assert!(matches!(
|
||||
timestamp,
|
||||
PipelineValue::Timestamp(Timestamp::Nanosecond(_))
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_yaml_to_vrl() {
|
||||
let yaml = r#"
|
||||
processors:
|
||||
- vrl:
|
||||
source: |
|
||||
.name.a = .user_info.name
|
||||
.name.b = .user_info.name
|
||||
del(.user_info)
|
||||
.timestamp = now()
|
||||
.
|
||||
"#;
|
||||
let y = yaml_rust::YamlLoader::load_from_str(yaml).unwrap();
|
||||
let vrl_processor_yaml = y
|
||||
.first()
|
||||
.and_then(|x| x.as_hash())
|
||||
.and_then(|x| x.get(&yaml_rust::Yaml::String("processors".to_string())))
|
||||
.and_then(|x| x.as_vec())
|
||||
.and_then(|x| x.first())
|
||||
.and_then(|x| x.as_hash())
|
||||
.and_then(|x| x.get(&yaml_rust::Yaml::String("vrl".to_string())))
|
||||
.and_then(|x| x.as_hash())
|
||||
.unwrap();
|
||||
|
||||
let vrl = VrlProcessor::try_from(vrl_processor_yaml);
|
||||
assert!(vrl.is_ok());
|
||||
let vrl = vrl.unwrap();
|
||||
|
||||
assert_eq!(vrl.source, ".name.a = .user_info.name\n.name.b = .user_info.name\ndel(.user_info)\n.timestamp = now()\n.\n");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_regex() {
|
||||
let source = r#"
|
||||
.re = r'(?i)^Hello, World!$'
|
||||
del(.re)
|
||||
.re = r'(?i)^Hello, World!$'
|
||||
.
|
||||
"#;
|
||||
|
||||
let v = VrlProcessor::new(source.to_string());
|
||||
assert!(v.is_err());
|
||||
}
|
||||
}
|
||||
@@ -88,9 +88,10 @@ impl TryFrom<&Vec<yaml_rust::Yaml>> for Transforms {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(docs: &Vec<yaml_rust::Yaml>) -> Result<Self> {
|
||||
let mut transforms = Vec::with_capacity(100);
|
||||
let mut all_output_keys: Vec<String> = Vec::with_capacity(100);
|
||||
let mut all_required_keys = Vec::with_capacity(100);
|
||||
let mut transforms = Vec::with_capacity(32);
|
||||
let mut all_output_keys: Vec<String> = Vec::with_capacity(32);
|
||||
let mut all_required_keys = Vec::with_capacity(32);
|
||||
|
||||
for doc in docs {
|
||||
let transform_builder: Transform = doc
|
||||
.as_hash()
|
||||
@@ -123,15 +124,10 @@ impl TryFrom<&Vec<yaml_rust::Yaml>> for Transforms {
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Transform {
|
||||
pub fields: Fields,
|
||||
|
||||
pub type_: Value,
|
||||
|
||||
pub default: Option<Value>,
|
||||
|
||||
pub index: Option<Index>,
|
||||
|
||||
pub tag: bool,
|
||||
|
||||
pub on_failure: Option<OnFailure>,
|
||||
}
|
||||
|
||||
|
||||
@@ -35,12 +35,13 @@ use crate::error::{
|
||||
TransformColumnNameMustBeUniqueSnafu, TransformMultipleTimestampIndexSnafu,
|
||||
TransformTimestampIndexCountSnafu, UnsupportedNumberTypeSnafu,
|
||||
};
|
||||
use crate::etl::ctx_req::ContextOpt;
|
||||
use crate::etl::field::{Field, Fields};
|
||||
use crate::etl::transform::index::Index;
|
||||
use crate::etl::transform::{Transform, Transforms};
|
||||
use crate::etl::value::{Timestamp, Value};
|
||||
use crate::etl::PipelineMap;
|
||||
use crate::{from_pipeline_map_to_opt, PipelineContext};
|
||||
use crate::PipelineContext;
|
||||
|
||||
const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp";
|
||||
const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
|
||||
@@ -185,8 +186,8 @@ impl GreptimeTransformer {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn transform_mut(&self, pipeline_map: &mut PipelineMap) -> Result<(String, Row)> {
|
||||
let opt = from_pipeline_map_to_opt(pipeline_map);
|
||||
pub fn transform_mut(&self, pipeline_map: &mut PipelineMap) -> Result<(ContextOpt, Row)> {
|
||||
let opt = ContextOpt::from_pipeline_map_to_opt(pipeline_map);
|
||||
|
||||
let mut values = vec![GreptimeValue { value_data: None }; self.schema.len()];
|
||||
let mut output_index = 0;
|
||||
@@ -519,7 +520,7 @@ fn resolve_value(
|
||||
fn identity_pipeline_inner(
|
||||
pipeline_maps: Vec<PipelineMap>,
|
||||
pipeline_ctx: &PipelineContext<'_>,
|
||||
) -> Result<(SchemaInfo, HashMap<String, Vec<Row>>)> {
|
||||
) -> Result<(SchemaInfo, HashMap<ContextOpt, Vec<Row>>)> {
|
||||
let mut schema_info = SchemaInfo::default();
|
||||
let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
|
||||
|
||||
@@ -544,7 +545,7 @@ fn identity_pipeline_inner(
|
||||
let len = pipeline_maps.len();
|
||||
|
||||
for mut pipeline_map in pipeline_maps {
|
||||
let opt = from_pipeline_map_to_opt(&mut pipeline_map);
|
||||
let opt = ContextOpt::from_pipeline_map_to_opt(&mut pipeline_map);
|
||||
let row = values_to_row(&mut schema_info, pipeline_map, pipeline_ctx)?;
|
||||
|
||||
opt_map
|
||||
@@ -578,7 +579,7 @@ pub fn identity_pipeline(
|
||||
array: Vec<PipelineMap>,
|
||||
table: Option<Arc<table::Table>>,
|
||||
pipeline_ctx: &PipelineContext<'_>,
|
||||
) -> Result<HashMap<String, Rows>> {
|
||||
) -> Result<HashMap<ContextOpt, Rows>> {
|
||||
let input = if pipeline_ctx.pipeline_param.flatten_json_object() {
|
||||
array
|
||||
.into_iter()
|
||||
@@ -609,7 +610,7 @@ pub fn identity_pipeline(
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect::<HashMap<String, Rows>>()
|
||||
.collect::<HashMap<ContextOpt, Rows>>()
|
||||
})
|
||||
}
|
||||
|
||||
@@ -761,7 +762,7 @@ mod tests {
|
||||
assert!(rows.is_ok());
|
||||
let mut rows = rows.unwrap();
|
||||
assert!(rows.len() == 1);
|
||||
let rows = rows.remove("").unwrap();
|
||||
let rows = rows.remove(&ContextOpt::default()).unwrap();
|
||||
assert_eq!(rows.schema.len(), 8);
|
||||
assert_eq!(rows.rows.len(), 2);
|
||||
assert_eq!(8, rows.rows[0].values.len());
|
||||
@@ -799,7 +800,7 @@ mod tests {
|
||||
}
|
||||
|
||||
assert!(rows.len() == 1);
|
||||
let rows = rows.remove("").unwrap();
|
||||
let rows = rows.remove(&ContextOpt::default()).unwrap();
|
||||
|
||||
Rows {
|
||||
schema: schema.schema,
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use common_time::timestamp::TimeUnit;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
@@ -104,6 +105,19 @@ impl Timestamp {
|
||||
Timestamp::Second(_) => TimeUnit::Second,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn to_datetime(&self) -> Option<DateTime<Utc>> {
|
||||
match self {
|
||||
Timestamp::Nanosecond(v) => Some(DateTime::from_timestamp_nanos(*v)),
|
||||
Timestamp::Microsecond(v) => DateTime::from_timestamp_micros(*v),
|
||||
Timestamp::Millisecond(v) => DateTime::from_timestamp_millis(*v),
|
||||
Timestamp::Second(v) => DateTime::from_timestamp(*v, 0),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_datetime(dt: DateTime<Utc>) -> Option<Self> {
|
||||
dt.timestamp_nanos_opt().map(Timestamp::Nanosecond)
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Timestamp {
|
||||
|
||||
@@ -19,7 +19,7 @@ mod manager;
|
||||
mod metrics;
|
||||
mod tablesuffix;
|
||||
|
||||
pub use etl::ctx_req::{from_pipeline_map_to_opt, ContextReq};
|
||||
pub use etl::ctx_req::{ContextOpt, ContextReq};
|
||||
pub use etl::processor::Processor;
|
||||
pub use etl::transform::transformer::greptime::{GreptimePipelineParams, SchemaInfo};
|
||||
pub use etl::transform::transformer::identity_pipeline;
|
||||
|
||||
@@ -29,9 +29,9 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows {
|
||||
match input_value {
|
||||
serde_json::Value::Array(array) => {
|
||||
for value in array {
|
||||
let mut intermediate_status = json_to_map(value).unwrap();
|
||||
let intermediate_status = json_to_map(value).unwrap();
|
||||
let row = pipeline
|
||||
.exec_mut(&mut intermediate_status)
|
||||
.exec_mut(intermediate_status)
|
||||
.expect("failed to exec pipeline")
|
||||
.into_transformed()
|
||||
.expect("expect transformed result ");
|
||||
@@ -39,9 +39,9 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows {
|
||||
}
|
||||
}
|
||||
serde_json::Value::Object(_) => {
|
||||
let mut intermediate_status = json_to_map(input_value).unwrap();
|
||||
let intermediate_status = json_to_map(input_value).unwrap();
|
||||
let row = pipeline
|
||||
.exec_mut(&mut intermediate_status)
|
||||
.exec_mut(intermediate_status)
|
||||
.expect("failed to exec pipeline")
|
||||
.into_transformed()
|
||||
.expect("expect transformed result ");
|
||||
|
||||
@@ -274,9 +274,9 @@ transform:
|
||||
let yaml_content = pipeline::Content::Yaml(pipeline_yaml);
|
||||
let pipeline: pipeline::Pipeline =
|
||||
pipeline::parse(&yaml_content).expect("failed to parse pipeline");
|
||||
let mut result = json_to_map(input_value).unwrap();
|
||||
let result = json_to_map(input_value).unwrap();
|
||||
|
||||
let row = pipeline.exec_mut(&mut result);
|
||||
let row = pipeline.exec_mut(result);
|
||||
|
||||
assert!(row.is_err());
|
||||
assert_eq!(row.err().unwrap().to_string(), "No matching pattern found");
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user