Compare commits

..

2 Commits

Author SHA1 Message Date
Weny Xu
60852af5f8 chore: bump version to 0.13.2 (#5837)
* fix: mysql prepare bool value (#5732)

* fix: mysql prepare limit&offset param (#5734)

* fix: prepare limit&offset param

* test: sqlness

* chore: per review

* chore: per review

* fix: wrap table name with `` (#5748)

* fix: wrap table name with quotes

* fix: minor fix

* fix: handle nullable default value (#5747)

* fix: handle nullable default value

* chore: update sqlness

* fix: properly give placeholder types (#5760)

* fix: properly give placeholder types

* chore: update sqlness

* fix: support __name__ matcher in label values (#5773)

* fix: typo variadic (#5800)

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: close issue #3902 since upstream fixed (#5801)

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

* fix: correct error status code (#5802)

* fix: interval cast expression can't work in range query, #5805 (#5813)

* fix: interval cast expression can't work in range query, #5805

* fix: nested cast

* test: make vector test stable

* feat: introduce poison mechanism for procedure  (#5822)

* feat: introduce poison for procedure

* tests: add unit tests

* refactor: minor refactor

* fix: unit tests

* chore: fix unit tests

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* chore: update comments

* chore: introduce `ProcedureStatus::Poisoned`

* chore: upgrade greptime-proto to `2be0f`

* chore: apply suggestions from CR

* fix: throw errors instead of ignoring (#5792)

* fix: throw errors instead of ignoring

* fix: fix unit tests

* refactor: remove schema version check

* fix: fix clippy

* chore: remove unused error

* refactor: remove schema version check

* feat: handle mutliple results

* feat: introduce consistency guard

* fix: release consistency guard on datanode operation completion

* test: add tests

* chore: remove schema version

* refactor: rename

* test: add more tests

* chore: print all error

* tests: query table after alteration

* log ignored request

* refine fuzz test

* chore: fix clippy and log mailbox message

* chore: close prepared statement after execution

* chore: add comment

* chore: remove log

* chore: rename to `ConsistencyPoison`

* chore: remove unused error

* fix: fix unit tests

* chore: apply suggestions from CR

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: yihong0618 <zouzou0208@gmail.com>
Co-authored-by: discord9 <55937128+discord9@users.noreply.github.com>
Co-authored-by: Yohan Wal <profsyb@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: yihong <zouzou0208@gmail.com>
Co-authored-by: dennis zhuang <killme2008@gmail.com>
2025-04-08 14:16:40 +08:00
shuiyisong
557e850d87 chore: use Bytes instead of string in bulk ingestion (#5717)
chore: use bytes instead of string in bulk log ingestion
2025-03-18 04:59:02 +08:00
421 changed files with 7373 additions and 16700 deletions

View File

@@ -8,7 +8,7 @@ inputs:
default: 2
description: "Number of Datanode replicas"
meta-replicas:
default: 2
default: 1
description: "Number of Metasrv replicas"
image-registry:
default: "docker.io"

View File

@@ -576,12 +576,9 @@ jobs:
- name: "Remote WAL"
opts: "-w kafka -k 127.0.0.1:9092"
kafka: true
- name: "PostgreSQL KvBackend"
- name: "Pg Kvbackend"
opts: "--setup-pg"
kafka: false
- name: "MySQL Kvbackend"
opts: "--setup-mysql"
kafka: false
timeout-minutes: 60
steps:
- uses: actions/checkout@v4

View File

@@ -91,7 +91,7 @@ env:
# The scheduled version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-YYYYMMDD', like v0.2.0-nigthly-20230313;
NIGHTLY_RELEASE_PREFIX: nightly
# Note: The NEXT_RELEASE_VERSION should be modified manually by every formal release.
NEXT_RELEASE_VERSION: v0.14.0
NEXT_RELEASE_VERSION: v0.13.0
jobs:
allocate-runners:

3
.gitignore vendored
View File

@@ -54,6 +54,3 @@ tests-fuzz/corpus/
# Nix
.direnv
.envrc
## default data home
greptimedb_data

1266
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -29,7 +29,6 @@ members = [
"src/common/query",
"src/common/recordbatch",
"src/common/runtime",
"src/common/session",
"src/common/substrait",
"src/common/telemetry",
"src/common/test-util",
@@ -68,7 +67,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.14.0"
version = "0.13.0"
edition = "2021"
license = "Apache-2.0"
@@ -89,7 +88,7 @@ rust.unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
#
# See for more detaiils: https://github.com/rust-lang/cargo/issues/11329
ahash = { version = "0.8", features = ["compile-time-rng"] }
aquamarine = "0.6"
aquamarine = "0.3"
arrow = { version = "53.0.0", features = ["prettyprint"] }
arrow-array = { version = "53.0.0", default-features = false, features = ["chrono-tz"] }
arrow-flight = "53.0"
@@ -100,9 +99,9 @@ async-trait = "0.1"
# Remember to update axum-extra, axum-macros when updating axum
axum = "0.8"
axum-extra = "0.10"
axum-macros = "0.5"
axum-macros = "0.4"
backon = "1"
base64 = "0.22"
base64 = "0.21"
bigdecimal = "0.4.2"
bitflags = "2.4.1"
bytemuck = "1.12"
@@ -112,7 +111,7 @@ chrono-tz = "0.10.1"
clap = { version = "4.4", features = ["derive"] }
config = "0.13.0"
crossbeam-utils = "0.8"
dashmap = "6.1"
dashmap = "5.4"
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "2464703c84c400a09cc59277018813f0e797bb4e" }
datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "2464703c84c400a09cc59277018813f0e797bb4e" }
datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "2464703c84c400a09cc59277018813f0e797bb4e" }
@@ -122,31 +121,31 @@ datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", r
datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "2464703c84c400a09cc59277018813f0e797bb4e" }
datafusion-sql = { git = "https://github.com/apache/datafusion.git", rev = "2464703c84c400a09cc59277018813f0e797bb4e" }
datafusion-substrait = { git = "https://github.com/apache/datafusion.git", rev = "2464703c84c400a09cc59277018813f0e797bb4e" }
deadpool = "0.12"
deadpool-postgres = "0.14"
derive_builder = "0.20"
deadpool = "0.10"
deadpool-postgres = "0.12"
derive_builder = "0.12"
dotenv = "0.15"
etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "dd4a1996982534636734674db66e44464b0c0d83" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2be0f36b3264e28ab0e1c22a980d0bb634eb3a77" }
hex = "0.4"
http = "1"
humantime = "2.1"
humantime-serde = "1.1"
hyper = "1.1"
hyper-util = "0.1"
itertools = "0.14"
itertools = "0.10"
jsonb = { git = "https://github.com/databendlabs/jsonb.git", rev = "8c8d2fc294a39f3ff08909d60f718639cfba3875", default-features = false }
lazy_static = "1.4"
local-ip-address = "0.6"
loki-proto = { git = "https://github.com/GreptimeTeam/loki-proto.git", rev = "1434ecf23a2654025d86188fb5205e7a74b225d3" }
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "5618e779cf2bb4755b499c630fba4c35e91898cb" }
mockall = "0.13"
mockall = "0.11.4"
moka = "0.12"
nalgebra = "0.33"
notify = "8.0"
notify = "6.1"
num_cpus = "1.16"
once_cell = "1.18"
opentelemetry-proto = { version = "0.27", features = [
@@ -164,8 +163,8 @@ prometheus = { version = "0.13.3", features = ["process"] }
promql-parser = { version = "0.5", features = ["ser"] }
prost = "0.13"
raft-engine = { version = "0.4.1", default-features = false }
rand = "0.9"
ratelimit = "0.10"
rand = "0.8"
ratelimit = "0.9"
regex = "1.8"
regex-automata = "0.4"
reqwest = { version = "0.12", default-features = false, features = [
@@ -177,37 +176,33 @@ reqwest = { version = "0.12", default-features = false, features = [
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev = "75535b5ad9bae4a5dbb582c82e44dfd81ec10105", features = [
"transport-tls",
] }
rstest = "0.25"
rstest = "0.21"
rstest_reuse = "0.7"
rust_decimal = "1.33"
rustc-hash = "2.0"
# It is worth noting that we should try to avoid using aws-lc-rs until it can be compiled on various platforms.
rustls = { version = "0.23.25", default-features = false }
rustls = { version = "0.23.20", default-features = false } # override by patch, see [patch.crates-io]
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = ["float_roundtrip"] }
serde_with = "3"
shadow-rs = "1.1"
simd-json = "0.15"
shadow-rs = "0.38"
similar-asserts = "1.6.0"
smallvec = { version = "1", features = ["serde"] }
snafu = "0.8"
sqlx = { version = "0.8", features = [
"runtime-tokio-rustls",
"mysql",
"postgres",
"chrono",
] }
sysinfo = "0.33"
sysinfo = "0.30"
# on branch v0.52.x
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "71dd86058d2af97b9925093d40c4e03360403170", features = [
"visitor",
"serde",
] } # on branch v0.44.x
strum = { version = "0.27", features = ["derive"] }
strum = { version = "0.25", features = ["derive"] }
tempfile = "3"
tokio = { version = "1.40", features = ["full"] }
tokio-postgres = "0.7"
tokio-rustls = { version = "0.26.2", default-features = false }
tokio-rustls = { version = "0.26.0", default-features = false } # override by patch, see [patch.crates-io]
tokio-stream = "0.1"
tokio-util = { version = "0.7", features = ["io-util", "compat"] }
toml = "0.8.8"
@@ -250,7 +245,6 @@ common-procedure-test = { path = "src/common/procedure-test" }
common-query = { path = "src/common/query" }
common-recordbatch = { path = "src/common/recordbatch" }
common-runtime = { path = "src/common/runtime" }
common-session = { path = "src/common/session" }
common-telemetry = { path = "src/common/telemetry" }
common-test-util = { path = "src/common/test-util" }
common-time = { path = "src/common/time" }
@@ -283,6 +277,15 @@ store-api = { path = "src/store-api" }
substrait = { path = "src/common/substrait" }
table = { path = "src/table" }
[patch.crates-io]
# change all rustls dependencies to use our fork to default to `ring` to make it "just work"
hyper-rustls = { git = "https://github.com/GreptimeTeam/hyper-rustls", rev = "a951e03" } # version = "0.27.5" with ring patch
rustls = { git = "https://github.com/GreptimeTeam/rustls", rev = "34fd0c6" } # version = "0.23.20" with ring patch
tokio-rustls = { git = "https://github.com/GreptimeTeam/tokio-rustls", rev = "4604ca6" } # version = "0.26.0" with ring patch
# This is commented, since we are not using aws-lc-sys, if we need to use it, we need to uncomment this line or use a release after this commit, or it wouldn't compile with gcc < 8.1
# see https://github.com/aws/aws-lc-rs/pull/526
# aws-lc-sys = { git ="https://github.com/aws/aws-lc-rs", rev = "556558441e3494af4b156ae95ebc07ebc2fd38aa" }
[workspace.dependencies.meter-macros]
git = "https://github.com/GreptimeTeam/greptime-meter.git"
rev = "5618e779cf2bb4755b499c630fba4c35e91898cb"

View File

@@ -6,7 +6,7 @@
</picture>
</p>
<h2 align="center">Unified & Cost-Effective Observerability Database for Metrics, Logs, and Events</h2>
<h2 align="center">Unified & Cost-Effective Time Series Database for Metrics, Logs, and Events</h2>
<div align="center">
<h3 align="center">
@@ -62,19 +62,15 @@
## Introduction
**GreptimeDB** is an open-source unified & cost-effective observerability database for **Metrics**, **Logs**, and **Events** (also **Traces** in plan). You can gain real-time insights from Edge to Cloud at Any Scale.
## News
**[GreptimeDB tops JSONBench's billion-record cold run test!](https://greptime.com/blogs/2025-03-18-jsonbench-greptimedb-performance)**
**GreptimeDB** is an open-source unified & cost-effective time-series database for **Metrics**, **Logs**, and **Events** (also **Traces** in plan). You can gain real-time insights from Edge to Cloud at Any Scale.
## Why GreptimeDB
Our core developers have been building observerability data platforms for years. Based on our best practices, GreptimeDB was born to give you:
Our core developers have been building time-series data platforms for years. Based on our best practices, GreptimeDB was born to give you:
* **Unified Processing of Metrics, Logs, and Events**
GreptimeDB unifies observerability data processing by treating all data - whether metrics, logs, or events - as timestamped events with context. Users can analyze this data using either [SQL](https://docs.greptime.com/user-guide/query-data/sql) or [PromQL](https://docs.greptime.com/user-guide/query-data/promql) and leverage stream processing ([Flow](https://docs.greptime.com/user-guide/flow-computation/overview)) to enable continuous aggregation. [Read more](https://docs.greptime.com/user-guide/concepts/data-model).
GreptimeDB unifies time series data processing by treating all data - whether metrics, logs, or events - as timestamped events with context. Users can analyze this data using either [SQL](https://docs.greptime.com/user-guide/query-data/sql) or [PromQL](https://docs.greptime.com/user-guide/query-data/promql) and leverage stream processing ([Flow](https://docs.greptime.com/user-guide/flow-computation/overview)) to enable continuous aggregation. [Read more](https://docs.greptime.com/user-guide/concepts/data-model).
* **Cloud-native Distributed Database**
@@ -116,7 +112,7 @@ Start a GreptimeDB container with:
```shell
docker run -p 127.0.0.1:4000-4003:4000-4003 \
-v "$(pwd)/greptimedb:./greptimedb_data" \
-v "$(pwd)/greptimedb:/tmp/greptimedb" \
--name greptime --rm \
greptime/greptimedb:latest standalone start \
--http-addr 0.0.0.0:4000 \

View File

@@ -12,6 +12,7 @@
| Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- |
| `mode` | String | `standalone` | The running mode of the datanode. It can be `standalone` or `distributed`. |
| `default_timezone` | String | Unset | The default timezone of the server. |
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. |
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
@@ -23,7 +24,7 @@
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions |
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
@@ -85,6 +86,10 @@
| `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled.<br/>**It's only used when the provider is `kafka`**. |
| `wal.max_batch_bytes` | String | `1MB` | The max size of a single producer batch.<br/>Warning: Kafka has a default limit of 1MB per message in a topic.<br/>**It's only used when the provider is `kafka`**. |
| `wal.consumer_wait_timeout` | String | `100ms` | The consumer wait timeout.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_init` | String | `500ms` | The initial backoff delay.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_max` | String | `10s` | The maximum backoff delay.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_base` | Integer | `2` | The exponential backoff rate, i.e. next backoff = base * current backoff.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_deadline` | String | `5mins` | The deadline of retries.<br/>**It's only used when the provider is `kafka`**. |
| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.<br/>**It's only used when the provider is `kafka`**.<br/><br/>This option ensures that when Kafka messages are deleted, the system<br/>can still successfully replay memtable data without throwing an<br/>out-of-range error.<br/>However, enabling this option might lead to unexpected data loss,<br/>as the system will skip over missing entries instead of treating<br/>them as critical errors. |
| `metadata_store` | -- | -- | Metadata storage options. |
| `metadata_store.file_size` | String | `64MB` | The size of the metadata store log file. |
@@ -93,11 +98,10 @@
| `procedure` | -- | -- | Procedure storage options. |
| `procedure.max_retry_times` | Integer | `3` | Procedure max retry time. |
| `procedure.retry_delay` | String | `500ms` | Initial retry delay of procedures, increases exponentially |
| `procedure.max_running_procedures` | Integer | `128` | Max running procedures.<br/>The maximum number of procedures that can be running at the same time.<br/>If the number of running procedures exceeds this limit, the procedure will be rejected. |
| `flow` | -- | -- | flow engine options. |
| `flow.num_workers` | Integer | `0` | The number of flow worker in flownode.<br/>Not setting(or set to 0) this value will use the number of CPU cores divided by 2. |
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `./greptimedb_data/` | The working home directory. |
| `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
@@ -177,7 +181,7 @@
| `region_engine.metric` | -- | -- | Metric engine options. |
| `region_engine.metric.experimental_sparse_primary_key_encoding` | Bool | `false` | Whether to enable the experimental sparse primary key encoding. |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
@@ -218,7 +222,7 @@
| `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions |
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
@@ -275,7 +279,7 @@
| `datanode.client.connect_timeout` | String | `10s` | -- |
| `datanode.client.tcp_nodelay` | Bool | `true` | -- |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
@@ -304,7 +308,7 @@
| Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- |
| `data_home` | String | `./greptimedb_data/metasrv/` | The working home directory. |
| `data_home` | String | `/tmp/metasrv/` | The working home directory. |
| `bind_addr` | String | `127.0.0.1:3002` | The bind address of metasrv. |
| `server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `bind_addr`. |
| `store_addrs` | Array | -- | Store server address default to etcd store.<br/>For postgres store, the format is:<br/>"password=password dbname=postgres user=postgres host=localhost port=5432"<br/>For etcd store, the format is:<br/>"127.0.0.1:2379" |
@@ -324,7 +328,6 @@
| `procedure.max_retry_times` | Integer | `12` | Procedure max retry time. |
| `procedure.retry_delay` | String | `500ms` | Initial retry delay of procedures, increases exponentially |
| `procedure.max_metadata_value_size` | String | `1500KiB` | Auto split large value<br/>GreptimeDB procedure uses etcd as the default metadata storage backend.<br/>The etcd the maximum size of any request is 1.5 MiB<br/>1500KiB = 1536KiB (1.5MiB) - 36KiB (reserved size of key)<br/>Comments out the `max_metadata_value_size`, for don't split large value (no limit). |
| `procedure.max_running_procedures` | Integer | `128` | Max running procedures.<br/>The maximum number of procedures that can be running at the same time.<br/>If the number of running procedures exceeds this limit, the procedure will be rejected. |
| `failure_detector` | -- | -- | -- |
| `failure_detector.threshold` | Float | `8.0` | The threshold value used by the failure detector to determine failure conditions. |
| `failure_detector.min_std_deviation` | String | `100ms` | The minimum standard deviation of the heartbeat intervals, used to calculate acceptable variations. |
@@ -344,8 +347,12 @@
| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.<br/>Only accepts strings that match the following regular expression pattern:<br/>[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*<br/>i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1. |
| `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition. |
| `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled. |
| `wal.backoff_init` | String | `500ms` | The initial backoff for kafka clients. |
| `wal.backoff_max` | String | `10s` | The maximum backoff for kafka clients. |
| `wal.backoff_base` | Integer | `2` | Exponential backoff rate, i.e. next backoff = base * current backoff. |
| `wal.backoff_deadline` | String | `5mins` | Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate. |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
@@ -374,6 +381,7 @@
| Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- |
| `mode` | String | `standalone` | The running mode of the datanode. It can be `standalone` or `distributed`. |
| `node_id` | Integer | Unset | The datanode identifier and should be unique in the cluster. |
| `require_lease_before_startup` | Bool | `false` | Start services after regions have obtained leases.<br/>It will block the datanode start if it can't receive leases in the heartbeat from metasrv. |
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. |
@@ -382,7 +390,7 @@
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:3001` | The address to bind the gRPC server. |
@@ -426,11 +434,15 @@
| `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.<br/>**It's only used when the provider is `kafka`**. |
| `wal.max_batch_bytes` | String | `1MB` | The max size of a single producer batch.<br/>Warning: Kafka has a default limit of 1MB per message in a topic.<br/>**It's only used when the provider is `kafka`**. |
| `wal.consumer_wait_timeout` | String | `100ms` | The consumer wait timeout.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_init` | String | `500ms` | The initial backoff delay.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_max` | String | `10s` | The maximum backoff delay.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_base` | Integer | `2` | The exponential backoff rate, i.e. next backoff = base * current backoff.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_deadline` | String | `5mins` | The deadline of retries.<br/>**It's only used when the provider is `kafka`**. |
| `wal.create_index` | Bool | `true` | Whether to enable WAL index creation.<br/>**It's only used when the provider is `kafka`**. |
| `wal.dump_index_interval` | String | `60s` | The interval for dumping WAL indexes.<br/>**It's only used when the provider is `kafka`**. |
| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.<br/>**It's only used when the provider is `kafka`**.<br/><br/>This option ensures that when Kafka messages are deleted, the system<br/>can still successfully replay memtable data without throwing an<br/>out-of-range error.<br/>However, enabling this option might lead to unexpected data loss,<br/>as the system will skip over missing entries instead of treating<br/>them as critical errors. |
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `./greptimedb_data/` | The working home directory. |
| `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
@@ -510,7 +522,7 @@
| `region_engine.metric` | -- | -- | Metric engine options. |
| `region_engine.metric.experimental_sparse_primary_key_encoding` | Bool | `false` | Whether to enable the experimental sparse primary key encoding. |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
@@ -539,6 +551,7 @@
| Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- |
| `mode` | String | `distributed` | The running mode of the flownode. It can be `standalone` or `distributed`. |
| `node_id` | Integer | Unset | The flownode identifier and should be unique in the cluster. |
| `flow` | -- | -- | flow engine options. |
| `flow.num_workers` | Integer | `0` | The number of flow worker in flownode.<br/>Not setting(or set to 0) this value will use the number of CPU cores divided by 2. |
@@ -550,7 +563,7 @@
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `meta_client` | -- | -- | The metasrv client options. |
| `meta_client.metasrv_addrs` | Array | -- | The addresses of the metasrv. |
@@ -566,7 +579,7 @@
| `heartbeat.interval` | String | `3s` | Interval for sending heartbeat messages to the metasrv. |
| `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |

View File

@@ -1,3 +1,6 @@
## The running mode of the datanode. It can be `standalone` or `distributed`.
mode = "standalone"
## The datanode identifier and should be unique in the cluster.
## @toml2docs:none-default
node_id = 42
@@ -24,7 +27,7 @@ max_concurrent_queries = 0
## The address to bind the HTTP server.
addr = "127.0.0.1:4000"
## HTTP request timeout. Set to 0 to disable timeout.
timeout = "0s"
timeout = "30s"
## HTTP request body limit.
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.
@@ -116,7 +119,7 @@ provider = "raft_engine"
## The directory to store the WAL files.
## **It's only used when the provider is `raft_engine`**.
## @toml2docs:none-default
dir = "./greptimedb_data/wal"
dir = "/tmp/greptimedb/wal"
## The size of the WAL segment file.
## **It's only used when the provider is `raft_engine`**.
@@ -166,6 +169,22 @@ max_batch_bytes = "1MB"
## **It's only used when the provider is `kafka`**.
consumer_wait_timeout = "100ms"
## The initial backoff delay.
## **It's only used when the provider is `kafka`**.
backoff_init = "500ms"
## The maximum backoff delay.
## **It's only used when the provider is `kafka`**.
backoff_max = "10s"
## The exponential backoff rate, i.e. next backoff = base * current backoff.
## **It's only used when the provider is `kafka`**.
backoff_base = 2
## The deadline of retries.
## **It's only used when the provider is `kafka`**.
backoff_deadline = "5mins"
## Whether to enable WAL index creation.
## **It's only used when the provider is `kafka`**.
create_index = true
@@ -246,7 +265,7 @@ overwrite_entry_start_id = false
## The data storage options.
[storage]
## The working home directory.
data_home = "./greptimedb_data/"
data_home = "/tmp/greptimedb/"
## The storage type used to store the data.
## - `File`: the data is stored in the local file system.
@@ -599,7 +618,7 @@ experimental_sparse_primary_key_encoding = false
## The logging options.
[logging]
## The directory to store the log files. If set to empty, logs will not be written to files.
dir = "./greptimedb_data/logs"
dir = "/tmp/greptimedb/logs"
## The log level. Can be `info`/`debug`/`warn`/`error`.
## @toml2docs:none-default

View File

@@ -1,3 +1,6 @@
## The running mode of the flownode. It can be `standalone` or `distributed`.
mode = "distributed"
## The flownode identifier and should be unique in the cluster.
## @toml2docs:none-default
node_id = 14
@@ -27,7 +30,7 @@ max_send_message_size = "512MB"
## The address to bind the HTTP server.
addr = "127.0.0.1:4000"
## HTTP request timeout. Set to 0 to disable timeout.
timeout = "0s"
timeout = "30s"
## HTTP request body limit.
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.
@@ -73,7 +76,7 @@ retry_interval = "3s"
## The logging options.
[logging]
## The directory to store the log files. If set to empty, logs will not be written to files.
dir = "./greptimedb_data/logs"
dir = "/tmp/greptimedb/logs"
## The log level. Can be `info`/`debug`/`warn`/`error`.
## @toml2docs:none-default
@@ -118,3 +121,4 @@ sample_ratio = 1.0
## The tokio console address.
## @toml2docs:none-default
#+ tokio_console_addr = "127.0.0.1"

View File

@@ -26,7 +26,7 @@ retry_interval = "3s"
## The address to bind the HTTP server.
addr = "127.0.0.1:4000"
## HTTP request timeout. Set to 0 to disable timeout.
timeout = "0s"
timeout = "30s"
## HTTP request body limit.
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.
@@ -189,7 +189,7 @@ tcp_nodelay = true
## The logging options.
[logging]
## The directory to store the log files. If set to empty, logs will not be written to files.
dir = "./greptimedb_data/logs"
dir = "/tmp/greptimedb/logs"
## The log level. Can be `info`/`debug`/`warn`/`error`.
## @toml2docs:none-default

View File

@@ -1,5 +1,5 @@
## The working home directory.
data_home = "./greptimedb_data/metasrv/"
data_home = "/tmp/metasrv/"
## The bind address of metasrv.
bind_addr = "127.0.0.1:3002"
@@ -79,11 +79,6 @@ retry_delay = "500ms"
## Comments out the `max_metadata_value_size`, for don't split large value (no limit).
max_metadata_value_size = "1500KiB"
## Max running procedures.
## The maximum number of procedures that can be running at the same time.
## If the number of running procedures exceeds this limit, the procedure will be rejected.
max_running_procedures = 128
# Failure detectors options.
[failure_detector]
@@ -149,6 +144,17 @@ replication_factor = 1
## Above which a topic creation operation will be cancelled.
create_topic_timeout = "30s"
## The initial backoff for kafka clients.
backoff_init = "500ms"
## The maximum backoff for kafka clients.
backoff_max = "10s"
## Exponential backoff rate, i.e. next backoff = base * current backoff.
backoff_base = 2
## Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate.
backoff_deadline = "5mins"
# The Kafka SASL configuration.
# **It's only used when the provider is `kafka`**.
@@ -171,7 +177,7 @@ create_topic_timeout = "30s"
## The logging options.
[logging]
## The directory to store the log files. If set to empty, logs will not be written to files.
dir = "./greptimedb_data/logs"
dir = "/tmp/greptimedb/logs"
## The log level. Can be `info`/`debug`/`warn`/`error`.
## @toml2docs:none-default

View File

@@ -1,3 +1,6 @@
## The running mode of the datanode. It can be `standalone` or `distributed`.
mode = "standalone"
## The default timezone of the server.
## @toml2docs:none-default
default_timezone = "UTC"
@@ -31,7 +34,7 @@ max_concurrent_queries = 0
## The address to bind the HTTP server.
addr = "127.0.0.1:4000"
## HTTP request timeout. Set to 0 to disable timeout.
timeout = "0s"
timeout = "30s"
## HTTP request body limit.
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.
@@ -161,7 +164,7 @@ provider = "raft_engine"
## The directory to store the WAL files.
## **It's only used when the provider is `raft_engine`**.
## @toml2docs:none-default
dir = "./greptimedb_data/wal"
dir = "/tmp/greptimedb/wal"
## The size of the WAL segment file.
## **It's only used when the provider is `raft_engine`**.
@@ -239,6 +242,22 @@ max_batch_bytes = "1MB"
## **It's only used when the provider is `kafka`**.
consumer_wait_timeout = "100ms"
## The initial backoff delay.
## **It's only used when the provider is `kafka`**.
backoff_init = "500ms"
## The maximum backoff delay.
## **It's only used when the provider is `kafka`**.
backoff_max = "10s"
## The exponential backoff rate, i.e. next backoff = base * current backoff.
## **It's only used when the provider is `kafka`**.
backoff_base = 2
## The deadline of retries.
## **It's only used when the provider is `kafka`**.
backoff_deadline = "5mins"
## Ignore missing entries during read WAL.
## **It's only used when the provider is `kafka`**.
##
@@ -283,10 +302,6 @@ purge_interval = "1m"
max_retry_times = 3
## Initial retry delay of procedures, increases exponentially
retry_delay = "500ms"
## Max running procedures.
## The maximum number of procedures that can be running at the same time.
## If the number of running procedures exceeds this limit, the procedure will be rejected.
max_running_procedures = 128
## flow engine options.
[flow]
@@ -337,7 +352,7 @@ max_running_procedures = 128
## The data storage options.
[storage]
## The working home directory.
data_home = "./greptimedb_data/"
data_home = "/tmp/greptimedb/"
## The storage type used to store the data.
## - `File`: the data is stored in the local file system.
@@ -690,7 +705,7 @@ experimental_sparse_primary_key_encoding = false
## The logging options.
[logging]
## The directory to store the log files. If set to empty, logs will not be written to files.
dir = "./greptimedb_data/logs"
dir = "/tmp/greptimedb/logs"
## The log level. Can be `info`/`debug`/`warn`/`error`.
## @toml2docs:none-default

View File

@@ -25,7 +25,7 @@ services:
- --initial-cluster-state=new
- *etcd_initial_cluster_token
volumes:
- ./greptimedb-cluster-docker-compose/etcd0:/var/lib/etcd
- /tmp/greptimedb-cluster-docker-compose/etcd0:/var/lib/etcd
healthcheck:
test: [ "CMD", "etcdctl", "--endpoints=http://etcd0:2379", "endpoint", "health" ]
interval: 5s
@@ -68,13 +68,12 @@ services:
- datanode
- start
- --node-id=0
- --data-home=/greptimedb_data
- --rpc-bind-addr=0.0.0.0:3001
- --rpc-server-addr=datanode0:3001
- --metasrv-addrs=metasrv:3002
- --http-addr=0.0.0.0:5000
volumes:
- ./greptimedb-cluster-docker-compose/datanode0:/greptimedb_data
- /tmp/greptimedb-cluster-docker-compose/datanode0:/tmp/greptimedb
healthcheck:
test: [ "CMD", "curl", "-fv", "http://datanode0:5000/health" ]
interval: 5s

View File

@@ -3,7 +3,7 @@
This document introduces how to write fuzz tests in GreptimeDB.
## What is a fuzz test
Fuzz test is tool that leverage deterministic random generation to assist in finding bugs. The goal of fuzz tests is to identify inputs generated by the fuzzer that cause system panics, crashes, or unexpected behaviors to occur. And we are using the [cargo-fuzz](https://github.com/rust-fuzz/cargo-fuzz) to run our fuzz test targets.
Fuzz test is tool that leverage deterministic random generation to assist in finding bugs. The goal of fuzz tests is to identify inputs generated by the fuzzer that cause system panics, crashes, or unexpected behaviors to occur. And we are using the [cargo-fuzz](https://github.com/rust-fuzz/cargo-fuzz) to run our fuzz test targets.
## Why we need them
- Find bugs by leveraging random generation
@@ -13,7 +13,7 @@ Fuzz test is tool that leverage deterministic random generation to assist in fin
All fuzz test-related resources are located in the `/tests-fuzz` directory.
There are two types of resources: (1) fundamental components and (2) test targets.
### Fundamental components
### Fundamental components
They are located in the `/tests-fuzz/src` directory. The fundamental components define how to generate SQLs (including dialects for different protocols) and validate execution results (e.g., column attribute validation), etc.
### Test targets
@@ -21,25 +21,25 @@ They are located in the `/tests-fuzz/targets` directory, with each file represen
Figure 1 illustrates the fundamental components of the fuzz test provide the ability to generate random SQLs. It utilizes a Random Number Generator (Rng) to generate the Intermediate Representation (IR), then employs a DialectTranslator to produce specified dialects for different protocols. Finally, the fuzz tests send the generated SQL via the specified protocol and verify that the execution results meet expectations.
```
Rng
|
|
v
ExprGenerator
|
|
v
Intermediate representation (IR)
|
|
+----------------------+----------------------+
| | |
v v v
Rng
|
|
v
ExprGenerator
|
|
v
Intermediate representation (IR)
|
|
+----------------------+----------------------+
| | |
v v v
MySQLTranslator PostgreSQLTranslator OtherDialectTranslator
| | |
| | |
v v v
SQL(MySQL Dialect) ..... .....
| | |
| | |
v v v
SQL(MySQL Dialect) ..... .....
|
|
v
@@ -133,4 +133,4 @@ fuzz_target!(|input: FuzzInput| {
cargo fuzz run <fuzz-target> --fuzz-dir tests-fuzz
```
For more details, please refer to this [document](/tests-fuzz/README.md).
For more details, please refer to this [document](/tests-fuzz/README.md).

View File

@@ -1,77 +0,0 @@
---
Feature Name: Remote WAL Purge
Tracking Issue: https://github.com/GreptimeTeam/greptimedb/issues/5474
Date: 2025-02-06
Author: "Yuhan Wang <profsyb@gmail.com>"
---
# Summary
This RFC proposes a method for purging remote WAL in the database.
# Motivation
Currently only local wal entries are purged when flushing, while remote wal does nothing.
# Details
```mermaid
sequenceDiagram
Region0->>Kafka: Last entry id of the topic in use
Region0->>WALPruner: Heartbeat with last entry id
WALPruner->>+WALPruner: Time Loop
WALPruner->>+ProcedureManager: Submit purge procedure
ProcedureManager->>Region0: Flush request
ProcedureManager->>Kafka: Prune WAL entries
Region0->>Region0: Flush
```
## Steps
### Before purge
Before purging remote WAL, metasrv needs to know:
1. `last_entry_id` of each region.
2. `kafka_topic_last_entry_id` which is the last entry id of the topic in use. Can be lazily updated and needed when region has empty memtable.
3. Kafka topics that each region uses.
The states are maintained through:
1. Heartbeat: Datanode sends `last_entry_id` to metasrv in heartbeat. As for regions with empty memtable, `last_entry_id` should equals to `kafka_topic_last_entry_id`.
2. Metasrv maintains a topic-region map to know which region uses which topic.
`kafka_topic_last_entry_id` will be maintained by the region itself. Region will update the value after `k` heartbeats if the memtable is empty.
### Purge procedure
We can better handle locks utilizing current procedure. It's quite similar to the region migration procedure.
After a period of time, metasrv will submit a purge procedure to ProcedureManager. The purge will apply to all topics.
The procedure is divided into following stages:
1. Preparation:
- Retrieve `last_entry_id` of each region kvbackend.
- Choose regions that have a relatively small `last_entry_id` as candidate regions, which means we need to send a flush request to these regions.
2. Communication:
- Send flush requests to candidate regions.
3. Purge:
- Choose proper entry id to delete for each topic. The entry should be the smallest `last_entry_id - 1` among all regions.
- Delete legacy entries in Kafka.
- Store the `last_purged_entry_id` in kvbackend. It should be locked to prevent other regions from replaying the purged entries.
### After purge
After purge, there may be some regions that have `last_entry_id` smaller than the entry we just deleted. It's legal since we only delete the entries that are not needed anymore.
When restarting a region, it should query the `last_purged_entry_id` from metasrv and replay from `min(last_entry_id, last_purged_entry_id)`.
### Error handling
No persisted states are needed since all states are maintained in kvbackend.
Retry when failed to retrieving metadata from kvbackend.
# Alternatives
Purge time can depend on the size of the WAL entries instead of a fixed period of time, which may be more efficient.

View File

@@ -4782,7 +4782,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Ingestion size by row counts.",
"description": "Current counts for stalled write requests by instance\n\nWrite stalls when memtable is full and pending for flush\n\n",
"fieldConfig": {
"defaults": {
"color": {
@@ -4844,7 +4844,7 @@
"x": 12,
"y": 138
},
"id": 277,
"id": 221,
"options": {
"legend": {
"calcs": [],
@@ -4864,14 +4864,14 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "rate(greptime_mito_write_rows_total{pod=~\"$datanode\"}[$__rate_interval])",
"expr": "sum by(pod) (greptime_mito_write_stall_total{pod=~\"$datanode\"})",
"instant": false,
"legendFormat": "{{pod}}",
"range": true,
"refId": "A"
}
],
"title": "Write Rows per Instance",
"title": "Write Stall per Instance",
"type": "timeseries"
},
{
@@ -4976,7 +4976,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Current counts for stalled write requests by instance\n\nWrite stalls when memtable is full and pending for flush\n\n",
"description": "Cache size by instance.\n",
"fieldConfig": {
"defaults": {
"color": {
@@ -5028,7 +5028,7 @@
}
]
},
"unit": "none"
"unit": "decbytes"
},
"overrides": []
},
@@ -5038,7 +5038,7 @@
"x": 12,
"y": 146
},
"id": 221,
"id": 229,
"options": {
"legend": {
"calcs": [],
@@ -5058,14 +5058,14 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "sum by(pod) (greptime_mito_write_stall_total{pod=~\"$datanode\"})",
"expr": "greptime_mito_cache_bytes{pod=~\"$datanode\"}",
"instant": false,
"legendFormat": "{{pod}}",
"legendFormat": "{{pod}}-{{type}}",
"range": true,
"refId": "A"
}
],
"title": "Write Stall per Instance",
"title": "Cached Bytes per Instance",
"type": "timeseries"
},
{
@@ -5172,7 +5172,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Cache size by instance.\n",
"description": "P99 latency of each type of reads by instance",
"fieldConfig": {
"defaults": {
"color": {
@@ -5224,7 +5224,7 @@
}
]
},
"unit": "decbytes"
"unit": "s"
},
"overrides": []
},
@@ -5234,13 +5234,17 @@
"x": 12,
"y": 154
},
"id": 229,
"id": 228,
"options": {
"legend": {
"calcs": [],
"calcs": [
"lastNotNull"
],
"displayMode": "table",
"placement": "bottom",
"showLegend": true
"showLegend": true,
"sortBy": "Last *",
"sortDesc": true
},
"tooltip": {
"mode": "single",
@@ -5254,14 +5258,14 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "greptime_mito_cache_bytes{pod=~\"$datanode\"}",
"expr": "histogram_quantile(0.99, sum by(pod, le, stage) (rate(greptime_mito_read_stage_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
"instant": false,
"legendFormat": "{{pod}}-{{type}}",
"legendFormat": "{{pod}}-{{stage}}-p99",
"range": true,
"refId": "A"
}
],
"title": "Cached Bytes per Instance",
"title": "Read Stage P99 per Instance",
"type": "timeseries"
},
{
@@ -5313,8 +5317,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@@ -5367,7 +5370,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "P99 latency of each type of reads by instance",
"description": "Latency of compaction task, at p99",
"fieldConfig": {
"defaults": {
"color": {
@@ -5411,8 +5414,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@@ -5430,7 +5432,7 @@
"x": 12,
"y": 162
},
"id": 228,
"id": 230,
"options": {
"legend": {
"calcs": [
@@ -5438,9 +5440,7 @@
],
"displayMode": "table",
"placement": "bottom",
"showLegend": true,
"sortBy": "Last *",
"sortDesc": true
"showLegend": true
},
"tooltip": {
"mode": "single",
@@ -5454,14 +5454,14 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, sum by(pod, le, stage) (rate(greptime_mito_read_stage_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
"expr": "histogram_quantile(0.99, sum by(pod, le) (rate(greptime_mito_compaction_total_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
"instant": false,
"legendFormat": "{{pod}}-{{stage}}-p99",
"legendFormat": "[{{pod}}]-compaction-p99",
"range": true,
"refId": "A"
}
],
"title": "Read Stage P99 per Instance",
"title": "Compaction P99 per Instance",
"type": "timeseries"
},
{
@@ -5570,7 +5570,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Latency of compaction task, at p99",
"description": "Compaction latency by stage",
"fieldConfig": {
"defaults": {
"color": {
@@ -5632,7 +5632,7 @@
"x": 12,
"y": 170
},
"id": 230,
"id": 232,
"options": {
"legend": {
"calcs": [
@@ -5654,9 +5654,9 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, sum by(pod, le) (rate(greptime_mito_compaction_total_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
"expr": "histogram_quantile(0.99, sum by(pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
"instant": false,
"legendFormat": "[{{pod}}]-compaction-p99",
"legendFormat": "{{pod}}-{{stage}}-p99",
"range": true,
"refId": "A"
}
@@ -5794,7 +5794,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Compaction latency by stage",
"description": "Write-ahead log operations latency at p99",
"fieldConfig": {
"defaults": {
"color": {
@@ -5856,13 +5856,13 @@
"x": 12,
"y": 178
},
"id": 232,
"id": 269,
"options": {
"legend": {
"calcs": [
"lastNotNull"
],
"displayMode": "table",
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
@@ -5878,14 +5878,14 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, sum by(pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
"expr": "histogram_quantile(0.99, sum by(le,logstore,optype,pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))",
"instant": false,
"legendFormat": "{{pod}}-{{stage}}-p99",
"legendFormat": "{{pod}}-{{logstore}}-{{optype}}-p99",
"range": true,
"refId": "A"
}
],
"title": "Compaction P99 per Instance",
"title": "Log Store op duration seconds",
"type": "timeseries"
},
{
@@ -5993,7 +5993,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Write-ahead log operations latency at p99",
"description": "Ongoing compaction task count",
"fieldConfig": {
"defaults": {
"color": {
@@ -6045,7 +6045,7 @@
}
]
},
"unit": "s"
"unit": "none"
},
"overrides": []
},
@@ -6055,13 +6055,13 @@
"x": 12,
"y": 186
},
"id": 269,
"id": 271,
"options": {
"legend": {
"calcs": [
"lastNotNull"
],
"displayMode": "list",
"displayMode": "table",
"placement": "bottom",
"showLegend": true
},
@@ -6078,14 +6078,14 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, sum by(le,logstore,optype,pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))",
"expr": "greptime_mito_inflight_compaction_count",
"instant": false,
"legendFormat": "{{pod}}-{{logstore}}-{{optype}}-p99",
"legendFormat": "{{pod}}",
"range": true,
"refId": "A"
}
],
"title": "Log Store op duration seconds",
"title": "Inflight Compaction",
"type": "timeseries"
},
{
@@ -6188,105 +6188,6 @@
"title": "Inflight Flush",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Ongoing compaction task count",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "points",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "none"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 194
},
"id": 271,
"options": {
"legend": {
"calcs": [
"lastNotNull"
],
"displayMode": "table",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.1.3",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "greptime_mito_inflight_compaction_count",
"instant": false,
"legendFormat": "{{pod}}",
"range": true,
"refId": "A"
}
],
"title": "Inflight Compaction",
"type": "timeseries"
},
{
"collapsed": false,
"gridPos": {

View File

@@ -15,13 +15,10 @@
use std::collections::HashMap;
use datatypes::schema::{
ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions,
SkippingIndexOptions, SkippingIndexType, COMMENT_KEY, FULLTEXT_KEY, INVERTED_INDEX_KEY,
SKIPPING_INDEX_KEY,
};
use greptime_proto::v1::{
Analyzer, FulltextBackend as PbFulltextBackend, SkippingIndexType as PbSkippingIndexType,
ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextOptions, SkippingIndexOptions,
SkippingIndexType, COMMENT_KEY, FULLTEXT_KEY, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY,
};
use greptime_proto::v1::{Analyzer, SkippingIndexType as PbSkippingIndexType};
use snafu::ResultExt;
use crate::error::{self, Result};
@@ -145,21 +142,13 @@ pub fn options_from_inverted() -> ColumnOptions {
}
/// Tries to construct a `FulltextAnalyzer` from the given analyzer.
pub fn as_fulltext_option_analyzer(analyzer: Analyzer) -> FulltextAnalyzer {
pub fn as_fulltext_option(analyzer: Analyzer) -> FulltextAnalyzer {
match analyzer {
Analyzer::English => FulltextAnalyzer::English,
Analyzer::Chinese => FulltextAnalyzer::Chinese,
}
}
/// Tries to construct a `FulltextBackend` from the given backend.
pub fn as_fulltext_option_backend(backend: PbFulltextBackend) -> FulltextBackend {
match backend {
PbFulltextBackend::Bloom => FulltextBackend::Bloom,
PbFulltextBackend::Tantivy => FulltextBackend::Tantivy,
}
}
/// Tries to construct a `SkippingIndexType` from the given skipping index type.
pub fn as_skipping_index_type(skipping_index_type: PbSkippingIndexType) -> SkippingIndexType {
match skipping_index_type {
@@ -171,7 +160,7 @@ pub fn as_skipping_index_type(skipping_index_type: PbSkippingIndexType) -> Skipp
mod tests {
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{FulltextAnalyzer, FulltextBackend};
use datatypes::schema::FulltextAnalyzer;
use super::*;
use crate::v1::ColumnDataType;
@@ -230,14 +219,13 @@ mod tests {
enable: true,
analyzer: FulltextAnalyzer::English,
case_sensitive: false,
backend: FulltextBackend::Bloom,
})
.unwrap();
schema.set_inverted_index(true);
let options = options_from_column_schema(&schema).unwrap();
assert_eq!(
options.options.get(FULLTEXT_GRPC_KEY).unwrap(),
"{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\"}"
"{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false}"
);
assert_eq!(
options.options.get(INVERTED_INDEX_GRPC_KEY).unwrap(),
@@ -251,12 +239,11 @@ mod tests {
enable: true,
analyzer: FulltextAnalyzer::English,
case_sensitive: false,
backend: FulltextBackend::Bloom,
};
let options = options_from_fulltext(&fulltext).unwrap().unwrap();
assert_eq!(
options.options.get(FULLTEXT_GRPC_KEY).unwrap(),
"{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\"}"
"{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false}"
);
}

View File

@@ -19,7 +19,7 @@ mod information_memory_table;
pub mod key_column_usage;
mod partitions;
mod procedure_info;
pub mod region_peers;
mod region_peers;
mod region_statistics;
mod runtime_metrics;
pub mod schemata;

View File

@@ -56,8 +56,6 @@ pub const TABLE_CATALOG: &str = "table_catalog";
pub const TABLE_SCHEMA: &str = "table_schema";
pub const TABLE_NAME: &str = "table_name";
pub const COLUMN_NAME: &str = "column_name";
pub const REGION_ID: &str = "region_id";
pub const PEER_ID: &str = "peer_id";
const ORDINAL_POSITION: &str = "ordinal_position";
const CHARACTER_MAXIMUM_LENGTH: &str = "character_maximum_length";
const CHARACTER_OCTET_LENGTH: &str = "character_octet_length";

View File

@@ -21,7 +21,6 @@ use common_error::ext::BoxedError;
use common_meta::rpc::router::RegionRoute;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::common::HashMap;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
@@ -44,22 +43,16 @@ use crate::kvbackend::KvBackendCatalogManager;
use crate::system_schema::information_schema::{InformationTable, Predicates};
use crate::CatalogManager;
pub const TABLE_CATALOG: &str = "table_catalog";
pub const TABLE_SCHEMA: &str = "table_schema";
pub const TABLE_NAME: &str = "table_name";
pub const REGION_ID: &str = "region_id";
pub const PEER_ID: &str = "peer_id";
const REGION_ID: &str = "region_id";
const PEER_ID: &str = "peer_id";
const PEER_ADDR: &str = "peer_addr";
pub const IS_LEADER: &str = "is_leader";
const IS_LEADER: &str = "is_leader";
const STATUS: &str = "status";
const DOWN_SECONDS: &str = "down_seconds";
const INIT_CAPACITY: usize = 42;
/// The `REGION_PEERS` table provides information about the region distribution and routes. Including fields:
///
/// - `table_catalog`: the table catalog name
/// - `table_schema`: the table schema name
/// - `table_name`: the table name
/// - `region_id`: the region id
/// - `peer_id`: the region storage datanode peer id
/// - `peer_addr`: the region storage datanode gRPC peer address
@@ -84,9 +77,6 @@ impl InformationSchemaRegionPeers {
pub(crate) fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(REGION_ID, ConcreteDataType::uint64_datatype(), false),
ColumnSchema::new(PEER_ID, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true),
@@ -144,9 +134,6 @@ struct InformationSchemaRegionPeersBuilder {
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
table_catalogs: StringVectorBuilder,
table_schemas: StringVectorBuilder,
table_names: StringVectorBuilder,
region_ids: UInt64VectorBuilder,
peer_ids: UInt64VectorBuilder,
peer_addrs: StringVectorBuilder,
@@ -165,9 +152,6 @@ impl InformationSchemaRegionPeersBuilder {
schema,
catalog_name,
catalog_manager,
table_catalogs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_schemas: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
region_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
peer_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
peer_addrs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
@@ -193,28 +177,24 @@ impl InformationSchemaRegionPeersBuilder {
let predicates = Predicates::from_scan_request(&request);
for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
let table_stream = catalog_manager
let table_id_stream = catalog_manager
.tables(&catalog_name, &schema_name, None)
.try_filter_map(|t| async move {
let table_info = t.table_info();
if table_info.table_type == TableType::Temporary {
Ok(None)
} else {
Ok(Some((
table_info.ident.table_id,
table_info.name.to_string(),
)))
Ok(Some(table_info.ident.table_id))
}
});
const BATCH_SIZE: usize = 128;
// Split tables into chunks
let mut table_chunks = pin!(table_stream.ready_chunks(BATCH_SIZE));
// Split table ids into chunks
let mut table_id_chunks = pin!(table_id_stream.ready_chunks(BATCH_SIZE));
while let Some(tables) = table_chunks.next().await {
let tables = tables.into_iter().collect::<Result<HashMap<_, _>>>()?;
let table_ids = tables.keys().cloned().collect::<Vec<_>>();
while let Some(table_ids) = table_id_chunks.next().await {
let table_ids = table_ids.into_iter().collect::<Result<Vec<_>>>()?;
let table_routes = if let Some(partition_manager) = &partition_manager {
partition_manager
@@ -226,16 +206,7 @@ impl InformationSchemaRegionPeersBuilder {
};
for (table_id, routes) in table_routes {
// Safety: table_id is guaranteed to be in the map
let table_name = tables.get(&table_id).unwrap();
self.add_region_peers(
&catalog_name,
&schema_name,
table_name,
&predicates,
table_id,
&routes,
);
self.add_region_peers(&predicates, table_id, &routes);
}
}
}
@@ -245,9 +216,6 @@ impl InformationSchemaRegionPeersBuilder {
fn add_region_peers(
&mut self,
table_catalog: &str,
table_schema: &str,
table_name: &str,
predicates: &Predicates,
table_id: TableId,
routes: &[RegionRoute],
@@ -263,20 +231,13 @@ impl InformationSchemaRegionPeersBuilder {
Some("ALIVE".to_string())
};
let row = [
(TABLE_CATALOG, &Value::from(table_catalog)),
(TABLE_SCHEMA, &Value::from(table_schema)),
(TABLE_NAME, &Value::from(table_name)),
(REGION_ID, &Value::from(region_id)),
];
let row = [(REGION_ID, &Value::from(region_id))];
if !predicates.eval(&row) {
return;
}
self.table_catalogs.push(Some(table_catalog));
self.table_schemas.push(Some(table_schema));
self.table_names.push(Some(table_name));
// TODO(dennis): adds followers.
self.region_ids.push(Some(region_id));
self.peer_ids.push(peer_id);
self.peer_addrs.push(peer_addr.as_deref());
@@ -284,26 +245,11 @@ impl InformationSchemaRegionPeersBuilder {
self.statuses.push(state.as_deref());
self.down_seconds
.push(route.leader_down_millis().map(|m| m / 1000));
for follower in &route.follower_peers {
self.table_catalogs.push(Some(table_catalog));
self.table_schemas.push(Some(table_schema));
self.table_names.push(Some(table_name));
self.region_ids.push(Some(region_id));
self.peer_ids.push(Some(follower.id));
self.peer_addrs.push(Some(follower.addr.as_str()));
self.is_leaders.push(Some("No"));
self.statuses.push(None);
self.down_seconds.push(None);
}
}
}
fn finish(&mut self) -> Result<RecordBatch> {
let columns: Vec<VectorRef> = vec![
Arc::new(self.table_catalogs.finish()),
Arc::new(self.table_schemas.finish()),
Arc::new(self.table_names.finish()),
Arc::new(self.region_ids.finish()),
Arc::new(self.peer_ids.finish()),
Arc::new(self.peer_addrs.finish()),

View File

@@ -177,7 +177,7 @@ fn create_table_info(table_id: TableId, table_name: TableName) -> RawTableInfo {
fn create_region_routes(regions: Vec<RegionNumber>) -> Vec<RegionRoute> {
let mut region_routes = Vec::with_capacity(100);
let mut rng = rand::rng();
let mut rng = rand::thread_rng();
for region_id in regions.into_iter().map(u64::from) {
region_routes.push(RegionRoute {
@@ -188,7 +188,7 @@ fn create_region_routes(regions: Vec<RegionNumber>) -> Vec<RegionRoute> {
attrs: BTreeMap::new(),
},
leader_peer: Some(Peer {
id: rng.random_range(0..10),
id: rng.gen_range(0..10),
addr: String::new(),
}),
follower_peers: vec![],

View File

@@ -16,6 +16,7 @@
mod client;
pub mod client_manager;
#[cfg(feature = "testing")]
mod database;
pub mod error;
pub mod flow;
@@ -33,6 +34,7 @@ pub use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use snafu::OptionExt;
pub use self::client::Client;
#[cfg(feature = "testing")]
pub use self::database::Database;
pub use self::error::{Error, Result};
use crate::error::{IllegalDatabaseResponseSnafu, ServerSnafu};

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use enum_dispatch::enum_dispatch;
use rand::seq::IndexedRandom;
use rand::seq::SliceRandom;
#[enum_dispatch]
pub trait LoadBalance {
@@ -37,7 +37,7 @@ pub struct Random;
impl LoadBalance for Random {
fn get_peer<'a>(&self, peers: &'a [String]) -> Option<&'a String> {
peers.choose(&mut rand::rng())
peers.choose(&mut rand::thread_rng())
}
}

View File

@@ -30,7 +30,7 @@ use datanode::datanode::{Datanode, DatanodeBuilder};
use datanode::service::DatanodeServiceBuilder;
use meta_client::{MetaClientOptions, MetaClientType};
use servers::Mode;
use snafu::{ensure, OptionExt, ResultExt};
use snafu::{OptionExt, ResultExt};
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{
@@ -223,14 +223,15 @@ impl StartCommand {
.get_or_insert_with(MetaClientOptions::default)
.metasrv_addrs
.clone_from(metasrv_addrs);
opts.mode = Mode::Distributed;
}
ensure!(
opts.node_id.is_some(),
MissingConfigSnafu {
msg: "Missing node id option"
if let (Mode::Distributed, None) = (&opts.mode, &opts.node_id) {
return MissingConfigSnafu {
msg: "Missing node id option",
}
);
.fail();
}
if let Some(data_home) = &self.data_home {
opts.storage.data_home.clone_from(data_home);
@@ -294,13 +295,10 @@ impl StartCommand {
msg: "'meta_client_options'",
})?;
let meta_client = meta_client::create_meta_client(
MetaClientType::Datanode { member_id },
meta_config,
None,
)
.await
.context(MetaClientInitSnafu)?;
let meta_client =
meta_client::create_meta_client(MetaClientType::Datanode { member_id }, meta_config)
.await
.context(MetaClientInitSnafu)?;
let meta_backend = Arc::new(MetaKvBackend {
client: meta_client.clone(),
@@ -313,7 +311,7 @@ impl StartCommand {
.build(),
);
let mut datanode = DatanodeBuilder::new(opts.clone(), plugins, Mode::Distributed)
let mut datanode = DatanodeBuilder::new(opts.clone(), plugins)
.with_meta_client(meta_client)
.with_kv_backend(meta_backend)
.with_cache_registry(layered_cache_registry)
@@ -335,7 +333,6 @@ impl StartCommand {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::io::Write;
use std::time::Duration;
@@ -343,6 +340,7 @@ mod tests {
use common_test_util::temp_dir::create_named_temp_file;
use datanode::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config};
use servers::heartbeat_options::HeartbeatOptions;
use servers::Mode;
use super::*;
use crate::options::GlobalOptions;
@@ -408,7 +406,7 @@ mod tests {
sync_write = false
[storage]
data_home = "./greptimedb_data/"
data_home = "/tmp/greptimedb/"
type = "File"
[[storage.providers]]
@@ -422,7 +420,7 @@ mod tests {
[logging]
level = "debug"
dir = "./greptimedb_data/test/logs"
dir = "/tmp/greptimedb/test/logs"
"#;
write!(file, "{}", toml_str).unwrap();
@@ -469,7 +467,7 @@ mod tests {
assert_eq!(10000, ddl_timeout.as_millis());
assert_eq!(3000, timeout.as_millis());
assert!(tcp_nodelay);
assert_eq!("./greptimedb_data/", options.storage.data_home);
assert_eq!("/tmp/greptimedb/", options.storage.data_home);
assert!(matches!(
&options.storage.store,
ObjectStoreConfig::File(FileConfig { .. })
@@ -485,14 +483,27 @@ mod tests {
));
assert_eq!("debug", options.logging.level.unwrap());
assert_eq!(
"./greptimedb_data/test/logs".to_string(),
options.logging.dir
);
assert_eq!("/tmp/greptimedb/test/logs".to_string(), options.logging.dir);
}
#[test]
fn test_try_from_cmd() {
let opt = StartCommand::default()
.load_options(&GlobalOptions::default())
.unwrap()
.component;
assert_eq!(Mode::Standalone, opt.mode);
let opt = (StartCommand {
node_id: Some(42),
metasrv_addrs: Some(vec!["127.0.0.1:3002".to_string()]),
..Default::default()
})
.load_options(&GlobalOptions::default())
.unwrap()
.component;
assert_eq!(Mode::Distributed, opt.mode);
assert!((StartCommand {
metasrv_addrs: Some(vec!["127.0.0.1:3002".to_string()]),
..Default::default()
@@ -511,23 +522,11 @@ mod tests {
#[test]
fn test_load_log_options_from_cli() {
let mut cmd = StartCommand::default();
let result = cmd.load_options(&GlobalOptions {
log_dir: Some("./greptimedb_data/test/logs".to_string()),
log_level: Some("debug".to_string()),
#[cfg(feature = "tokio-console")]
tokio_console_addr: None,
});
// Missing node_id.
assert_matches!(result, Err(crate::error::Error::MissingConfig { .. }));
cmd.node_id = Some(42);
let cmd = StartCommand::default();
let options = cmd
.load_options(&GlobalOptions {
log_dir: Some("./greptimedb_data/test/logs".to_string()),
log_dir: Some("/tmp/greptimedb/test/logs".to_string()),
log_level: Some("debug".to_string()),
#[cfg(feature = "tokio-console")]
@@ -537,7 +536,7 @@ mod tests {
.component;
let logging_opt = options.logging;
assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
assert_eq!("/tmp/greptimedb/test/logs", logging_opt.dir);
assert_eq!("debug", logging_opt.level.as_ref().unwrap());
}
@@ -566,11 +565,11 @@ mod tests {
[storage]
type = "File"
data_home = "./greptimedb_data/"
data_home = "/tmp/greptimedb/"
[logging]
level = "debug"
dir = "./greptimedb_data/test/logs"
dir = "/tmp/greptimedb/test/logs"
"#;
write!(file, "{}", toml_str).unwrap();

View File

@@ -100,13 +100,6 @@ pub enum Error {
source: flow::Error,
},
#[snafu(display("Servers error"))]
Servers {
#[snafu(implicit)]
location: Location,
source: servers::error::Error,
},
#[snafu(display("Failed to start frontend"))]
StartFrontend {
#[snafu(implicit)]
@@ -372,7 +365,6 @@ impl ErrorExt for Error {
Error::ShutdownFrontend { source, .. } => source.status_code(),
Error::StartMetaServer { source, .. } => source.status_code(),
Error::ShutdownMetaServer { source, .. } => source.status_code(),
Error::Servers { source, .. } => source.status_code(),
Error::BuildMetaServer { source, .. } => source.status_code(),
Error::UnsupportedSelectorType { source, .. } => source.status_code(),
Error::BuildCli { source, .. } => source.status_code(),

View File

@@ -34,7 +34,8 @@ use common_telemetry::logging::TracingOptions;
use common_version::{short_version, version};
use flow::{FlownodeBuilder, FlownodeInstance, FrontendInvoker};
use meta_client::{MetaClientOptions, MetaClientType};
use snafu::{ensure, OptionExt, ResultExt};
use servers::Mode;
use snafu::{OptionExt, ResultExt};
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{
@@ -202,6 +203,7 @@ impl StartCommand {
.get_or_insert_with(MetaClientOptions::default)
.metasrv_addrs
.clone_from(metasrv_addrs);
opts.mode = Mode::Distributed;
}
if let Some(http_addr) = &self.http_addr {
@@ -212,12 +214,12 @@ impl StartCommand {
opts.http.timeout = Duration::from_secs(http_timeout);
}
ensure!(
opts.node_id.is_some(),
MissingConfigSnafu {
msg: "Missing node id option"
if let (Mode::Distributed, None) = (&opts.mode, &opts.node_id) {
return MissingConfigSnafu {
msg: "Missing node id option",
}
);
.fail();
}
Ok(())
}
@@ -247,13 +249,10 @@ impl StartCommand {
msg: "'meta_client_options'",
})?;
let meta_client = meta_client::create_meta_client(
MetaClientType::Flownode { member_id },
meta_config,
None,
)
.await
.context(MetaClientInitSnafu)?;
let meta_client =
meta_client::create_meta_client(MetaClientType::Flownode { member_id }, meta_config)
.await
.context(MetaClientInitSnafu)?;
let cache_max_capacity = meta_config.metadata_cache_max_capacity;
let cache_ttl = meta_config.metadata_cache_ttl;

View File

@@ -32,25 +32,28 @@ use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
use common_time::timezone::set_default_timezone;
use common_version::{short_version, version};
use frontend::frontend::Frontend;
use frontend::heartbeat::HeartbeatTask;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use frontend::server::Services;
use meta_client::{MetaClientOptions, MetaClientType};
use query::stats::StatementStatistics;
use servers::export_metrics::ExportMetricsTask;
use servers::tls::{TlsMode, TlsOption};
use snafu::{OptionExt, ResultExt};
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{self, Result};
use crate::error::{
self, InitTimezoneSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu, MissingConfigSnafu,
Result, StartFrontendSnafu,
};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{log_versions, App};
type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>;
pub struct Instance {
frontend: Frontend,
frontend: FeInstance,
// Keep the logging guard to prevent the worker from being dropped.
_guard: Vec<WorkerGuard>,
}
@@ -58,17 +61,20 @@ pub struct Instance {
pub const APP_NAME: &str = "greptime-frontend";
impl Instance {
pub fn new(frontend: Frontend, _guard: Vec<WorkerGuard>) -> Self {
Self { frontend, _guard }
pub fn new(frontend: FeInstance, guard: Vec<WorkerGuard>) -> Self {
Self {
frontend,
_guard: guard,
}
}
pub fn inner(&self) -> &Frontend {
&self.frontend
}
pub fn mut_inner(&mut self) -> &mut Frontend {
pub fn mut_inner(&mut self) -> &mut FeInstance {
&mut self.frontend
}
pub fn inner(&self) -> &FeInstance {
&self.frontend
}
}
#[async_trait]
@@ -78,15 +84,11 @@ impl App for Instance {
}
async fn start(&mut self) -> Result<()> {
let plugins = self.frontend.instance.plugins().clone();
plugins::start_frontend_plugins(plugins)
plugins::start_frontend_plugins(self.frontend.plugins().clone())
.await
.context(error::StartFrontendSnafu)?;
.context(StartFrontendSnafu)?;
self.frontend
.start()
.await
.context(error::StartFrontendSnafu)
self.frontend.start().await.context(StartFrontendSnafu)
}
async fn stop(&self) -> Result<()> {
@@ -176,7 +178,7 @@ impl StartCommand {
self.config_file.as_deref(),
self.env_prefix.as_ref(),
)
.context(error::LoadLayeredConfigSnafu)?;
.context(LoadLayeredConfigSnafu)?;
self.merge_with_cli_options(global_options, &mut opts)?;
@@ -281,28 +283,22 @@ impl StartCommand {
let mut plugins = Plugins::new();
plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &opts)
.await
.context(error::StartFrontendSnafu)?;
.context(StartFrontendSnafu)?;
set_default_timezone(opts.default_timezone.as_deref()).context(error::InitTimezoneSnafu)?;
set_default_timezone(opts.default_timezone.as_deref()).context(InitTimezoneSnafu)?;
let meta_client_options = opts
.meta_client
.as_ref()
.context(error::MissingConfigSnafu {
msg: "'meta_client'",
})?;
let meta_client_options = opts.meta_client.as_ref().context(MissingConfigSnafu {
msg: "'meta_client'",
})?;
let cache_max_capacity = meta_client_options.metadata_cache_max_capacity;
let cache_ttl = meta_client_options.metadata_cache_ttl;
let cache_tti = meta_client_options.metadata_cache_tti;
let meta_client = meta_client::create_meta_client(
MetaClientType::Frontend,
meta_client_options,
Some(&plugins),
)
.await
.context(error::MetaClientInitSnafu)?;
let meta_client =
meta_client::create_meta_client(MetaClientType::Frontend, meta_client_options)
.await
.context(MetaClientInitSnafu)?;
// TODO(discord9): add helper function to ease the creation of cache registry&such
let cached_meta_backend =
@@ -349,7 +345,6 @@ impl StartCommand {
opts.heartbeat.clone(),
Arc::new(executor),
);
let heartbeat_task = Some(heartbeat_task);
// frontend to datanode need not timeout.
// Some queries are expected to take long time.
@@ -361,7 +356,7 @@ impl StartCommand {
};
let client = NodeClients::new(channel_config);
let instance = FrontendBuilder::new(
let mut instance = FrontendBuilder::new(
opts.clone(),
cached_meta_backend.clone(),
layered_cache_registry.clone(),
@@ -372,27 +367,20 @@ impl StartCommand {
)
.with_plugin(plugins.clone())
.with_local_cache_invalidator(layered_cache_registry)
.with_heartbeat_task(heartbeat_task)
.try_build()
.await
.context(error::StartFrontendSnafu)?;
let instance = Arc::new(instance);
.context(StartFrontendSnafu)?;
let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
.context(error::ServersSnafu)?;
let servers = Services::new(opts, instance.clone(), plugins)
let servers = Services::new(opts, Arc::new(instance.clone()), plugins)
.build()
.await
.context(error::StartFrontendSnafu)?;
.context(StartFrontendSnafu)?;
instance
.build_servers(servers)
.context(StartFrontendSnafu)?;
let frontend = Frontend {
instance,
servers,
heartbeat_task,
export_metrics_task,
};
Ok(Instance::new(frontend, guard))
Ok(Instance::new(instance, guard))
}
}
@@ -452,7 +440,7 @@ mod tests {
[http]
addr = "127.0.0.1:4000"
timeout = "0s"
timeout = "30s"
body_limit = "2GB"
[opentsdb]
@@ -460,7 +448,7 @@ mod tests {
[logging]
level = "debug"
dir = "./greptimedb_data/test/logs"
dir = "/tmp/greptimedb/test/logs"
"#;
write!(file, "{}", toml_str).unwrap();
@@ -473,15 +461,12 @@ mod tests {
let fe_opts = command.load_options(&Default::default()).unwrap().component;
assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
assert_eq!(Duration::from_secs(0), fe_opts.http.timeout);
assert_eq!(Duration::from_secs(30), fe_opts.http.timeout);
assert_eq!(ReadableSize::gb(2), fe_opts.http.body_limit);
assert_eq!("debug", fe_opts.logging.level.as_ref().unwrap());
assert_eq!(
"./greptimedb_data/test/logs".to_string(),
fe_opts.logging.dir
);
assert_eq!("/tmp/greptimedb/test/logs".to_string(), fe_opts.logging.dir);
assert!(!fe_opts.opentsdb.enable);
}
@@ -520,7 +505,7 @@ mod tests {
let options = cmd
.load_options(&GlobalOptions {
log_dir: Some("./greptimedb_data/test/logs".to_string()),
log_dir: Some("/tmp/greptimedb/test/logs".to_string()),
log_level: Some("debug".to_string()),
#[cfg(feature = "tokio-console")]
@@ -530,7 +515,7 @@ mod tests {
.component;
let logging_opt = options.logging;
assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
assert_eq!("/tmp/greptimedb/test/logs", logging_opt.dir);
assert_eq!("debug", logging_opt.level.as_ref().unwrap());
}

View File

@@ -337,7 +337,7 @@ mod tests {
[logging]
level = "debug"
dir = "./greptimedb_data/test/logs"
dir = "/tmp/greptimedb/test/logs"
[failure_detector]
threshold = 8.0
@@ -358,10 +358,7 @@ mod tests {
assert_eq!(vec!["127.0.0.1:2379".to_string()], options.store_addrs);
assert_eq!(SelectorType::LeaseBased, options.selector);
assert_eq!("debug", options.logging.level.as_ref().unwrap());
assert_eq!(
"./greptimedb_data/test/logs".to_string(),
options.logging.dir
);
assert_eq!("/tmp/greptimedb/test/logs".to_string(), options.logging.dir);
assert_eq!(8.0, options.failure_detector.threshold);
assert_eq!(
100.0,
@@ -399,7 +396,7 @@ mod tests {
let options = cmd
.load_options(&GlobalOptions {
log_dir: Some("./greptimedb_data/test/logs".to_string()),
log_dir: Some("/tmp/greptimedb/test/logs".to_string()),
log_level: Some("debug".to_string()),
#[cfg(feature = "tokio-console")]
@@ -409,7 +406,7 @@ mod tests {
.component;
let logging_opt = options.logging;
assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
assert_eq!("/tmp/greptimedb/test/logs", logging_opt.dir);
assert_eq!("debug", logging_opt.level.as_ref().unwrap());
}
@@ -427,7 +424,7 @@ mod tests {
[logging]
level = "debug"
dir = "./greptimedb_data/test/logs"
dir = "/tmp/greptimedb/test/logs"
"#;
write!(file, "{}", toml_str).unwrap();

View File

@@ -42,7 +42,6 @@ use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::{build_wal_options_allocator, WalOptionsAllocatorRef};
use common_procedure::{ProcedureInfo, ProcedureManagerRef};
@@ -56,9 +55,9 @@ use datanode::datanode::{Datanode, DatanodeBuilder};
use datanode::region_server::RegionServer;
use file_engine::config::EngineConfig as FileEngineConfig;
use flow::{FlowConfig, FlowWorkerManager, FlownodeBuilder, FlownodeOptions, FrontendInvoker};
use frontend::frontend::{Frontend, FrontendOptions};
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{Instance as FeInstance, StandaloneDatanodeManager};
use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager};
use frontend::server::Services;
use frontend::service_config::{
InfluxdbOptions, JaegerOptions, MysqlOptions, OpentsdbOptions, PostgresOptions,
@@ -68,7 +67,7 @@ use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
use mito2::config::MitoConfig;
use query::stats::StatementStatistics;
use serde::{Deserialize, Serialize};
use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
use servers::export_metrics::ExportMetricsOption;
use servers::grpc::GrpcOptions;
use servers::http::HttpOptions;
use servers::tls::{TlsMode, TlsOption};
@@ -77,9 +76,15 @@ use snafu::ResultExt;
use tokio::sync::{broadcast, RwLock};
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::Result;
use crate::error::{
BuildCacheRegistrySnafu, BuildWalOptionsAllocatorSnafu, CreateDirSnafu, IllegalConfigSnafu,
InitDdlManagerSnafu, InitMetadataSnafu, InitTimezoneSnafu, LoadLayeredConfigSnafu, OtherSnafu,
Result, ShutdownDatanodeSnafu, ShutdownFlownodeSnafu, ShutdownFrontendSnafu,
StartDatanodeSnafu, StartFlownodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu,
StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu,
};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{error, log_versions, App};
use crate::{log_versions, App};
pub const APP_NAME: &str = "greptime-standalone";
@@ -127,6 +132,7 @@ impl SubCommand {
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(default)]
pub struct StandaloneOptions {
pub mode: Mode,
pub enable_telemetry: bool,
pub default_timezone: Option<String>,
pub http: HttpOptions,
@@ -156,6 +162,7 @@ pub struct StandaloneOptions {
impl Default for StandaloneOptions {
fn default() -> Self {
Self {
mode: Mode::Standalone,
enable_telemetry: true,
default_timezone: None,
http: HttpOptions::default(),
@@ -236,6 +243,7 @@ impl StandaloneOptions {
grpc: cloned_opts.grpc,
init_regions_in_background: cloned_opts.init_regions_in_background,
init_regions_parallelism: cloned_opts.init_regions_parallelism,
mode: Mode::Standalone,
..Default::default()
}
}
@@ -243,12 +251,13 @@ impl StandaloneOptions {
pub struct Instance {
datanode: Datanode,
frontend: Frontend,
frontend: FeInstance,
// TODO(discord9): wrapped it in flownode instance instead
flow_worker_manager: Arc<FlowWorkerManager>,
flow_shutdown: broadcast::Sender<()>,
procedure_manager: ProcedureManagerRef,
wal_options_allocator: WalOptionsAllocatorRef,
// Keep the logging guard to prevent the worker from being dropped.
_guard: Vec<WorkerGuard>,
}
@@ -272,26 +281,21 @@ impl App for Instance {
self.procedure_manager
.start()
.await
.context(error::StartProcedureManagerSnafu)?;
.context(StartProcedureManagerSnafu)?;
self.wal_options_allocator
.start()
.await
.context(error::StartWalOptionsAllocatorSnafu)?;
.context(StartWalOptionsAllocatorSnafu)?;
plugins::start_frontend_plugins(self.frontend.instance.plugins().clone())
plugins::start_frontend_plugins(self.frontend.plugins().clone())
.await
.context(error::StartFrontendSnafu)?;
self.frontend
.start()
.await
.context(error::StartFrontendSnafu)?;
.context(StartFrontendSnafu)?;
self.frontend.start().await.context(StartFrontendSnafu)?;
self.flow_worker_manager
.clone()
.run_background(Some(self.flow_shutdown.subscribe()));
Ok(())
}
@@ -299,18 +303,17 @@ impl App for Instance {
self.frontend
.shutdown()
.await
.context(error::ShutdownFrontendSnafu)?;
.context(ShutdownFrontendSnafu)?;
self.procedure_manager
.stop()
.await
.context(error::StopProcedureManagerSnafu)?;
.context(StopProcedureManagerSnafu)?;
self.datanode
.shutdown()
.await
.context(error::ShutdownDatanodeSnafu)?;
.context(ShutdownDatanodeSnafu)?;
self.flow_shutdown
.send(())
.map_err(|_e| {
@@ -319,8 +322,7 @@ impl App for Instance {
}
.build()
})
.context(error::ShutdownFlownodeSnafu)?;
.context(ShutdownFlownodeSnafu)?;
info!("Datanode instance stopped.");
Ok(())
@@ -366,7 +368,7 @@ impl StartCommand {
self.config_file.as_deref(),
self.env_prefix.as_ref(),
)
.context(error::LoadLayeredConfigSnafu)?;
.context(LoadLayeredConfigSnafu)?;
self.merge_with_cli_options(global_options, &mut opts.component)?;
@@ -379,6 +381,9 @@ impl StartCommand {
global_options: &GlobalOptions,
opts: &mut StandaloneOptions,
) -> Result<()> {
// Should always be standalone mode.
opts.mode = Mode::Standalone;
if let Some(dir) = &global_options.log_dir {
opts.logging.dir.clone_from(dir);
}
@@ -410,7 +415,7 @@ impl StartCommand {
// frontend grpc addr conflict with datanode default grpc addr
let datanode_grpc_addr = DatanodeOptions::default().grpc.bind_addr;
if addr.eq(&datanode_grpc_addr) {
return error::IllegalConfigSnafu {
return IllegalConfigSnafu {
msg: format!(
"gRPC listen address conflicts with datanode reserved gRPC addr: {datanode_grpc_addr}",
),
@@ -469,19 +474,18 @@ impl StartCommand {
plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &fe_opts)
.await
.context(error::StartFrontendSnafu)?;
.context(StartFrontendSnafu)?;
plugins::setup_datanode_plugins(&mut plugins, &plugin_opts, &dn_opts)
.await
.context(error::StartDatanodeSnafu)?;
.context(StartDatanodeSnafu)?;
set_default_timezone(fe_opts.default_timezone.as_deref())
.context(error::InitTimezoneSnafu)?;
set_default_timezone(fe_opts.default_timezone.as_deref()).context(InitTimezoneSnafu)?;
let data_home = &dn_opts.storage.data_home;
// Ensure the data_home directory exists.
fs::create_dir_all(path::Path::new(data_home))
.context(error::CreateDirSnafu { dir: data_home })?;
.context(CreateDirSnafu { dir: data_home })?;
let metadata_dir = metadata_store_dir(data_home);
let (kv_backend, procedure_manager) = FeInstance::try_build_standalone_components(
@@ -490,7 +494,7 @@ impl StartCommand {
opts.procedure,
)
.await
.context(error::StartFrontendSnafu)?;
.context(StartFrontendSnafu)?;
// Builds cache registry
let layered_cache_builder = LayeredCacheRegistryBuilder::default();
@@ -499,16 +503,16 @@ impl StartCommand {
with_default_composite_cache_registry(
layered_cache_builder.add_cache_registry(fundamental_cache_registry),
)
.context(error::BuildCacheRegistrySnafu)?
.context(BuildCacheRegistrySnafu)?
.build(),
);
let datanode = DatanodeBuilder::new(dn_opts, plugins.clone(), Mode::Standalone)
let datanode = DatanodeBuilder::new(dn_opts, plugins.clone())
.with_kv_backend(kv_backend.clone())
.with_cache_registry(layered_cache_registry.clone())
.build()
.await
.context(error::StartDatanodeSnafu)?;
.context(StartDatanodeSnafu)?;
let information_extension = Arc::new(StandaloneInformationExtension::new(
datanode.region_server(),
@@ -541,7 +545,7 @@ impl StartCommand {
.build()
.await
.map_err(BoxedError::new)
.context(error::OtherSnafu)?,
.context(OtherSnafu)?,
);
// set the ref to query for the local flow state
@@ -572,7 +576,7 @@ impl StartCommand {
let kafka_options = opts.wal.clone().into();
let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone())
.await
.context(error::BuildWalOptionsAllocatorSnafu)?;
.context(BuildWalOptionsAllocatorSnafu)?;
let wal_options_allocator = Arc::new(wal_options_allocator);
let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
table_id_sequence,
@@ -593,8 +597,8 @@ impl StartCommand {
)
.await?;
let fe_instance = FrontendBuilder::new(
fe_opts.clone(),
let mut frontend = FrontendBuilder::new(
fe_opts,
kv_backend.clone(),
layered_cache_registry.clone(),
catalog_manager.clone(),
@@ -605,8 +609,7 @@ impl StartCommand {
.with_plugin(plugins.clone())
.try_build()
.await
.context(error::StartFrontendSnafu)?;
let fe_instance = Arc::new(fe_instance);
.context(StartFrontendSnafu)?;
let flow_worker_manager = flownode.flow_worker_manager();
// flow server need to be able to use frontend to write insert requests back
@@ -619,25 +622,18 @@ impl StartCommand {
node_manager,
)
.await
.context(error::StartFlownodeSnafu)?;
.context(StartFlownodeSnafu)?;
flow_worker_manager.set_frontend_invoker(invoker).await;
let (tx, _rx) = broadcast::channel(1);
let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
.context(error::ServersSnafu)?;
let servers = Services::new(opts, fe_instance.clone(), plugins)
let servers = Services::new(opts, Arc::new(frontend.clone()), plugins)
.build()
.await
.context(error::StartFrontendSnafu)?;
let frontend = Frontend {
instance: fe_instance,
servers,
heartbeat_task: None,
export_metrics_task,
};
.context(StartFrontendSnafu)?;
frontend
.build_servers(servers)
.context(StartFrontendSnafu)?;
Ok(Instance {
datanode,
@@ -665,7 +661,6 @@ impl StartCommand {
node_manager,
cache_invalidator,
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
table_metadata_manager,
table_metadata_allocator,
flow_metadata_manager,
@@ -675,7 +670,7 @@ impl StartCommand {
procedure_manager,
true,
)
.context(error::InitDdlManagerSnafu)?,
.context(InitDdlManagerSnafu)?,
);
Ok(procedure_executor)
@@ -689,7 +684,7 @@ impl StartCommand {
table_metadata_manager
.init()
.await
.context(error::InitMetadataSnafu)?;
.context(InitMetadataSnafu)?;
Ok(table_metadata_manager)
}
@@ -783,7 +778,6 @@ impl InformationExtension for StandaloneInformationExtension {
manifest_size: region_stat.manifest_size,
sst_size: region_stat.sst_size,
index_size: region_stat.index_size,
region_manifest: region_stat.manifest.into(),
}
})
.collect::<Vec<_>>();
@@ -858,7 +852,7 @@ mod tests {
[wal]
provider = "raft_engine"
dir = "./greptimedb_data/test/wal"
dir = "/tmp/greptimedb/test/wal"
file_size = "1GB"
purge_threshold = "50GB"
purge_interval = "10m"
@@ -866,7 +860,7 @@ mod tests {
sync_write = false
[storage]
data_home = "./greptimedb_data/"
data_home = "/tmp/greptimedb/"
type = "File"
[[storage.providers]]
@@ -898,7 +892,7 @@ mod tests {
[logging]
level = "debug"
dir = "./greptimedb_data/test/logs"
dir = "/tmp/greptimedb/test/logs"
"#;
write!(file, "{}", toml_str).unwrap();
let cmd = StartCommand {
@@ -928,10 +922,7 @@ mod tests {
let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
unreachable!()
};
assert_eq!(
"./greptimedb_data/test/wal",
raft_engine_config.dir.unwrap()
);
assert_eq!("/tmp/greptimedb/test/wal", raft_engine_config.dir.unwrap());
assert!(matches!(
&dn_opts.storage.store,
@@ -955,7 +946,7 @@ mod tests {
}
assert_eq!("debug", logging_opts.level.as_ref().unwrap());
assert_eq!("./greptimedb_data/test/logs".to_string(), logging_opts.dir);
assert_eq!("/tmp/greptimedb/test/logs".to_string(), logging_opts.dir);
}
#[test]
@@ -967,7 +958,7 @@ mod tests {
let opts = cmd
.load_options(&GlobalOptions {
log_dir: Some("./greptimedb_data/test/logs".to_string()),
log_dir: Some("/tmp/greptimedb/test/logs".to_string()),
log_level: Some("debug".to_string()),
#[cfg(feature = "tokio-console")]
@@ -976,7 +967,7 @@ mod tests {
.unwrap()
.component;
assert_eq!("./greptimedb_data/test/logs", opts.logging.dir);
assert_eq!("/tmp/greptimedb/test/logs", opts.logging.dir);
assert_eq!("debug", opts.logging.level.unwrap());
}
@@ -1060,6 +1051,7 @@ mod tests {
let options =
StandaloneOptions::load_layered_options(None, "GREPTIMEDB_STANDALONE").unwrap();
let default_options = StandaloneOptions::default();
assert_eq!(options.mode, default_options.mode);
assert_eq!(options.enable_telemetry, default_options.enable_telemetry);
assert_eq!(options.http, default_options.http);
assert_eq!(options.grpc, default_options.grpc);

View File

@@ -56,13 +56,13 @@ fn test_load_datanode_example_config() {
metadata_cache_tti: Duration::from_secs(300),
}),
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
dir: Some("./greptimedb_data/wal".to_string()),
dir: Some("/tmp/greptimedb/wal".to_string()),
sync_period: Some(Duration::from_secs(10)),
recovery_parallelism: 2,
..Default::default()
}),
storage: StorageConfig {
data_home: "./greptimedb_data/".to_string(),
data_home: "/tmp/greptimedb/".to_string(),
..Default::default()
},
region_engine: vec![
@@ -159,17 +159,17 @@ fn test_load_metasrv_example_config() {
let expected = GreptimeOptions::<MetasrvOptions> {
component: MetasrvOptions {
selector: SelectorType::default(),
data_home: "./greptimedb_data/metasrv/".to_string(),
data_home: "/tmp/metasrv/".to_string(),
server_addr: "127.0.0.1:3002".to_string(),
logging: LoggingOptions {
dir: "./greptimedb_data/logs".to_string(),
dir: "/tmp/greptimedb/logs".to_string(),
level: Some("info".to_string()),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
slow_query: SlowQueryOptions {
enable: false,
threshold: None,
sample_ratio: None,
threshold: Some(Duration::from_secs(10)),
sample_ratio: Some(1.0),
},
..Default::default()
},
@@ -202,7 +202,7 @@ fn test_load_standalone_example_config() {
component: StandaloneOptions {
default_timezone: Some("UTC".to_string()),
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
dir: Some("./greptimedb_data/wal".to_string()),
dir: Some("/tmp/greptimedb/wal".to_string()),
sync_period: Some(Duration::from_secs(10)),
recovery_parallelism: 2,
..Default::default()
@@ -219,7 +219,7 @@ fn test_load_standalone_example_config() {
}),
],
storage: StorageConfig {
data_home: "./greptimedb_data/".to_string(),
data_home: "/tmp/greptimedb/".to_string(),
..Default::default()
},
logging: LoggingOptions {

View File

@@ -135,14 +135,5 @@ pub fn is_readonly_schema(schema: &str) -> bool {
pub const TRACE_ID_COLUMN: &str = "trace_id";
pub const SPAN_ID_COLUMN: &str = "span_id";
pub const SPAN_NAME_COLUMN: &str = "span_name";
pub const SERVICE_NAME_COLUMN: &str = "service_name";
pub const PARENT_SPAN_ID_COLUMN: &str = "parent_span_id";
pub const TRACE_TABLE_NAME: &str = "opentelemetry_traces";
pub const TRACE_TABLE_NAME_SESSION_KEY: &str = "trace_table_name";
// ---- End of special table and fields ----
/// Generate the trace services table name from the trace table name by adding `_services` suffix.
pub fn trace_services_table_name(trace_table_name: &str) -> String {
format!("{}_services", trace_table_name)
}
// ---- End of special table and fields ----

View File

@@ -161,7 +161,7 @@ mod tests {
[wal]
provider = "raft_engine"
dir = "./greptimedb_data/wal"
dir = "/tmp/greptimedb/wal"
file_size = "1GB"
purge_threshold = "50GB"
purge_interval = "10m"
@@ -170,7 +170,7 @@ mod tests {
[logging]
level = "debug"
dir = "./greptimedb_data/test/logs"
dir = "/tmp/greptimedb/test/logs"
"#;
write!(file, "{}", toml_str).unwrap();
@@ -246,7 +246,7 @@ mod tests {
let DatanodeWalConfig::RaftEngine(raft_engine_config) = opts.wal else {
unreachable!()
};
assert_eq!(raft_engine_config.dir.unwrap(), "./greptimedb_data/wal");
assert_eq!(raft_engine_config.dir.unwrap(), "/tmp/greptimedb/wal");
// Should be default values.
assert_eq!(opts.node_id, None);

View File

@@ -39,7 +39,6 @@ geohash = { version = "0.13", optional = true }
h3o = { version = "0.6", optional = true }
hyperloglogplus = "0.4"
jsonb.workspace = true
memchr = "2.7"
nalgebra.workspace = true
num = "0.4"
num-traits = "0.2"

View File

@@ -12,19 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod add_region_follower;
mod flush_compact_region;
mod flush_compact_table;
mod migrate_region;
mod remove_region_follower;
use std::sync::Arc;
use add_region_follower::AddRegionFollowerFunction;
use flush_compact_region::{CompactRegionFunction, FlushRegionFunction};
use flush_compact_table::{CompactTableFunction, FlushTableFunction};
use migrate_region::MigrateRegionFunction;
use remove_region_follower::RemoveRegionFollowerFunction;
use crate::flush_flow::FlushFlowFunction;
use crate::function_registry::FunctionRegistry;
@@ -36,8 +32,6 @@ impl AdminFunction {
/// Register all table functions to [`FunctionRegistry`].
pub fn register(registry: &FunctionRegistry) {
registry.register_async(Arc::new(MigrateRegionFunction));
registry.register_async(Arc::new(AddRegionFollowerFunction));
registry.register_async(Arc::new(RemoveRegionFollowerFunction));
registry.register_async(Arc::new(FlushRegionFunction));
registry.register_async(Arc::new(CompactRegionFunction));
registry.register_async(Arc::new(FlushTableFunction));

View File

@@ -1,129 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_macro::admin_fn;
use common_meta::rpc::procedure::AddRegionFollowerRequest;
use common_query::error::{
InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result,
UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, TypeSignature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::value::{Value, ValueRef};
use session::context::QueryContextRef;
use snafu::ensure;
use crate::handlers::ProcedureServiceHandlerRef;
use crate::helper::cast_u64;
/// A function to add a follower to a region.
/// Only available in cluster mode.
///
/// - `add_region_follower(region_id, peer_id)`.
///
/// The parameters:
/// - `region_id`: the region id
/// - `peer_id`: the peer id
#[admin_fn(
name = AddRegionFollowerFunction,
display_name = add_region_follower,
sig_fn = signature,
ret = uint64
)]
pub(crate) async fn add_region_follower(
procedure_service_handler: &ProcedureServiceHandlerRef,
_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
params.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly 2, have: {}",
params.len()
),
}
);
let Some(region_id) = cast_u64(&params[0])? else {
return UnsupportedInputDataTypeSnafu {
function: "add_region_follower",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
let Some(peer_id) = cast_u64(&params[1])? else {
return UnsupportedInputDataTypeSnafu {
function: "add_region_follower",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
procedure_service_handler
.add_region_follower(AddRegionFollowerRequest { region_id, peer_id })
.await?;
Ok(Value::from(0u64))
}
fn signature() -> Signature {
Signature::one_of(
vec![
// add_region_follower(region_id, peer)
TypeSignature::Uniform(2, ConcreteDataType::numerics()),
],
Volatility::Immutable,
)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_query::prelude::TypeSignature;
use datatypes::vectors::{UInt64Vector, VectorRef};
use super::*;
use crate::function::{AsyncFunction, FunctionContext};
#[test]
fn test_add_region_follower_misc() {
let f = AddRegionFollowerFunction;
assert_eq!("add_region_follower", f.name());
assert_eq!(
ConcreteDataType::uint64_datatype(),
f.return_type(&[]).unwrap()
);
assert!(matches!(f.signature(),
Signature {
type_signature: TypeSignature::OneOf(sigs),
volatility: Volatility::Immutable
} if sigs.len() == 1));
}
#[tokio::test]
async fn test_add_region_follower() {
let f = AddRegionFollowerFunction;
let args = vec![1, 1];
let args = args
.into_iter()
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::mock(), &args).await.unwrap();
let expect: VectorRef = Arc::new(UInt64Vector::from_slice([0u64]));
assert_eq!(result, expect);
}
}

View File

@@ -1,129 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_macro::admin_fn;
use common_meta::rpc::procedure::RemoveRegionFollowerRequest;
use common_query::error::{
InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result,
UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, TypeSignature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::value::{Value, ValueRef};
use session::context::QueryContextRef;
use snafu::ensure;
use crate::handlers::ProcedureServiceHandlerRef;
use crate::helper::cast_u64;
/// A function to remove a follower from a region.
//// Only available in cluster mode.
///
/// - `remove_region_follower(region_id, peer_id)`.
///
/// The parameters:
/// - `region_id`: the region id
/// - `peer_id`: the peer id
#[admin_fn(
name = RemoveRegionFollowerFunction,
display_name = remove_region_follower,
sig_fn = signature,
ret = uint64
)]
pub(crate) async fn remove_region_follower(
procedure_service_handler: &ProcedureServiceHandlerRef,
_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
params.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly 2, have: {}",
params.len()
),
}
);
let Some(region_id) = cast_u64(&params[0])? else {
return UnsupportedInputDataTypeSnafu {
function: "add_region_follower",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
let Some(peer_id) = cast_u64(&params[1])? else {
return UnsupportedInputDataTypeSnafu {
function: "add_region_follower",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
procedure_service_handler
.remove_region_follower(RemoveRegionFollowerRequest { region_id, peer_id })
.await?;
Ok(Value::from(0u64))
}
fn signature() -> Signature {
Signature::one_of(
vec![
// remove_region_follower(region_id, peer_id)
TypeSignature::Uniform(2, ConcreteDataType::numerics()),
],
Volatility::Immutable,
)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_query::prelude::TypeSignature;
use datatypes::vectors::{UInt64Vector, VectorRef};
use super::*;
use crate::function::{AsyncFunction, FunctionContext};
#[test]
fn test_remove_region_follower_misc() {
let f = RemoveRegionFollowerFunction;
assert_eq!("remove_region_follower", f.name());
assert_eq!(
ConcreteDataType::uint64_datatype(),
f.return_type(&[]).unwrap()
);
assert!(matches!(f.signature(),
Signature {
type_signature: TypeSignature::OneOf(sigs),
volatility: Volatility::Immutable
} if sigs.len() == 1));
}
#[tokio::test]
async fn test_remove_region_follower() {
let f = RemoveRegionFollowerFunction;
let args = vec![1, 1];
let args = args
.into_iter()
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::mock(), &args).await.unwrap();
let expect: VectorRef = Arc::new(UInt64Vector::from_slice([0u64]));
assert_eq!(result, expect);
}
}

View File

@@ -27,7 +27,6 @@ use crate::scalars::hll_count::HllCalcFunction;
use crate::scalars::ip::IpFunctions;
use crate::scalars::json::JsonFunction;
use crate::scalars::matches::MatchesFunction;
use crate::scalars::matches_term::MatchesTermFunction;
use crate::scalars::math::MathFunction;
use crate::scalars::timestamp::TimestampFunction;
use crate::scalars::uddsketch_calc::UddSketchCalcFunction;
@@ -117,7 +116,6 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
// Full text search function
MatchesFunction::register(&function_registry);
MatchesTermFunction::register(&function_registry);
// System and administration functions
SystemFunction::register(&function_registry);

View File

@@ -16,10 +16,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_base::AffectedRows;
use common_meta::rpc::procedure::{
AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
RemoveRegionFollowerRequest,
};
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
use common_query::error::Result;
use common_query::Output;
use session::context::QueryContextRef;
@@ -66,12 +63,6 @@ pub trait ProcedureServiceHandler: Send + Sync {
/// Query the procedure' state by its id
async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse>;
/// Add a region follower to a region.
async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> Result<()>;
/// Remove a region follower from a region.
async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>;
}
/// This flow service handler is only use for flush flow for now.

View File

@@ -19,7 +19,6 @@ pub mod expression;
pub mod geo;
pub mod json;
pub mod matches;
pub mod matches_term;
pub mod math;
pub mod vector;

View File

@@ -1,375 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::{fmt, iter};
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::Volatility;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{BooleanVector, BooleanVectorBuilder, MutableVector, VectorRef};
use memchr::memmem;
use snafu::ensure;
use crate::function::{Function, FunctionContext};
use crate::function_registry::FunctionRegistry;
/// Exact term/phrase matching function for text columns.
///
/// This function checks if a text column contains exact term/phrase matches
/// with non-alphanumeric boundaries. Designed for:
/// - Whole-word matching (e.g. "cat" in "cat!" but not in "category")
/// - Phrase matching (e.g. "hello world" in "note:hello world!")
///
/// # Signature
/// `matches_term(text: String, term: String) -> Boolean`
///
/// # Arguments
/// * `text` - String column to search
/// * `term` - Search term/phrase
///
/// # Returns
/// BooleanVector where each element indicates if the corresponding text
/// contains an exact match of the term, following these rules:
/// 1. Exact substring match found (case-sensitive)
/// 2. Match boundaries are either:
/// - Start/end of text
/// - Any non-alphanumeric character (including spaces, hyphens, punctuation, etc.)
///
/// # Examples
/// ```
/// -- SQL examples --
/// -- Match phrase with space --
/// SELECT matches_term(column, 'hello world') FROM table;
/// -- Text: "warning:hello world!" => true
/// -- Text: "hello-world" => false (hyphen instead of space)
/// -- Text: "hello world2023" => false (ending with numbers)
///
/// -- Match multiple words with boundaries --
/// SELECT matches_term(column, 'critical error') FROM logs;
/// -- Match in: "ERROR:critical error!"
/// -- No match: "critical_errors"
///
/// -- Empty string handling --
/// SELECT matches_term(column, '') FROM table;
/// -- Text: "" => true
/// -- Text: "any" => false
///
/// -- Case sensitivity --
/// SELECT matches_term(column, 'Cat') FROM table;
/// -- Text: "Cat" => true
/// -- Text: "cat" => false
/// ```
pub struct MatchesTermFunction;
impl MatchesTermFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register(Arc::new(MatchesTermFunction));
}
}
impl fmt::Display for MatchesTermFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "MATCHES_TERM")
}
}
impl Function for MatchesTermFunction {
fn name(&self) -> &str {
"matches_term"
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::boolean_datatype())
}
fn signature(&self) -> common_query::prelude::Signature {
common_query::prelude::Signature::exact(
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::string_datatype(),
],
Volatility::Immutable,
)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly 2, have: {}",
columns.len()
),
}
);
let text_column = &columns[0];
if text_column.is_empty() {
return Ok(Arc::new(BooleanVector::from(Vec::<bool>::with_capacity(0))));
}
let term_column = &columns[1];
let compiled_finder = if term_column.is_const() {
let term = term_column.get_ref(0).as_string().unwrap();
match term {
None => {
return Ok(Arc::new(BooleanVector::from_iter(
iter::repeat(None).take(text_column.len()),
)));
}
Some(term) => Some(MatchesTermFinder::new(term)),
}
} else {
None
};
let len = text_column.len();
let mut result = BooleanVectorBuilder::with_capacity(len);
for i in 0..len {
let text = text_column.get_ref(i).as_string().unwrap();
let Some(text) = text else {
result.push_null();
continue;
};
let contains = match &compiled_finder {
Some(finder) => finder.find(text),
None => {
let term = match term_column.get_ref(i).as_string().unwrap() {
None => {
result.push_null();
continue;
}
Some(term) => term,
};
MatchesTermFinder::new(term).find(text)
}
};
result.push(Some(contains));
}
Ok(result.to_vector())
}
}
/// A compiled finder for `matches_term` function that holds the compiled term
/// and its metadata for efficient matching.
///
/// A term is considered matched when:
/// 1. The exact sequence appears in the text
/// 2. It is either:
/// - At the start/end of text with adjacent non-alphanumeric character
/// - Surrounded by non-alphanumeric characters
///
/// # Examples
/// ```
/// let finder = MatchesTermFinder::new("cat");
/// assert!(finder.find("cat!")); // Term at end with punctuation
/// assert!(finder.find("dog,cat")); // Term preceded by comma
/// assert!(!finder.find("category")); // Partial match rejected
///
/// let finder = MatchesTermFinder::new("world");
/// assert!(finder.find("hello-world")); // Hyphen boundary
/// ```
#[derive(Clone, Debug)]
pub struct MatchesTermFinder {
finder: memmem::Finder<'static>,
term: String,
starts_with_non_alnum: bool,
ends_with_non_alnum: bool,
}
impl MatchesTermFinder {
/// Create a new `MatchesTermFinder` for the given term.
pub fn new(term: &str) -> Self {
let starts_with_non_alnum = term.chars().next().is_some_and(|c| !c.is_alphanumeric());
let ends_with_non_alnum = term.chars().last().is_some_and(|c| !c.is_alphanumeric());
Self {
finder: memmem::Finder::new(term).into_owned(),
term: term.to_string(),
starts_with_non_alnum,
ends_with_non_alnum,
}
}
/// Find the term in the text.
pub fn find(&self, text: &str) -> bool {
if self.term.is_empty() {
return text.is_empty();
}
if text.len() < self.term.len() {
return false;
}
let mut pos = 0;
while let Some(found_pos) = self.finder.find(text[pos..].as_bytes()) {
let actual_pos = pos + found_pos;
let prev_ok = self.starts_with_non_alnum
|| text[..actual_pos]
.chars()
.last()
.map(|c| !c.is_alphanumeric())
.unwrap_or(true);
if prev_ok {
let next_pos = actual_pos + self.finder.needle().len();
let next_ok = self.ends_with_non_alnum
|| text[next_pos..]
.chars()
.next()
.map(|c| !c.is_alphanumeric())
.unwrap_or(true);
if next_ok {
return true;
}
}
if let Some(next_char) = text[actual_pos..].chars().next() {
pos = actual_pos + next_char.len_utf8();
} else {
break;
}
}
false
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn matches_term_example() {
let finder = MatchesTermFinder::new("hello world");
assert!(finder.find("warning:hello world!"));
assert!(!finder.find("hello-world"));
assert!(!finder.find("hello world2023"));
let finder = MatchesTermFinder::new("critical error");
assert!(finder.find("ERROR:critical error!"));
assert!(!finder.find("critical_errors"));
let finder = MatchesTermFinder::new("");
assert!(finder.find(""));
assert!(!finder.find("any"));
let finder = MatchesTermFinder::new("Cat");
assert!(finder.find("Cat"));
assert!(!finder.find("cat"));
}
#[test]
fn matches_term_with_punctuation() {
assert!(MatchesTermFinder::new("cat").find("cat!"));
assert!(MatchesTermFinder::new("dog").find("!dog"));
}
#[test]
fn matches_phrase_with_boundaries() {
assert!(MatchesTermFinder::new("hello-world").find("hello-world"));
assert!(MatchesTermFinder::new("'foo bar'").find("test: 'foo bar'"));
}
#[test]
fn matches_at_text_boundaries() {
assert!(MatchesTermFinder::new("start").find("start..."));
assert!(MatchesTermFinder::new("end").find("...end"));
}
// Negative cases
#[test]
fn rejects_partial_matches() {
assert!(!MatchesTermFinder::new("cat").find("category"));
assert!(!MatchesTermFinder::new("boot").find("rebooted"));
}
#[test]
fn rejects_missing_term() {
assert!(!MatchesTermFinder::new("foo").find("hello world"));
}
// Edge cases
#[test]
fn handles_empty_inputs() {
assert!(!MatchesTermFinder::new("test").find(""));
assert!(!MatchesTermFinder::new("").find("text"));
}
#[test]
fn different_unicode_boundaries() {
assert!(MatchesTermFinder::new("café").find("café>"));
assert!(!MatchesTermFinder::new("café").find("口café>"));
assert!(!MatchesTermFinder::new("café").find("café口"));
assert!(!MatchesTermFinder::new("café").find("cafémore"));
assert!(MatchesTermFinder::new("русский").find("русский!"));
assert!(MatchesTermFinder::new("русский").find("русский!"));
}
#[test]
fn case_sensitive_matching() {
assert!(!MatchesTermFinder::new("cat").find("Cat"));
assert!(MatchesTermFinder::new("CaT").find("CaT"));
}
#[test]
fn numbers_in_term() {
assert!(MatchesTermFinder::new("v1.0").find("v1.0!"));
assert!(!MatchesTermFinder::new("v1.0").find("v1.0a"));
}
#[test]
fn adjacent_alphanumeric_fails() {
assert!(!MatchesTermFinder::new("cat").find("cat5"));
assert!(!MatchesTermFinder::new("dog").find("dogcat"));
}
#[test]
fn empty_term_text() {
assert!(!MatchesTermFinder::new("").find("text"));
assert!(MatchesTermFinder::new("").find(""));
assert!(!MatchesTermFinder::new("text").find(""));
}
#[test]
fn leading_non_alphanumeric() {
assert!(MatchesTermFinder::new("/cat").find("dog/cat"));
assert!(MatchesTermFinder::new("dog/").find("dog/cat"));
assert!(MatchesTermFinder::new("dog/cat").find("dog/cat"));
}
#[test]
fn continues_searching_after_boundary_mismatch() {
assert!(!MatchesTermFinder::new("log").find("bloglog!"));
assert!(MatchesTermFinder::new("log").find("bloglog log"));
assert!(MatchesTermFinder::new("log").find("alogblog_log!"));
assert!(MatchesTermFinder::new("error").find("errorlog_error_case"));
assert!(MatchesTermFinder::new("test").find("atestbtestc_test_end"));
assert!(MatchesTermFinder::new("data").find("database_data_store"));
assert!(!MatchesTermFinder::new("data").find("database_datastore"));
assert!(MatchesTermFinder::new("log.txt").find("catalog.txt_log.txt!"));
assert!(!MatchesTermFinder::new("log.txt").find("catalog.txtlog.txt!"));
assert!(MatchesTermFinder::new("data-set").find("bigdata-set_data-set!"));
assert!(MatchesTermFinder::new("中文").find("这是中文测试,中文!"));
assert!(MatchesTermFinder::new("error").find("错误errorerror日志_error!"));
}
}

View File

@@ -24,11 +24,9 @@ pub(crate) mod sum;
mod vector_add;
mod vector_dim;
mod vector_div;
mod vector_kth_elem;
mod vector_mul;
mod vector_norm;
mod vector_sub;
mod vector_subvector;
use std::sync::Arc;
@@ -58,8 +56,6 @@ impl VectorFunction {
registry.register(Arc::new(vector_div::VectorDivFunction));
registry.register(Arc::new(vector_norm::VectorNormFunction));
registry.register(Arc::new(vector_dim::VectorDimFunction));
registry.register(Arc::new(vector_kth_elem::VectorKthElemFunction));
registry.register(Arc::new(vector_subvector::VectorSubvectorFunction));
registry.register(Arc::new(elem_sum::ElemSumFunction));
registry.register(Arc::new(elem_product::ElemProductFunction));
}

View File

@@ -1,211 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::borrow::Cow;
use std::fmt::Display;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::Signature;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{Float32VectorBuilder, MutableVector, VectorRef};
use snafu::ensure;
use crate::function::{Function, FunctionContext};
use crate::helper;
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const};
const NAME: &str = "vec_kth_elem";
/// Returns the k-th(0-based index) element of the vector.
///
/// # Example
///
/// ```sql
/// SELECT vec_kth_elem("[2, 4, 6]",1) as result;
///
/// +---------+
/// | result |
/// +---------+
/// | 4 |
/// +---------+
///
/// ```
///
#[derive(Debug, Clone, Default)]
pub struct VectorKthElemFunction;
impl Function for VectorKthElemFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(
&self,
_input_types: &[ConcreteDataType],
) -> common_query::error::Result<ConcreteDataType> {
Ok(ConcreteDataType::float32_datatype())
}
fn signature(&self) -> Signature {
helper::one_of_sigs2(
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
vec![ConcreteDataType::int64_datatype()],
)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly two, have: {}",
columns.len()
),
}
);
let arg0 = &columns[0];
let arg1 = &columns[1];
let len = arg0.len();
let mut result = Float32VectorBuilder::with_capacity(len);
if len == 0 {
return Ok(result.to_vector());
};
let arg0_const = as_veclit_if_const(arg0)?;
for i in 0..len {
let arg0 = match arg0_const.as_ref() {
Some(arg0) => Some(Cow::Borrowed(arg0.as_ref())),
None => as_veclit(arg0.get_ref(i))?,
};
let Some(arg0) = arg0 else {
result.push_null();
continue;
};
let arg1 = arg1.get(i).as_f64_lossy();
let Some(arg1) = arg1 else {
result.push_null();
continue;
};
ensure!(
arg1 >= 0.0 && arg1.fract() == 0.0,
InvalidFuncArgsSnafu {
err_msg: format!(
"Invalid argument: k must be a non-negative integer, but got k = {}.",
arg1
),
}
);
let k = arg1 as usize;
ensure!(
k < arg0.len(),
InvalidFuncArgsSnafu {
err_msg: format!(
"Out of range: k must be in the range [0, {}], but got k = {}.",
arg0.len() - 1,
k
),
}
);
let value = arg0[k];
result.push(Some(value));
}
Ok(result.to_vector())
}
}
impl Display for VectorKthElemFunction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", NAME.to_ascii_uppercase())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_query::error;
use datatypes::vectors::{Int64Vector, StringVector};
use super::*;
#[test]
fn test_vec_kth_elem() {
let func = VectorKthElemFunction;
let input0 = Arc::new(StringVector::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
Some("[4.0,5.0,6.0]".to_string()),
Some("[7.0,8.0,9.0]".to_string()),
None,
]));
let input1 = Arc::new(Int64Vector::from(vec![Some(0), Some(2), None, Some(1)]));
let result = func
.eval(&FunctionContext::default(), &[input0, input1])
.unwrap();
let result = result.as_ref();
assert_eq!(result.len(), 4);
assert_eq!(result.get_ref(0).as_f32().unwrap(), Some(1.0));
assert_eq!(result.get_ref(1).as_f32().unwrap(), Some(6.0));
assert!(result.get_ref(2).is_null());
assert!(result.get_ref(3).is_null());
let input0 = Arc::new(StringVector::from(vec![Some("[1.0,2.0,3.0]".to_string())]));
let input1 = Arc::new(Int64Vector::from(vec![Some(3)]));
let err = func
.eval(&FunctionContext::default(), &[input0, input1])
.unwrap_err();
match err {
error::Error::InvalidFuncArgs { err_msg, .. } => {
assert_eq!(
err_msg,
format!("Out of range: k must be in the range [0, 2], but got k = 3.")
)
}
_ => unreachable!(),
}
let input0 = Arc::new(StringVector::from(vec![Some("[1.0,2.0,3.0]".to_string())]));
let input1 = Arc::new(Int64Vector::from(vec![Some(-1)]));
let err = func
.eval(&FunctionContext::default(), &[input0, input1])
.unwrap_err();
match err {
error::Error::InvalidFuncArgs { err_msg, .. } => {
assert_eq!(
err_msg,
format!("Invalid argument: k must be a non-negative integer, but got k = -1.")
)
}
_ => unreachable!(),
}
}
}

