Compare commits

...

17 Commits

Author SHA1 Message Date
Yingwen
1008af5324 feat!: Divide flush and compaction job pool (#4871)
* feat: divide flush/compact job pool

* feat!: divide bg jobs config

* docs: update config examples

* test: fix tests
2024-10-25 23:36:16 +00:00
discord9
2485f66077 chore: graceful exit on bind fail (#4882) 2024-10-25 09:29:39 +00:00
Weny Xu
4f3afb13b6 fix: fix broken import (#4880) 2024-10-25 07:09:51 +00:00
shuiyisong
32a0023010 chore: add schema urls to otlp logs (#4876)
* chore: add schema urls to otlp logs table

* chore: update meter-macros version to remove anymap warning

* chore: change span id and trace id to field
2024-10-25 03:45:24 +00:00
Kaifeng Zheng
4e9c251041 feat: add json_path_match udf (#4864)
* add json_path_match udf

* sql tests for json_path_match

* fix clippy & comment

* fix null value behavior

* added null tests

* adjust function's behavior on nulls

* update test cases

* fix null check of json
2024-10-25 03:13:34 +00:00
Lei, HUANG
e328c7067c chore: udapte Rust toolchain to 2024-10-19 (#4857)
* update rust toolchain

* change toolchain to 2024-10-17

* fix: clippy

* fix: ut

* bump shadow-rs

* fix: use nightly-2024-10-19

* fix: clippy

* chore/udapte-toolchain-2024-10-17: Update DEV_BUILDER_IMAGE_TAG to 2024-10-19-a5c00e85-20241024184445 in Makefile
2024-10-25 00:23:32 +00:00
Weny Xu
8b307e4548 feat: introduce the PluginOptions (#4835)
* feat: introduce the `PluginOptions`

* chore: apply suggestions from CR
2024-10-24 12:02:10 +00:00
discord9
ff38abde2e chore: better column schema check for flow (#4855)
* chore: better column schema check for flow

* chore: better msg

* tests: clean up after tests

* chore: better msg

* chore: per review

* tests: sqlness
2024-10-24 09:43:32 +00:00
jeremyhi
aa9a265984 chore: make pusher log easy to understand (#4841)
* chore: make pusher log easy to understand

* Update src/meta-srv/src/service/heartbeat.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* Update src/meta-srv/src/service/heartbeat.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* chore: by comment

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
2024-10-24 07:44:16 +00:00
pa
9d3ee6384a feat: Limit CPU in runtime (#3685) (#4782)
feat: add throttle runtime (#3685)
2024-10-24 07:30:24 +00:00
localhost
fcde0a4874 feat: Add functionality to the Opentelemetry write interface to extract fields from attr to top-level data. (#4859)
* chore: add otlp select

* chore: change otlp select

* chore: remove json path

* chore: format toml

* chore: change opentelemetry extract keys header name

* chore: add some doc and remove useless code and lib

* chore: make clippy happy

* chore: fix by pr comment

* chore: fix by pr comment

* chore: opentelemetry logs select key change some type default semantic type
2024-10-24 05:55:57 +00:00
Weny Xu
5d42e63ab0 fix!: replace timeout_millis and connect_timeout_millis with Duration in DatanodeClientOptions (#4867)
* fix: correct options struct

* fix: fix unit test
2024-10-23 08:20:34 +00:00
discord9
0c01532a37 feat: Sort within each PartitionRange (#4847)
* feat: PartSort

* chore: rm unused

* chore: typo

* chore: mem pool df

* chore: add location to arrow error

* refactor: test_util

* refactor: per review

* chore: rm unused

* chore: more cases

* chore: test&buffer clear

* fix: remove fetch

* chore: fmt

* chore: per review

* chore: rm unused
2024-10-23 07:01:55 +00:00
ZonaHe
6d503b047a feat: update dashboard to v0.6.0 (#4861)
Co-authored-by: ZonaHex <ZonaHex@users.noreply.github.com>
2024-10-22 02:34:09 +00:00
Yingwen
5d28f7a912 feat: yields empty batch after reading a range (#4845)
* feat: add empty batch to end of range stream

* feat: add batch validation

* fix: validate batch order

* fix: not yield empty batch in compaction

* fix: empty record batch

* feat: add a flag to enable empty batch
2024-10-21 13:52:47 +00:00
Lei, HUANG
a50eea76a6 chore: bump greptime-meter (#4858)
chore/bump-greptime-meter: Add meter-core package and update meter-core dependency across various packages to
new git revision.
2024-10-21 08:18:30 +00:00
Yingwen
2ee1ce2ba1 docs: change cpu/mem panel to time-series (#4844)
* docs: change cpu/mem panel to time-series

* docs: update version
2024-10-18 08:42:01 +00:00
174 changed files with 3234 additions and 665 deletions

51
Cargo.lock generated
View File

@@ -1,6 +1,6 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
version = 4
[[package]]
name = "Inflector"
@@ -200,12 +200,6 @@ version = "1.0.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6"
[[package]]
name = "anymap"
version = "1.0.0-beta.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f1f8f5a6f3d50d89e3797d7593a50f96bb2aaa20ca0cc7be1fb673232c91d72"
[[package]]
name = "anymap2"
version = "0.13.0"
@@ -1808,6 +1802,17 @@ dependencies = [
"winapi",
]
[[package]]
name = "clocksource"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "129026dd5a8a9592d96916258f3a5379589e513ea5e86aeb0bd2530286e44e9e"
dependencies = [
"libc",
"time",
"winapi",
]
[[package]]
name = "cmake"
version = "0.1.51"
@@ -2332,16 +2337,24 @@ name = "common-runtime"
version = "0.9.5"
dependencies = [
"async-trait",
"clap 4.5.19",
"common-error",
"common-macro",
"common-telemetry",
"futures",
"lazy_static",
"num_cpus",
"once_cell",
"parking_lot 0.12.3",
"paste",
"pin-project",
"prometheus",
"rand",
"ratelimit",
"serde",
"serde_json",
"snafu 0.8.5",
"tempfile",
"tokio",
"tokio-metrics",
"tokio-metrics-collector",
@@ -5250,7 +5263,7 @@ dependencies = [
[[package]]
name = "influxdb_line_protocol"
version = "0.1.0"
source = "git+https://github.com/evenyag/influxdb_iox?branch=feat/line-protocol#10ef0d0b02705ac7518717390939fa3a9bcfcacc"
source = "git+https://github.com/evenyag/influxdb_iox?branch=feat%2Fline-protocol#10ef0d0b02705ac7518717390939fa3a9bcfcacc"
dependencies = [
"bytes",
"nom",
@@ -6408,9 +6421,9 @@ dependencies = [
[[package]]
name = "meter-core"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=80eb97c24c88af4dd9a86f8bbaf50e741d4eb8cd#80eb97c24c88af4dd9a86f8bbaf50e741d4eb8cd"
source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=a10facb353b41460eeb98578868ebf19c2084fac#a10facb353b41460eeb98578868ebf19c2084fac"
dependencies = [
"anymap",
"anymap2",
"once_cell",
"parking_lot 0.12.3",
]
@@ -6418,7 +6431,7 @@ dependencies = [
[[package]]
name = "meter-macros"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=80eb97c24c88af4dd9a86f8bbaf50e741d4eb8cd#80eb97c24c88af4dd9a86f8bbaf50e741d4eb8cd"
source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=a10facb353b41460eeb98578868ebf19c2084fac#a10facb353b41460eeb98578868ebf19c2084fac"
dependencies = [
"meter-core",
]
@@ -8320,6 +8333,7 @@ dependencies = [
"datanode",
"frontend",
"meta-srv",
"serde",
"snafu 0.8.5",
]
@@ -9195,6 +9209,17 @@ dependencies = [
"rand",
]
[[package]]
name = "ratelimit"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c1bb13e2dcfa2232ac6887157aad8d9b3fe4ca57f7c8d4938ff5ea9be742300"
dependencies = [
"clocksource",
"parking_lot 0.12.3",
"thiserror",
]
[[package]]
name = "raw-cpuid"
version = "11.2.0"
@@ -10912,9 +10937,9 @@ dependencies = [
[[package]]
name = "shadow-rs"
version = "0.31.1"
version = "0.35.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "02c282402d25101f9c893e9cd7e4cae535fe7db18b81291de973026c219ddf1e"
checksum = "2311e39772c00391875f40e34d43efef247b23930143a70ca5fbec9505937420"
dependencies = [
"const_format",
"git2",

View File

@@ -127,7 +127,7 @@ humantime-serde = "1.1"
itertools = "0.10"
jsonb = { git = "https://github.com/datafuselabs/jsonb.git", rev = "46ad50fc71cf75afbf98eec455f7892a6387c1fc", default-features = false }
lazy_static = "1.4"
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "80eb97c24c88af4dd9a86f8bbaf50e741d4eb8cd" }
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "a10facb353b41460eeb98578868ebf19c2084fac" }
mockall = "0.11.4"
moka = "0.12"
notify = "6.1"
@@ -140,6 +140,7 @@ opentelemetry-proto = { version = "0.5", features = [
"with-serde",
"logs",
] }
parking_lot = "0.12"
parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] }
paste = "1.0"
pin-project = "1.0"
@@ -148,6 +149,7 @@ promql-parser = { version = "0.4.1" }
prost = "0.12"
raft-engine = { version = "0.4.1", default-features = false }
rand = "0.8"
ratelimit = "0.9"
regex = "1.8"
regex-automata = { version = "0.4" }
reqwest = { version = "0.12", default-features = false, features = [
@@ -167,7 +169,7 @@ schemars = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = ["float_roundtrip"] }
serde_with = "3"
shadow-rs = "0.31"
shadow-rs = "0.35"
similar-asserts = "1.6.0"
smallvec = { version = "1", features = ["serde"] }
snafu = "0.8"
@@ -259,7 +261,7 @@ tokio-rustls = { git = "https://github.com/GreptimeTeam/tokio-rustls" }
[workspace.dependencies.meter-macros]
git = "https://github.com/GreptimeTeam/greptime-meter.git"
rev = "80eb97c24c88af4dd9a86f8bbaf50e741d4eb8cd"
rev = "a10facb353b41460eeb98578868ebf19c2084fac"
[profile.release]
debug = 1

View File

@@ -8,7 +8,7 @@ CARGO_BUILD_OPTS := --locked
IMAGE_REGISTRY ?= docker.io
IMAGE_NAMESPACE ?= greptime
IMAGE_TAG ?= latest
DEV_BUILDER_IMAGE_TAG ?= 2024-06-06-5674c14f-20240920110415
DEV_BUILDER_IMAGE_TAG ?= 2024-10-19-a5c00e85-20241024184445
BUILDX_MULTI_PLATFORM_BUILD ?= false
BUILDX_BUILDER_NAME ?= gtbuilder
BASE_IMAGE ?= ubuntu

View File

@@ -83,7 +83,7 @@
| `wal.backoff_max` | String | `10s` | The maximum backoff delay.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_base` | Integer | `2` | The exponential backoff rate, i.e. next backoff = base * current backoff.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_deadline` | String | `5mins` | The deadline of retries.<br/>**It's only used when the provider is `kafka`**. |
| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.<br/>**It's only used when the provider is `kafka`**.<br/><br/>This option ensures that when Kafka messages are deleted, the system <br/>can still successfully replay memtable data without throwing an <br/>out-of-range error. <br/>However, enabling this option might lead to unexpected data loss, <br/>as the system will skip over missing entries instead of treating <br/>them as critical errors. |
| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.<br/>**It's only used when the provider is `kafka`**.<br/><br/>This option ensures that when Kafka messages are deleted, the system<br/>can still successfully replay memtable data without throwing an<br/>out-of-range error.<br/>However, enabling this option might lead to unexpected data loss,<br/>as the system will skip over missing entries instead of treating<br/>them as critical errors. |
| `metadata_store` | -- | -- | Metadata storage options. |
| `metadata_store.file_size` | String | `256MB` | Kv file size in bytes. |
| `metadata_store.purge_threshold` | String | `4GB` | Kv purge threshold. |
@@ -116,7 +116,9 @@
| `region_engine.mito.worker_request_batch_size` | Integer | `64` | Max batch size for a worker to handle requests. |
| `region_engine.mito.manifest_checkpoint_distance` | Integer | `10` | Number of meta action updated to trigger a new checkpoint for the manifest. |
| `region_engine.mito.compress_manifest` | Bool | `false` | Whether to compress manifest and checkpoint file by gzip (default false). |
| `region_engine.mito.max_background_jobs` | Integer | `4` | Max number of running background jobs |
| `region_engine.mito.max_background_flushes` | Integer | Auto | Max number of running background flush jobs (default: 1/2 of cpu cores). |
| `region_engine.mito.max_background_compactions` | Integer | Auto | Max number of running background compaction jobs (default: 1/4 of cpu cores). |
| `region_engine.mito.max_background_purges` | Integer | Auto | Max number of running background purge jobs (default: number of cpu cores). |
| `region_engine.mito.auto_flush_interval` | String | `1h` | Interval to auto flush a region if it has not flushed yet. |
| `region_engine.mito.global_write_buffer_size` | String | Auto | Global write buffer size for all regions. If not set, it's default to 1/8 of OS memory with a max limitation of 1GB. |
| `region_engine.mito.global_write_buffer_reject_size` | String | Auto | Global write buffer size threshold to reject write requests. If not set, it's default to 2 times of `global_write_buffer_size`. |
@@ -410,7 +412,7 @@
| `wal.backoff_deadline` | String | `5mins` | The deadline of retries.<br/>**It's only used when the provider is `kafka`**. |
| `wal.create_index` | Bool | `true` | Whether to enable WAL index creation.<br/>**It's only used when the provider is `kafka`**. |
| `wal.dump_index_interval` | String | `60s` | The interval for dumping WAL indexes.<br/>**It's only used when the provider is `kafka`**. |
| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.<br/>**It's only used when the provider is `kafka`**.<br/><br/>This option ensures that when Kafka messages are deleted, the system <br/>can still successfully replay memtable data without throwing an <br/>out-of-range error. <br/>However, enabling this option might lead to unexpected data loss, <br/>as the system will skip over missing entries instead of treating <br/>them as critical errors. |
| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.<br/>**It's only used when the provider is `kafka`**.<br/><br/>This option ensures that when Kafka messages are deleted, the system<br/>can still successfully replay memtable data without throwing an<br/>out-of-range error.<br/>However, enabling this option might lead to unexpected data loss,<br/>as the system will skip over missing entries instead of treating<br/>them as critical errors. |
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
@@ -437,7 +439,9 @@
| `region_engine.mito.worker_request_batch_size` | Integer | `64` | Max batch size for a worker to handle requests. |
| `region_engine.mito.manifest_checkpoint_distance` | Integer | `10` | Number of meta action updated to trigger a new checkpoint for the manifest. |
| `region_engine.mito.compress_manifest` | Bool | `false` | Whether to compress manifest and checkpoint file by gzip (default false). |
| `region_engine.mito.max_background_jobs` | Integer | `4` | Max number of running background jobs |
| `region_engine.mito.max_background_flushes` | Integer | Auto | Max number of running background flush jobs (default: 1/2 of cpu cores). |
| `region_engine.mito.max_background_compactions` | Integer | Auto | Max number of running background compaction jobs (default: 1/4 of cpu cores). |
| `region_engine.mito.max_background_purges` | Integer | Auto | Max number of running background purge jobs (default: number of cpu cores). |
| `region_engine.mito.auto_flush_interval` | String | `1h` | Interval to auto flush a region if it has not flushed yet. |
| `region_engine.mito.global_write_buffer_size` | String | Auto | Global write buffer size for all regions. If not set, it's default to 1/8 of OS memory with a max limitation of 1GB. |
| `region_engine.mito.global_write_buffer_reject_size` | String | Auto | Global write buffer size threshold to reject write requests. If not set, it's default to 2 times of `global_write_buffer_size` |

View File

@@ -215,12 +215,12 @@ dump_index_interval = "60s"
## Ignore missing entries during read WAL.
## **It's only used when the provider is `kafka`**.
##
## This option ensures that when Kafka messages are deleted, the system
## can still successfully replay memtable data without throwing an
## out-of-range error.
## However, enabling this option might lead to unexpected data loss,
## as the system will skip over missing entries instead of treating
##
## This option ensures that when Kafka messages are deleted, the system
## can still successfully replay memtable data without throwing an
## out-of-range error.
## However, enabling this option might lead to unexpected data loss,
## as the system will skip over missing entries instead of treating
## them as critical errors.
overwrite_entry_start_id = false
@@ -416,8 +416,17 @@ manifest_checkpoint_distance = 10
## Whether to compress manifest and checkpoint file by gzip (default false).
compress_manifest = false
## Max number of running background jobs
max_background_jobs = 4
## Max number of running background flush jobs (default: 1/2 of cpu cores).
## @toml2docs:none-default="Auto"
#+ max_background_flushes = 4
## Max number of running background compaction jobs (default: 1/4 of cpu cores).
## @toml2docs:none-default="Auto"
#+ max_background_compactions = 2
## Max number of running background purge jobs (default: number of cpu cores).
## @toml2docs:none-default="Auto"
#+ max_background_purges = 8
## Interval to auto flush a region if it has not flushed yet.
auto_flush_interval = "1h"

View File

@@ -239,12 +239,12 @@ backoff_deadline = "5mins"
## Ignore missing entries during read WAL.
## **It's only used when the provider is `kafka`**.
##
## This option ensures that when Kafka messages are deleted, the system
## can still successfully replay memtable data without throwing an
## out-of-range error.
## However, enabling this option might lead to unexpected data loss,
## as the system will skip over missing entries instead of treating
##
## This option ensures that when Kafka messages are deleted, the system
## can still successfully replay memtable data without throwing an
## out-of-range error.
## However, enabling this option might lead to unexpected data loss,
## as the system will skip over missing entries instead of treating
## them as critical errors.
overwrite_entry_start_id = false
@@ -454,8 +454,17 @@ manifest_checkpoint_distance = 10
## Whether to compress manifest and checkpoint file by gzip (default false).
compress_manifest = false
## Max number of running background jobs
max_background_jobs = 4
## Max number of running background flush jobs (default: 1/2 of cpu cores).
## @toml2docs:none-default="Auto"
#+ max_background_flushes = 4
## Max number of running background compaction jobs (default: 1/4 of cpu cores).
## @toml2docs:none-default="Auto"
#+ max_background_compactions = 2
## Max number of running background purge jobs (default: number of cpu cores).
## @toml2docs:none-default="Auto"
#+ max_background_purges = 8
## Interval to auto flush a region if it has not flushed yet.
auto_flush_interval = "1h"

View File

@@ -409,7 +409,39 @@
"fieldConfig": {
"defaults": {
"color": {
"mode": "thresholds"
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"fieldMinMax": false,
"mappings": [],
@@ -438,18 +470,16 @@
},
"id": 27,
"options": {
"colorMode": "value",
"graphMode": "area",
"justifyMode": "auto",
"orientation": "auto",
"reduceOptions": {
"calcs": ["lastNotNull"],
"fields": "",
"values": false
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"text": {},
"textMode": "auto",
"wideLayout": true
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "10.2.3",
"targets": [
@@ -467,7 +497,7 @@
}
],
"title": "CPU",
"type": "stat"
"type": "timeseries"
},
{
"datasource": {
@@ -477,7 +507,39 @@
"fieldConfig": {
"defaults": {
"color": {
"mode": "thresholds"
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"decimals": 0,
"fieldMinMax": false,
@@ -503,18 +565,16 @@
},
"id": 28,
"options": {
"colorMode": "value",
"graphMode": "area",
"justifyMode": "auto",
"orientation": "auto",
"reduceOptions": {
"calcs": ["lastNotNull"],
"fields": "",
"values": false
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"text": {},
"textMode": "auto",
"wideLayout": true
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "10.2.3",
"targets": [
@@ -532,7 +592,7 @@
}
],
"title": "Memory",
"type": "stat"
"type": "timeseries"
},
{
"collapsed": false,
@@ -3335,6 +3395,6 @@
"timezone": "",
"title": "GreptimeDB",
"uid": "e7097237-669b-4f8d-b751-13067afbfb68",
"version": 15,
"version": 16,
"weekStart": ""
}

View File

@@ -1,3 +1,2 @@
[toolchain]
channel = "nightly-2024-06-06"
channel = "nightly-2024-10-19"

View File

@@ -33,7 +33,7 @@ impl StaticUserProvider {
value: value.to_string(),
msg: "StaticUserProviderOption must be in format `<option>:<value>`",
})?;
return match mode {
match mode {
"file" => {
let users = load_credential_from_file(content)?
.context(InvalidConfigSnafu {
@@ -58,7 +58,7 @@ impl StaticUserProvider {
msg: "StaticUserProviderOption must be in format `file:<path>` or `cmd:<values>`",
}
.fail(),
};
}
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
/// All table names in `information_schema`.
//! All table names in `information_schema`.
pub const TABLES: &str = "tables";
pub const COLUMNS: &str = "columns";

View File

@@ -74,7 +74,7 @@ impl MemoryTableBuilder {
/// Construct the `information_schema.{table_name}` virtual table
pub async fn memory_records(&mut self) -> Result<RecordBatch> {
if self.columns.is_empty() {
RecordBatch::new_empty(self.schema.clone()).context(CreateRecordBatchSnafu)
Ok(RecordBatch::new_empty(self.schema.clone()))
} else {
RecordBatch::new(self.schema.clone(), std::mem::take(&mut self.columns))
.context(CreateRecordBatchSnafu)

View File

@@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! The `pg_catalog.pg_namespace` table implementation.
//! namespace is a schema in greptime
pub(super) mod oid_map;
use std::sync::{Arc, Weak};
@@ -40,9 +43,6 @@ use crate::system_schema::utils::tables::{string_column, u32_column};
use crate::system_schema::SystemTable;
use crate::CatalogManager;
/// The `pg_catalog.pg_namespace` table implementation.
/// namespace is a schema in greptime
const NSPNAME: &str = "nspname";
const INIT_CAPACITY: usize = 42;

View File

@@ -28,7 +28,7 @@ enum_dispatch = "0.3"
futures-util.workspace = true
lazy_static.workspace = true
moka = { workspace = true, features = ["future"] }
parking_lot = "0.12"
parking_lot.workspace = true
prometheus.workspace = true
prost.workspace = true
query.workspace = true

View File

@@ -272,9 +272,10 @@ impl StartCommand {
info!("Datanode start command: {:#?}", self);
info!("Datanode options: {:#?}", opts);
let plugin_opts = opts.plugins;
let opts = opts.component;
let mut plugins = Plugins::new();
plugins::setup_datanode_plugins(&mut plugins, &opts)
plugins::setup_datanode_plugins(&mut plugins, &plugin_opts, &opts)
.await
.context(StartDatanodeSnafu)?;

View File

@@ -266,9 +266,10 @@ impl StartCommand {
info!("Frontend start command: {:#?}", self);
info!("Frontend options: {:#?}", opts);
let plugin_opts = opts.plugins;
let opts = opts.component;
let mut plugins = Plugins::new();
plugins::setup_frontend_plugins(&mut plugins, &opts)
plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &opts)
.await
.context(StartFrontendSnafu)?;
@@ -342,6 +343,8 @@ impl StartCommand {
// Some queries are expected to take long time.
let channel_config = ChannelConfig {
timeout: None,
tcp_nodelay: opts.datanode.client.tcp_nodelay,
connect_timeout: Some(opts.datanode.client.connect_timeout),
..Default::default()
};
let client = NodeClients::new(channel_config);
@@ -472,7 +475,7 @@ mod tests {
};
let mut plugins = Plugins::new();
plugins::setup_frontend_plugins(&mut plugins, &fe_opts)
plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
.await
.unwrap();

View File

@@ -84,6 +84,7 @@ pub trait App: Send {
}
/// Log the versions of the application, and the arguments passed to the cli.
///
/// `version` should be the same as the output of cli "--version";
/// and the `short_version` is the short version of the codes, often consist of git branch and commit.
pub fn log_versions(version: &str, short_version: &str, app: &str) {

View File

@@ -273,9 +273,10 @@ impl StartCommand {
info!("Metasrv start command: {:#?}", self);
info!("Metasrv options: {:#?}", opts);
let plugin_opts = opts.plugins;
let opts = opts.component;
let mut plugins = Plugins::new();
plugins::setup_metasrv_plugins(&mut plugins, &opts)
plugins::setup_metasrv_plugins(&mut plugins, &plugin_opts, &opts)
.await
.context(StartMetaServerSnafu)?;

View File

@@ -15,6 +15,7 @@
use clap::Parser;
use common_config::Configurable;
use common_runtime::global::RuntimeOptions;
use plugins::PluginOptions;
use serde::{Deserialize, Serialize};
#[derive(Parser, Default, Debug, Clone)]
@@ -40,6 +41,8 @@ pub struct GlobalOptions {
pub struct GreptimeOptions<T> {
/// The runtime options.
pub runtime: RuntimeOptions,
/// The plugin options.
pub plugins: Vec<PluginOptions>,
/// The options of each component (like Datanode or Standalone) of GreptimeDB.
#[serde(flatten)]

View File

@@ -445,15 +445,16 @@ impl StartCommand {
info!("Standalone options: {opts:#?}");
let mut plugins = Plugins::new();
let plugin_opts = opts.plugins;
let opts = opts.component;
let fe_opts = opts.frontend_options();
let dn_opts = opts.datanode_options();
plugins::setup_frontend_plugins(&mut plugins, &fe_opts)
plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &fe_opts)
.await
.context(StartFrontendSnafu)?;
plugins::setup_datanode_plugins(&mut plugins, &dn_opts)
plugins::setup_datanode_plugins(&mut plugins, &plugin_opts, &dn_opts)
.await
.context(StartDatanodeSnafu)?;
@@ -762,7 +763,7 @@ mod tests {
};
let mut plugins = Plugins::new();
plugins::setup_frontend_plugins(&mut plugins, &fe_opts)
plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
.await
.unwrap();

View File

@@ -20,7 +20,7 @@ use common_config::Configurable;
use common_grpc::channel_manager::{
DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
};
use common_telemetry::logging::{LoggingOptions, DEFAULT_OTLP_ENDPOINT};
use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, DEFAULT_OTLP_ENDPOINT};
use common_wal::config::raft_engine::RaftEngineConfig;
use common_wal::config::DatanodeWalConfig;
use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
@@ -159,8 +159,20 @@ fn test_load_metasrv_example_config() {
level: Some("info".to_string()),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
slow_query: SlowQueryOptions {
enable: false,
threshold: Some(Duration::from_secs(10)),
sample_ratio: Some(1.0),
},
..Default::default()
},
datanode: meta_srv::metasrv::DatanodeOptions {
client: meta_srv::metasrv::DatanodeClientOptions {
timeout: Duration::from_secs(10),
connect_timeout: Duration::from_secs(10),
tcp_nodelay: true,
},
},
export_metrics: ExportMetricsOption {
self_import: Some(Default::default()),
remote_write: Some(Default::default()),

View File

@@ -46,8 +46,9 @@ impl From<String> for SecretString {
}
}
/// Wrapper type for values that contains secrets, which attempts to limit
/// accidental exposure and ensure secrets are wiped from memory when dropped.
/// Wrapper type for values that contains secrets.
///
/// It attempts to limit accidental exposure and ensure secrets are wiped from memory when dropped.
/// (e.g. passwords, cryptographic keys, access tokens or other credentials)
///
/// Access to the secret inner value occurs through the [`ExposeSecret`]

View File

@@ -103,14 +103,15 @@ pub const INFORMATION_SCHEMA_PROCEDURE_INFO_TABLE_ID: u32 = 34;
/// id for information_schema.region_statistics
pub const INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID: u32 = 35;
/// ----- End of information_schema tables -----
// ----- End of information_schema tables -----
/// ----- Begin of pg_catalog tables -----
pub const PG_CATALOG_PG_CLASS_TABLE_ID: u32 = 256;
pub const PG_CATALOG_PG_TYPE_TABLE_ID: u32 = 257;
pub const PG_CATALOG_PG_NAMESPACE_TABLE_ID: u32 = 258;
/// ----- End of pg_catalog tables -----
// ----- End of pg_catalog tables -----
pub const MITO_ENGINE: &str = "mito";
pub const MITO2_ENGINE: &str = "mito2";
pub const METRIC_ENGINE: &str = "metric";

View File

@@ -16,6 +16,7 @@ use std::sync::Arc;
mod json_get;
mod json_is;
mod json_path_exists;
mod json_path_match;
mod json_to_string;
mod parse_json;
@@ -49,5 +50,6 @@ impl JsonFunction {
registry.register(Arc::new(JsonIsObject));
registry.register(Arc::new(json_path_exists::JsonPathExistsFunction));
registry.register(Arc::new(json_path_match::JsonPathMatchFunction));
}
}

View File

@@ -0,0 +1,202 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{self, Display};
use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu};
use common_query::prelude::Signature;
use datafusion::logical_expr::Volatility;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::VectorRef;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{BooleanVectorBuilder, MutableVector};
use snafu::ensure;
use crate::function::{Function, FunctionContext};
/// Check if the given JSON data match the given JSON path's predicate.
#[derive(Clone, Debug, Default)]
pub struct JsonPathMatchFunction;
const NAME: &str = "json_path_match";
impl Function for JsonPathMatchFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::boolean_datatype())
}
fn signature(&self) -> Signature {
Signature::exact(
vec![
ConcreteDataType::json_datatype(),
ConcreteDataType::string_datatype(),
],
Volatility::Immutable,
)
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly two, have: {}",
columns.len()
),
}
);
let jsons = &columns[0];
let paths = &columns[1];
let size = jsons.len();
let mut results = BooleanVectorBuilder::with_capacity(size);
for i in 0..size {
let json = jsons.get_ref(i);
let path = paths.get_ref(i);
match json.data_type() {
// JSON data type uses binary vector
ConcreteDataType::Binary(_) => {
let json = json.as_binary();
let path = path.as_string();
let result = match (json, path) {
(Ok(Some(json)), Ok(Some(path))) => {
if !jsonb::is_null(json) {
let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes());
match json_path {
Ok(json_path) => jsonb::path_match(json, json_path).ok(),
Err(_) => None,
}
} else {
None
}
}
_ => None,
};
results.push(result);
}
_ => {
return UnsupportedInputDataTypeSnafu {
function: NAME,
datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
}
.fail();
}
}
}
Ok(results.to_vector())
}
}
impl Display for JsonPathMatchFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "JSON_PATH_MATCH")
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_query::prelude::TypeSignature;
use datatypes::vectors::{BinaryVector, StringVector};
use super::*;
#[test]
fn test_json_path_match_function() {
let json_path_match = JsonPathMatchFunction;
assert_eq!("json_path_match", json_path_match.name());
assert_eq!(
ConcreteDataType::boolean_datatype(),
json_path_match
.return_type(&[ConcreteDataType::json_datatype()])
.unwrap()
);
assert!(matches!(json_path_match.signature(),
Signature {
type_signature: TypeSignature::Exact(valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![ConcreteDataType::json_datatype(), ConcreteDataType::string_datatype()],
));
let json_strings = [
Some(r#"{"a": {"b": 2}, "b": 2, "c": 3}"#.to_string()),
Some(r#"{"a": 1, "b": [1,2,3]}"#.to_string()),
Some(r#"{"a": 1 ,"b": [1,2,3]}"#.to_string()),
Some(r#"[1,2,3]"#.to_string()),
Some(r#"{"a":1,"b":[1,2,3]}"#.to_string()),
Some(r#"null"#.to_string()),
Some(r#"null"#.to_string()),
];
let paths = vec![
Some("$.a.b == 2".to_string()),
Some("$.b[1 to last] >= 2".to_string()),
Some("$.c > 0".to_string()),
Some("$[0 to last] > 0".to_string()),
Some(r#"null"#.to_string()),
Some("$.c > 0".to_string()),
Some(r#"null"#.to_string()),
];
let results = [
Some(true),
Some(true),
Some(false),
Some(true),
None,
None,
None,
];
let jsonbs = json_strings
.into_iter()
.map(|s| s.map(|json| jsonb::parse_value(json.as_bytes()).unwrap().to_vec()))
.collect::<Vec<_>>();
let json_vector = BinaryVector::from(jsonbs);
let path_vector = StringVector::from(paths);
let args: Vec<VectorRef> = vec![Arc::new(json_vector), Arc::new(path_vector)];
let vector = json_path_match
.eval(FunctionContext::default(), &args)
.unwrap();
assert_eq!(7, vector.len());
for (i, expected) in results.iter().enumerate() {
let result = vector.get_ref(i);
match expected {
Some(expected_value) => {
assert!(!result.is_null());
let result_value = result.as_boolean().unwrap().unwrap();
assert_eq!(*expected_value, result_value);
}
None => {
assert!(result.is_null());
}
}
}
}
}

View File

@@ -199,6 +199,7 @@ pub fn default_get_uuid(working_home: &Option<String>) -> Option<String> {
}
/// Report version info to GreptimeDB.
///
/// We do not collect any identity-sensitive information.
/// This task is scheduled to run every 30 minutes.
/// The task will be disabled default. It can be enabled by setting the build feature `greptimedb-telemetry`
@@ -324,7 +325,7 @@ mod tests {
});
let addr = ([127, 0, 0, 1], port).into();
let server = Server::bind(&addr).serve(make_svc);
let server = Server::try_bind(&addr).unwrap().serve(make_svc);
let graceful = server.with_graceful_shutdown(async {
rx.await.ok();
});

View File

@@ -35,7 +35,9 @@ pub fn aggr_func_type_store_derive(input: TokenStream) -> TokenStream {
}
/// A struct can be used as a creator for aggregate function if it has been annotated with this
/// attribute first. This attribute add a necessary field which is intended to store the input
/// attribute first.
///
/// This attribute add a necessary field which is intended to store the input
/// data's types to the struct.
/// This attribute is expected to be used along with derive macro [AggrFuncTypeStore].
#[proc_macro_attribute]
@@ -44,9 +46,10 @@ pub fn as_aggr_func_creator(args: TokenStream, input: TokenStream) -> TokenStrea
}
/// Attribute macro to convert an arithimetic function to a range function. The annotated function
/// should accept servaral arrays as input and return a single value as output. This procedure
/// macro can works on any number of input parameters. Return type can be either primitive type
/// or wrapped in `Option`.
/// should accept servaral arrays as input and return a single value as output.
///
/// This procedure macro can works on any number of input parameters. Return type can be either
/// primitive type or wrapped in `Option`.
///
/// # Example
/// Take `count_over_time()` in PromQL as an example:

View File

@@ -55,6 +55,7 @@ pub trait ClusterInfo {
}
/// The key of [NodeInfo] in the storage. The format is `__meta_cluster_node_info-{cluster_id}-{role}-{node_id}`.
///
/// This key cannot be used to describe the `Metasrv` because the `Metasrv` does not have
/// a `cluster_id`, it serves multiple clusters.
#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)]

View File

@@ -35,7 +35,7 @@ pub struct CatalogNameKey<'a> {
pub catalog: &'a str,
}
impl<'a> Default for CatalogNameKey<'a> {
impl Default for CatalogNameKey<'_> {
fn default() -> Self {
Self {
catalog: DEFAULT_CATALOG_NAME,

View File

@@ -77,7 +77,7 @@ impl DatanodeTableKey {
}
}
impl<'a> MetadataKey<'a, DatanodeTableKey> for DatanodeTableKey {
impl MetadataKey<'_, DatanodeTableKey> for DatanodeTableKey {
fn to_bytes(&self) -> Vec<u8> {
self.to_string().into_bytes()
}

View File

@@ -42,6 +42,8 @@ lazy_static! {
/// The layout: `__flow/info/{flow_id}`.
pub struct FlowInfoKey(FlowScoped<FlowInfoKeyInner>);
pub type FlowInfoDecodeResult = Result<Option<DeserializedValueWithBytes<FlowInfoValue>>>;
impl<'a> MetadataKey<'a, FlowInfoKey> for FlowInfoKey {
fn to_bytes(&self) -> Vec<u8> {
self.0.to_bytes()
@@ -203,9 +205,7 @@ impl FlowInfoManager {
flow_value: &FlowInfoValue,
) -> Result<(
Txn,
impl FnOnce(
&mut TxnOpGetResponseSet,
) -> Result<Option<DeserializedValueWithBytes<FlowInfoValue>>>,
impl FnOnce(&mut TxnOpGetResponseSet) -> FlowInfoDecodeResult,
)> {
let key = FlowInfoKey::new(flow_id).to_bytes();
let txn = Txn::put_if_not_exists(key.clone(), flow_value.try_as_raw_value()?);

View File

@@ -46,6 +46,8 @@ lazy_static! {
/// The layout: `__flow/name/{catalog_name}/{flow_name}`.
pub struct FlowNameKey<'a>(FlowScoped<FlowNameKeyInner<'a>>);
pub type FlowNameDecodeResult = Result<Option<DeserializedValueWithBytes<FlowNameValue>>>;
#[allow(dead_code)]
impl<'a> FlowNameKey<'a> {
/// Returns the [FlowNameKey]
@@ -104,7 +106,7 @@ impl<'a> MetadataKey<'a, FlowNameKeyInner<'a>> for FlowNameKeyInner<'_> {
.into_bytes()
}
fn from_bytes(bytes: &'a [u8]) -> Result<FlowNameKeyInner> {
fn from_bytes(bytes: &'a [u8]) -> Result<FlowNameKeyInner<'a>> {
let key = std::str::from_utf8(bytes).map_err(|e| {
error::InvalidMetadataSnafu {
err_msg: format!(
@@ -223,9 +225,7 @@ impl FlowNameManager {
flow_id: FlowId,
) -> Result<(
Txn,
impl FnOnce(
&mut TxnOpGetResponseSet,
) -> Result<Option<DeserializedValueWithBytes<FlowNameValue>>>,
impl FnOnce(&mut TxnOpGetResponseSet) -> FlowNameDecodeResult,
)> {
let key = FlowNameKey::new(catalog_name, flow_name);
let raw_key = key.to_bytes();

View File

@@ -52,7 +52,7 @@ impl NodeAddressValue {
}
}
impl<'a> MetadataKey<'a, NodeAddressKey> for NodeAddressKey {
impl MetadataKey<'_, NodeAddressKey> for NodeAddressKey {
fn to_bytes(&self) -> Vec<u8> {
self.to_string().into_bytes()
}

View File

@@ -41,7 +41,7 @@ pub struct SchemaNameKey<'a> {
pub schema: &'a str,
}
impl<'a> Default for SchemaNameKey<'a> {
impl Default for SchemaNameKey<'_> {
fn default() -> Self {
Self {
catalog: DEFAULT_CATALOG_NAME,

View File

@@ -51,7 +51,7 @@ impl Display for TableInfoKey {
}
}
impl<'a> MetadataKey<'a, TableInfoKey> for TableInfoKey {
impl MetadataKey<'_, TableInfoKey> for TableInfoKey {
fn to_bytes(&self) -> Vec<u8> {
self.to_string().into_bytes()
}
@@ -132,6 +132,7 @@ pub type TableInfoManagerRef = Arc<TableInfoManager>;
pub struct TableInfoManager {
kv_backend: KvBackendRef,
}
pub type TableInfoDecodeResult = Result<Option<DeserializedValueWithBytes<TableInfoValue>>>;
impl TableInfoManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
@@ -145,9 +146,7 @@ impl TableInfoManager {
table_info_value: &TableInfoValue,
) -> Result<(
Txn,
impl FnOnce(
&mut TxnOpGetResponseSet,
) -> Result<Option<DeserializedValueWithBytes<TableInfoValue>>>,
impl FnOnce(&mut TxnOpGetResponseSet) -> TableInfoDecodeResult,
)> {
let key = TableInfoKey::new(table_id);
let raw_key = key.to_bytes();
@@ -169,9 +168,7 @@ impl TableInfoManager {
new_table_info_value: &TableInfoValue,
) -> Result<(
Txn,
impl FnOnce(
&mut TxnOpGetResponseSet,
) -> Result<Option<DeserializedValueWithBytes<TableInfoValue>>>,
impl FnOnce(&mut TxnOpGetResponseSet) -> TableInfoDecodeResult,
)> {
let key = TableInfoKey::new(table_id);
let raw_key = key.to_bytes();

View File

@@ -245,7 +245,7 @@ impl LogicalTableRouteValue {
}
}
impl<'a> MetadataKey<'a, TableRouteKey> for TableRouteKey {
impl MetadataKey<'_, TableRouteKey> for TableRouteKey {
fn to_bytes(&self) -> Vec<u8> {
self.to_string().into_bytes()
}
@@ -472,6 +472,8 @@ pub struct TableRouteStorage {
kv_backend: KvBackendRef,
}
pub type TableRouteValueDecodeResult = Result<Option<DeserializedValueWithBytes<TableRouteValue>>>;
impl TableRouteStorage {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
@@ -485,9 +487,7 @@ impl TableRouteStorage {
table_route_value: &TableRouteValue,
) -> Result<(
Txn,
impl FnOnce(
&mut TxnOpGetResponseSet,
) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>>,
impl FnOnce(&mut TxnOpGetResponseSet) -> TableRouteValueDecodeResult,
)> {
let key = TableRouteKey::new(table_id);
let raw_key = key.to_bytes();
@@ -510,9 +510,7 @@ impl TableRouteStorage {
new_table_route_value: &TableRouteValue,
) -> Result<(
Txn,
impl FnOnce(
&mut TxnOpGetResponseSet,
) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>>,
impl FnOnce(&mut TxnOpGetResponseSet) -> TableRouteValueDecodeResult,
)> {
let key = TableRouteKey::new(table_id);
let raw_key = key.to_bytes();

View File

@@ -53,7 +53,7 @@ impl Display for ViewInfoKey {
}
}
impl<'a> MetadataKey<'a, ViewInfoKey> for ViewInfoKey {
impl MetadataKey<'_, ViewInfoKey> for ViewInfoKey {
fn to_bytes(&self) -> Vec<u8> {
self.to_string().into_bytes()
}
@@ -139,6 +139,8 @@ pub struct ViewInfoManager {
pub type ViewInfoManagerRef = Arc<ViewInfoManager>;
pub type ViewInfoValueDecodeResult = Result<Option<DeserializedValueWithBytes<ViewInfoValue>>>;
impl ViewInfoManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
@@ -151,9 +153,7 @@ impl ViewInfoManager {
view_info_value: &ViewInfoValue,
) -> Result<(
Txn,
impl FnOnce(
&mut TxnOpGetResponseSet,
) -> Result<Option<DeserializedValueWithBytes<ViewInfoValue>>>,
impl FnOnce(&mut TxnOpGetResponseSet) -> ViewInfoValueDecodeResult,
)> {
let key = ViewInfoKey::new(view_id);
let raw_key = key.to_bytes();
@@ -175,9 +175,7 @@ impl ViewInfoManager {
new_view_info_value: &ViewInfoValue,
) -> Result<(
Txn,
impl FnOnce(
&mut TxnOpGetResponseSet,
) -> Result<Option<DeserializedValueWithBytes<ViewInfoValue>>>,
impl FnOnce(&mut TxnOpGetResponseSet) -> ViewInfoValueDecodeResult,
)> {
let key = ViewInfoKey::new(view_id);
let raw_key = key.to_bytes();

View File

@@ -34,7 +34,7 @@ pub enum CatalogLock<'a> {
Write(&'a str),
}
impl<'a> Display for CatalogLock<'a> {
impl Display for CatalogLock<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let key = match self {
CatalogLock::Read(s) => s,
@@ -44,7 +44,7 @@ impl<'a> Display for CatalogLock<'a> {
}
}
impl<'a> From<CatalogLock<'a>> for StringKey {
impl From<CatalogLock<'_>> for StringKey {
fn from(value: CatalogLock) -> Self {
match value {
CatalogLock::Write(_) => StringKey::Exclusive(value.to_string()),

View File

@@ -297,7 +297,7 @@ struct ParsedKey<'a> {
key_type: KeyType,
}
impl<'a> fmt::Display for ParsedKey<'a> {
impl fmt::Display for ParsedKey<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,

View File

@@ -17,6 +17,7 @@ use std::slice;
use std::sync::Arc;
use datafusion::arrow::util::pretty::pretty_format_batches;
use datatypes::prelude::DataType;
use datatypes::schema::SchemaRef;
use datatypes::value::Value;
use datatypes::vectors::{Helper, VectorRef};
@@ -58,13 +59,18 @@ impl RecordBatch {
}
/// Create an empty [`RecordBatch`] from `schema`.
pub fn new_empty(schema: SchemaRef) -> Result<RecordBatch> {
pub fn new_empty(schema: SchemaRef) -> RecordBatch {
let df_record_batch = DfRecordBatch::new_empty(schema.arrow_schema().clone());
Ok(RecordBatch {
let columns = schema
.column_schemas()
.iter()
.map(|col| col.data_type.create_mutable_vector(0).to_vector())
.collect();
RecordBatch {
schema,
columns: vec![],
columns,
df_record_batch,
})
}
}
pub fn try_project(&self, indices: &[usize]) -> Result<Self> {
@@ -220,7 +226,7 @@ pub struct RecordBatchRowIterator<'a> {
}
impl<'a> RecordBatchRowIterator<'a> {
fn new(record_batch: &'a RecordBatch) -> RecordBatchRowIterator {
fn new(record_batch: &'a RecordBatch) -> RecordBatchRowIterator<'a> {
RecordBatchRowIterator {
record_batch,
rows: record_batch.df_record_batch.num_rows(),
@@ -230,7 +236,7 @@ impl<'a> RecordBatchRowIterator<'a> {
}
}
impl<'a> Iterator for RecordBatchRowIterator<'a> {
impl Iterator for RecordBatchRowIterator<'_> {
type Item = Vec<Value>;
fn next(&mut self) -> Option<Self::Item> {

View File

@@ -4,21 +4,36 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[lib]
path = "src/lib.rs"
[[bin]]
name = "common-runtime-bin"
path = "src/bin.rs"
[lints]
workspace = true
[dependencies]
async-trait.workspace = true
clap.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-telemetry.workspace = true
futures.workspace = true
lazy_static.workspace = true
num_cpus.workspace = true
once_cell.workspace = true
parking_lot.workspace = true
paste.workspace = true
pin-project.workspace = true
prometheus.workspace = true
rand.workspace = true
ratelimit.workspace = true
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
tempfile.workspace = true
tokio.workspace = true
tokio-metrics = "0.3"
tokio-metrics-collector = { git = "https://github.com/MichaelScofield/tokio-metrics-collector.git", rev = "89d692d5753d28564a7aac73c6ac5aba22243ba0" }

View File

@@ -0,0 +1,60 @@
# Greptime Runtime
## Run performance test for different priority & workload type
```
# workspace is at this subcrate
cargo run --release -- --loop-cnt 500
```
## Related PRs & issues
- Preliminary support cpu limitation
ISSUE: https://github.com/GreptimeTeam/greptimedb/issues/3685
PR: https://github.com/GreptimeTeam/greptimedb/pull/4782
## CPU resource constraints (ThrottleableRuntime)
To achieve CPU resource constraints, we adopt the concept of rate limiting. When creating a future, we first wrap it with another layer of future to intercept the poll operation during runtime. By using the ratelimit library, we can simply implement a mechanism that allows only a limited number of polls for a batch of tasks under a certain priority within a specific time frame (the current token generation interval is set to 10ms).
The default used runtime can be switched by
``` rust
pub type Runtime = DefaultRuntime;
```
in `runtime.rs`.
We tested four type of workload with 5 priorities, whose setup are as follows:
``` rust
impl Priority {
fn ratelimiter_count(&self) -> Result<Option<Ratelimiter>> {
let max = 8000;
let gen_per_10ms = match self {
Priority::VeryLow => Some(2000),
Priority::Low => Some(4000),
Priority::Middle => Some(6000),
Priority::High => Some(8000),
Priority::VeryHigh => None,
};
if let Some(gen_per_10ms) = gen_per_10ms {
Ratelimiter::builder(gen_per_10ms, Duration::from_millis(10)) // generate poll count per 10ms
.max_tokens(max) // reserved token for batch request
.build()
.context(BuildRuntimeRateLimiterSnafu)
.map(Some)
} else {
Ok(None)
}
}
}
```
This is the preliminary experimental effect so far:
![](resources/rdme-exp.png)
## TODO
- Introduce PID to achieve more accurate limitation.

Binary file not shown.

After

Width:  |  Height:  |  Size: 226 KiB

View File

@@ -0,0 +1,205 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use clap::Parser;
#[derive(Debug, Default, Parser)]
pub struct Command {
#[clap(long)]
loop_cnt: usize,
}
fn main() {
common_telemetry::init_default_ut_logging();
let cmd = Command::parse();
test_diff_priority_cpu::test_diff_workload_priority(cmd.loop_cnt);
}
mod test_diff_priority_cpu {
use std::path::PathBuf;
use common_runtime::runtime::{BuilderBuild, Priority, RuntimeTrait};
use common_runtime::{Builder, Runtime};
use common_telemetry::debug;
use tempfile::TempDir;
fn compute_pi_str(precision: usize) -> String {
let mut pi = 0.0;
let mut sign = 1.0;
for i in 0..precision {
pi += sign / (2 * i + 1) as f64;
sign *= -1.0;
}
pi *= 4.0;
format!("{:.prec$}", pi, prec = precision)
}
macro_rules! def_workload_enum {
($($variant:ident),+) => {
#[derive(Debug)]
enum WorkloadType {
$($variant),+
}
/// array of workloads for iteration
const WORKLOADS: &'static [WorkloadType] = &[
$( WorkloadType::$variant ),+
];
};
}
def_workload_enum!(
ComputeHeavily,
ComputeHeavily2,
WriteFile,
SpawnBlockingWriteFile
);
async fn workload_compute_heavily() {
let prefix = 10;
for _ in 0..3000 {
let _ = compute_pi_str(prefix);
tokio::task::yield_now().await;
}
}
async fn workload_compute_heavily2() {
let prefix = 30;
for _ in 0..2000 {
let _ = compute_pi_str(prefix);
tokio::task::yield_now().await;
}
}
async fn workload_write_file(_idx: u64, tempdir: PathBuf) {
use tokio::io::AsyncWriteExt;
let prefix = 50;
let mut file = tokio::fs::OpenOptions::new()
.write(true)
.append(true)
.create(true)
.open(tempdir.join(format!("pi_{}", prefix)))
.await
.unwrap();
for i in 0..200 {
let pi = compute_pi_str(prefix);
if i % 2 == 0 {
file.write_all(pi.as_bytes()).await.unwrap();
}
}
}
async fn workload_spawn_blocking_write_file(tempdir: PathBuf) {
use std::io::Write;
let prefix = 100;
let mut file = Some(
std::fs::OpenOptions::new()
.append(true)
.create(true)
.open(tempdir.join(format!("pi_{}", prefix)))
.unwrap(),
);
for i in 0..100 {
let pi = compute_pi_str(prefix);
if i % 2 == 0 {
let mut file1 = file.take().unwrap();
file = Some(
tokio::task::spawn_blocking(move || {
file1.write_all(pi.as_bytes()).unwrap();
file1
})
.await
.unwrap(),
);
}
}
}
pub fn test_diff_workload_priority(loop_cnt: usize) {
let tempdir = tempfile::tempdir().unwrap();
let priorities = [
Priority::VeryLow,
Priority::Low,
Priority::Middle,
Priority::High,
Priority::VeryHigh,
];
for wl in WORKLOADS {
for p in priorities.iter() {
let runtime: Runtime = Builder::default()
.runtime_name("test")
.thread_name("test")
.worker_threads(8)
.priority(*p)
.build()
.expect("Fail to create runtime");
let runtime2 = runtime.clone();
runtime.block_on(test_spec_priority_and_workload(
*p, runtime2, wl, &tempdir, loop_cnt,
));
}
}
}
async fn test_spec_priority_and_workload(
priority: Priority,
runtime: Runtime,
workload_id: &WorkloadType,
tempdir: &TempDir,
loop_cnt: usize,
) {
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
debug!(
"testing cpu usage for priority {:?} workload_id {:?}",
priority, workload_id,
);
// start monitor thread
let mut tasks = vec![];
let start = std::time::Instant::now();
for i in 0..loop_cnt {
// persist cpu usage in json: {priority}.{workload_id}
match *workload_id {
WorkloadType::ComputeHeavily => {
tasks.push(runtime.spawn(workload_compute_heavily()));
}
WorkloadType::ComputeHeavily2 => {
tasks.push(runtime.spawn(workload_compute_heavily2()));
}
WorkloadType::SpawnBlockingWriteFile => {
tasks.push(runtime.spawn(workload_spawn_blocking_write_file(
tempdir.path().to_path_buf(),
)));
}
WorkloadType::WriteFile => {
tasks.push(
runtime.spawn(workload_write_file(i as u64, tempdir.path().to_path_buf())),
);
}
}
}
for task in tasks {
task.await.unwrap();
}
let elapsed = start.elapsed();
debug!(
"test cpu usage for priority {:?} workload_id {:?} elapsed {}ms",
priority,
workload_id,
elapsed.as_millis()
);
}
}

View File

@@ -33,6 +33,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to build runtime rate limiter"))]
BuildRuntimeRateLimiter {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: ratelimit::Error,
},
#[snafu(display("Repeated task {} is already started", name))]
IllegalState {
name: String,

View File

@@ -21,6 +21,7 @@ use once_cell::sync::Lazy;
use paste::paste;
use serde::{Deserialize, Serialize};
use crate::runtime::{BuilderBuild, RuntimeTrait};
use crate::{Builder, JoinHandle, Runtime};
const GLOBAL_WORKERS: usize = 8;

View File

@@ -17,6 +17,8 @@ pub mod global;
mod metrics;
mod repeated_task;
pub mod runtime;
pub mod runtime_default;
pub mod runtime_throttleable;
pub use global::{
block_on_compact, block_on_global, compact_runtime, create_runtime, global_runtime,

View File

@@ -23,6 +23,7 @@ use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::error::{IllegalStateSnafu, Result, WaitGcTaskStopSnafu};
use crate::runtime::RuntimeTrait;
use crate::Runtime;
/// Task to execute repeatedly.

View File

@@ -19,24 +19,20 @@ use std::thread;
use std::time::Duration;
use snafu::ResultExt;
use tokio::runtime::{Builder as RuntimeBuilder, Handle};
use tokio::runtime::Builder as RuntimeBuilder;
use tokio::sync::oneshot;
pub use tokio::task::{JoinError, JoinHandle};
use crate::error::*;
use crate::metrics::*;
use crate::runtime_default::DefaultRuntime;
use crate::runtime_throttleable::ThrottleableRuntime;
// configurations
pub type Runtime = DefaultRuntime;
static RUNTIME_ID: AtomicUsize = AtomicUsize::new(0);
/// A runtime to run future tasks
#[derive(Clone, Debug)]
pub struct Runtime {
name: String,
handle: Handle,
// Used to receive a drop signal when dropper is dropped, inspired by databend
_dropper: Arc<Dropper>,
}
/// Dropping the dropper will cause runtime to shutdown.
#[derive(Debug)]
pub struct Dropper {
@@ -50,45 +46,42 @@ impl Drop for Dropper {
}
}
impl Runtime {
pub fn builder() -> Builder {
pub trait RuntimeTrait {
/// Get a runtime builder
fn builder() -> Builder {
Builder::default()
}
/// Spawn a future and execute it in this thread pool
///
/// Similar to tokio::runtime::Runtime::spawn()
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.handle.spawn(future)
}
F::Output: Send + 'static;
/// Run the provided function on an executor dedicated to blocking
/// operations.
pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
self.handle.spawn_blocking(func)
}
R: Send + 'static;
/// Run a future to complete, this is the runtime's entry point
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
self.handle.block_on(future)
}
fn block_on<F: Future>(&self, future: F) -> F::Output;
pub fn name(&self) -> &str {
&self.name
}
/// Get the name of the runtime
fn name(&self) -> &str;
}
pub trait BuilderBuild<R: RuntimeTrait> {
fn build(&mut self) -> Result<R>;
}
pub struct Builder {
runtime_name: String,
thread_name: String,
priority: Priority,
builder: RuntimeBuilder,
}
@@ -98,11 +91,17 @@ impl Default for Builder {
runtime_name: format!("runtime-{}", RUNTIME_ID.fetch_add(1, Ordering::Relaxed)),
thread_name: "default-worker".to_string(),
builder: RuntimeBuilder::new_multi_thread(),
priority: Priority::VeryHigh,
}
}
}
impl Builder {
pub fn priority(&mut self, priority: Priority) -> &mut Self {
self.priority = priority;
self
}
/// Sets the number of worker threads the Runtime will use.
///
/// This can be any number above 0. The default value is the number of cores available to the system.
@@ -139,8 +138,10 @@ impl Builder {
self.thread_name = val.into();
self
}
}
pub fn build(&mut self) -> Result<Runtime> {
impl BuilderBuild<DefaultRuntime> for Builder {
fn build(&mut self) -> Result<DefaultRuntime> {
let runtime = self
.builder
.enable_all()
@@ -163,18 +164,53 @@ impl Builder {
#[cfg(tokio_unstable)]
register_collector(name.clone(), &handle);
Ok(Runtime {
name,
Ok(DefaultRuntime::new(
&name,
handle,
_dropper: Arc::new(Dropper {
Arc::new(Dropper {
close: Some(send_stop),
}),
})
))
}
}
impl BuilderBuild<ThrottleableRuntime> for Builder {
fn build(&mut self) -> Result<ThrottleableRuntime> {
let runtime = self
.builder
.enable_all()
.thread_name(self.thread_name.clone())
.on_thread_start(on_thread_start(self.thread_name.clone()))
.on_thread_stop(on_thread_stop(self.thread_name.clone()))
.on_thread_park(on_thread_park(self.thread_name.clone()))
.on_thread_unpark(on_thread_unpark(self.thread_name.clone()))
.build()
.context(BuildRuntimeSnafu)?;
let name = self.runtime_name.clone();
let handle = runtime.handle().clone();
let (send_stop, recv_stop) = oneshot::channel();
// Block the runtime to shutdown.
let _ = thread::Builder::new()
.name(format!("{}-blocker", self.thread_name))
.spawn(move || runtime.block_on(recv_stop));
#[cfg(tokio_unstable)]
register_collector(name.clone(), &handle);
ThrottleableRuntime::new(
&name,
self.priority,
handle,
Arc::new(Dropper {
close: Some(send_stop),
}),
)
}
}
#[cfg(tokio_unstable)]
pub fn register_collector(name: String, handle: &Handle) {
pub fn register_collector(name: String, handle: &tokio::runtime::Handle) {
let name = name.replace("-", "_");
let monitor = tokio_metrics::RuntimeMonitor::new(handle);
let collector = tokio_metrics_collector::RuntimeCollector::new(monitor, name);
@@ -213,8 +249,18 @@ fn on_thread_unpark(thread_name: String) -> impl Fn() + 'static {
}
}
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
pub enum Priority {
VeryLow = 0,
Low = 1,
Middle = 2,
High = 3,
VeryHigh = 4,
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::thread;
use std::time::Duration;
@@ -235,12 +281,12 @@ mod tests {
#[test]
fn test_metric() {
let runtime = Builder::default()
let runtime: Runtime = Builder::default()
.worker_threads(5)
.thread_name("test_runtime_metric")
.build()
.unwrap();
// wait threads created
// wait threads create
thread::sleep(Duration::from_millis(50));
let _handle = runtime.spawn(async {

View File

@@ -0,0 +1,77 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::future::Future;
use std::sync::Arc;
use tokio::runtime::Handle;
pub use tokio::task::JoinHandle;
use crate::runtime::{Dropper, RuntimeTrait};
use crate::Builder;
/// A runtime to run future tasks
#[derive(Clone, Debug)]
pub struct DefaultRuntime {
name: String,
handle: Handle,
// Used to receive a drop signal when dropper is dropped, inspired by databend
_dropper: Arc<Dropper>,
}
impl DefaultRuntime {
pub(crate) fn new(name: &str, handle: Handle, dropper: Arc<Dropper>) -> Self {
Self {
name: name.to_string(),
handle,
_dropper: dropper,
}
}
}
impl RuntimeTrait for DefaultRuntime {
fn builder() -> Builder {
Builder::default()
}
/// Spawn a future and execute it in this thread pool
///
/// Similar to tokio::runtime::Runtime::spawn()
fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.handle.spawn(future)
}
/// Run the provided function on an executor dedicated to blocking
/// operations.
fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
self.handle.spawn_blocking(func)
}
/// Run a future to complete, this is the runtime's entry point
fn block_on<F: Future>(&self, future: F) -> F::Output {
self.handle.block_on(future)
}
fn name(&self) -> &str {
&self.name
}
}

View File

@@ -0,0 +1,285 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use futures::FutureExt;
use ratelimit::Ratelimiter;
use snafu::ResultExt;
use tokio::runtime::Handle;
pub use tokio::task::JoinHandle;
use tokio::time::Sleep;
use crate::error::{BuildRuntimeRateLimiterSnafu, Result};
use crate::runtime::{Dropper, Priority, RuntimeTrait};
use crate::Builder;
struct RuntimeRateLimiter {
pub ratelimiter: Option<Ratelimiter>,
}
impl Debug for RuntimeRateLimiter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RuntimeThrottleShareWithFuture")
.field(
"ratelimiter_max_tokens",
&self.ratelimiter.as_ref().map(|v| v.max_tokens()),
)
.field(
"ratelimiter_refill_amount",
&self.ratelimiter.as_ref().map(|v| v.refill_amount()),
)
.finish()
}
}
/// A runtime to run future tasks
#[derive(Clone, Debug)]
pub struct ThrottleableRuntime {
name: String,
handle: Handle,
shared_with_future: Arc<RuntimeRateLimiter>,
// Used to receive a drop signal when dropper is dropped, inspired by databend
_dropper: Arc<Dropper>,
}
impl ThrottleableRuntime {
pub(crate) fn new(
name: &str,
priority: Priority,
handle: Handle,
dropper: Arc<Dropper>,
) -> Result<Self> {
Ok(Self {
name: name.to_string(),
handle,
shared_with_future: Arc::new(RuntimeRateLimiter {
ratelimiter: priority.ratelimiter_count()?,
}),
_dropper: dropper,
})
}
}
impl RuntimeTrait for ThrottleableRuntime {
fn builder() -> Builder {
Builder::default()
}
/// Spawn a future and execute it in this thread pool
///
/// Similar to tokio::runtime::Runtime::spawn()
fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.handle
.spawn(ThrottleFuture::new(self.shared_with_future.clone(), future))
}
/// Run the provided function on an executor dedicated to blocking
/// operations.
fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
self.handle.spawn_blocking(func)
}
/// Run a future to complete, this is the runtime's entry point
fn block_on<F: Future>(&self, future: F) -> F::Output {
self.handle.block_on(future)
}
fn name(&self) -> &str {
&self.name
}
}
enum State {
Pollable,
Throttled(Pin<Box<Sleep>>),
}
impl State {
fn unwrap_backoff(&mut self) -> &mut Pin<Box<Sleep>> {
match self {
State::Throttled(sleep) => sleep,
_ => panic!("unwrap_backoff failed"),
}
}
}
#[pin_project::pin_project]
pub struct ThrottleFuture<F: Future + Send + 'static> {
#[pin]
future: F,
/// RateLimiter of this future
handle: Arc<RuntimeRateLimiter>,
state: State,
}
impl<F> ThrottleFuture<F>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
fn new(handle: Arc<RuntimeRateLimiter>, future: F) -> Self {
Self {
future,
handle,
state: State::Pollable,
}
}
}
impl<F> Future for ThrottleFuture<F>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
type Output = F::Output;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.state {
State::Pollable => {}
State::Throttled(ref mut sleep) => match sleep.poll_unpin(cx) {
Poll::Ready(_) => {
*this.state = State::Pollable;
}
Poll::Pending => return Poll::Pending,
},
};
if let Some(ratelimiter) = &this.handle.ratelimiter {
if let Err(wait) = ratelimiter.try_wait() {
*this.state = State::Throttled(Box::pin(tokio::time::sleep(wait)));
match this.state.unwrap_backoff().poll_unpin(cx) {
Poll::Ready(_) => {
*this.state = State::Pollable;
}
Poll::Pending => {
return Poll::Pending;
}
}
}
}
let poll_res = this.future.poll(cx);
match poll_res {
Poll::Ready(r) => Poll::Ready(r),
Poll::Pending => Poll::Pending,
}
}
}
impl Priority {
fn ratelimiter_count(&self) -> Result<Option<Ratelimiter>> {
let max = 8000;
let gen_per_10ms = match self {
Priority::VeryLow => Some(2000),
Priority::Low => Some(4000),
Priority::Middle => Some(6000),
Priority::High => Some(8000),
Priority::VeryHigh => None,
};
if let Some(gen_per_10ms) = gen_per_10ms {
Ratelimiter::builder(gen_per_10ms, Duration::from_millis(10)) // generate poll count per 10ms
.max_tokens(max) // reserved token for batch request
.build()
.context(BuildRuntimeRateLimiterSnafu)
.map(Some)
} else {
Ok(None)
}
}
}
#[cfg(test)]
mod tests {
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tokio::time::Duration;
use super::*;
use crate::runtime::BuilderBuild;
#[tokio::test]
async fn test_throttleable_runtime_spawn_simple() {
for p in [
Priority::VeryLow,
Priority::Low,
Priority::Middle,
Priority::High,
Priority::VeryHigh,
] {
let runtime: ThrottleableRuntime = Builder::default()
.runtime_name("test")
.thread_name("test")
.worker_threads(8)
.priority(p)
.build()
.expect("Fail to create runtime");
// Spawn a simple future that returns 42
let handle = runtime.spawn(async {
tokio::time::sleep(Duration::from_millis(10)).await;
42
});
let result = handle.await.expect("Task panicked");
assert_eq!(result, 42);
}
}
#[tokio::test]
async fn test_throttleable_runtime_spawn_complex() {
let tempdir = tempfile::tempdir().unwrap();
for p in [
Priority::VeryLow,
Priority::Low,
Priority::Middle,
Priority::High,
Priority::VeryHigh,
] {
let runtime: ThrottleableRuntime = Builder::default()
.runtime_name("test")
.thread_name("test")
.worker_threads(8)
.priority(p)
.build()
.expect("Fail to create runtime");
let tempdirpath = tempdir.path().to_path_buf();
let handle = runtime.spawn(async move {
let mut file = File::create(tempdirpath.join("test.txt")).await.unwrap();
file.write_all(b"Hello, world!").await.unwrap();
42
});
let result = handle.await.expect("Task panicked");
assert_eq!(result, 42);
}
}
}

View File

@@ -26,7 +26,7 @@ opentelemetry = { version = "0.21.0", default-features = false, features = [
opentelemetry-otlp = { version = "0.14.0", features = ["tokio"] }
opentelemetry-semantic-conventions = "0.13.0"
opentelemetry_sdk = { version = "0.21.0", features = ["rt-tokio"] }
parking_lot = { version = "0.12" }
parking_lot.workspace = true
prometheus.workspace = true
serde.workspace = true
serde_json.workspace = true

View File

@@ -23,6 +23,7 @@ use common_function::function::FunctionRef;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::prelude::ScalarUdf;
use common_query::Output;
use common_runtime::runtime::{BuilderBuild, RuntimeTrait};
use common_runtime::Runtime;
use datafusion_expr::LogicalPlan;
use query::dataframe::DataFrame;

View File

@@ -203,7 +203,7 @@ impl Scalar for bool {
}
}
impl<'a> ScalarRef<'a> for bool {
impl ScalarRef<'_> for bool {
type ScalarType = bool;
#[inline]
@@ -273,7 +273,7 @@ impl Scalar for Date {
}
}
impl<'a> ScalarRef<'a> for Date {
impl ScalarRef<'_> for Date {
type ScalarType = Date;
fn to_owned_scalar(&self) -> Self::ScalarType {
@@ -294,7 +294,7 @@ impl Scalar for Decimal128 {
}
}
impl<'a> ScalarRef<'a> for Decimal128 {
impl ScalarRef<'_> for Decimal128 {
type ScalarType = Decimal128;
fn to_owned_scalar(&self) -> Self::ScalarType {
@@ -315,7 +315,7 @@ impl Scalar for DateTime {
}
}
impl<'a> ScalarRef<'a> for DateTime {
impl ScalarRef<'_> for DateTime {
type ScalarType = DateTime;
fn to_owned_scalar(&self) -> Self::ScalarType {

View File

@@ -82,8 +82,8 @@ pub fn cast_with_opt(
}
}
/// Return true if the src_value can be casted to dest_type,
/// Otherwise, return false.
/// Return true if the src_value can be casted to dest_type, Otherwise, return false.
///
/// Notice: this function does not promise that the `cast_with_opt` will succeed,
/// it only checks whether the src_value can be casted to dest_type.
pub fn can_cast_type(src_value: &Value, dest_type: &ConcreteDataType) -> bool {

View File

@@ -83,9 +83,10 @@ pub trait LogicalPrimitiveType: 'static + Sized {
fn cast_value_ref(value: ValueRef) -> Result<Option<Self::Wrapper>>;
}
/// A new type for [WrapperType], complement the `Ord` feature for it. Wrapping non ordered
/// primitive types like `f32` and `f64` in `OrdPrimitive` can make them be used in places that
/// require `Ord`. For example, in `Median` UDAFs.
/// A new type for [WrapperType], complement the `Ord` feature for it.
///
/// Wrapping non ordered primitive types like `f32` and `f64` in `OrdPrimitive`
/// can make them be used in places that require `Ord`. For example, in `Median` UDAFs.
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct OrdPrimitive<T: WrapperType>(pub T);

View File

@@ -1087,7 +1087,7 @@ macro_rules! impl_as_for_value_ref {
};
}
impl<'a> ValueRef<'a> {
impl ValueRef<'_> {
define_data_type_func!(ValueRef);
/// Returns true if this is null.
@@ -1214,13 +1214,13 @@ impl<'a> ValueRef<'a> {
}
}
impl<'a> PartialOrd for ValueRef<'a> {
impl PartialOrd for ValueRef<'_> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<'a> Ord for ValueRef<'a> {
impl Ord for ValueRef<'_> {
fn cmp(&self, other: &Self) -> Ordering {
impl_ord_for_value_like!(ValueRef, self, other)
}
@@ -1347,7 +1347,7 @@ pub enum ListValueRef<'a> {
Ref { val: &'a ListValue },
}
impl<'a> ListValueRef<'a> {
impl ListValueRef<'_> {
/// Convert self to [Value]. This method would clone the underlying data.
fn to_value(self) -> Value {
match self {
@@ -1365,7 +1365,7 @@ impl<'a> ListValueRef<'a> {
}
}
impl<'a> Serialize for ListValueRef<'a> {
impl Serialize for ListValueRef<'_> {
fn serialize<S: Serializer>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> {
match self {
ListValueRef::Indexed { vector, idx } => match vector.get(*idx) {
@@ -1377,28 +1377,28 @@ impl<'a> Serialize for ListValueRef<'a> {
}
}
impl<'a> PartialEq for ListValueRef<'a> {
impl PartialEq for ListValueRef<'_> {
fn eq(&self, other: &Self) -> bool {
self.to_value().eq(&other.to_value())
}
}
impl<'a> Eq for ListValueRef<'a> {}
impl Eq for ListValueRef<'_> {}
impl<'a> Ord for ListValueRef<'a> {
impl Ord for ListValueRef<'_> {
fn cmp(&self, other: &Self) -> Ordering {
// Respect the order of `Value` by converting into value before comparison.
self.to_value().cmp(&other.to_value())
}
}
impl<'a> PartialOrd for ListValueRef<'a> {
impl PartialOrd for ListValueRef<'_> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<'a> ValueRef<'a> {
impl ValueRef<'_> {
/// Returns the size of the underlying data in bytes,
/// The size is estimated and only considers the data size.
pub fn data_size(&self) -> usize {

View File

@@ -247,7 +247,7 @@ pub struct Decimal128Iter<'a> {
iter: ArrayIter<&'a Decimal128Array>,
}
impl<'a> Iterator for Decimal128Iter<'a> {
impl Iterator for Decimal128Iter<'_> {
type Item = Option<Decimal128>;
fn next(&mut self) -> Option<Self::Item> {

View File

@@ -157,7 +157,7 @@ pub struct ListIter<'a> {
}
impl<'a> ListIter<'a> {
fn new(vector: &'a ListVector) -> ListIter {
fn new(vector: &'a ListVector) -> ListIter<'a> {
ListIter { vector, idx: 0 }
}
}

View File

@@ -207,7 +207,7 @@ pub struct PrimitiveIter<'a, T: LogicalPrimitiveType> {
iter: ArrayIter<&'a PrimitiveArray<T::ArrowPrimitive>>,
}
impl<'a, T: LogicalPrimitiveType> Iterator for PrimitiveIter<'a, T> {
impl<T: LogicalPrimitiveType> Iterator for PrimitiveIter<'_, T> {
type Item = Option<T::Wrapper>;
fn next(&mut self) -> Option<Option<T::Wrapper>> {

View File

@@ -271,10 +271,17 @@ impl FlowWorkerManager {
let rows_proto: Vec<v1::Row> = insert
.into_iter()
.map(|(mut row, _ts)| {
// `update_at` col
row.extend([Value::from(common_time::Timestamp::new_millisecond(
now,
))]);
// extend `update_at` col if needed
// if schema include a millisecond timestamp here, and result row doesn't have it, add it
if row.len() < proto_schema.len()
&& proto_schema[row.len()].datatype
== greptime_proto::v1::ColumnDataType::TimestampMillisecond
as i32
{
row.extend([Value::from(
common_time::Timestamp::new_millisecond(now),
)]);
}
// ts col, if auto create
if is_ts_placeholder {
ensure!(
@@ -291,6 +298,17 @@ impl FlowWorkerManager {
common_time::Timestamp::new_millisecond(0),
)]);
}
if row.len() != proto_schema.len() {
InternalSnafu {
reason: format!(
"Flow output row length mismatch, expect {} got {}, the columns in schema are: {:?}",
proto_schema.len(),
row.len(),
proto_schema.iter().map(|c|&c.column_name).collect_vec()
),
}
.fail()?;
}
Ok(row.into())
})
.collect::<Result<Vec<_>, Error>>()?;

View File

@@ -61,7 +61,7 @@ pub struct Context<'referred, 'df> {
pub err_collector: ErrCollector,
}
impl<'referred, 'df> Drop for Context<'referred, 'df> {
impl Drop for Context<'_, '_> {
fn drop(&mut self) {
for bundle in std::mem::take(&mut self.input_collection)
.into_values()
@@ -92,7 +92,7 @@ impl<'referred, 'df> Drop for Context<'referred, 'df> {
}
}
impl<'referred, 'df> Context<'referred, 'df> {
impl Context<'_, '_> {
pub fn insert_global(&mut self, id: GlobalId, collection: CollectionBundle) {
self.input_collection.insert(id, collection);
}
@@ -120,7 +120,7 @@ impl<'referred, 'df> Context<'referred, 'df> {
}
}
impl<'referred, 'df> Context<'referred, 'df> {
impl Context<'_, '_> {
/// Like `render_plan` but in Batch Mode
pub fn render_plan_batch(&mut self, plan: TypedPlan) -> Result<CollectionBundle<Batch>, Error> {
match plan.plan {

View File

@@ -28,7 +28,7 @@ use crate::plan::TypedPlan;
use crate::repr::{self, DiffRow, KeyValDiffRow, Row};
use crate::utils::ArrangeHandler;
impl<'referred, 'df> Context<'referred, 'df> {
impl Context<'_, '_> {
/// Like `render_mfp` but in batch mode
pub fn render_mfp_batch(
&mut self,

View File

@@ -34,7 +34,7 @@ use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, ReducePlan, TypedP
use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row};
use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, KeyExpiryManager};
impl<'referred, 'df> Context<'referred, 'df> {
impl Context<'_, '_> {
const REDUCE_BATCH: &'static str = "reduce_batch";
/// Like `render_reduce`, but for batch mode, and only barebone implementation
/// no support for distinct aggregation for now

View File

@@ -31,7 +31,7 @@ use crate::expr::{Batch, EvalError};
use crate::repr::{DiffRow, Row, BROADCAST_CAP};
#[allow(clippy::mutable_key_type)]
impl<'referred, 'df> Context<'referred, 'df> {
impl Context<'_, '_> {
/// simply send the batch to downstream, without fancy features like buffering
pub fn render_source_batch(
&mut self,

View File

@@ -273,7 +273,7 @@ impl<'a> ExpandAvgRewriter<'a> {
}
}
impl<'a> TreeNodeRewriter for ExpandAvgRewriter<'a> {
impl TreeNodeRewriter for ExpandAvgRewriter<'_> {
type Node = Expr;
fn f_up(&mut self, expr: Expr) -> Result<Transformed<Expr>, DataFusionError> {

View File

@@ -113,8 +113,8 @@ impl OpenTelemetryProtocolHandler for Instance {
.plugins
.get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_execute(ctx.clone())?;
let (requests, rows) = otlp::logs::to_grpc_insert_requests(request, pipeline, table_name)?;
let (requests, rows) = otlp::logs::to_grpc_insert_requests(request, pipeline, table_name)?;
self.handle_log_inserts(requests, ctx)
.await
.inspect(|_| OTLP_LOGS_ROWS.inc_by(rows as u64))

View File

@@ -19,7 +19,9 @@ use crate::inverted_index::error::Result;
use crate::inverted_index::format::reader::InvertedIndexReader;
/// `FstValuesMapper` maps FST-encoded u64 values to their corresponding bitmaps
/// within an inverted index. The higher 32 bits of each u64 value represent the
/// within an inverted index.
///
/// The higher 32 bits of each u64 value represent the
/// bitmap offset and the lower 32 bits represent its size. This mapper uses these
/// combined offset-size pairs to fetch and union multiple bitmaps into a single `BitVec`.
pub struct FstValuesMapper<'a> {

View File

@@ -134,7 +134,7 @@ impl PredicatesIndexApplier {
fn bitmap_full_range(metadata: &InvertedIndexMetas) -> BitVec {
let total_count = metadata.total_row_count;
let segment_count = metadata.segment_row_count;
let len = (total_count + segment_count - 1) / segment_count;
let len = total_count.div_ceil(segment_count);
BitVec::repeat(true, len as _)
}
}

View File

@@ -42,7 +42,7 @@ humantime-serde.workspace = true
itertools.workspace = true
lazy_static.workspace = true
once_cell.workspace = true
parking_lot = "0.12"
parking_lot.workspace = true
prometheus.workspace = true
prost.workspace = true
rand.workspace = true

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::collections::{BTreeMap, HashSet};
use std::fmt::{Debug, Display};
use std::ops::Range;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
@@ -113,6 +114,34 @@ impl HeartbeatAccumulator {
}
}
#[derive(Copy, Clone)]
pub struct PusherId {
pub role: Role,
pub id: u64,
}
impl Debug for PusherId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}-{}", self.role, self.id)
}
}
impl Display for PusherId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}-{}", self.role, self.id)
}
}
impl PusherId {
pub fn new(role: Role, id: u64) -> Self {
Self { role, id }
}
pub fn string_key(&self) -> String {
format!("{}-{}", self.role as i32, self.id)
}
}
/// The pusher of the heartbeat response.
pub struct Pusher {
sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
@@ -154,10 +183,11 @@ impl Pusher {
pub struct Pushers(Arc<RwLock<BTreeMap<String, Pusher>>>);
impl Pushers {
async fn push(&self, pusher_id: &str, mailbox_message: MailboxMessage) -> Result<()> {
async fn push(&self, pusher_id: PusherId, mailbox_message: MailboxMessage) -> Result<()> {
let pusher_id = pusher_id.string_key();
let pushers = self.0.read().await;
let pusher = pushers
.get(pusher_id)
.get(&pusher_id)
.context(error::PusherNotFoundSnafu { pusher_id })?;
pusher
.push(HeartbeatResponse {
@@ -234,21 +264,19 @@ pub struct HeartbeatHandlerGroup {
impl HeartbeatHandlerGroup {
/// Registers the heartbeat response [`Pusher`] with the given key to the group.
pub async fn register_pusher(&self, key: impl AsRef<str>, pusher: Pusher) {
let key = key.as_ref();
pub async fn register_pusher(&self, pusher_id: PusherId, pusher: Pusher) {
METRIC_META_HEARTBEAT_CONNECTION_NUM.inc();
info!("Pusher register: {}", key);
let _ = self.pushers.insert(key.to_string(), pusher).await;
info!("Pusher register: {}", pusher_id);
let _ = self.pushers.insert(pusher_id.string_key(), pusher).await;
}
/// Deregisters the heartbeat response [`Pusher`] with the given key from the group.
///
/// Returns the [`Pusher`] if it exists.
pub async fn deregister_push(&self, key: impl AsRef<str>) -> Option<Pusher> {
let key = key.as_ref();
pub async fn deregister_push(&self, pusher_id: PusherId) -> Option<Pusher> {
METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
info!("Pusher unregister: {}", key);
self.pushers.remove(key).await
info!("Pusher unregister: {}", pusher_id);
self.pushers.remove(&pusher_id.string_key()).await
}
/// Returns the [`Pushers`] of the group.
@@ -417,7 +445,7 @@ impl Mailbox for HeartbeatMailbox {
let _ = self.timeouts.insert(message_id, deadline);
self.timeout_notify.notify_one();
self.pushers.push(&pusher_id, msg).await?;
self.pushers.push(pusher_id, msg).await?;
Ok(MailboxReceiver::new(message_id, rx, *ch))
}
@@ -720,7 +748,7 @@ mod tests {
use common_meta::sequence::SequenceBuilder;
use tokio::sync::mpsc;
use super::{HeartbeatHandlerGroupBuilder, Pushers};
use super::{HeartbeatHandlerGroupBuilder, PusherId, Pushers};
use crate::error;
use crate::handler::collect_stats_handler::CollectStatsHandler;
use crate::handler::response_header_handler::ResponseHeaderHandler;
@@ -761,11 +789,10 @@ mod tests {
protocol_version: PROTOCOL_VERSION,
..Default::default()
};
let pusher_id = PusherId::new(Role::Datanode, datanode_id);
let pusher: Pusher = Pusher::new(pusher_tx, &res_header);
let handler_group = HeartbeatHandlerGroup::default();
handler_group
.register_pusher(format!("{}-{}", Role::Datanode as i32, datanode_id), pusher)
.await;
handler_group.register_pusher(pusher_id, pusher).await;
let kv_backend = Arc::new(MemoryKvBackend::new());
let seq = SequenceBuilder::new("test_seq", kv_backend).build();

View File

@@ -16,7 +16,6 @@
#![feature(result_flattening)]
#![feature(assert_matches)]
#![feature(extract_if)]
#![feature(option_take_if)]
pub mod bootstrap;
mod cache_invalidator;

View File

@@ -184,22 +184,26 @@ pub struct MetasrvInfo {
// Options for datanode.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct DatanodeOptions {
pub client_options: DatanodeClientOptions,
pub client: DatanodeClientOptions,
}
// Options for datanode client.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DatanodeClientOptions {
pub timeout_millis: u64,
pub connect_timeout_millis: u64,
#[serde(with = "humantime_serde")]
pub timeout: Duration,
#[serde(with = "humantime_serde")]
pub connect_timeout: Duration,
pub tcp_nodelay: bool,
}
impl Default for DatanodeClientOptions {
fn default() -> Self {
Self {
timeout_millis: channel_manager::DEFAULT_GRPC_REQUEST_TIMEOUT_SECS * 1000,
connect_timeout_millis: channel_manager::DEFAULT_GRPC_CONNECT_TIMEOUT_SECS * 1000,
timeout: Duration::from_secs(channel_manager::DEFAULT_GRPC_REQUEST_TIMEOUT_SECS),
connect_timeout: Duration::from_secs(
channel_manager::DEFAULT_GRPC_CONNECT_TIMEOUT_SECS,
),
tcp_nodelay: true,
}
}

View File

@@ -14,7 +14,6 @@
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use client::client_manager::NodeClients;
use common_base::Plugins;
@@ -250,13 +249,9 @@ impl MetasrvBuilder {
let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
let node_manager = node_manager.unwrap_or_else(|| {
let datanode_client_channel_config = ChannelConfig::new()
.timeout(Duration::from_millis(
options.datanode.client_options.timeout_millis,
))
.connect_timeout(Duration::from_millis(
options.datanode.client_options.connect_timeout_millis,
))
.tcp_nodelay(options.datanode.client_options.tcp_nodelay);
.timeout(options.datanode.client.timeout)
.connect_timeout(options.datanode.client.connect_timeout)
.tcp_nodelay(options.datanode.client.tcp_nodelay);
Arc::new(NodeClients::new(datanode_client_channel_config))
});
let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new(

View File

@@ -84,7 +84,7 @@ impl MailboxContext {
) {
let pusher_id = channel.pusher_id();
let pusher = Pusher::new(tx, &RequestHeader::default());
let _ = self.pushers.insert(pusher_id, pusher).await;
let _ = self.pushers.insert(pusher_id.string_key(), pusher).await;
}
pub fn mailbox(&self) -> &MailboxRef {

View File

@@ -24,6 +24,7 @@ pub mod mock {
use client::Client;
use common_grpc::channel_manager::ChannelManager;
use common_meta::peer::Peer;
use common_runtime::runtime::BuilderBuild;
use common_runtime::{Builder as RuntimeBuilder, Runtime};
use servers::grpc::region_server::{RegionServerHandler, RegionServerRequestHandler};
use tokio::sync::mpsc;

View File

@@ -31,7 +31,7 @@ use tonic::{Request, Response, Streaming};
use crate::error;
use crate::error::Result;
use crate::handler::{HeartbeatHandlerGroup, Pusher};
use crate::handler::{HeartbeatHandlerGroup, Pusher, PusherId};
use crate::metasrv::{Context, Metasrv};
use crate::metrics::METRIC_META_HEARTBEAT_RECV;
use crate::service::{GrpcResult, GrpcStream};
@@ -52,7 +52,7 @@ impl heartbeat_server::Heartbeat for Metasrv {
let ctx = self.new_ctx();
let _handle = common_runtime::spawn_global(async move {
let mut pusher_key = None;
let mut pusher_id = None;
while let Some(msg) = in_stream.next().await {
let mut is_not_leader = false;
match msg {
@@ -67,11 +67,11 @@ impl heartbeat_server::Heartbeat for Metasrv {
break;
};
if pusher_key.is_none() {
pusher_key = register_pusher(&handler_group, header, tx.clone()).await;
if pusher_id.is_none() {
pusher_id = register_pusher(&handler_group, header, tx.clone()).await;
}
if let Some(k) = &pusher_key {
METRIC_META_HEARTBEAT_RECV.with_label_values(&[k]);
if let Some(k) = &pusher_id {
METRIC_META_HEARTBEAT_RECV.with_label_values(&[&k.to_string()]);
} else {
METRIC_META_HEARTBEAT_RECV.with_label_values(&["none"]);
}
@@ -111,13 +111,10 @@ impl heartbeat_server::Heartbeat for Metasrv {
}
}
info!(
"Heartbeat stream closed: {:?}",
pusher_key.as_ref().unwrap_or(&"unknown".to_string())
);
info!("Heartbeat stream closed: {pusher_id:?}");
if let Some(key) = pusher_key {
let _ = handler_group.deregister_push(&key).await;
if let Some(pusher_id) = pusher_id {
let _ = handler_group.deregister_push(pusher_id).await;
}
});
@@ -176,13 +173,13 @@ async fn register_pusher(
handler_group: &HeartbeatHandlerGroup,
header: &RequestHeader,
sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
) -> Option<String> {
let role = header.role() as i32;
let node_id = get_node_id(header);
let key = format!("{}-{}", role, node_id);
) -> Option<PusherId> {
let role = header.role();
let id = get_node_id(header);
let pusher_id = PusherId::new(role, id);
let pusher = Pusher::new(sender, header);
handler_group.register_pusher(&key, pusher).await;
Some(key)
handler_group.register_pusher(pusher_id, pusher).await;
Some(pusher_id)
}
#[cfg(test)]

View File

@@ -24,6 +24,7 @@ use futures::Future;
use tokio::sync::oneshot;
use crate::error::{self, Result};
use crate::handler::PusherId;
pub type MailboxRef = Arc<dyn Mailbox>;
@@ -53,11 +54,11 @@ impl Display for Channel {
}
impl Channel {
pub(crate) fn pusher_id(&self) -> String {
pub(crate) fn pusher_id(&self) -> PusherId {
match self {
Channel::Datanode(id) => format!("{}-{}", Role::Datanode as i32, id),
Channel::Frontend(id) => format!("{}-{}", Role::Frontend as i32, id),
Channel::Flownode(id) => format!("{}-{}", Role::Flownode as i32, id),
Channel::Datanode(id) => PusherId::new(Role::Datanode, *id),
Channel::Frontend(id) => PusherId::new(Role::Frontend, *id),
Channel::Flownode(id) => PusherId::new(Role::Flownode, *id),
}
}
}

View File

@@ -137,6 +137,6 @@ mod tests {
fn test_parquet_meta_size() {
let metadata = parquet_meta();
assert_eq!(964, parquet_meta_size(&metadata));
assert_eq!(956, parquet_meta_size(&metadata));
}
}

View File

@@ -134,7 +134,7 @@ pub async fn open_compaction_region(
));
let file_purger = {
let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_jobs));
let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_purges));
Arc::new(LocalFilePurger::new(
purge_scheduler.clone(),
access_layer.clone(),

View File

@@ -28,9 +28,6 @@ use crate::error::Result;
use crate::memtable::MemtableConfig;
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
/// Default max running background job.
const DEFAULT_MAX_BG_JOB: usize = 4;
const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
/// Default channel size for parallel scan task.
const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
@@ -69,8 +66,12 @@ pub struct MitoConfig {
pub compress_manifest: bool,
// Background job configs:
/// Max number of running background jobs (default 4).
pub max_background_jobs: usize,
/// Max number of running background flush jobs (default: 1/2 of cpu cores).
pub max_background_flushes: usize,
/// Max number of running background compaction jobs (default: 1/4 of cpu cores).
pub max_background_compactions: usize,
/// Max number of running background purge jobs (default: number of cpu cores).
pub max_background_purges: usize,
// Flush configs:
/// Interval to auto flush a region if it has not flushed yet (default 30 min).
@@ -137,7 +138,9 @@ impl Default for MitoConfig {
worker_request_batch_size: 64,
manifest_checkpoint_distance: 10,
compress_manifest: false,
max_background_jobs: DEFAULT_MAX_BG_JOB,
max_background_flushes: divide_num_cpus(2),
max_background_compactions: divide_num_cpus(4),
max_background_purges: common_config::utils::get_cpus(),
auto_flush_interval: Duration::from_secs(30 * 60),
global_write_buffer_size: ReadableSize::gb(1),
global_write_buffer_reject_size: ReadableSize::gb(2),
@@ -185,9 +188,26 @@ impl MitoConfig {
self.worker_channel_size = 1;
}
if self.max_background_jobs == 0 {
warn!("Sanitize max background jobs 0 to {}", DEFAULT_MAX_BG_JOB);
self.max_background_jobs = DEFAULT_MAX_BG_JOB;
if self.max_background_flushes == 0 {
warn!(
"Sanitize max background flushes 0 to {}",
divide_num_cpus(2)
);
self.max_background_flushes = divide_num_cpus(2);
}
if self.max_background_compactions == 0 {
warn!(
"Sanitize max background compactions 0 to {}",
divide_num_cpus(4)
);
self.max_background_compactions = divide_num_cpus(4);
}
if self.max_background_purges == 0 {
warn!(
"Sanitize max background purges 0 to {}",
common_config::utils::get_cpus()
);
self.max_background_purges = common_config::utils::get_cpus();
}
if self.global_write_buffer_reject_size <= self.global_write_buffer_size {
@@ -499,7 +519,7 @@ fn divide_num_cpus(divisor: usize) -> usize {
let cores = common_config::utils::get_cpus();
debug_assert!(cores > 0);
(cores + divisor - 1) / divisor
cores.div_ceil(divisor)
}
#[cfg(test)]

View File

@@ -272,7 +272,7 @@ async fn test_readonly_during_compaction() {
.create_engine_with(
MitoConfig {
// Ensure there is only one background worker for purge task.
max_background_jobs: 1,
max_background_purges: 1,
..Default::default()
},
None,
@@ -310,7 +310,7 @@ async fn test_readonly_during_compaction() {
listener.wake();
let notify = Arc::new(Notify::new());
// We already sets max background jobs to 1, so we can submit a task to the
// We already sets max background purges to 1, so we can submit a task to the
// purge scheduler to ensure all purge tasks are finished.
let job_notify = notify.clone();
engine

View File

@@ -126,7 +126,7 @@ pub struct KeyValue<'a> {
op_type: OpType,
}
impl<'a> KeyValue<'a> {
impl KeyValue<'_> {
/// Get primary key columns.
pub fn primary_keys(&self) -> impl Iterator<Item = ValueRef> {
self.helper.indices[..self.helper.num_primary_key_column]

View File

@@ -491,6 +491,116 @@ impl Batch {
// Safety: sequences is not null so it actually returns Some.
self.sequences.get_data(index).unwrap()
}
/// Checks the batch is monotonic by timestamps.
#[cfg(debug_assertions)]
pub(crate) fn check_monotonic(&self) -> bool {
if self.timestamps_native().is_none() {
return true;
}
let timestamps = self.timestamps_native().unwrap();
let sequences = self.sequences.as_arrow().values();
timestamps.windows(2).enumerate().all(|(i, window)| {
let current = window[0];
let next = window[1];
let current_sequence = sequences[i];
let next_sequence = sequences[i + 1];
if current == next {
current_sequence >= next_sequence
} else {
current < next
}
})
}
/// Returns true if the given batch is behind the current batch.
#[cfg(debug_assertions)]
pub(crate) fn check_next_batch(&self, other: &Batch) -> bool {
// Checks the primary key and then the timestamp.
use std::cmp::Ordering;
self.primary_key()
.cmp(other.primary_key())
.then_with(|| self.last_timestamp().cmp(&other.first_timestamp()))
.then_with(|| other.first_sequence().cmp(&self.last_sequence()))
<= Ordering::Equal
}
}
/// A struct to check the batch is monotonic.
#[cfg(debug_assertions)]
#[derive(Default)]
pub(crate) struct BatchChecker {
last_batch: Option<Batch>,
}
#[cfg(debug_assertions)]
impl BatchChecker {
/// Returns true if the given batch is monotonic and behind
/// the last batch.
pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> bool {
if !batch.check_monotonic() {
return false;
}
// Checks the batch is behind the last batch.
// Then Updates the last batch.
let is_behind = self
.last_batch
.as_ref()
.map(|last| last.check_next_batch(batch))
.unwrap_or(true);
self.last_batch = Some(batch.clone());
is_behind
}
/// Formats current batch and last batch for debug.
pub(crate) fn format_batch(&self, batch: &Batch) -> String {
use std::fmt::Write;
let mut message = String::new();
if let Some(last) = &self.last_batch {
write!(
message,
"last_pk: {:?}, last_ts: {:?}, last_seq: {:?}, ",
last.primary_key(),
last.last_timestamp(),
last.last_sequence()
)
.unwrap();
}
write!(
message,
"batch_pk: {:?}, batch_ts: {:?}, batch_seq: {:?}",
batch.primary_key(),
batch.timestamps(),
batch.sequences()
)
.unwrap();
message
}
/// Checks batches from the part range are monotonic. Otherwise, panics.
pub(crate) fn ensure_part_range_batch(
&mut self,
scanner: &str,
region_id: store_api::storage::RegionId,
partition: usize,
part_range: store_api::region_engine::PartitionRange,
batch: &Batch,
) {
if !self.check_monotonic(batch) {
panic!(
"{}: batch is not sorted, region_id: {}, partition: {}, part_range: {:?}, {}",
scanner,
region_id,
partition,
part_range,
self.format_batch(batch),
);
}
}
}
/// Len of timestamp in arrow row format.

View File

@@ -160,6 +160,11 @@ impl ProjectionMapper {
self.output_schema.clone()
}
/// Returns an empty [RecordBatch].
pub(crate) fn empty_record_batch(&self) -> RecordBatch {
RecordBatch::new_empty(self.output_schema.clone())
}
/// Converts a [Batch] to a [RecordBatch].
///
/// The batch must match the `projection` using to build the mapper.

View File

@@ -203,6 +203,7 @@ impl SeqScan {
let semaphore = self.semaphore.clone();
let partition_ranges = self.properties.partitions[partition].clone();
let compaction = self.compaction;
let distinguish_range = self.properties.distinguish_partition_range();
let part_metrics = PartitionMetrics::new(
self.stream_ctx.input.mapper.metadata().region_id,
partition,
@@ -230,6 +231,8 @@ impl SeqScan {
let cache = stream_ctx.input.cache_manager.as_deref();
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();
#[cfg(debug_assertions)]
let mut checker = crate::read::BatchChecker::default();
while let Some(batch) = reader
.next_batch()
.await
@@ -240,6 +243,20 @@ impl SeqScan {
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();
debug_assert!(!batch.is_empty());
if batch.is_empty() {
continue;
}
#[cfg(debug_assertions)]
checker.ensure_part_range_batch(
"SeqScan",
stream_ctx.input.mapper.metadata().region_id,
partition,
part_range,
&batch,
);
let convert_start = Instant::now();
let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
metrics.convert_cost += convert_start.elapsed();
@@ -249,6 +266,15 @@ impl SeqScan {
fetch_start = Instant::now();
}
// Yields an empty part to indicate this range is terminated.
// The query engine can use this to optimize some queries.
if distinguish_range {
let yield_start = Instant::now();
yield stream_ctx.input.mapper.empty_record_batch();
metrics.yield_cost += yield_start.elapsed();
}
metrics.scan_cost += fetch_start.elapsed();
part_metrics.merge_metrics(&metrics);
}

View File

@@ -129,6 +129,7 @@ impl UnorderedScan {
);
let stream_ctx = self.stream_ctx.clone();
let part_ranges = self.properties.partitions[partition].clone();
let distinguish_range = self.properties.distinguish_partition_range();
let stream = try_stream! {
part_metrics.on_first_poll();
@@ -138,6 +139,8 @@ impl UnorderedScan {
for part_range in part_ranges {
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();
#[cfg(debug_assertions)]
let mut checker = crate::read::BatchChecker::default();
let stream = Self::scan_partition_range(
stream_ctx.clone(),
@@ -150,6 +153,20 @@ impl UnorderedScan {
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();
debug_assert!(!batch.is_empty());
if batch.is_empty() {
continue;
}
#[cfg(debug_assertions)]
checker.ensure_part_range_batch(
"UnorderedScan",
stream_ctx.input.mapper.metadata().region_id,
partition,
part_range,
&batch,
);
let convert_start = Instant::now();
let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
metrics.convert_cost += convert_start.elapsed();
@@ -160,6 +177,14 @@ impl UnorderedScan {
fetch_start = Instant::now();
}
// Yields an empty part to indicate this range is terminated.
// The query engine can use this to optimize some queries.
if distinguish_range {
let yield_start = Instant::now();
yield stream_ctx.input.mapper.empty_record_batch();
metrics.yield_cost += yield_start.elapsed();
}
metrics.scan_cost += fetch_start.elapsed();
part_metrics.merge_metrics(&metrics);
}

View File

@@ -18,7 +18,7 @@ use index::inverted_index::search::predicate::{Bound, Predicate, Range, RangePre
use crate::error::Result;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
impl<'a> InvertedIndexApplierBuilder<'a> {
impl InvertedIndexApplierBuilder<'_> {
/// Collects a `BETWEEN` expression in the form of `column BETWEEN lit AND lit`.
pub(crate) fn collect_between(&mut self, between: &Between) -> Result<()> {
if between.negated {

View File

@@ -19,7 +19,7 @@ use index::inverted_index::Bytes;
use crate::error::Result;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
impl<'a> InvertedIndexApplierBuilder<'a> {
impl InvertedIndexApplierBuilder<'_> {
/// Collects a comparison expression in the form of
/// `column < lit`, `column > lit`, `column <= lit`, `column >= lit`,
/// `lit < column`, `lit > column`, `lit <= column`, `lit >= column`.

View File

@@ -22,7 +22,7 @@ use index::inverted_index::Bytes;
use crate::error::Result;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
impl<'a> InvertedIndexApplierBuilder<'a> {
impl InvertedIndexApplierBuilder<'_> {
/// Collects an eq expression in the form of `column = lit`.
pub(crate) fn collect_eq(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> {
let Some(column_name) = Self::column_name(left).or_else(|| Self::column_name(right)) else {

View File

@@ -20,7 +20,7 @@ use index::inverted_index::search::predicate::{InListPredicate, Predicate};
use crate::error::Result;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
impl<'a> InvertedIndexApplierBuilder<'a> {
impl InvertedIndexApplierBuilder<'_> {
/// Collects an in list expression in the form of `column IN (lit, lit, ...)`.
pub(crate) fn collect_inlist(&mut self, inlist: &InList) -> Result<()> {
if inlist.negated {

View File

@@ -19,7 +19,7 @@ use index::inverted_index::search::predicate::{Predicate, RegexMatchPredicate};
use crate::error::Result;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
impl<'a> InvertedIndexApplierBuilder<'a> {
impl InvertedIndexApplierBuilder<'_> {
/// Collects a regex match expression in the form of `column ~ pattern`.
pub(crate) fn collect_regex_match(&mut self, column: &DfExpr, pattern: &DfExpr) -> Result<()> {
let Some(column_name) = Self::column_name(column) else {

View File

@@ -153,7 +153,7 @@ impl<'a, R> InstrumentedAsyncRead<'a, R> {
}
}
impl<'a, R: AsyncRead + Unpin + Send> AsyncRead for InstrumentedAsyncRead<'a, R> {
impl<R: AsyncRead + Unpin + Send> AsyncRead for InstrumentedAsyncRead<'_, R> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
@@ -168,7 +168,7 @@ impl<'a, R: AsyncRead + Unpin + Send> AsyncRead for InstrumentedAsyncRead<'a, R>
}
}
impl<'a, R: AsyncSeek + Unpin + Send> AsyncSeek for InstrumentedAsyncRead<'a, R> {
impl<R: AsyncSeek + Unpin + Send> AsyncSeek for InstrumentedAsyncRead<'_, R> {
fn poll_seek(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
@@ -209,7 +209,7 @@ impl<'a, W> InstrumentedAsyncWrite<'a, W> {
}
}
impl<'a, W: AsyncWrite + Unpin + Send> AsyncWrite for InstrumentedAsyncWrite<'a, W> {
impl<W: AsyncWrite + Unpin + Send> AsyncWrite for InstrumentedAsyncWrite<'_, W> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
@@ -254,7 +254,7 @@ impl<'a> CounterGuard<'a> {
}
}
impl<'a> Drop for CounterGuard<'a> {
impl Drop for CounterGuard<'_> {
fn drop(&mut self) {
if self.count > 0 {
self.counter.inc_by(self.count as _);

View File

@@ -35,7 +35,11 @@ pub(crate) struct MetadataLoader<'a> {
impl<'a> MetadataLoader<'a> {
/// Create a new parquet metadata loader.
pub fn new(object_store: ObjectStore, file_path: &'a str, file_size: u64) -> MetadataLoader {
pub fn new(
object_store: ObjectStore,
file_path: &'a str,
file_size: u64,
) -> MetadataLoader<'a> {
Self {
object_store,
file_path,

View File

@@ -360,7 +360,7 @@ fn cache_uncompressed_pages(column: &ColumnChunkMetaData) -> bool {
column.uncompressed_size() as usize <= DEFAULT_PAGE_SIZE
}
impl<'a> RowGroups for InMemoryRowGroup<'a> {
impl RowGroups for InMemoryRowGroup<'_> {
fn num_rows(&self) -> usize {
self.row_count
}

View File

@@ -64,7 +64,7 @@ impl<'a, T> RowGroupPruningStats<'a, T> {
}
}
impl<'a, T: Borrow<RowGroupMetaData>> PruningStatistics for RowGroupPruningStats<'a, T> {
impl<T: Borrow<RowGroupMetaData>> PruningStatistics for RowGroupPruningStats<'_, T> {
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
let column_id = self.column_id_to_prune(&column.name)?;
self.read_format.min_values(self.row_groups, column_id)

View File

@@ -114,8 +114,10 @@ pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3;
pub(crate) struct WorkerGroup {
/// Workers of the group.
workers: Vec<RegionWorker>,
/// Global background job scheduelr.
scheduler: SchedulerRef,
/// Flush background job pool.
flush_job_pool: SchedulerRef,
/// Compaction background job pool.
compact_job_pool: SchedulerRef,
/// Scheduler for file purgers.
purge_scheduler: SchedulerRef,
/// Cache.
@@ -146,10 +148,10 @@ impl WorkerGroup {
let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
.await?
.with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
// We use another scheduler to avoid purge jobs blocking other jobs.
// A purge job is cheaper than other background jobs so they share the same job limit.
let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges));
let write_cache = write_cache_from_config(
&config,
object_store_manager.clone(),
@@ -178,7 +180,8 @@ impl WorkerGroup {
log_store: log_store.clone(),
object_store_manager: object_store_manager.clone(),
write_buffer_manager: write_buffer_manager.clone(),
scheduler: scheduler.clone(),
flush_job_pool: flush_job_pool.clone(),
compact_job_pool: compact_job_pool.clone(),
purge_scheduler: purge_scheduler.clone(),
listener: WorkerListener::default(),
cache_manager: cache_manager.clone(),
@@ -195,7 +198,8 @@ impl WorkerGroup {
Ok(WorkerGroup {
workers,
scheduler,
flush_job_pool,
compact_job_pool,
purge_scheduler,
cache_manager,
})
@@ -205,8 +209,11 @@ impl WorkerGroup {
pub(crate) async fn stop(&self) -> Result<()> {
info!("Stop region worker group");
// TODO(yingwen): Do we need to stop gracefully?
// Stops the scheduler gracefully.
self.scheduler.stop(true).await?;
self.compact_job_pool.stop(true).await?;
// Stops the scheduler gracefully.
self.flush_job_pool.stop(true).await?;
// Stops the purge scheduler gracefully.
self.purge_scheduler.stop(true).await?;
@@ -275,8 +282,9 @@ impl WorkerGroup {
.with_notifier(flush_sender.clone()),
)
});
let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_flushes));
let puffin_manager_factory = PuffinManagerFactory::new(
&config.index.aux_path,
config.index.staging_size.as_bytes(),
@@ -310,7 +318,8 @@ impl WorkerGroup {
log_store: log_store.clone(),
object_store_manager: object_store_manager.clone(),
write_buffer_manager: write_buffer_manager.clone(),
scheduler: scheduler.clone(),
flush_job_pool: flush_job_pool.clone(),
compact_job_pool: compact_job_pool.clone(),
purge_scheduler: purge_scheduler.clone(),
listener: WorkerListener::new(listener.clone()),
cache_manager: cache_manager.clone(),
@@ -327,7 +336,8 @@ impl WorkerGroup {
Ok(WorkerGroup {
workers,
scheduler,
flush_job_pool,
compact_job_pool,
purge_scheduler,
cache_manager,
})
@@ -382,7 +392,8 @@ struct WorkerStarter<S> {
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
write_buffer_manager: WriteBufferManagerRef,
scheduler: SchedulerRef,
compact_job_pool: SchedulerRef,
flush_job_pool: SchedulerRef,
purge_scheduler: SchedulerRef,
listener: WorkerListener,
cache_manager: CacheManagerRef,
@@ -423,9 +434,9 @@ impl<S: LogStore> WorkerStarter<S> {
),
purge_scheduler: self.purge_scheduler.clone(),
write_buffer_manager: self.write_buffer_manager,
flush_scheduler: FlushScheduler::new(self.scheduler.clone()),
flush_scheduler: FlushScheduler::new(self.flush_job_pool),
compaction_scheduler: CompactionScheduler::new(
self.scheduler,
self.compact_job_pool,
sender.clone(),
self.cache_manager.clone(),
self.config,

View File

@@ -153,7 +153,7 @@ static LOGGING_TARGET: &str = "opendal::services";
struct LoggingContext<'a>(&'a [(&'a str, &'a str)]);
impl<'a> Display for LoggingContext<'a> {
impl Display for LoggingContext<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for (i, (k, v)) in self.0.iter().enumerate() {
if i > 0 {

View File

@@ -317,8 +317,33 @@ pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str
.context(IntermediateKeyIndexSnafu { kind, key })
}
/// SelectInfo is used to store the selected keys from OpenTelemetry record attrs
#[derive(Default)]
pub struct SelectInfo {
pub keys: Vec<String>,
}
/// Try to convert a string to SelectInfo
/// The string should be a comma-separated list of keys
/// example: "key1,key2,key3"
/// The keys will be sorted and deduplicated
impl From<String> for SelectInfo {
fn from(value: String) -> Self {
let mut keys: Vec<String> = value.split(',').map(|s| s.to_string()).sorted().collect();
keys.dedup();
SelectInfo { keys }
}
}
impl SelectInfo {
pub fn is_empty(&self) -> bool {
self.keys.is_empty()
}
}
pub enum PipelineWay {
Identity,
OtlpLog(Box<SelectInfo>),
Custom(std::sync::Arc<Pipeline<crate::GreptimeTransformer>>),
}

View File

@@ -62,7 +62,8 @@ const TARGET_FIELDS_NAME: &str = "target_fields";
// const ON_FAILURE_NAME: &str = "on_failure";
// const TAG_NAME: &str = "tag";
/// Processor trait defines the interface for all processors
/// Processor trait defines the interface for all processors.
///
/// A processor is a transformation that can be applied to a field in a document
/// It can be used to extract, transform, or enrich data
/// Now Processor only have one input field. In the future, we may support multiple input fields.

View File

@@ -254,7 +254,7 @@ impl TimestampProcessor {
}
fn parse_formats(yaml: &yaml_rust::yaml::Yaml) -> Result<Vec<(Arc<String>, Tz)>> {
return match yaml.as_vec() {
match yaml.as_vec() {
Some(formats_yaml) => {
let mut formats = Vec::with_capacity(formats_yaml.len());
for v in formats_yaml {
@@ -286,7 +286,7 @@ fn parse_formats(yaml: &yaml_rust::yaml::Yaml) -> Result<Vec<(Arc<String>, Tz)>>
s: format!("{yaml:?}"),
}
.fail(),
};
}
}
impl TryFrom<&yaml_rust::yaml::Hash> for TimestampProcessorBuilder {

Some files were not shown because too many files have changed in this diff Show More