View File

@@ -1,240 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::borrow::Cow;
use std::fmt::Display;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, TypeSignature};
use datafusion_expr::Volatility;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{BinaryVectorBuilder, MutableVector, VectorRef};
use snafu::ensure;
use crate::function::{Function, FunctionContext};
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const, veclit_to_binlit};
const NAME: &str = "vec_subvector";
/// Returns a subvector from start(included) to end(excluded) index.
///
/// # Example
///
/// ```sql
/// SELECT vec_to_string(vec_subvector("[1, 2, 3, 4, 5]", 1, 3)) as result;
///
/// +---------+
/// | result |
/// +---------+
/// | [2, 3] |
/// +---------+
///
/// ```
///
#[derive(Debug, Clone, Default)]
pub struct VectorSubvectorFunction;
impl Function for VectorSubvectorFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::binary_datatype())
}
fn signature(&self) -> Signature {
Signature::one_of(
vec![
TypeSignature::Exact(vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::int64_datatype(),
]),
TypeSignature::Exact(vec![
ConcreteDataType::binary_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::int64_datatype(),
]),
],
Volatility::Immutable,
)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 3,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly three, have: {}",
columns.len()
)
}
);
let arg0 = &columns[0];
let arg1 = &columns[1];
let arg2 = &columns[2];
ensure!(
arg0.len() == arg1.len() && arg1.len() == arg2.len(),
InvalidFuncArgsSnafu {
err_msg: format!(
"The lengths of the vector are not aligned, args 0: {}, args 1: {}, args 2: {}",
arg0.len(),
arg1.len(),
arg2.len()
)
}
);
let len = arg0.len();
let mut result = BinaryVectorBuilder::with_capacity(len);
if len == 0 {
return Ok(result.to_vector());
}
let arg0_const = as_veclit_if_const(arg0)?;
for i in 0..len {
let arg0 = match arg0_const.as_ref() {
Some(arg0) => Some(Cow::Borrowed(arg0.as_ref())),
None => as_veclit(arg0.get_ref(i))?,
};
let arg1 = arg1.get(i).as_i64();
let arg2 = arg2.get(i).as_i64();
let (Some(arg0), Some(arg1), Some(arg2)) = (arg0, arg1, arg2) else {
result.push_null();
continue;
};
ensure!(
0 <= arg1 && arg1 <= arg2 && arg2 as usize <= arg0.len(),
InvalidFuncArgsSnafu {
err_msg: format!(
"Invalid start and end indices: start={}, end={}, vec_len={}",
arg1,
arg2,
arg0.len()
)
}
);
let subvector = &arg0[arg1 as usize..arg2 as usize];
let binlit = veclit_to_binlit(subvector);
result.push(Some(&binlit));
}
Ok(result.to_vector())
}
}
impl Display for VectorSubvectorFunction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", NAME.to_ascii_uppercase())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_query::error::Error;
use datatypes::vectors::{Int64Vector, StringVector};
use super::*;
use crate::function::FunctionContext;
#[test]
fn test_subvector() {
let func = VectorSubvectorFunction;
let input0 = Arc::new(StringVector::from(vec![
Some("[1.0, 2.0, 3.0, 4.0, 5.0]".to_string()),
Some("[6.0, 7.0, 8.0, 9.0, 10.0]".to_string()),
None,
Some("[11.0, 12.0, 13.0]".to_string()),
]));
let input1 = Arc::new(Int64Vector::from(vec![Some(1), Some(0), Some(0), Some(1)]));
let input2 = Arc::new(Int64Vector::from(vec![Some(3), Some(5), Some(2), Some(3)]));
let result = func
.eval(&FunctionContext::default(), &[input0, input1, input2])
.unwrap();
let result = result.as_ref();
assert_eq!(result.len(), 4);
assert_eq!(
result.get_ref(0).as_binary().unwrap(),
Some(veclit_to_binlit(&[2.0, 3.0]).as_slice())
);
assert_eq!(
result.get_ref(1).as_binary().unwrap(),
Some(veclit_to_binlit(&[6.0, 7.0, 8.0, 9.0, 10.0]).as_slice())
);
assert!(result.get_ref(2).is_null());
assert_eq!(
result.get_ref(3).as_binary().unwrap(),
Some(veclit_to_binlit(&[12.0, 13.0]).as_slice())
);
}
#[test]
fn test_subvector_error() {
let func = VectorSubvectorFunction;
let input0 = Arc::new(StringVector::from(vec![
Some("[1.0, 2.0, 3.0]".to_string()),
Some("[4.0, 5.0, 6.0]".to_string()),
]));
let input1 = Arc::new(Int64Vector::from(vec![Some(1), Some(2)]));
let input2 = Arc::new(Int64Vector::from(vec![Some(3)]));
let result = func.eval(&FunctionContext::default(), &[input0, input1, input2]);
match result {
Err(Error::InvalidFuncArgs { err_msg, .. }) => {
assert_eq!(
err_msg,
"The lengths of the vector are not aligned, args 0: 2, args 1: 2, args 2: 1"
)
}
_ => unreachable!(),
}
}
#[test]
fn test_subvector_invalid_indices() {
let func = VectorSubvectorFunction;
let input0 = Arc::new(StringVector::from(vec![
Some("[1.0, 2.0, 3.0]".to_string()),
Some("[4.0, 5.0, 6.0]".to_string()),
]));
let input1 = Arc::new(Int64Vector::from(vec![Some(1), Some(3)]));
let input2 = Arc::new(Int64Vector::from(vec![Some(3), Some(4)]));
let result = func.eval(&FunctionContext::default(), &[input0, input1, input2]);
match result {
Err(Error::InvalidFuncArgs { err_msg, .. }) => {
assert_eq!(
err_msg,
"Invalid start and end indices: start=3, end=4, vec_len=3"
)
}
_ => unreachable!(),
}
}
}

View File

@@ -35,10 +35,7 @@ impl FunctionState {
use api::v1::meta::ProcedureStatus;
use async_trait::async_trait;
use common_base::AffectedRows;
use common_meta::rpc::procedure::{
AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
RemoveRegionFollowerRequest,
};
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
use common_query::error::Result;
use common_query::Output;
use session::context::QueryContextRef;
@@ -69,17 +66,6 @@ impl FunctionState {
..Default::default()
})
}
async fn add_region_follower(&self, _request: AddRegionFollowerRequest) -> Result<()> {
Ok(())
}
async fn remove_region_follower(
&self,
_request: RemoveRegionFollowerRequest,
) -> Result<()> {
Ok(())
}
}
#[async_trait]

View File

@@ -22,9 +22,7 @@ mod version;
use std::sync::Arc;
use build::BuildFunction;
use database::{
CurrentSchemaFunction, DatabaseFunction, ReadPreferenceFunction, SessionUserFunction,
};
use database::{CurrentSchemaFunction, DatabaseFunction, SessionUserFunction};
use pg_catalog::PGCatalogFunction;
use procedure_state::ProcedureStateFunction;
use timezone::TimezoneFunction;
@@ -41,7 +39,6 @@ impl SystemFunction {
registry.register(Arc::new(CurrentSchemaFunction));
registry.register(Arc::new(DatabaseFunction));
registry.register(Arc::new(SessionUserFunction));
registry.register(Arc::new(ReadPreferenceFunction));
registry.register(Arc::new(TimezoneFunction));
registry.register_async(Arc::new(ProcedureStateFunction));
PGCatalogFunction::register(registry);

View File

@@ -30,12 +30,9 @@ pub struct DatabaseFunction;
pub struct CurrentSchemaFunction;
pub struct SessionUserFunction;
pub struct ReadPreferenceFunction;
const DATABASE_FUNCTION_NAME: &str = "database";
const CURRENT_SCHEMA_FUNCTION_NAME: &str = "current_schema";
const SESSION_USER_FUNCTION_NAME: &str = "session_user";
const READ_PREFERENCE_FUNCTION_NAME: &str = "read_preference";
impl Function for DatabaseFunction {
fn name(&self) -> &str {
@@ -97,26 +94,6 @@ impl Function for SessionUserFunction {
}
}
impl Function for ReadPreferenceFunction {
fn name(&self) -> &str {
READ_PREFERENCE_FUNCTION_NAME
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::string_datatype())
}
fn signature(&self) -> Signature {
Signature::nullary(Volatility::Immutable)
}
fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
let read_preference = func_ctx.query_ctx.read_preference();
Ok(Arc::new(StringVector::from_slice(&[read_preference.as_ref()])) as _)
}
}
impl fmt::Display for DatabaseFunction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "DATABASE")
@@ -135,12 +112,6 @@ impl fmt::Display for SessionUserFunction {
}
}
impl fmt::Display for ReadPreferenceFunction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "READ_PREFERENCE")
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;

View File

@@ -15,13 +15,11 @@
use api::helper::ColumnDataTypeWrapper;
use api::v1::add_column_location::LocationType;
use api::v1::alter_table_expr::Kind;
use api::v1::column_def::{
as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
};
use api::v1::column_def::{as_fulltext_option, as_skipping_index_type};
use api::v1::{
column_def, AddColumnLocation as Location, AlterTableExpr, Analyzer, CreateTableExpr,
DropColumns, FulltextBackend as PbFulltextBackend, ModifyColumnTypes, RenameTable,
SemanticType, SkippingIndexType as PbSkippingIndexType,
DropColumns, ModifyColumnTypes, RenameTable, SemanticType,
SkippingIndexType as PbSkippingIndexType,
};
use common_query::AddColumnLocation;
use datatypes::schema::{ColumnSchema, FulltextOptions, RawSchema, SkippingIndexOptions};
@@ -128,15 +126,11 @@ pub fn alter_expr_to_request(table_id: TableId, expr: AlterTableExpr) -> Result<
column_name: f.column_name.clone(),
options: FulltextOptions {
enable: f.enable,
analyzer: as_fulltext_option_analyzer(
analyzer: as_fulltext_option(
Analyzer::try_from(f.analyzer)
.context(InvalidSetFulltextOptionRequestSnafu)?,
),
case_sensitive: f.case_sensitive,
backend: as_fulltext_option_backend(
PbFulltextBackend::try_from(f.backend)
.context(InvalidSetFulltextOptionRequestSnafu)?,
),
},
},
},

View File

@@ -25,7 +25,7 @@ async fn do_bench_channel_manager() {
let m_clone = m.clone();
let join = tokio::spawn(async move {
for _ in 0..10000 {
let idx = rand::random::<u32>() % 100;
let idx = rand::random::<usize>() % 100;
let ret = m_clone.get(format!("{idx}"));
let _ = ret.unwrap();
}

View File

@@ -27,7 +27,6 @@ use crate::error::{
DecodeJsonSnafu, EncodeJsonSnafu, Error, FromUtf8Snafu, InvalidNodeInfoKeySnafu,
InvalidRoleSnafu, ParseNumSnafu, Result,
};
use crate::key::flow::flow_state::FlowStat;
use crate::peer::Peer;
const CLUSTER_NODE_INFO_PREFIX: &str = "__meta_cluster_node_info";
@@ -53,9 +52,6 @@ pub trait ClusterInfo {
/// List all region stats in the cluster.
async fn list_region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
/// List all flow stats in the cluster.
async fn list_flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
// TODO(jeremy): Other info, like region status, etc.
}

View File

@@ -92,22 +92,6 @@ pub struct RegionStat {
pub sst_size: u64,
/// The size of the SST index files in bytes.
pub index_size: u64,
/// The manifest infoof the region.
pub region_manifest: RegionManifestInfo,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum RegionManifestInfo {
Mito {
manifest_version: u64,
flushed_entry_id: u64,
},
Metric {
data_manifest_version: u64,
data_flushed_entry_id: u64,
metadata_manifest_version: u64,
metadata_flushed_entry_id: u64,
},
}
impl Stat {
@@ -181,31 +165,6 @@ impl TryFrom<&HeartbeatRequest> for Stat {
}
}
impl From<store_api::region_engine::RegionManifestInfo> for RegionManifestInfo {
fn from(value: store_api::region_engine::RegionManifestInfo) -> Self {
match value {
store_api::region_engine::RegionManifestInfo::Mito {
manifest_version,
flushed_entry_id,
} => RegionManifestInfo::Mito {
manifest_version,
flushed_entry_id,
},
store_api::region_engine::RegionManifestInfo::Metric {
data_manifest_version,
data_flushed_entry_id,
metadata_manifest_version,
metadata_flushed_entry_id,
} => RegionManifestInfo::Metric {
data_manifest_version,
data_flushed_entry_id,
metadata_manifest_version,
metadata_flushed_entry_id,
},
}
}
}
impl From<&api::v1::meta::RegionStat> for RegionStat {
fn from(value: &api::v1::meta::RegionStat) -> Self {
let region_stat = value
@@ -226,7 +185,6 @@ impl From<&api::v1::meta::RegionStat> for RegionStat {
manifest_size: region_stat.manifest_size,
sst_size: region_stat.sst_size,
index_size: region_stat.index_size,
region_manifest: region_stat.manifest.into(),
}
}
}

View File

@@ -22,18 +22,14 @@ use store_api::storage::{RegionId, RegionNumber, TableId};
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::ddl::flow_meta::FlowMetadataAllocatorRef;
use crate::ddl::table_meta::TableMetadataAllocatorRef;
use crate::error::{Result, UnsupportedSnafu};
use crate::error::Result;
use crate::key::flow::FlowMetadataManagerRef;
use crate::key::table_route::PhysicalTableRouteValue;
use crate::key::TableMetadataManagerRef;
use crate::node_manager::NodeManagerRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::region_registry::LeaderRegionRegistryRef;
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::procedure::{
AddRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
RemoveRegionFollowerRequest,
};
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
use crate::DatanodeId;
pub mod alter_database;
@@ -74,30 +70,6 @@ pub trait ProcedureExecutor: Send + Sync {
request: SubmitDdlTaskRequest,
) -> Result<SubmitDdlTaskResponse>;
/// Add a region follower
async fn add_region_follower(
&self,
_ctx: &ExecutorContext,
_request: AddRegionFollowerRequest,
) -> Result<()> {
UnsupportedSnafu {
operation: "add_region_follower",
}
.fail()
}
/// Remove a region follower
async fn remove_region_follower(
&self,
_ctx: &ExecutorContext,
_request: RemoveRegionFollowerRequest,
) -> Result<()> {
UnsupportedSnafu {
operation: "remove_region_follower",
}
.fail()
}
/// Submit a region migration task
async fn migrate_region(
&self,
@@ -165,8 +137,6 @@ pub struct DdlContext {
pub cache_invalidator: CacheInvalidatorRef,
/// Keep tracking operating regions.
pub memory_region_keeper: MemoryRegionKeeperRef,
/// The leader region registry.
pub leader_region_registry: LeaderRegionRegistryRef,
/// Table metadata manager.
pub table_metadata_manager: TableMetadataManagerRef,
/// Allocator for table metadata.

View File

@@ -35,9 +35,7 @@ use crate::error::{self, Result};
use crate::instruction::CacheIdent;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::rpc::router::{
find_leader_regions, find_leaders, operating_leader_regions, RegionRoute,
};
use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
/// [Control] indicated to the caller whether to go to the next step.
#[derive(Debug)]
@@ -252,11 +250,6 @@ impl DropTableExecutor {
.into_iter()
.collect::<Result<Vec<_>>>()?;
// Deletes the leader region from registry.
let region_ids = operating_leader_regions(region_routes);
ctx.leader_region_registry
.batch_delete(region_ids.into_iter().map(|(region_id, _)| region_id));
Ok(())
}
}

View File

@@ -98,14 +98,13 @@ impl TableMetadataAllocator {
fn create_wal_options(
&self,
table_route: &PhysicalTableRouteValue,
skip_wal: bool,
) -> Result<HashMap<RegionNumber, String>> {
let region_numbers = table_route
.region_routes
.iter()
.map(|route| route.region.id.region_number())
.collect();
allocate_region_wal_options(region_numbers, &self.wal_options_allocator, skip_wal)
allocate_region_wal_options(region_numbers, &self.wal_options_allocator)
}
async fn create_table_route(
@@ -159,9 +158,7 @@ impl TableMetadataAllocator {
pub async fn create(&self, task: &CreateTableTask) -> Result<TableMetadata> {
let table_id = self.allocate_table_id(&task.create_table.table_id).await?;
let table_route = self.create_table_route(table_id, task).await?;
let region_wal_options =
self.create_wal_options(&table_route, task.table_info.meta.options.skip_wal)?;
let region_wal_options = self.create_wal_options(&table_route)?;
debug!(
"Allocated region wal options {:?} for table {}",

View File

@@ -851,7 +851,6 @@ mod tests {
use crate::node_manager::{DatanodeRef, FlownodeRef, NodeManager};
use crate::peer::Peer;
use crate::region_keeper::MemoryRegionKeeper;
use crate::region_registry::LeaderRegionRegistry;
use crate::sequence::SequenceBuilder;
use crate::state_store::KvStateStore;
use crate::wal_options_allocator::WalOptionsAllocator;
@@ -900,7 +899,6 @@ mod tests {
flow_metadata_manager,
flow_metadata_allocator,
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
},
procedure_manager.clone(),

View File

@@ -192,12 +192,6 @@ pub struct DropFlow {
pub flownode_ids: Vec<FlownodeId>,
}
/// Flushes a batch of regions.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct FlushRegions {
pub region_ids: Vec<RegionId>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
pub enum Instruction {
/// Opens a region.
@@ -214,8 +208,6 @@ pub enum Instruction {
DowngradeRegion(DowngradeRegion),
/// Invalidates batch cache.
InvalidateCaches(Vec<CacheIdent>),
/// Flushes regions.
FlushRegion(FlushRegions),
}
/// The reply of [UpgradeRegion].

View File

@@ -57,10 +57,7 @@
//! - This key is mainly used in constructing the view in Datanode and Frontend.
//!
//! 12. Kafka topic key: `__topic_name/kafka/{topic_name}`
//! - The key is used to track existing topics in Kafka.
//! - The value is a [TopicNameValue](crate::key::topic_name::TopicNameValue) struct; it contains the `pruned_entry_id` which represents
//! the highest entry id that has been pruned from the remote WAL.
//! - When a region uses this topic, it should start replaying entries from `pruned_entry_id + 1` (minimum available entry id).
//! - The key is used to mark existing topics in kafka for WAL.
//!
//! 13. Topic name to region map key `__topic_region/{topic_name}/{region_id}`
//! - Mapping {topic_name} to {region_id}
@@ -140,7 +137,6 @@ use table::metadata::{RawTableInfo, TableId};
use table::table_name::TableName;
use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
use table_name::{TableNameKey, TableNameManager, TableNameValue};
use topic_name::TopicNameManager;
use topic_region::{TopicRegionKey, TopicRegionManager};
use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue};
@@ -313,7 +309,6 @@ pub struct TableMetadataManager {
schema_manager: SchemaManager,
table_route_manager: TableRouteManager,
tombstone_manager: TombstoneManager,
topic_name_manager: TopicNameManager,
topic_region_manager: TopicRegionManager,
kv_backend: KvBackendRef,
}
@@ -465,7 +460,6 @@ impl TableMetadataManager {
schema_manager: SchemaManager::new(kv_backend.clone()),
table_route_manager: TableRouteManager::new(kv_backend.clone()),
tombstone_manager: TombstoneManager::new(kv_backend.clone()),
topic_name_manager: TopicNameManager::new(kv_backend.clone()),
topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
kv_backend,
}
@@ -519,14 +513,6 @@ impl TableMetadataManager {
&self.table_route_manager
}
pub fn topic_name_manager(&self) -> &TopicNameManager {
&self.topic_name_manager
}
pub fn topic_region_manager(&self) -> &TopicRegionManager {
&self.topic_region_manager
}
#[cfg(feature = "testing")]
pub fn kv_backend(&self) -> &KvBackendRef {
&self.kv_backend
@@ -1487,8 +1473,7 @@ mod tests {
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let wal_allocator = WalOptionsAllocator::RaftEngine;
let regions = (0..16).collect();
let region_wal_options =
allocate_region_wal_options(regions, &wal_allocator, false).unwrap();
let region_wal_options = allocate_region_wal_options(regions, &wal_allocator).unwrap();
create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),

View File

@@ -15,14 +15,11 @@
use std::fmt::{self, Display};
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use snafu::{OptionExt, ResultExt};
use crate::ensure_values;
use crate::error::{self, DecodeJsonSnafu, Error, InvalidMetadataSnafu, Result, UnexpectedSnafu};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::error::{DecodeJsonSnafu, Error, InvalidMetadataSnafu, Result};
use crate::key::{
DeserializedValueWithBytes, MetadataKey, MetadataValue, KAFKA_TOPIC_KEY_PATTERN,
KAFKA_TOPIC_KEY_PREFIX, LEGACY_TOPIC_KEY_PREFIX,
MetadataKey, KAFKA_TOPIC_KEY_PATTERN, KAFKA_TOPIC_KEY_PREFIX, LEGACY_TOPIC_KEY_PREFIX,
};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
@@ -34,32 +31,8 @@ pub struct TopicNameKey<'a> {
pub topic: &'a str,
}
/// The value associated with a topic name key.
///
/// The `pruned_entry_id` is the highest entry id that has been pruned from the remote WAL.
/// When a region uses this topic, it should start replaying entries from `pruned_entry_id + 1` (minimal available entry id).
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
pub struct TopicNameValue {
pub pruned_entry_id: u64,
}
impl TopicNameValue {
pub fn new(pruned_entry_id: u64) -> Self {
Self { pruned_entry_id }
}
}
impl MetadataValue for TopicNameValue {
fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
let value = serde_json::from_slice::<TopicNameValue>(raw_value).context(DecodeJsonSnafu)?;
Ok(value)
}
fn try_as_raw_value(&self) -> Result<Vec<u8>> {
let raw_value = serde_json::to_vec(self).context(DecodeJsonSnafu)?;
Ok(raw_value)
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct TopicNameValue;
impl<'a> TopicNameKey<'a> {
pub fn new(topic: &'a str) -> Self {
@@ -141,16 +114,13 @@ impl TopicNameManager {
{
let topics =
serde_json::from_slice::<Vec<String>>(&kv.value).context(DecodeJsonSnafu)?;
let mut reqs = Vec::with_capacity(topics.len() + 1);
for topic in topics {
let topic_name_key = TopicNameKey::new(&topic);
let topic_name_value = TopicNameValue::new(0);
let put_req = TxnOp::Put(
topic_name_key.to_bytes(),
topic_name_value.try_as_raw_value()?,
);
reqs.push(put_req);
}
let mut reqs = topics
.iter()
.map(|topic| {
let key = TopicNameKey::new(topic);
TxnOp::Put(key.to_bytes(), vec![])
})
.collect::<Vec<_>>();
let delete_req = TxnOp::Delete(LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec());
reqs.push(delete_req);
let txn = Txn::new().and_then(reqs);
@@ -159,7 +129,7 @@ impl TopicNameManager {
Ok(())
}
/// Range query for topics. Only the keys are returned.
/// Range query for topics.
/// Caution: this method returns keys as String instead of values of range query since the topics are stored in keys.
pub async fn range(&self) -> Result<Vec<String>> {
let prefix = TopicNameKey::range_start_key();
@@ -172,72 +142,25 @@ impl TopicNameManager {
.collect::<Result<Vec<String>>>()
}
/// Put topics into kvbackend. The value is set to 0 by default.
/// Put topics into kvbackend.
pub async fn batch_put(&self, topic_name_keys: Vec<TopicNameKey<'_>>) -> Result<()> {
let mut kvs = Vec::with_capacity(topic_name_keys.len());
let topic_name_value = TopicNameValue::new(0);
for topic_name_key in &topic_name_keys {
let kv = KeyValue {
key: topic_name_key.to_bytes(),
value: topic_name_value.clone().try_as_raw_value()?,
};
kvs.push(kv);
}
let req = BatchPutRequest {
kvs,
kvs: topic_name_keys
.iter()
.map(|key| KeyValue {
key: key.to_bytes(),
value: vec![],
})
.collect(),
prev_kv: false,
};
self.kv_backend.batch_put(req).await?;
Ok(())
}
/// Get value for a specific topic.
pub async fn get(
&self,
topic: &str,
) -> Result<Option<DeserializedValueWithBytes<TopicNameValue>>> {
let key = TopicNameKey::new(topic);
let raw_key = key.to_bytes();
self.kv_backend
.get(&raw_key)
.await?
.map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
.transpose()
}
/// Update the topic name key and value in the kv backend.
pub async fn update(
&self,
topic: &str,
pruned_entry_id: u64,
prev: Option<DeserializedValueWithBytes<TopicNameValue>>,
) -> Result<()> {
let key = TopicNameKey::new(topic);
let raw_key = key.to_bytes();
let value = TopicNameValue::new(pruned_entry_id);
let new_raw_value = value.try_as_raw_value()?;
let raw_value = prev.map(|v| v.get_raw_bytes()).unwrap_or_default();
let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value.clone());
let mut r = self.kv_backend.txn(txn).await?;
if !r.succeeded {
let mut set = TxnOpGetResponseSet::from(&mut r.responses);
let raw_value = TxnOpGetResponseSet::filter(raw_key)(&mut set)
.context(UnexpectedSnafu {
err_msg: "Reads the empty topic name value in comparing operation while updating TopicNameValue",
})?;
let op_name = "updating TopicNameValue";
ensure_values!(raw_value, new_raw_value, op_name);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;
use super::*;
@@ -281,19 +204,7 @@ mod tests {
let topics = manager.range().await.unwrap();
assert_eq!(topics, all_topics);
for topic in &topics {
let value = manager.get(topic).await.unwrap().unwrap();
assert_eq!(value.pruned_entry_id, 0);
manager.update(topic, 1, Some(value.clone())).await.unwrap();
let new_value = manager.get(topic).await.unwrap().unwrap();
assert_eq!(new_value.pruned_entry_id, 1);
// Update twice, nothing changed
manager.update(topic, 1, Some(value.clone())).await.unwrap();
let new_value = manager.get(topic).await.unwrap().unwrap();
assert_eq!(new_value.pruned_entry_id, 1);
// Bad cas, emit error
let err = manager.update(topic, 3, Some(value)).await.unwrap_err();
assert_matches!(err, error::Error::Unexpected { .. });
}
let topics = manager.range().await.unwrap();
assert_eq!(topics, all_topics);
}
}

View File

@@ -224,7 +224,6 @@ impl TopicRegionManager {
Some((region_id, kafka.topic.as_str()))
}
Some(WalOptions::RaftEngine) => None,
Some(WalOptions::Noop) => None,
None => None,
},
)

View File

@@ -155,7 +155,7 @@ impl<'a> MySqlTemplateFactory<'a> {
table_name: table_name.to_string(),
create_table_statement: format!(
// Cannot be more than 3072 bytes in PRIMARY KEY
"CREATE TABLE IF NOT EXISTS `{table_name}`(k VARBINARY(3072) PRIMARY KEY, v BLOB);",
"CREATE TABLE IF NOT EXISTS {table_name}(k VARBINARY(3072) PRIMARY KEY, v BLOB);",
),
range_template: RangeTemplate {
point: format!("SELECT k, v FROM `{table_name}` WHERE k = ?"),

View File

@@ -40,7 +40,6 @@ pub mod peer;
pub mod poison_key;
pub mod range_stream;
pub mod region_keeper;
pub mod region_registry;
pub mod rpc;
pub mod sequence;
pub mod state_store;

View File

@@ -27,7 +27,6 @@ const TABLE_NAME_LOCK_PREFIX: &str = "__table_name_lock";
const FLOW_NAME_LOCK_PREFIX: &str = "__flow_name_lock";
const REGION_LOCK_PREFIX: &str = "__region_lock";
const FLOW_LOCK_PREFIX: &str = "__flow_lock";
const REMOTE_WAL_LOCK_PREFIX: &str = "__remote_wal_lock";
/// [CatalogLock] acquires the lock on the tenant level.
pub enum CatalogLock<'a> {
@@ -232,31 +231,6 @@ impl From<FlowLock> for StringKey {
}
}
/// [RemoteWalLock] acquires the lock on the remote wal topic level.
pub enum RemoteWalLock {
Read(String),
Write(String),
}
impl Display for RemoteWalLock {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let key = match self {
RemoteWalLock::Read(s) => s,
RemoteWalLock::Write(s) => s,
};
write!(f, "{}/{}", REMOTE_WAL_LOCK_PREFIX, key)
}
}
impl From<RemoteWalLock> for StringKey {
fn from(value: RemoteWalLock) -> Self {
match value {
RemoteWalLock::Write(_) => StringKey::Exclusive(value.to_string()),
RemoteWalLock::Read(_) => StringKey::Share(value.to_string()),
}
}
}
#[cfg(test)]
mod tests {
use common_procedure::StringKey;
@@ -334,16 +308,5 @@ mod tests {
string_key,
StringKey::Exclusive(format!("{}/{}", FLOW_LOCK_PREFIX, flow_id))
);
// The remote wal lock
let string_key: StringKey = RemoteWalLock::Read("foo".to_string()).into();
assert_eq!(
string_key,
StringKey::Share(format!("{}/{}", REMOTE_WAL_LOCK_PREFIX, "foo"))
);
let string_key: StringKey = RemoteWalLock::Write("foo".to_string()).into();
assert_eq!(
string_key,
StringKey::Exclusive(format!("{}/{}", REMOTE_WAL_LOCK_PREFIX, "foo"))
);
}
}

View File

@@ -12,13 +12,63 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{Display, Formatter};
use std::sync::Arc;
pub use api::v1::meta::Peer;
use api::v1::meta::Peer as PbPeer;
use serde::{Deserialize, Serialize};
use crate::error::Error;
use crate::{DatanodeId, FlownodeId};
#[derive(Debug, Default, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)]
pub struct Peer {
/// Node identifier. Unique in a cluster.
pub id: u64,
pub addr: String,
}
impl From<PbPeer> for Peer {
fn from(p: PbPeer) -> Self {
Self {
id: p.id,
addr: p.addr,
}
}
}
impl From<Peer> for PbPeer {
fn from(p: Peer) -> Self {
Self {
id: p.id,
addr: p.addr,
}
}
}
impl Peer {
pub fn new(id: u64, addr: impl Into<String>) -> Self {
Self {
id,
addr: addr.into(),
}
}
#[cfg(any(test, feature = "testing"))]
pub fn empty(id: u64) -> Self {
Self {
id,
addr: String::new(),
}
}
}
impl Display for Peer {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "peer-{}({})", self.id, self.addr)
}
}
/// can query peer given a node id
#[async_trait::async_trait]
pub trait PeerLookupService {

View File

@@ -1,186 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use common_telemetry::warn;
use store_api::storage::RegionId;
use crate::datanode::RegionManifestInfo;
/// Represents information about a leader region in the cluster.
/// Contains the datanode id where the leader is located,
/// and the current manifest version.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct LeaderRegion {
pub datanode_id: u64,
pub manifest: LeaderRegionManifestInfo,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum LeaderRegionManifestInfo {
Mito {
manifest_version: u64,
flushed_entry_id: u64,
},
Metric {
data_manifest_version: u64,
data_flushed_entry_id: u64,
metadata_manifest_version: u64,
metadata_flushed_entry_id: u64,
},
}
impl From<RegionManifestInfo> for LeaderRegionManifestInfo {
fn from(value: RegionManifestInfo) -> Self {
match value {
RegionManifestInfo::Mito {
manifest_version,
flushed_entry_id,
} => LeaderRegionManifestInfo::Mito {
manifest_version,
flushed_entry_id,
},
RegionManifestInfo::Metric {
data_manifest_version,
data_flushed_entry_id,
metadata_manifest_version,
metadata_flushed_entry_id,
} => LeaderRegionManifestInfo::Metric {
data_manifest_version,
data_flushed_entry_id,
metadata_manifest_version,
metadata_flushed_entry_id,
},
}
}
}
impl LeaderRegionManifestInfo {
/// Returns the manifest version of the leader region.
pub fn manifest_version(&self) -> u64 {
match self {
LeaderRegionManifestInfo::Mito {
manifest_version, ..
} => *manifest_version,
LeaderRegionManifestInfo::Metric {
data_manifest_version,
..
} => *data_manifest_version,
}
}
/// Returns the flushed entry id of the leader region.
pub fn flushed_entry_id(&self) -> u64 {
match self {
LeaderRegionManifestInfo::Mito {
flushed_entry_id, ..
} => *flushed_entry_id,
LeaderRegionManifestInfo::Metric {
data_flushed_entry_id,
..
} => *data_flushed_entry_id,
}
}
/// Returns the minimum flushed entry id of the leader region.
/// It is used to determine the minimum flushed entry id that can be pruned in remote wal.
pub fn min_flushed_entry_id(&self) -> u64 {
match self {
LeaderRegionManifestInfo::Mito {
flushed_entry_id, ..
} => *flushed_entry_id,
LeaderRegionManifestInfo::Metric {
data_flushed_entry_id,
metadata_flushed_entry_id,
..
} => (*data_flushed_entry_id).min(*metadata_flushed_entry_id),
}
}
}
pub type LeaderRegionRegistryRef = Arc<LeaderRegionRegistry>;
/// Registry that maintains a mapping of all leader regions in the cluster.
/// Tracks which datanode is hosting the leader for each region and the corresponding
/// manifest version.
#[derive(Default)]
pub struct LeaderRegionRegistry {
inner: RwLock<HashMap<RegionId, LeaderRegion>>,
}
impl LeaderRegionRegistry {
/// Creates a new empty leader region registry.
pub fn new() -> Self {
Self {
inner: RwLock::new(HashMap::new()),
}
}
/// Gets the leader region for the given region ids.
pub fn batch_get<I: Iterator<Item = RegionId>>(
&self,
region_ids: I,
) -> HashMap<RegionId, LeaderRegion> {
let inner = self.inner.read().unwrap();
region_ids
.into_iter()
.flat_map(|region_id| {
inner
.get(&region_id)
.map(|leader_region| (region_id, *leader_region))
})
.collect::<HashMap<_, _>>()
}
/// Puts the leader regions into the registry.
pub fn batch_put(&self, key_values: Vec<(RegionId, LeaderRegion)>) {
let mut inner = self.inner.write().unwrap();
for (region_id, leader_region) in key_values {
match inner.entry(region_id) {
Entry::Vacant(entry) => {
entry.insert(leader_region);
}
Entry::Occupied(mut entry) => {
let manifest_version = entry.get().manifest.manifest_version();
if manifest_version > leader_region.manifest.manifest_version() {
warn!(
"Received a leader region with a smaller manifest version than the existing one, ignore it. region: {}, existing_manifest_version: {}, new_manifest_version: {}",
region_id,
manifest_version,
leader_region.manifest.manifest_version()
);
} else {
entry.insert(leader_region);
}
}
}
}
}
pub fn batch_delete<I: Iterator<Item = RegionId>>(&self, region_ids: I) {
let mut inner = self.inner.write().unwrap();
for region_id in region_ids {
inner.remove(&region_id);
}
}
/// Resets the registry to an empty state.
pub fn reset(&self) {
let mut inner = self.inner.write().unwrap();
inner.clear();
}
}

View File

@@ -20,7 +20,6 @@ use api::v1::meta::{
ProcedureMeta as PbProcedureMeta, ProcedureStateResponse as PbProcedureStateResponse,
ProcedureStatus as PbProcedureStatus,
};
use common_error::ext::ErrorExt;
use common_procedure::{ProcedureId, ProcedureInfo, ProcedureState};
use snafu::ResultExt;
@@ -74,15 +73,15 @@ pub fn procedure_state_to_pb_state(state: &ProcedureState) -> (PbProcedureStatus
match state {
ProcedureState::Running => (PbProcedureStatus::Running, String::default()),
ProcedureState::Done { .. } => (PbProcedureStatus::Done, String::default()),
ProcedureState::Retrying { error } => (PbProcedureStatus::Retrying, error.output_msg()),
ProcedureState::Failed { error } => (PbProcedureStatus::Failed, error.output_msg()),
ProcedureState::Retrying { error } => (PbProcedureStatus::Retrying, error.to_string()),
ProcedureState::Failed { error } => (PbProcedureStatus::Failed, error.to_string()),
ProcedureState::PrepareRollback { error } => {
(PbProcedureStatus::PrepareRollback, error.output_msg())
(PbProcedureStatus::PrepareRollback, error.to_string())
}
ProcedureState::RollingBack { error } => {
(PbProcedureStatus::RollingBack, error.output_msg())
(PbProcedureStatus::RollingBack, error.to_string())
}
ProcedureState::Poisoned { error, .. } => (PbProcedureStatus::Poisoned, error.output_msg()),
ProcedureState::Poisoned { error, .. } => (PbProcedureStatus::Poisoned, error.to_string()),
}
}

View File

@@ -463,13 +463,13 @@ mod tests {
num_per_range: u32,
max_bytes: u32,
) {
let num_cases = rand::rng().random_range(1..=8);
let num_cases = rand::thread_rng().gen_range(1..=8);
common_telemetry::info!("num_cases: {}", num_cases);
let mut cases = Vec::with_capacity(num_cases);
for i in 0..num_cases {
let size = rand::rng().random_range(size_limit..=max_bytes);
let size = rand::thread_rng().gen_range(size_limit..=max_bytes);
let mut large_value = vec![0u8; size as usize];
rand::rng().fill_bytes(&mut large_value);
rand::thread_rng().fill_bytes(&mut large_value);
// Starts from `a`.
let prefix = format!("{}/", std::char::from_u32(97 + i as u32).unwrap());
@@ -527,8 +527,8 @@ mod tests {
#[tokio::test]
async fn test_meta_state_store_split_value() {
let size_limit = rand::rng().random_range(128..=512);
let page_size = rand::rng().random_range(1..10);
let size_limit = rand::thread_rng().gen_range(128..=512);
let page_size = rand::thread_rng().gen_range(1..10);
let kv_backend = Arc::new(MemoryKvBackend::new());
test_meta_state_store_split_value_with_size_limit(kv_backend, size_limit, page_size, 8192)
.await;
@@ -561,7 +561,7 @@ mod tests {
// However, some KvBackends, the `ChrootKvBackend`, will add the prefix to `key`;
// we don't know the exact size of the key.
let size_limit = 1536 * 1024 - key_size;
let page_size = rand::rng().random_range(1..10);
let page_size = rand::thread_rng().gen_range(1..10);
test_meta_state_store_split_value_with_size_limit(
kv_backend,
size_limit,

View File

@@ -35,7 +35,6 @@ use crate::node_manager::{
};
use crate::peer::{Peer, PeerLookupService};
use crate::region_keeper::MemoryRegionKeeper;
use crate::region_registry::LeaderRegionRegistry;
use crate::sequence::SequenceBuilder;
use crate::wal_options_allocator::WalOptionsAllocator;
use crate::{DatanodeId, FlownodeId};
@@ -178,7 +177,6 @@ pub fn new_ddl_context_with_kv_backend(
node_manager,
cache_invalidator: Arc::new(DummyCacheInvalidator),
memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
table_metadata_allocator,
table_metadata_manager,
flow_metadata_allocator,

View File

@@ -30,7 +30,7 @@ use crate::error::{EncodeWalOptionsSnafu, InvalidTopicNamePrefixSnafu, Result};
use crate::key::NAME_PATTERN_REGEX;
use crate::kv_backend::KvBackendRef;
use crate::leadership_notifier::LeadershipChangeListener;
pub use crate::wal_options_allocator::topic_creator::build_kafka_topic_creator;
use crate::wal_options_allocator::topic_creator::build_kafka_topic_creator;
use crate::wal_options_allocator::topic_pool::KafkaTopicPool;
/// Allocates wal options in region granularity.
@@ -53,12 +53,21 @@ impl WalOptionsAllocator {
}
}
/// Allocates a batch of wal options where each wal options goes to a region.
/// If skip_wal is true, the wal options will be set to Noop regardless of the allocator type.
pub fn alloc_batch(&self, num_regions: usize, skip_wal: bool) -> Result<Vec<WalOptions>> {
if skip_wal {
return Ok(vec![WalOptions::Noop; num_regions]);
/// Allocates a wal options for a region.
pub fn alloc(&self) -> Result<WalOptions> {
match self {
Self::RaftEngine => Ok(WalOptions::RaftEngine),
Self::Kafka(topic_manager) => {
let topic = topic_manager.select()?;
Ok(WalOptions::Kafka(KafkaWalOptions {
topic: topic.clone(),
}))
}
}
}
/// Allocates a batch of wal options where each wal options goes to a region.
pub fn alloc_batch(&self, num_regions: usize) -> Result<Vec<WalOptions>> {
match self {
WalOptionsAllocator::RaftEngine => Ok(vec![WalOptions::RaftEngine; num_regions]),
WalOptionsAllocator::Kafka(topic_manager) => {
@@ -121,10 +130,9 @@ pub async fn build_wal_options_allocator(
pub fn allocate_region_wal_options(
regions: Vec<RegionNumber>,
wal_options_allocator: &WalOptionsAllocator,
skip_wal: bool,
) -> Result<HashMap<RegionNumber, String>> {
let wal_options = wal_options_allocator
.alloc_batch(regions.len(), skip_wal)?
.alloc_batch(regions.len())?
.into_iter()
.map(|wal_options| {
serde_json::to_string(&wal_options).context(EncodeWalOptionsSnafu { wal_options })
@@ -169,7 +177,7 @@ mod tests {
let num_regions = 32;
let regions = (0..num_regions).collect::<Vec<_>>();
let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap();
let got = allocate_region_wal_options(regions.clone(), &allocator).unwrap();
let encoded_wal_options = serde_json::to_string(&WalOptions::RaftEngine).unwrap();
let expected = regions
@@ -229,7 +237,7 @@ mod tests {
let num_regions = 32;
let regions = (0..num_regions).collect::<Vec<_>>();
let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap();
let got = allocate_region_wal_options(regions.clone(), &allocator).unwrap();
// Check the allocated wal options contain the expected topics.
let expected = (0..num_regions)
@@ -245,18 +253,4 @@ mod tests {
})
.await;
}
#[tokio::test]
async fn test_allocator_with_skip_wal() {
let allocator = WalOptionsAllocator::RaftEngine;
allocator.start().await.unwrap();
let num_regions = 32;
let regions = (0..num_regions).collect::<Vec<_>>();
let got = allocate_region_wal_options(regions.clone(), &allocator, true).unwrap();
assert_eq!(got.len(), num_regions as usize);
for wal_options in got.values() {
assert_eq!(wal_options, &"{\"wal.provider\":\"noop\"}");
}
}
}

View File

@@ -39,7 +39,7 @@ impl RoundRobinTopicSelector {
// The cursor in the round-robin selector is not persisted which may break the round-robin strategy cross crashes.
// Introducing a shuffling strategy may help mitigate this issue.
pub fn with_shuffle() -> Self {
let offset = rand::rng().random_range(0..64);
let offset = rand::thread_rng().gen_range(0..64);
Self {
cursor: AtomicUsize::new(offset),
}

View File

@@ -12,16 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_telemetry::{error, info};
use common_wal::config::kafka::common::DEFAULT_BACKOFF_CONFIG;
use common_wal::config::kafka::MetasrvKafkaConfig;
use rskafka::client::error::Error as RsKafkaError;
use rskafka::client::error::ProtocolError::TopicAlreadyExists;
use rskafka::client::partition::{Compression, UnknownTopicHandling};
use rskafka::client::{Client, ClientBuilder};
use rskafka::record::Record;
use rskafka::BackoffConfig;
use snafu::ResultExt;
use crate::error::{
@@ -34,11 +32,9 @@ use crate::error::{
// The `DEFAULT_PARTITION` refers to the index of the partition.
const DEFAULT_PARTITION: i32 = 0;
type KafkaClientRef = Arc<Client>;
/// Creates topics in kafka.
pub struct KafkaTopicCreator {
client: KafkaClientRef,
client: Client,
/// The number of partitions per topic.
num_partitions: i32,
/// The replication factor of each topic.
@@ -48,10 +44,6 @@ pub struct KafkaTopicCreator {
}
impl KafkaTopicCreator {
pub fn client(&self) -> &KafkaClientRef {
&self.client
}
async fn create_topic(&self, topic: &String, client: &Client) -> Result<()> {
let controller = client
.controller_client()
@@ -135,10 +127,16 @@ impl KafkaTopicCreator {
pub async fn build_kafka_topic_creator(config: &MetasrvKafkaConfig) -> Result<KafkaTopicCreator> {
// Builds an kafka controller client for creating topics.
let backoff_config = BackoffConfig {
init_backoff: config.backoff.init,
max_backoff: config.backoff.max,
base: config.backoff.base as f64,
deadline: config.backoff.deadline,
};
let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints)
.await
.context(ResolveKafkaEndpointSnafu)?;
let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(DEFAULT_BACKOFF_CONFIG);
let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(backoff_config);
if let Some(sasl) = &config.connection.sasl {
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
};
@@ -153,7 +151,7 @@ pub async fn build_kafka_topic_creator(config: &MetasrvKafkaConfig) -> Result<Ka
})?;
Ok(KafkaTopicCreator {
client: Arc::new(client),
client,
num_partitions: config.kafka_topic.num_partitions,
replication_factor: config.kafka_topic.replication_factor,
create_topic_timeout: config.kafka_topic.create_topic_timeout.as_millis() as i32,

View File

@@ -65,13 +65,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Too many running procedures, max: {}", max_running_procedures))]
TooManyRunningProcedures {
max_running_procedures: usize,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to put state, key: '{key}'"))]
PutState {
key: String,
@@ -251,7 +244,6 @@ impl ErrorExt for Error {
| Error::WaitWatcher { .. }
| Error::RetryLater { .. }
| Error::RollbackProcedureRecovered { .. }
| Error::TooManyRunningProcedures { .. }
| Error::PoisonKeyNotDefined { .. } => StatusCode::Internal,
Error::RetryTimesExceeded { .. }

View File

@@ -15,8 +15,7 @@
mod runner;
mod rwlock;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet, VecDeque};
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant};
@@ -34,7 +33,7 @@ use self::rwlock::KeyRwLock;
use crate::error::{
self, DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu,
PoisonKeyNotDefinedSnafu, ProcedureNotFoundSnafu, Result, StartRemoveOutdatedMetaTaskSnafu,
StopRemoveOutdatedMetaTaskSnafu, TooManyRunningProceduresSnafu,
StopRemoveOutdatedMetaTaskSnafu,
};
use crate::local::runner::Runner;
use crate::procedure::{BoxedProcedureLoader, InitProcedureState, PoisonKeys, ProcedureInfo};
@@ -154,6 +153,7 @@ type ProcedureMetaRef = Arc<ProcedureMeta>;
/// Procedure loaded from store.
struct LoadedProcedure {
procedure: BoxedProcedure,
parent_id: Option<ProcedureId>,
step: u32,
}
@@ -163,7 +163,8 @@ pub(crate) struct ManagerContext {
loaders: Mutex<HashMap<String, BoxedProcedureLoader>>,
key_lock: KeyRwLock<String>,
procedures: RwLock<HashMap<ProcedureId, ProcedureMetaRef>>,
running_procedures: Mutex<HashSet<ProcedureId>>,
/// Messages loaded from the procedure store.
messages: Mutex<HashMap<ProcedureId, ProcedureMessage>>,
/// Ids and finished time of finished procedures.
finished_procedures: Mutex<VecDeque<(ProcedureId, Instant)>>,
/// Running flag.
@@ -208,7 +209,7 @@ impl ManagerContext {
key_lock: KeyRwLock::new(),
loaders: Mutex::new(HashMap::new()),
procedures: RwLock::new(HashMap::new()),
running_procedures: Mutex::new(HashSet::new()),
messages: Mutex::new(HashMap::new()),
finished_procedures: Mutex::new(VecDeque::new()),
running: Arc::new(AtomicBool::new(false)),
poison_manager,
@@ -240,27 +241,18 @@ impl ManagerContext {
procedures.contains_key(&procedure_id)
}
/// Returns the number of running procedures.
fn num_running_procedures(&self) -> usize {
self.running_procedures.lock().unwrap().len()
}
/// Try to insert the `procedure` to the context if there is no procedure
/// with same [ProcedureId].
///
/// Returns `false` if there is already a procedure using the same [ProcedureId].
fn try_insert_procedure(&self, meta: ProcedureMetaRef) -> bool {
let procedure_id = meta.id;
let mut procedures = self.procedures.write().unwrap();
match procedures.entry(procedure_id) {
Entry::Occupied(_) => return false,
Entry::Vacant(vacant_entry) => {
vacant_entry.insert(meta);
}
if procedures.contains_key(&meta.id) {
return false;
}
let mut running_procedures = self.running_procedures.lock().unwrap();
running_procedures.insert(procedure_id);
let old = procedures.insert(meta.id, meta);
debug_assert!(old.is_none());
true
}
@@ -303,6 +295,16 @@ impl ManagerContext {
}
}
/// Load procedure with specific `procedure_id` from cached [ProcedureMessage]s.
fn load_one_procedure(&self, procedure_id: ProcedureId) -> Option<LoadedProcedure> {
let message = {
let messages = self.messages.lock().unwrap();
messages.get(&procedure_id).cloned()?
};
self.load_one_procedure_from_message(procedure_id, &message)
}
/// Load procedure from specific [ProcedureMessage].
fn load_one_procedure_from_message(
&self,
@@ -330,6 +332,7 @@ impl ManagerContext {
Some(LoadedProcedure {
procedure,
parent_id: message.parent_id,
step: message.step,
})
}
@@ -378,19 +381,23 @@ impl ManagerContext {
}
}
/// Remove cached [ProcedureMessage] by ids.
fn remove_messages(&self, procedure_ids: &[ProcedureId]) {
let mut messages = self.messages.lock().unwrap();
for procedure_id in procedure_ids {
let _ = messages.remove(procedure_id);
}
}
/// Clean resources of finished procedures.
fn on_procedures_finish(&self, procedure_ids: &[ProcedureId]) {
self.remove_messages(procedure_ids);
// Since users need to query the procedure state, so we can't remove the
// meta of the procedure directly.
let now = Instant::now();
let mut finished_procedures = self.finished_procedures.lock().unwrap();
finished_procedures.extend(procedure_ids.iter().map(|id| (*id, now)));
// Remove the procedures from the running set.
let mut running_procedures = self.running_procedures.lock().unwrap();
for procedure_id in procedure_ids {
running_procedures.remove(procedure_id);
}
}
/// Remove metadata of outdated procedures.
@@ -434,7 +441,6 @@ pub struct ManagerConfig {
pub retry_delay: Duration,
pub remove_outdated_meta_task_interval: Duration,
pub remove_outdated_meta_ttl: Duration,
pub max_running_procedures: usize,
}
impl Default for ManagerConfig {
@@ -445,7 +451,6 @@ impl Default for ManagerConfig {
retry_delay: Duration::from_millis(500),
remove_outdated_meta_task_interval: Duration::from_secs(60 * 10),
remove_outdated_meta_ttl: META_TTL,
max_running_procedures: 128,
}
}
}
@@ -523,13 +528,6 @@ impl LocalManager {
let watcher = meta.state_receiver.clone();
ensure!(
self.manager_ctx.num_running_procedures() < self.config.max_running_procedures,
TooManyRunningProceduresSnafu {
max_running_procedures: self.config.max_running_procedures,
}
);
// Inserts meta into the manager before actually spawnd the runner.
ensure!(
self.manager_ctx.try_insert_procedure(meta),
@@ -1180,7 +1178,6 @@ mod tests {
retry_delay: Duration::from_millis(500),
remove_outdated_meta_task_interval: Duration::from_millis(1),
remove_outdated_meta_ttl: Duration::from_millis(1),
max_running_procedures: 128,
};
let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
@@ -1254,70 +1251,6 @@ mod tests {
.is_none());
}
#[tokio::test]
async fn test_too_many_running_procedures() {
let dir = create_temp_dir("too_many_running_procedures");
let config = ManagerConfig {
parent_path: "data/".to_string(),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
max_running_procedures: 1,
..Default::default()
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
manager.manager_ctx.set_running();
manager
.manager_ctx
.running_procedures
.lock()
.unwrap()
.insert(ProcedureId::random());
manager.start().await.unwrap();
// Submit a new procedure should fail.
let mut procedure = ProcedureToLoad::new("submit");
procedure.lock_key = LockKey::single_exclusive("test.submit");
let procedure_id = ProcedureId::random();
let err = manager
.submit(ProcedureWithId {
id: procedure_id,
procedure: Box::new(procedure),
})
.await
.unwrap_err();
assert!(matches!(err, Error::TooManyRunningProcedures { .. }));
manager
.manager_ctx
.running_procedures
.lock()
.unwrap()
.clear();
// Submit a new procedure should succeed.
let mut procedure = ProcedureToLoad::new("submit");
procedure.lock_key = LockKey::single_exclusive("test.submit");
assert!(manager
.submit(ProcedureWithId {
id: procedure_id,
procedure: Box::new(procedure),
})
.await
.is_ok());
assert!(manager
.procedure_state(procedure_id)
.await
.unwrap()
.is_some());
// Wait for the procedure done.
let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
watcher.changed().await.unwrap();
assert!(watcher.borrow().is_done());
}
#[derive(Debug)]
struct ProcedureToRecover {
content: String,

View File

@@ -207,7 +207,7 @@ impl Runner {
if let Some(d) = retry.next() {
let millis = d.as_millis() as u64;
// Add random noise to the retry delay to avoid retry storms.
let noise = rand::rng().random_range(0..(millis / 4) + 1);
let noise = rand::thread_rng().gen_range(0..(millis / 4) + 1);
let d = d.add(Duration::from_millis(noise));
self.wait_on_err(d, retry_times).await;
@@ -403,14 +403,24 @@ impl Runner {
&self,
procedure_id: ProcedureId,
procedure_state: ProcedureState,
procedure: BoxedProcedure,
mut procedure: BoxedProcedure,
) {
if self.manager_ctx.contains_procedure(procedure_id) {
// If the parent has already submitted this procedure, don't submit it again.
return;
}
let step = 0;
let mut step = 0;
if let Some(loaded_procedure) = self.manager_ctx.load_one_procedure(procedure_id) {
// Try to load procedure state from the message to avoid re-run the subprocedure
// from initial state.
assert_eq!(self.meta.id, loaded_procedure.parent_id.unwrap());
// Use the dumped procedure from the procedure store.
procedure = loaded_procedure.procedure;
// Update step number.
step = loaded_procedure.step;
}
let meta = Arc::new(ProcedureMeta::new(
procedure_id,

View File

@@ -29,8 +29,6 @@ pub struct ProcedureConfig {
pub retry_delay: Duration,
/// `None` stands for no limit.
pub max_metadata_value_size: Option<ReadableSize>,
/// Max running procedures.
pub max_running_procedures: usize,
}
impl Default for ProcedureConfig {
@@ -39,7 +37,6 @@ impl Default for ProcedureConfig {
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
max_metadata_value_size: None,
max_running_procedures: 128,
}
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{self, Display};
use std::fmt::Display;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
@@ -28,7 +28,7 @@ use datafusion::logical_expr::Expr;
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_plan::metrics::{BaselineMetrics, MetricValue};
use datafusion::physical_plan::{
accept, DisplayFormatType, ExecutionPlan, ExecutionPlanVisitor, PhysicalExpr,
accept, displayable, ExecutionPlan, ExecutionPlanVisitor, PhysicalExpr,
RecordBatchStream as DfRecordBatchStream,
};
use datafusion_common::arrow::error::ArrowError;
@@ -206,16 +206,13 @@ impl Stream for DfRecordBatchStreamAdapter {
}
/// DataFusion [SendableRecordBatchStream](DfSendableRecordBatchStream) -> Greptime [RecordBatchStream].
/// The reverse one is [DfRecordBatchStreamAdapter].
/// It can collect metrics from DataFusion execution plan.
/// The reverse one is [DfRecordBatchStreamAdapter]
pub struct RecordBatchStreamAdapter {
schema: SchemaRef,
stream: DfSendableRecordBatchStream,
metrics: Option<BaselineMetrics>,
/// Aggregated plan-level metrics. Resolved after an [ExecutionPlan] is finished.
metrics_2: Metrics,
/// Display plan and metrics in verbose mode.
explain_verbose: bool,
}
/// Json encoded metrics. Contains metric from a whole plan tree.
@@ -234,7 +231,6 @@ impl RecordBatchStreamAdapter {
stream,
metrics: None,
metrics_2: Metrics::Unavailable,
explain_verbose: false,
})
}
@@ -250,18 +246,12 @@ impl RecordBatchStreamAdapter {
stream,
metrics: Some(metrics),
metrics_2: Metrics::Unresolved(df_plan),
explain_verbose: false,
})
}
pub fn set_metrics2(&mut self, plan: Arc<dyn ExecutionPlan>) {
self.metrics_2 = Metrics::Unresolved(plan)
}
/// Set the verbose mode for displaying plan and metrics.
pub fn set_explain_verbose(&mut self, verbose: bool) {
self.explain_verbose = verbose;
}
}
impl RecordBatchStream for RecordBatchStreamAdapter {
@@ -306,7 +296,7 @@ impl Stream for RecordBatchStreamAdapter {
}
Poll::Ready(None) => {
if let Metrics::Unresolved(df_plan) = &self.metrics_2 {
let mut metric_collector = MetricCollector::new(self.explain_verbose);
let mut metric_collector = MetricCollector::default();
accept(df_plan.as_ref(), &mut metric_collector).unwrap();
self.metrics_2 = Metrics::Resolved(metric_collector.record_batch_metrics);
}
@@ -322,20 +312,10 @@ impl Stream for RecordBatchStreamAdapter {
}
/// An [ExecutionPlanVisitor] to collect metrics from a [ExecutionPlan].
#[derive(Default)]
pub struct MetricCollector {
current_level: usize,
pub record_batch_metrics: RecordBatchMetrics,
verbose: bool,
}
impl MetricCollector {
pub fn new(verbose: bool) -> Self {
Self {
current_level: 0,
record_batch_metrics: RecordBatchMetrics::default(),
verbose,
}
}
}
impl ExecutionPlanVisitor for MetricCollector {
@@ -359,7 +339,7 @@ impl ExecutionPlanVisitor for MetricCollector {
.sorted_for_display()
.timestamps_removed();
let mut plan_metric = PlanMetrics {
plan: one_line(plan, self.verbose).to_string(),
plan: displayable(plan).one_line().to_string(),
level: self.current_level,
metrics: Vec::with_capacity(metric.iter().size_hint().0),
};
@@ -391,29 +371,6 @@ impl ExecutionPlanVisitor for MetricCollector {
}
}
/// Returns a single-line summary of the root of the plan.
/// If the `verbose` flag is set, it will display detailed information about the plan.
fn one_line(plan: &dyn ExecutionPlan, verbose: bool) -> impl fmt::Display + '_ {
struct Wrapper<'a> {
plan: &'a dyn ExecutionPlan,
format_type: DisplayFormatType,
}
impl fmt::Display for Wrapper<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.plan.fmt_as(self.format_type, f)?;
writeln!(f)
}
}
let format_type = if verbose {
DisplayFormatType::Verbose
} else {
DisplayFormatType::Default
};
Wrapper { plan, format_type }
}
/// [`RecordBatchMetrics`] carrys metrics value
/// from datanode to frontend through gRPC
#[derive(serde::Serialize, serde::Deserialize, Default, Debug, Clone)]

View File

@@ -22,12 +22,10 @@ use datafusion::physical_plan::PhysicalExpr;
use datafusion_common::arrow::array::{ArrayRef, Datum, Scalar};
use datafusion_common::arrow::buffer::BooleanBuffer;
use datafusion_common::arrow::compute::kernels::cmp;
use datafusion_common::cast::{as_boolean_array, as_null_array, as_string_array};
use datafusion_common::cast::{as_boolean_array, as_null_array};
use datafusion_common::{internal_err, DataFusionError, ScalarValue};
use datatypes::arrow::array::{Array, BooleanArray, RecordBatch};
use datatypes::arrow::compute::filter_record_batch;
use datatypes::arrow::error::ArrowError;
use datatypes::compute::kernels::regexp;
use datatypes::compute::or_kleene;
use datatypes::vectors::VectorRef;
use snafu::ResultExt;
@@ -38,8 +36,7 @@ use crate::error::{ArrowComputeSnafu, Result, ToArrowScalarSnafu, UnsupportedOpe
/// - `col` `op` `literal`
/// - `literal` `op` `col`
///
/// And the `op` is one of `=`, `!=`, `>`, `>=`, `<`, `<=`,
/// or regex operators: `~`, `~*`, `!~`, `!~*`.
/// And the `op` is one of `=`, `!=`, `>`, `>=`, `<`, `<=`.
///
/// This struct contains normalized predicate expr. In the form of
/// `col` `op` `literal` where the `col` is provided from input.
@@ -89,11 +86,7 @@ impl SimpleFilterEvaluator {
| Operator::Lt
| Operator::LtEq
| Operator::Gt
| Operator::GtEq
| Operator::RegexMatch
| Operator::RegexIMatch
| Operator::RegexNotMatch
| Operator::RegexNotIMatch => {}
| Operator::GtEq => {}
Operator::Or => {
let lhs = Self::try_new(&binary.left)?;
let rhs = Self::try_new(&binary.right)?;
@@ -179,10 +172,6 @@ impl SimpleFilterEvaluator {
Operator::LtEq => cmp::lt_eq(input, &self.literal),
Operator::Gt => cmp::gt(input, &self.literal),
Operator::GtEq => cmp::gt_eq(input, &self.literal),
Operator::RegexMatch => self.regex_match(input, false, false),
Operator::RegexIMatch => self.regex_match(input, true, false),
Operator::RegexNotMatch => self.regex_match(input, false, true),
Operator::RegexNotIMatch => self.regex_match(input, true, true),
Operator::Or => {
// OR operator stands for OR-chained EQs (or INLIST in other words)
let mut result: BooleanArray = vec![false; input_len].into();
@@ -203,28 +192,6 @@ impl SimpleFilterEvaluator {
.context(ArrowComputeSnafu)
.map(|array| array.values().clone())
}
fn regex_match(
&self,
input: &impl Datum,
ignore_case: bool,
negative: bool,
) -> std::result::Result<BooleanArray, ArrowError> {
let flag = if ignore_case { Some("i") } else { None };
let array = input.get().0;
let string_array = as_string_array(array).map_err(|_| {
ArrowError::CastError(format!("Cannot cast {:?} to StringArray", array))
})?;
let literal_array = self.literal.clone().into_inner();
let regex_array = as_string_array(&literal_array).map_err(|_| {
ArrowError::CastError(format!("Cannot cast {:?} to StringArray", literal_array))
})?;
let mut result = regexp::regexp_is_match_scalar(string_array, regex_array.value(0), flag)?;
if negative {
result = datatypes::compute::not(&result)?;
}
Ok(result)
}
}
/// Evaluate the predicate on the input [RecordBatch], and return a new [RecordBatch].

View File

@@ -1,11 +0,0 @@
[package]
name = "common-session"
version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
strum.workspace = true

View File

@@ -1,45 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use strum::{AsRefStr, Display, EnumString};
/// Defines the read preference for frontend route operations,
/// determining whether to read from the region leader or follower.
#[derive(Debug, Clone, Copy, Default, EnumString, Display, AsRefStr, PartialEq, Eq)]
pub enum ReadPreference {
#[default]
// Reads all operations from the region leader. This is the default mode.
#[strum(serialize = "leader", to_string = "LEADER")]
Leader,
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use crate::ReadPreference;
#[test]
fn test_read_preference() {
assert_eq!(ReadPreference::Leader.to_string(), "LEADER");
let read_preference = ReadPreference::from_str("LEADER").unwrap();
assert_eq!(read_preference, ReadPreference::Leader);
let read_preference = ReadPreference::from_str("leader").unwrap();
assert_eq!(read_preference, ReadPreference::Leader);
ReadPreference::from_str("follower").unwrap_err();
}
}

View File

@@ -111,7 +111,7 @@ impl Eq for LoggingOptions {}
impl Default for LoggingOptions {
fn default() -> Self {
Self {
dir: "./greptimedb_data/logs".to_string(),
dir: "/tmp/greptimedb/logs".to_string(),
level: None,
log_format: LogFormat::Text,
enable_otlp_tracing: false,

View File

@@ -22,6 +22,6 @@ static PORTS: OnceCell<AtomicUsize> = OnceCell::new();
/// Return a unique port(in runtime) for test
pub fn get_port() -> usize {
PORTS
.get_or_init(|| AtomicUsize::new(rand::rng().random_range(13000..13800)))
.get_or_init(|| AtomicUsize::new(rand::thread_rng().gen_range(13000..13800)))
.fetch_add(1, Ordering::Relaxed)
}

View File

@@ -715,10 +715,10 @@ mod tests {
TimeUnit::Microsecond,
TimeUnit::Nanosecond,
];
let mut rng = rand::rng();
let unit_idx: usize = rng.random_range(0..4);
let mut rng = rand::thread_rng();
let unit_idx: usize = rng.gen_range(0..4);
let unit = units[unit_idx];
let value: i64 = rng.random();
let value: i64 = rng.gen();
Timestamp::new(value, unit)
}
@@ -745,8 +745,8 @@ mod tests {
/// Generate timestamp less than or equal to `threshold`
fn gen_ts_le(threshold: &Timestamp) -> Timestamp {
let mut rng = rand::rng();
let timestamp = rng.random_range(i64::MIN..=threshold.value);
let mut rng = rand::thread_rng();
let timestamp = rng.gen_range(i64::MIN..=threshold.value);
Timestamp::new(timestamp, threshold.unit)
}

View File

@@ -51,6 +51,7 @@ impl From<DatanodeWalConfig> for MetasrvWalConfig {
DatanodeWalConfig::RaftEngine(_) => Self::RaftEngine,
DatanodeWalConfig::Kafka(config) => Self::Kafka(MetasrvKafkaConfig {
connection: config.connection,
backoff: config.backoff,
kafka_topic: config.kafka_topic,
auto_create_topics: config.auto_create_topics,
}),
@@ -64,6 +65,7 @@ impl From<MetasrvWalConfig> for DatanodeWalConfig {
MetasrvWalConfig::RaftEngine => Self::RaftEngine(RaftEngineConfig::default()),
MetasrvWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig {
connection: config.connection,
backoff: config.backoff,
kafka_topic: config.kafka_topic,
..Default::default()
}),
@@ -82,6 +84,7 @@ mod tests {
use tests::kafka::common::KafkaTopicConfig;
use super::*;
use crate::config::kafka::common::BackoffConfig;
use crate::config::{DatanodeKafkaConfig, MetasrvKafkaConfig};
use crate::TopicSelectorType;
@@ -172,6 +175,12 @@ mod tests {
client_key_path: None,
}),
},
backoff: BackoffConfig {
init: Duration::from_millis(500),
max: Duration::from_secs(10),
base: 2,
deadline: Some(Duration::from_secs(60 * 5)),
},
kafka_topic: KafkaTopicConfig {
num_topics: 32,
selector_type: TopicSelectorType::RoundRobin,
@@ -203,6 +212,12 @@ mod tests {
},
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig {
init: Duration::from_millis(500),
max: Duration::from_secs(10),
base: 2,
deadline: Some(Duration::from_secs(60 * 5)),
},
kafka_topic: KafkaTopicConfig {
num_topics: 32,
selector_type: TopicSelectorType::RoundRobin,

View File

@@ -17,22 +17,44 @@ use std::sync::Arc;
use std::time::Duration;
use rskafka::client::{Credentials, SaslConfig};
use rskafka::BackoffConfig;
use rustls::{ClientConfig, RootCertStore};
use serde::{Deserialize, Serialize};
use serde_with::with_prefix;
use snafu::{OptionExt, ResultExt};
/// The default backoff config for kafka client.
pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig {
init_backoff: Duration::from_millis(100),
max_backoff: Duration::from_secs(10),
base: 2.0,
deadline: Some(Duration::from_secs(120)),
};
use crate::error::{self, Result};
use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX};
with_prefix!(pub backoff_prefix "backoff_");
/// Backoff configurations for kafka client.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct BackoffConfig {
/// The initial backoff delay.
#[serde(with = "humantime_serde")]
pub init: Duration,
/// The maximum backoff delay.
#[serde(with = "humantime_serde")]
pub max: Duration,
/// The exponential backoff rate, i.e. next backoff = base * current backoff.
pub base: u32,
/// The deadline of retries. `None` stands for no deadline.
#[serde(with = "humantime_serde")]
pub deadline: Option<Duration>,
}
impl Default for BackoffConfig {
fn default() -> Self {
Self {
init: Duration::from_millis(500),
max: Duration::from_secs(10),
base: 2,
deadline: Some(Duration::from_secs(60 * 5)), // 5 mins
}
}
}
/// The SASL configurations for kafka client.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct KafkaClientSasl {

View File

@@ -18,7 +18,7 @@ use common_base::readable_size::ReadableSize;
use serde::{Deserialize, Serialize};
use super::common::KafkaConnectionConfig;
use crate::config::kafka::common::KafkaTopicConfig;
use crate::config::kafka::common::{backoff_prefix, BackoffConfig, KafkaTopicConfig};
/// Kafka wal configurations for datanode.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
@@ -34,6 +34,9 @@ pub struct DatanodeKafkaConfig {
/// The consumer wait timeout.
#[serde(with = "humantime_serde")]
pub consumer_wait_timeout: Duration,
/// The backoff config.
#[serde(flatten, with = "backoff_prefix")]
pub backoff: BackoffConfig,
/// The kafka topic config.
#[serde(flatten)]
pub kafka_topic: KafkaTopicConfig,
@@ -54,6 +57,7 @@ impl Default for DatanodeKafkaConfig {
// Warning: Kafka has a default limit of 1MB per message in a topic.
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig::default(),
kafka_topic: KafkaTopicConfig::default(),
auto_create_topics: true,
create_index: true,

View File

@@ -15,7 +15,7 @@
use serde::{Deserialize, Serialize};
use super::common::KafkaConnectionConfig;
use crate::config::kafka::common::KafkaTopicConfig;
use crate::config::kafka::common::{backoff_prefix, BackoffConfig, KafkaTopicConfig};
/// Kafka wal configurations for metasrv.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
@@ -24,6 +24,9 @@ pub struct MetasrvKafkaConfig {
/// The kafka connection config.
#[serde(flatten)]
pub connection: KafkaConnectionConfig,
/// The backoff config.
#[serde(flatten, with = "backoff_prefix")]
pub backoff: BackoffConfig,
/// The kafka config.
#[serde(flatten)]
pub kafka_topic: KafkaTopicConfig,
@@ -35,6 +38,7 @@ impl Default for MetasrvKafkaConfig {
fn default() -> Self {
Self {
connection: Default::default(),
backoff: Default::default(),
kafka_topic: Default::default(),
auto_create_topics: true,
}

View File

@@ -33,7 +33,6 @@ pub enum WalOptions {
RaftEngine,
#[serde(with = "kafka_prefix")]
Kafka(KafkaWalOptions),
Noop,
}
with_prefix!(kafka_prefix "wal.kafka.");
@@ -63,14 +62,5 @@ mod tests {
let decoded: WalOptions = serde_json::from_str(&encoded).unwrap();
assert_eq!(decoded, wal_options);
// Test serde noop wal options.
let wal_options = WalOptions::Noop;
let encoded = serde_json::to_string(&wal_options).unwrap();
let expected = r#"{"wal.provider":"noop"}"#;
assert_eq!(&encoded, expected);
let decoded: WalOptions = serde_json::from_str(&encoded).unwrap();
assert_eq!(decoded, wal_options);
}
}

View File

@@ -58,24 +58,17 @@ pub struct RegionAliveKeeper {
/// non-decreasing). The heartbeat requests will carry the duration since this epoch, and the
/// duration acts like an "invariant point" for region's keep alive lease.
epoch: Instant,
countdown_task_handler_ext: Option<CountdownTaskHandlerExtRef>,
}
impl RegionAliveKeeper {
/// Returns an empty [RegionAliveKeeper].
pub fn new(
region_server: RegionServer,
countdown_task_handler_ext: Option<CountdownTaskHandlerExtRef>,
heartbeat_interval_millis: u64,
) -> Self {
pub fn new(region_server: RegionServer, heartbeat_interval_millis: u64) -> Self {
Self {
region_server,
tasks: Arc::new(Mutex::new(HashMap::new())),
heartbeat_interval_millis,
started: Arc::new(AtomicBool::new(false)),
epoch: Instant::now(),
countdown_task_handler_ext,
}
}
@@ -92,7 +85,6 @@ impl RegionAliveKeeper {
let handle = Arc::new(CountdownTaskHandle::new(
self.region_server.clone(),
self.countdown_task_handler_ext.clone(),
region_id,
));
@@ -122,9 +114,7 @@ impl RegionAliveKeeper {
for region in regions {
let (role, region_id) = (region.role().into(), RegionId::from(region.region_id));
if let Some(handle) = self.find_handle(region_id).await {
handle
.reset_deadline(role, deadline, region.extensions.clone())
.await;
handle.reset_deadline(role, deadline).await;
} else {
warn!(
"Trying to renew the lease for region {region_id}, the keeper handler is not found!"
@@ -275,28 +265,13 @@ enum CountdownCommand {
/// 4 * `heartbeat_interval_millis`
Start(u64),
/// Reset countdown deadline to the given instance.
/// (NextRole, Deadline, ExtensionInfo)
Reset((RegionRole, Instant, HashMap<String, Vec<u8>>)),
/// (NextRole, Deadline)
Reset((RegionRole, Instant)),
/// Returns the current deadline of the countdown task.
#[cfg(test)]
Deadline(oneshot::Sender<Instant>),
}
pub type CountdownTaskHandlerExtRef = Arc<dyn CountdownTaskHandlerExt>;
/// Extension trait for [CountdownTaskHandlerExt] to reset deadline of a region.
#[async_trait]
pub trait CountdownTaskHandlerExt: Send + Sync {
async fn reset_deadline(
&self,
region_server: &RegionServer,
region_id: RegionId,
role: RegionRole,
deadline: Instant,
extension_info: HashMap<String, Vec<u8>>,
);
}
struct CountdownTaskHandle {
tx: mpsc::Sender<CountdownCommand>,
handler: JoinHandle<()>,
@@ -305,16 +280,11 @@ struct CountdownTaskHandle {
impl CountdownTaskHandle {
/// Creates a new [CountdownTaskHandle] and starts the countdown task.
fn new(
region_server: RegionServer,
handler_ext: Option<CountdownTaskHandlerExtRef>,
region_id: RegionId,
) -> Self {
fn new(region_server: RegionServer, region_id: RegionId) -> Self {
let (tx, rx) = mpsc::channel(1024);
let mut countdown_task = CountdownTask {
region_server,
handler_ext,
region_id,
rx,
};
@@ -353,15 +323,10 @@ impl CountdownTaskHandle {
None
}
async fn reset_deadline(
&self,
role: RegionRole,
deadline: Instant,
extension_info: HashMap<String, Vec<u8>>,
) {
async fn reset_deadline(&self, role: RegionRole, deadline: Instant) {
if let Err(e) = self
.tx
.send(CountdownCommand::Reset((role, deadline, extension_info)))
.send(CountdownCommand::Reset((role, deadline)))
.await
{
warn!(
@@ -385,7 +350,6 @@ impl Drop for CountdownTaskHandle {
struct CountdownTask {
region_server: RegionServer,
region_id: RegionId,
handler_ext: Option<CountdownTaskHandlerExtRef>,
rx: mpsc::Receiver<CountdownCommand>,
}
@@ -415,19 +379,8 @@ impl CountdownTask {
started = true;
}
},
Some(CountdownCommand::Reset((role, deadline, extension_info))) => {
if let Err(err) = self.region_server.set_region_role(self.region_id, role) {
error!(err; "Failed to set region role to {role} for region {region_id}");
}
if let Some(ext_handler) = self.handler_ext.as_ref() {
ext_handler.reset_deadline(
&self.region_server,
self.region_id,
role,
deadline,
extension_info,
).await;
}
Some(CountdownCommand::Reset((role, deadline))) => {
let _ = self.region_server.set_region_role(self.region_id, role);
trace!(
"Reset deadline of region {region_id} to approximately {} seconds later.",
(deadline - Instant::now()).as_secs_f32(),
@@ -449,9 +402,7 @@ impl CountdownTask {
}
() = &mut countdown => {
warn!("The region {region_id} lease is expired, convert region to follower.");
if let Err(err) = self.region_server.set_region_role(self.region_id, RegionRole::Follower) {
error!(err; "Failed to set region role to follower for region {region_id}");
}
let _ = self.region_server.set_region_role(self.region_id, RegionRole::Follower);
// resets the countdown.
let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);
countdown.as_mut().reset(far_future);
@@ -480,7 +431,7 @@ mod test {
let engine = Arc::new(engine);
region_server.register_engine(engine.clone());
let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server.clone(), None, 100));
let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server.clone(), 100));
let region_id = RegionId::new(1024, 1);
let builder = CreateRequestBuilder::new();
@@ -542,8 +493,7 @@ mod test {
async fn countdown_task() {
let region_server = mock_region_server();
let countdown_handle =
CountdownTaskHandle::new(region_server, None, RegionId::new(9999, 2));
let countdown_handle = CountdownTaskHandle::new(region_server, RegionId::new(9999, 2));
// If countdown task is not started, its deadline is set to far future.
assert!(
@@ -573,7 +523,6 @@ mod test {
.reset_deadline(
RegionRole::Leader,
Instant::now() + Duration::from_millis(heartbeat_interval_millis * 5),
HashMap::new(),
)
.await;
assert!(

View File

@@ -31,11 +31,12 @@ use servers::export_metrics::ExportMetricsOption;
use servers::grpc::GrpcOptions;
use servers::heartbeat_options::HeartbeatOptions;
use servers::http::HttpOptions;
use servers::Mode;
pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
/// Default data home in file storage
const DEFAULT_DATA_HOME: &str = "./greptimedb_data";
const DEFAULT_DATA_HOME: &str = "/tmp/greptimedb";
/// Object storage config
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
@@ -358,6 +359,7 @@ impl Default for ObjectStoreConfig {
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(default)]
pub struct DatanodeOptions {
pub mode: Mode,
pub node_id: Option<u64>,
pub require_lease_before_startup: bool,
pub init_regions_in_background: bool,
@@ -393,6 +395,7 @@ impl Default for DatanodeOptions {
#[allow(deprecated)]
fn default() -> Self {
Self {
mode: Mode::Standalone,
node_id: None,
require_lease_before_startup: false,
init_regions_in_background: false,

View File

@@ -157,7 +157,6 @@ impl Datanode {
pub struct DatanodeBuilder {
opts: DatanodeOptions,
mode: Mode,
plugins: Plugins,
meta_client: Option<MetaClientRef>,
kv_backend: Option<KvBackendRef>,
@@ -167,10 +166,9 @@ pub struct DatanodeBuilder {
impl DatanodeBuilder {
/// `kv_backend` is optional. If absent, the builder will try to build one
/// by using the given `opts`
pub fn new(opts: DatanodeOptions, plugins: Plugins, mode: Mode) -> Self {
pub fn new(opts: DatanodeOptions, plugins: Plugins) -> Self {
Self {
opts,
mode,
plugins,
meta_client: None,
kv_backend: None,
@@ -200,7 +198,7 @@ impl DatanodeBuilder {
}
pub async fn build(mut self) -> Result<Datanode> {
let mode = &self.mode;
let mode = &self.opts.mode;
let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
let meta_client = self.meta_client.take();
@@ -265,7 +263,6 @@ impl DatanodeBuilder {
region_server.clone(),
meta_client,
cache_registry,
self.plugins.clone(),
)
.await?,
)
@@ -632,7 +629,6 @@ mod tests {
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use mito2::engine::MITO_ENGINE_NAME;
use servers::Mode;
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;
@@ -678,7 +674,6 @@ mod tests {
..Default::default()
},
Plugins::default(),
Mode::Standalone,
)
.with_cache_registry(layered_cache_registry);

View File

@@ -18,7 +18,6 @@ use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
use common_base::Plugins;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::datanode::REGION_STATISTIC_KEY;
use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
@@ -38,7 +37,7 @@ use tokio::sync::{mpsc, Notify};
use tokio::time::Instant;
use self::handler::RegionHeartbeatResponseHandler;
use crate::alive_keeper::{CountdownTaskHandlerExtRef, RegionAliveKeeper};
use crate::alive_keeper::RegionAliveKeeper;
use crate::config::DatanodeOptions;
use crate::error::{self, MetaClientInitSnafu, Result};
use crate::event_listener::RegionServerEventReceiver;
@@ -74,12 +73,9 @@ impl HeartbeatTask {
region_server: RegionServer,
meta_client: MetaClientRef,
cache_invalidator: CacheInvalidatorRef,
plugins: Plugins,
) -> Result<Self> {
let countdown_task_handler_ext = plugins.get::<CountdownTaskHandlerExtRef>();
let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
region_server.clone(),
countdown_task_handler_ext,
opts.heartbeat.interval.as_millis() as u64,
));
let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![

View File

@@ -26,7 +26,6 @@ use store_api::storage::RegionId;
mod close_region;
mod downgrade_region;
mod flush_region;
mod open_region;
mod upgrade_region;
@@ -43,7 +42,7 @@ pub struct RegionHeartbeatResponseHandler {
/// Handler of the instruction.
pub type InstructionHandler =
Box<dyn FnOnce(HandlerContext) -> BoxFuture<'static, Option<InstructionReply>> + Send>;
Box<dyn FnOnce(HandlerContext) -> BoxFuture<'static, InstructionReply> + Send>;
#[derive(Clone)]
pub struct HandlerContext {
@@ -95,9 +94,6 @@ impl RegionHeartbeatResponseHandler {
handler_context.handle_upgrade_region_instruction(upgrade_region)
})),
Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
Instruction::FlushRegion(flush_regions) => Ok(Box::new(move |handler_context| {
handler_context.handle_flush_region_instruction(flush_regions)
})),
}
}
}
@@ -133,10 +129,8 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
})
.await;
if let Some(reply) = reply {
if let Err(e) = mailbox.send((meta, reply)).await {
error!(e; "Failed to send reply to mailbox");
}
if let Err(e) = mailbox.send((meta, reply)).await {
error!(e; "Failed to send reply to mailbox");
}
});

View File

@@ -26,28 +26,28 @@ impl HandlerContext {
pub(crate) fn handle_close_region_instruction(
self,
region_ident: RegionIdent,
) -> BoxFuture<'static, Option<InstructionReply>> {
) -> BoxFuture<'static, InstructionReply> {
Box::pin(async move {
let region_id = Self::region_ident_to_region_id(&region_ident);
let request = RegionRequest::Close(RegionCloseRequest {});
let result = self.region_server.handle_request(region_id, request).await;
match result {
Ok(_) => Some(InstructionReply::CloseRegion(SimpleReply {
Ok(_) => InstructionReply::CloseRegion(SimpleReply {
result: true,
error: None,
})),
}),
Err(error::Error::RegionNotFound { .. }) => {
warn!("Received a close region instruction from meta, but target region:{region_id} is not found.");
Some(InstructionReply::CloseRegion(SimpleReply {
InstructionReply::CloseRegion(SimpleReply {
result: true,
error: None,
}))
})
}
Err(err) => Some(InstructionReply::CloseRegion(SimpleReply {
Err(err) => InstructionReply::CloseRegion(SimpleReply {
result: false,
error: Some(format!("{err:?}")),
})),
}),
}
})
}

View File

@@ -24,34 +24,31 @@ use crate::heartbeat::handler::HandlerContext;
use crate::heartbeat::task_tracker::WaitResult;
impl HandlerContext {
async fn downgrade_to_follower_gracefully(
&self,
region_id: RegionId,
) -> Option<InstructionReply> {
async fn downgrade_to_follower_gracefully(&self, region_id: RegionId) -> InstructionReply {
match self
.region_server
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::Follower)
.await
{
Ok(SetRegionRoleStateResponse::Success { last_entry_id }) => {
Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id,
exists: true,
error: None,
}))
})
}
Ok(SetRegionRoleStateResponse::NotFound) => {
Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: false,
error: None,
}))
})
}
Err(err) => Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
Err(err) => InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: true,
error: Some(format!("{err:?}")),
})),
}),
}
}
@@ -62,15 +59,15 @@ impl HandlerContext {
flush_timeout,
reject_write,
}: DowngradeRegion,
) -> BoxFuture<'static, Option<InstructionReply>> {
) -> BoxFuture<'static, InstructionReply> {
Box::pin(async move {
let Some(writable) = self.region_server.is_region_leader(region_id) else {
warn!("Region: {region_id} is not found");
return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
return InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: false,
error: None,
}));
});
};
let region_server_moved = self.region_server.clone();
@@ -102,19 +99,19 @@ impl HandlerContext {
Ok(SetRegionRoleStateResponse::Success { .. }) => {}
Ok(SetRegionRoleStateResponse::NotFound) => {
warn!("Region: {region_id} is not found");
return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
return InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: false,
error: None,
}));
});
}
Err(err) => {
warn!(err; "Failed to convert region to downgrading leader");
return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
return InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: true,
error: Some(format!("{err:?}")),
}));
});
}
}
}
@@ -147,20 +144,18 @@ impl HandlerContext {
let result = self.catchup_tasks.wait(&mut watcher, flush_timeout).await;
match result {
WaitResult::Timeout => {
Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: true,
error: Some(format!("Flush region: {region_id} is timeout")),
}))
}
WaitResult::Timeout => InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: true,
error: Some(format!("Flush region: {region_id} is timeout")),
}),
WaitResult::Finish(Ok(_)) => self.downgrade_to_follower_gracefully(region_id).await,
WaitResult::Finish(Err(err)) => {
Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: true,
error: Some(format!("{err:?}")),
}))
})
}
}
})
@@ -201,9 +196,9 @@ mod tests {
reject_write: false,
})
.await;
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
if let InstructionReply::DowngradeRegion(reply) = reply {
assert!(!reply.exists);
assert!(reply.error.is_none());
assert!(reply.last_entry_id.is_none());
@@ -243,9 +238,9 @@ mod tests {
reject_write: false,
})
.await;
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
if let InstructionReply::DowngradeRegion(reply) = reply {
assert!(reply.exists);
assert!(reply.error.is_none());
assert_eq!(reply.last_entry_id.unwrap(), 1024);
@@ -277,9 +272,9 @@ mod tests {
reject_write: false,
})
.await;
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
if let InstructionReply::DowngradeRegion(reply) = reply {
assert!(reply.exists);
assert!(reply.error.unwrap().contains("timeout"));
assert!(reply.last_entry_id.is_none());
@@ -315,8 +310,8 @@ mod tests {
reject_write: false,
})
.await;
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
if let InstructionReply::DowngradeRegion(reply) = reply {
assert!(reply.exists);
assert!(reply.error.unwrap().contains("timeout"));
assert!(reply.last_entry_id.is_none());
@@ -330,11 +325,11 @@ mod tests {
reject_write: false,
})
.await;
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
// Must less than 300 ms.
assert!(timer.elapsed().as_millis() < 300);
if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
if let InstructionReply::DowngradeRegion(reply) = reply {
assert!(reply.exists);
assert!(reply.error.is_none());
assert_eq!(reply.last_entry_id.unwrap(), 1024);
@@ -376,8 +371,8 @@ mod tests {
reject_write: false,
})
.await;
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
if let InstructionReply::DowngradeRegion(reply) = reply {
assert!(reply.exists);
assert!(reply.error.unwrap().contains("timeout"));
assert!(reply.last_entry_id.is_none());
@@ -391,11 +386,11 @@ mod tests {
reject_write: false,
})
.await;
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
// Must less than 300 ms.
assert!(timer.elapsed().as_millis() < 300);
if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
if let InstructionReply::DowngradeRegion(reply) = reply {
assert!(reply.exists);
assert!(reply.error.unwrap().contains("flush failed"));
assert!(reply.last_entry_id.is_none());
@@ -422,8 +417,8 @@ mod tests {
reject_write: false,
})
.await;
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
if let InstructionReply::DowngradeRegion(reply) = reply {
assert!(!reply.exists);
assert!(reply.error.is_none());
assert!(reply.last_entry_id.is_none());
@@ -454,8 +449,8 @@ mod tests {
reject_write: false,
})
.await;
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
if let InstructionReply::DowngradeRegion(reply) = reply {
assert!(reply.exists);
assert!(reply
.error

View File

@@ -1,104 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::instruction::{FlushRegions, InstructionReply};
use common_telemetry::warn;
use futures_util::future::BoxFuture;
use store_api::region_request::{RegionFlushRequest, RegionRequest};
use crate::error;
use crate::heartbeat::handler::HandlerContext;
impl HandlerContext {
pub(crate) fn handle_flush_region_instruction(
self,
flush_regions: FlushRegions,
) -> BoxFuture<'static, Option<InstructionReply>> {
Box::pin(async move {
for region_id in flush_regions.region_ids {
let request = RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
});
let result = self.region_server.handle_request(region_id, request).await;
match result {
Ok(_) => {}
Err(error::Error::RegionNotFound { .. }) => {
warn!("Received a flush region instruction from meta, but target region: {region_id} is not found.");
}
Err(err) => {
warn!(
"Failed to flush region: {region_id}, error: {err}",
region_id = region_id,
err = err,
);
}
}
}
None
})
}
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, RwLock};
use common_meta::instruction::FlushRegions;
use mito2::engine::MITO_ENGINE_NAME;
use store_api::storage::RegionId;
use super::*;
use crate::tests::{mock_region_server, MockRegionEngine};
#[tokio::test]
async fn test_handle_flush_region_instruction() {
let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
let mock_region_server = mock_region_server();
let region_ids = (0..16).map(|i| RegionId::new(1024, i)).collect::<Vec<_>>();
for region_id in &region_ids {
let flushed_region_ids_ref = flushed_region_ids.clone();
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
region_engine.handle_request_mock_fn =
Some(Box::new(move |region_id, _request| {
flushed_region_ids_ref.write().unwrap().push(region_id);
Ok(0)
}))
});
mock_region_server.register_test_region(*region_id, mock_engine);
}
let handler_context = HandlerContext::new_for_test(mock_region_server);
let reply = handler_context
.clone()
.handle_flush_region_instruction(FlushRegions {
region_ids: region_ids.clone(),
})
.await;
assert!(reply.is_none());
assert_eq!(*flushed_region_ids.read().unwrap(), region_ids);
flushed_region_ids.write().unwrap().clear();
let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::<Vec<_>>();
let reply = handler_context
.handle_flush_region_instruction(FlushRegions {
region_ids: not_found_region_ids.clone(),
})
.await;
assert!(reply.is_none());
assert!(flushed_region_ids.read().unwrap().is_empty());
}
}

View File

@@ -30,7 +30,7 @@ impl HandlerContext {
region_wal_options,
skip_wal_replay,
}: OpenRegion,
) -> BoxFuture<'static, Option<InstructionReply>> {
) -> BoxFuture<'static, InstructionReply> {
Box::pin(async move {
let region_id = Self::region_ident_to_region_id(&region_ident);
prepare_wal_options(&mut region_options, region_id, &region_wal_options);
@@ -43,10 +43,10 @@ impl HandlerContext {
let result = self.region_server.handle_request(region_id, request).await;
let success = result.is_ok();
let error = result.as_ref().map_err(|e| format!("{e:?}")).err();
Some(InstructionReply::OpenRegion(SimpleReply {
InstructionReply::OpenRegion(SimpleReply {
result: success,
error,
}))
})
})
}
}

View File

@@ -29,22 +29,22 @@ impl HandlerContext {
replay_timeout,
location_id,
}: UpgradeRegion,
) -> BoxFuture<'static, Option<InstructionReply>> {
) -> BoxFuture<'static, InstructionReply> {
Box::pin(async move {
let Some(writable) = self.region_server.is_region_leader(region_id) else {
return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
return InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: false,
exists: false,
error: None,
}));
});
};
if writable {
return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
return InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: true,
exists: true,
error: None,
}));
});
}
let region_server_moved = self.region_server.clone();
@@ -79,11 +79,11 @@ impl HandlerContext {
// Returns immediately
let Some(replay_timeout) = replay_timeout else {
return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
return InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: false,
exists: true,
error: None,
}));
});
};
// We don't care that it returns a newly registered or running task.
@@ -91,24 +91,22 @@ impl HandlerContext {
let result = self.catchup_tasks.wait(&mut watcher, replay_timeout).await;
match result {
WaitResult::Timeout => Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
WaitResult::Timeout => InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: false,
exists: true,
error: None,
})),
WaitResult::Finish(Ok(_)) => {
Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: true,
exists: true,
error: None,
}))
}
}),
WaitResult::Finish(Ok(_)) => InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: true,
exists: true,
error: None,
}),
WaitResult::Finish(Err(err)) => {
Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: false,
exists: true,
error: Some(format!("{err:?}")),
}))
})
}
}
})
@@ -151,9 +149,9 @@ mod tests {
location_id: None,
})
.await;
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
if let InstructionReply::UpgradeRegion(reply) = reply {
assert!(!reply.exists);
assert!(reply.error.is_none());
}
@@ -189,9 +187,9 @@ mod tests {
location_id: None,
})
.await;
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
if let InstructionReply::UpgradeRegion(reply) = reply {
assert!(reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none());
@@ -228,9 +226,9 @@ mod tests {
location_id: None,
})
.await;
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
if let InstructionReply::UpgradeRegion(reply) = reply {
assert!(!reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none());
@@ -270,9 +268,9 @@ mod tests {
location_id: None,
})
.await;
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
if let InstructionReply::UpgradeRegion(reply) = reply {
assert!(!reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none());
@@ -288,11 +286,11 @@ mod tests {
location_id: None,
})
.await;
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
// Must less than 300 ms.
assert!(timer.elapsed().as_millis() < 300);
if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
if let InstructionReply::UpgradeRegion(reply) = reply {
assert!(reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none());
@@ -330,10 +328,10 @@ mod tests {
location_id: None,
})
.await;
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
// It didn't wait for handle returns; it had no idea about the error.
if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
if let InstructionReply::UpgradeRegion(reply) = reply {
assert!(!reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none());
@@ -348,9 +346,9 @@ mod tests {
location_id: None,
})
.await;
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
if let InstructionReply::UpgradeRegion(reply) = reply {
assert!(!reply.ready);
assert!(reply.exists);
assert!(reply.error.is_some());

View File

@@ -25,6 +25,6 @@ pub mod heartbeat;
pub mod metrics;
pub mod region_server;
pub mod service;
pub mod store;
mod store;
#[cfg(any(test, feature = "testing"))]
pub mod tests;

View File

@@ -55,7 +55,7 @@ use store_api::metric_engine_consts::{
FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
};
use store_api::region_engine::{
RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, SetRegionRoleStateResponse,
RegionEngineRef, RegionRole, RegionStatistic, SetRegionRoleStateResponse,
SettableRegionRoleState,
};
use store_api::region_request::{
@@ -308,22 +308,6 @@ impl RegionServer {
.with_context(|_| HandleRegionRequestSnafu { region_id })
}
pub async fn sync_region_manifest(
&self,
region_id: RegionId,
manifest_info: RegionManifestInfo,
) -> Result<()> {
let engine = self
.inner
.region_map
.get(&region_id)
.with_context(|| RegionNotFoundSnafu { region_id })?;
engine
.sync_region(region_id, manifest_info)
.await
.with_context(|_| HandleRegionRequestSnafu { region_id })
}
/// Set region role state gracefully.
///
/// For [SettableRegionRoleState::Follower]:

View File

@@ -15,7 +15,7 @@
//! object storage utilities
mod azblob;
pub mod fs;
mod fs;
mod gcs;
mod oss;
mod s3;

View File

@@ -24,8 +24,7 @@ use crate::config::FileConfig;
use crate::error::{self, Result};
use crate::store;
/// A helper function to create a file system object store.
pub async fn new_fs_object_store(
pub(crate) async fn new_fs_object_store(
data_home: &str,
_file_config: &FileConfig,
) -> Result<ObjectStore> {

Some files were not shown because too many files have changed in this diff Show More