mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 20:32:56 +00:00
Compare commits
68 Commits
feat/prefi
...
v0.14.0-ni
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
eab702cc02 | ||
|
|
dd63068df6 | ||
|
|
f73b61e767 | ||
|
|
2acecd3620 | ||
|
|
f797de3497 | ||
|
|
d53afa849d | ||
|
|
3aebfc1716 | ||
|
|
dbb79c9671 | ||
|
|
054056fcbb | ||
|
|
aa486db8b7 | ||
|
|
4ef9afd8d8 | ||
|
|
f9221e9e66 | ||
|
|
6c26fe9c80 | ||
|
|
33c9fb737c | ||
|
|
68ce796771 | ||
|
|
d701c18150 | ||
|
|
d3a60d8821 | ||
|
|
5d688c6565 | ||
|
|
41aee1f1b7 | ||
|
|
c5b55fd8cf | ||
|
|
8051dbbc31 | ||
|
|
2d3192984d | ||
|
|
bef45ed0e8 | ||
|
|
a9e990768d | ||
|
|
7e1ba49d3d | ||
|
|
737558ef53 | ||
|
|
dbc25dd8da | ||
|
|
76a58a07e1 | ||
|
|
c2ba7fb16c | ||
|
|
09ef24fd75 | ||
|
|
9b7b012620 | ||
|
|
898e0bd828 | ||
|
|
2b4ed43692 | ||
|
|
8f2ae4e136 | ||
|
|
0cd219a5d2 | ||
|
|
2b2ea5bf72 | ||
|
|
e107bd5529 | ||
|
|
a31f0e255b | ||
|
|
40b52f3b13 | ||
|
|
f13a43647a | ||
|
|
7bcb01d269 | ||
|
|
e81213728b | ||
|
|
d88482b996 | ||
|
|
3b547d9d13 | ||
|
|
278553fc3f | ||
|
|
a36901a653 | ||
|
|
c4ac242c69 | ||
|
|
9f9307de73 | ||
|
|
c77ce958a3 | ||
|
|
5ad2d8b3b8 | ||
|
|
2724c3c142 | ||
|
|
4eb0771afe | ||
|
|
a0739a96e4 | ||
|
|
77ccf1eac8 | ||
|
|
1dc4a196bf | ||
|
|
2431cd3bdf | ||
|
|
cd730e0486 | ||
|
|
a19441bed8 | ||
|
|
162e3b8620 | ||
|
|
83642dab87 | ||
|
|
46070958c9 | ||
|
|
eea8b1c730 | ||
|
|
1ab4ddab8d | ||
|
|
9e63018198 | ||
|
|
594bec8c36 | ||
|
|
1586732d20 | ||
|
|
16fddd97a7 | ||
|
|
2260782c12 |
@@ -47,7 +47,6 @@ runs:
|
||||
shell: pwsh
|
||||
run: make test sqlness-test
|
||||
env:
|
||||
RUSTUP_WINDOWS_PATH_ADD_BIN: 1 # Workaround for https://github.com/nextest-rs/nextest/issues/1493
|
||||
RUST_BACKTRACE: 1
|
||||
SQLNESS_OPTS: "--preserve-state"
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ inputs:
|
||||
default: 2
|
||||
description: "Number of Datanode replicas"
|
||||
meta-replicas:
|
||||
default: 1
|
||||
default: 2
|
||||
description: "Number of Metasrv replicas"
|
||||
image-registry:
|
||||
default: "docker.io"
|
||||
|
||||
5
.github/workflows/develop.yml
vendored
5
.github/workflows/develop.yml
vendored
@@ -576,9 +576,12 @@ jobs:
|
||||
- name: "Remote WAL"
|
||||
opts: "-w kafka -k 127.0.0.1:9092"
|
||||
kafka: true
|
||||
- name: "Pg Kvbackend"
|
||||
- name: "PostgreSQL KvBackend"
|
||||
opts: "--setup-pg"
|
||||
kafka: false
|
||||
- name: "MySQL Kvbackend"
|
||||
opts: "--setup-mysql"
|
||||
kafka: false
|
||||
timeout-minutes: 60
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
1
.github/workflows/nightly-ci.yml
vendored
1
.github/workflows/nightly-ci.yml
vendored
@@ -107,7 +107,6 @@ jobs:
|
||||
CARGO_BUILD_RUSTFLAGS: "-C linker=lld-link"
|
||||
RUST_BACKTRACE: 1
|
||||
CARGO_INCREMENTAL: 0
|
||||
RUSTUP_WINDOWS_PATH_ADD_BIN: 1 # Workaround for https://github.com/nextest-rs/nextest/issues/1493
|
||||
GT_S3_BUCKET: ${{ vars.AWS_CI_TEST_BUCKET }}
|
||||
GT_S3_ACCESS_KEY_ID: ${{ secrets.AWS_CI_TEST_ACCESS_KEY_ID }}
|
||||
GT_S3_ACCESS_KEY: ${{ secrets.AWS_CI_TEST_SECRET_ACCESS_KEY }}
|
||||
|
||||
823
Cargo.lock
generated
823
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
40
Cargo.toml
40
Cargo.toml
@@ -29,6 +29,7 @@ members = [
|
||||
"src/common/query",
|
||||
"src/common/recordbatch",
|
||||
"src/common/runtime",
|
||||
"src/common/session",
|
||||
"src/common/substrait",
|
||||
"src/common/telemetry",
|
||||
"src/common/test-util",
|
||||
@@ -88,7 +89,7 @@ rust.unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
|
||||
#
|
||||
# See for more detaiils: https://github.com/rust-lang/cargo/issues/11329
|
||||
ahash = { version = "0.8", features = ["compile-time-rng"] }
|
||||
aquamarine = "0.3"
|
||||
aquamarine = "0.6"
|
||||
arrow = { version = "53.0.0", features = ["prettyprint"] }
|
||||
arrow-array = { version = "53.0.0", default-features = false, features = ["chrono-tz"] }
|
||||
arrow-flight = "53.0"
|
||||
@@ -99,9 +100,9 @@ async-trait = "0.1"
|
||||
# Remember to update axum-extra, axum-macros when updating axum
|
||||
axum = "0.8"
|
||||
axum-extra = "0.10"
|
||||
axum-macros = "0.4"
|
||||
axum-macros = "0.5"
|
||||
backon = "1"
|
||||
base64 = "0.21"
|
||||
base64 = "0.22"
|
||||
bigdecimal = "0.4.2"
|
||||
bitflags = "2.4.1"
|
||||
bytemuck = "1.12"
|
||||
@@ -111,7 +112,7 @@ chrono-tz = "0.10.1"
|
||||
clap = { version = "4.4", features = ["derive"] }
|
||||
config = "0.13.0"
|
||||
crossbeam-utils = "0.8"
|
||||
dashmap = "5.4"
|
||||
dashmap = "6.1"
|
||||
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "2464703c84c400a09cc59277018813f0e797bb4e" }
|
||||
datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "2464703c84c400a09cc59277018813f0e797bb4e" }
|
||||
datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "2464703c84c400a09cc59277018813f0e797bb4e" }
|
||||
@@ -121,32 +122,31 @@ datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", r
|
||||
datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "2464703c84c400a09cc59277018813f0e797bb4e" }
|
||||
datafusion-sql = { git = "https://github.com/apache/datafusion.git", rev = "2464703c84c400a09cc59277018813f0e797bb4e" }
|
||||
datafusion-substrait = { git = "https://github.com/apache/datafusion.git", rev = "2464703c84c400a09cc59277018813f0e797bb4e" }
|
||||
deadpool = "0.10"
|
||||
deadpool-postgres = "0.12"
|
||||
derive_builder = "0.12"
|
||||
deadpool = "0.12"
|
||||
deadpool-postgres = "0.14"
|
||||
derive_builder = "0.20"
|
||||
dotenv = "0.15"
|
||||
etcd-client = "0.14"
|
||||
flate2 = { version = "1.1.0", default-features = false, features = ["zlib-rs"] }
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "c5419bbd20cb42e568ec325a4d71a3c94cc327e1" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "fb8e20ce29afd81835e3ea3c1164c8ce10de2c65" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
humantime-serde = "1.1"
|
||||
hyper = "1.1"
|
||||
hyper-util = "0.1"
|
||||
itertools = "0.10"
|
||||
itertools = "0.14"
|
||||
jsonb = { git = "https://github.com/databendlabs/jsonb.git", rev = "8c8d2fc294a39f3ff08909d60f718639cfba3875", default-features = false }
|
||||
lazy_static = "1.4"
|
||||
local-ip-address = "0.6"
|
||||
loki-proto = { git = "https://github.com/GreptimeTeam/loki-proto.git", rev = "1434ecf23a2654025d86188fb5205e7a74b225d3" }
|
||||
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "5618e779cf2bb4755b499c630fba4c35e91898cb" }
|
||||
mockall = "0.11.4"
|
||||
mockall = "0.13"
|
||||
moka = "0.12"
|
||||
nalgebra = "0.33"
|
||||
notify = "6.1"
|
||||
notify = "8.0"
|
||||
num_cpus = "1.16"
|
||||
once_cell = "1.18"
|
||||
opentelemetry-proto = { version = "0.27", features = [
|
||||
@@ -164,8 +164,8 @@ prometheus = { version = "0.13.3", features = ["process"] }
|
||||
promql-parser = { version = "0.5", features = ["ser"] }
|
||||
prost = "0.13"
|
||||
raft-engine = { version = "0.4.1", default-features = false }
|
||||
rand = "0.8"
|
||||
ratelimit = "0.9"
|
||||
rand = "0.9"
|
||||
ratelimit = "0.10"
|
||||
regex = "1.8"
|
||||
regex-automata = "0.4"
|
||||
reqwest = { version = "0.12", default-features = false, features = [
|
||||
@@ -177,7 +177,7 @@ reqwest = { version = "0.12", default-features = false, features = [
|
||||
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev = "75535b5ad9bae4a5dbb582c82e44dfd81ec10105", features = [
|
||||
"transport-tls",
|
||||
] }
|
||||
rstest = "0.21"
|
||||
rstest = "0.25"
|
||||
rstest_reuse = "0.7"
|
||||
rust_decimal = "1.33"
|
||||
rustc-hash = "2.0"
|
||||
@@ -185,21 +185,24 @@ rustls = { version = "0.23.20", default-features = false } # override by patch,
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = { version = "1.0", features = ["float_roundtrip"] }
|
||||
serde_with = "3"
|
||||
shadow-rs = "0.38"
|
||||
shadow-rs = "1.1"
|
||||
simd-json = "0.15"
|
||||
similar-asserts = "1.6.0"
|
||||
smallvec = { version = "1", features = ["serde"] }
|
||||
snafu = "0.8"
|
||||
sqlx = { version = "0.8", features = [
|
||||
"runtime-tokio-rustls",
|
||||
"mysql",
|
||||
"postgres",
|
||||
"chrono",
|
||||
] }
|
||||
sysinfo = "0.30"
|
||||
sysinfo = "0.33"
|
||||
# on branch v0.52.x
|
||||
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "71dd86058d2af97b9925093d40c4e03360403170", features = [
|
||||
"visitor",
|
||||
"serde",
|
||||
] } # on branch v0.44.x
|
||||
strum = { version = "0.25", features = ["derive"] }
|
||||
strum = { version = "0.27", features = ["derive"] }
|
||||
tempfile = "3"
|
||||
tokio = { version = "1.40", features = ["full"] }
|
||||
tokio-postgres = "0.7"
|
||||
@@ -246,6 +249,7 @@ common-procedure-test = { path = "src/common/procedure-test" }
|
||||
common-query = { path = "src/common/query" }
|
||||
common-recordbatch = { path = "src/common/recordbatch" }
|
||||
common-runtime = { path = "src/common/runtime" }
|
||||
common-session = { path = "src/common/session" }
|
||||
common-telemetry = { path = "src/common/telemetry" }
|
||||
common-test-util = { path = "src/common/test-util" }
|
||||
common-time = { path = "src/common/time" }
|
||||
|
||||
12
README.md
12
README.md
@@ -6,7 +6,7 @@
|
||||
</picture>
|
||||
</p>
|
||||
|
||||
<h2 align="center">Unified & Cost-Effective Time Series Database for Metrics, Logs, and Events</h2>
|
||||
<h2 align="center">Unified & Cost-Effective Observerability Database for Metrics, Logs, and Events</h2>
|
||||
|
||||
<div align="center">
|
||||
<h3 align="center">
|
||||
@@ -62,15 +62,19 @@
|
||||
|
||||
## Introduction
|
||||
|
||||
**GreptimeDB** is an open-source unified & cost-effective time-series database for **Metrics**, **Logs**, and **Events** (also **Traces** in plan). You can gain real-time insights from Edge to Cloud at Any Scale.
|
||||
**GreptimeDB** is an open-source unified & cost-effective observerability database for **Metrics**, **Logs**, and **Events** (also **Traces** in plan). You can gain real-time insights from Edge to Cloud at Any Scale.
|
||||
|
||||
## News
|
||||
|
||||
**[GreptimeDB archives 1 billion cold run #1 in JSONBench!](https://greptime.com/blogs/2025-03-18-jsonbench-greptimedb-performance)**
|
||||
|
||||
## Why GreptimeDB
|
||||
|
||||
Our core developers have been building time-series data platforms for years. Based on our best practices, GreptimeDB was born to give you:
|
||||
Our core developers have been building observerability data platforms for years. Based on our best practices, GreptimeDB was born to give you:
|
||||
|
||||
* **Unified Processing of Metrics, Logs, and Events**
|
||||
|
||||
GreptimeDB unifies time series data processing by treating all data - whether metrics, logs, or events - as timestamped events with context. Users can analyze this data using either [SQL](https://docs.greptime.com/user-guide/query-data/sql) or [PromQL](https://docs.greptime.com/user-guide/query-data/promql) and leverage stream processing ([Flow](https://docs.greptime.com/user-guide/flow-computation/overview)) to enable continuous aggregation. [Read more](https://docs.greptime.com/user-guide/concepts/data-model).
|
||||
GreptimeDB unifies observerability data processing by treating all data - whether metrics, logs, or events - as timestamped events with context. Users can analyze this data using either [SQL](https://docs.greptime.com/user-guide/query-data/sql) or [PromQL](https://docs.greptime.com/user-guide/query-data/promql) and leverage stream processing ([Flow](https://docs.greptime.com/user-guide/flow-computation/overview)) to enable continuous aggregation. [Read more](https://docs.greptime.com/user-guide/concepts/data-model).
|
||||
|
||||
* **Cloud-native Distributed Database**
|
||||
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
|
||||
| Key | Type | Default | Descriptions |
|
||||
| --- | -----| ------- | ----------- |
|
||||
| `mode` | String | `standalone` | The running mode of the datanode. It can be `standalone` or `distributed`. |
|
||||
| `default_timezone` | String | Unset | The default timezone of the server. |
|
||||
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. |
|
||||
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
|
||||
@@ -24,7 +23,7 @@
|
||||
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
|
||||
| `http` | -- | -- | The HTTP server options. |
|
||||
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
|
||||
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
|
||||
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
|
||||
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
|
||||
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions |
|
||||
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
|
||||
@@ -98,6 +97,7 @@
|
||||
| `procedure` | -- | -- | Procedure storage options. |
|
||||
| `procedure.max_retry_times` | Integer | `3` | Procedure max retry time. |
|
||||
| `procedure.retry_delay` | String | `500ms` | Initial retry delay of procedures, increases exponentially |
|
||||
| `procedure.max_running_procedures` | Integer | `128` | Max running procedures.<br/>The maximum number of procedures that can be running at the same time.<br/>If the number of running procedures exceeds this limit, the procedure will be rejected. |
|
||||
| `flow` | -- | -- | flow engine options. |
|
||||
| `flow.num_workers` | Integer | `0` | The number of flow worker in flownode.<br/>Not setting(or set to 0) this value will use the number of CPU cores divided by 2. |
|
||||
| `storage` | -- | -- | The data storage options. |
|
||||
@@ -222,7 +222,7 @@
|
||||
| `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. |
|
||||
| `http` | -- | -- | The HTTP server options. |
|
||||
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
|
||||
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
|
||||
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
|
||||
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
|
||||
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions |
|
||||
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
|
||||
@@ -328,6 +328,7 @@
|
||||
| `procedure.max_retry_times` | Integer | `12` | Procedure max retry time. |
|
||||
| `procedure.retry_delay` | String | `500ms` | Initial retry delay of procedures, increases exponentially |
|
||||
| `procedure.max_metadata_value_size` | String | `1500KiB` | Auto split large value<br/>GreptimeDB procedure uses etcd as the default metadata storage backend.<br/>The etcd the maximum size of any request is 1.5 MiB<br/>1500KiB = 1536KiB (1.5MiB) - 36KiB (reserved size of key)<br/>Comments out the `max_metadata_value_size`, for don't split large value (no limit). |
|
||||
| `procedure.max_running_procedures` | Integer | `128` | Max running procedures.<br/>The maximum number of procedures that can be running at the same time.<br/>If the number of running procedures exceeds this limit, the procedure will be rejected. |
|
||||
| `failure_detector` | -- | -- | -- |
|
||||
| `failure_detector.threshold` | Float | `8.0` | The threshold value used by the failure detector to determine failure conditions. |
|
||||
| `failure_detector.min_std_deviation` | String | `100ms` | The minimum standard deviation of the heartbeat intervals, used to calculate acceptable variations. |
|
||||
@@ -381,7 +382,6 @@
|
||||
|
||||
| Key | Type | Default | Descriptions |
|
||||
| --- | -----| ------- | ----------- |
|
||||
| `mode` | String | `standalone` | The running mode of the datanode. It can be `standalone` or `distributed`. |
|
||||
| `node_id` | Integer | Unset | The datanode identifier and should be unique in the cluster. |
|
||||
| `require_lease_before_startup` | Bool | `false` | Start services after regions have obtained leases.<br/>It will block the datanode start if it can't receive leases in the heartbeat from metasrv. |
|
||||
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. |
|
||||
@@ -390,7 +390,7 @@
|
||||
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. |
|
||||
| `http` | -- | -- | The HTTP server options. |
|
||||
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
|
||||
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
|
||||
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
|
||||
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
|
||||
| `grpc` | -- | -- | The gRPC server options. |
|
||||
| `grpc.bind_addr` | String | `127.0.0.1:3001` | The address to bind the gRPC server. |
|
||||
@@ -551,7 +551,6 @@
|
||||
|
||||
| Key | Type | Default | Descriptions |
|
||||
| --- | -----| ------- | ----------- |
|
||||
| `mode` | String | `distributed` | The running mode of the flownode. It can be `standalone` or `distributed`. |
|
||||
| `node_id` | Integer | Unset | The flownode identifier and should be unique in the cluster. |
|
||||
| `flow` | -- | -- | flow engine options. |
|
||||
| `flow.num_workers` | Integer | `0` | The number of flow worker in flownode.<br/>Not setting(or set to 0) this value will use the number of CPU cores divided by 2. |
|
||||
@@ -563,7 +562,7 @@
|
||||
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
|
||||
| `http` | -- | -- | The HTTP server options. |
|
||||
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
|
||||
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
|
||||
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
|
||||
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
|
||||
| `meta_client` | -- | -- | The metasrv client options. |
|
||||
| `meta_client.metasrv_addrs` | Array | -- | The addresses of the metasrv. |
|
||||
|
||||
@@ -1,6 +1,3 @@
|
||||
## The running mode of the datanode. It can be `standalone` or `distributed`.
|
||||
mode = "standalone"
|
||||
|
||||
## The datanode identifier and should be unique in the cluster.
|
||||
## @toml2docs:none-default
|
||||
node_id = 42
|
||||
@@ -27,7 +24,7 @@ max_concurrent_queries = 0
|
||||
## The address to bind the HTTP server.
|
||||
addr = "127.0.0.1:4000"
|
||||
## HTTP request timeout. Set to 0 to disable timeout.
|
||||
timeout = "30s"
|
||||
timeout = "0s"
|
||||
## HTTP request body limit.
|
||||
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
|
||||
## Set to 0 to disable limit.
|
||||
|
||||
@@ -1,6 +1,3 @@
|
||||
## The running mode of the flownode. It can be `standalone` or `distributed`.
|
||||
mode = "distributed"
|
||||
|
||||
## The flownode identifier and should be unique in the cluster.
|
||||
## @toml2docs:none-default
|
||||
node_id = 14
|
||||
@@ -30,7 +27,7 @@ max_send_message_size = "512MB"
|
||||
## The address to bind the HTTP server.
|
||||
addr = "127.0.0.1:4000"
|
||||
## HTTP request timeout. Set to 0 to disable timeout.
|
||||
timeout = "30s"
|
||||
timeout = "0s"
|
||||
## HTTP request body limit.
|
||||
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
|
||||
## Set to 0 to disable limit.
|
||||
|
||||
@@ -26,7 +26,7 @@ retry_interval = "3s"
|
||||
## The address to bind the HTTP server.
|
||||
addr = "127.0.0.1:4000"
|
||||
## HTTP request timeout. Set to 0 to disable timeout.
|
||||
timeout = "30s"
|
||||
timeout = "0s"
|
||||
## HTTP request body limit.
|
||||
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
|
||||
## Set to 0 to disable limit.
|
||||
|
||||
@@ -79,6 +79,11 @@ retry_delay = "500ms"
|
||||
## Comments out the `max_metadata_value_size`, for don't split large value (no limit).
|
||||
max_metadata_value_size = "1500KiB"
|
||||
|
||||
## Max running procedures.
|
||||
## The maximum number of procedures that can be running at the same time.
|
||||
## If the number of running procedures exceeds this limit, the procedure will be rejected.
|
||||
max_running_procedures = 128
|
||||
|
||||
# Failure detectors options.
|
||||
[failure_detector]
|
||||
|
||||
|
||||
@@ -1,6 +1,3 @@
|
||||
## The running mode of the datanode. It can be `standalone` or `distributed`.
|
||||
mode = "standalone"
|
||||
|
||||
## The default timezone of the server.
|
||||
## @toml2docs:none-default
|
||||
default_timezone = "UTC"
|
||||
@@ -34,7 +31,7 @@ max_concurrent_queries = 0
|
||||
## The address to bind the HTTP server.
|
||||
addr = "127.0.0.1:4000"
|
||||
## HTTP request timeout. Set to 0 to disable timeout.
|
||||
timeout = "30s"
|
||||
timeout = "0s"
|
||||
## HTTP request body limit.
|
||||
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
|
||||
## Set to 0 to disable limit.
|
||||
@@ -302,6 +299,10 @@ purge_interval = "1m"
|
||||
max_retry_times = 3
|
||||
## Initial retry delay of procedures, increases exponentially
|
||||
retry_delay = "500ms"
|
||||
## Max running procedures.
|
||||
## The maximum number of procedures that can be running at the same time.
|
||||
## If the number of running procedures exceeds this limit, the procedure will be rejected.
|
||||
max_running_procedures = 128
|
||||
|
||||
## flow engine options.
|
||||
[flow]
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
This document introduces how to write fuzz tests in GreptimeDB.
|
||||
|
||||
## What is a fuzz test
|
||||
Fuzz test is tool that leverage deterministic random generation to assist in finding bugs. The goal of fuzz tests is to identify inputs generated by the fuzzer that cause system panics, crashes, or unexpected behaviors to occur. And we are using the [cargo-fuzz](https://github.com/rust-fuzz/cargo-fuzz) to run our fuzz test targets.
|
||||
Fuzz test is tool that leverage deterministic random generation to assist in finding bugs. The goal of fuzz tests is to identify inputs generated by the fuzzer that cause system panics, crashes, or unexpected behaviors to occur. And we are using the [cargo-fuzz](https://github.com/rust-fuzz/cargo-fuzz) to run our fuzz test targets.
|
||||
|
||||
## Why we need them
|
||||
- Find bugs by leveraging random generation
|
||||
@@ -13,7 +13,7 @@ Fuzz test is tool that leverage deterministic random generation to assist in fin
|
||||
All fuzz test-related resources are located in the `/tests-fuzz` directory.
|
||||
There are two types of resources: (1) fundamental components and (2) test targets.
|
||||
|
||||
### Fundamental components
|
||||
### Fundamental components
|
||||
They are located in the `/tests-fuzz/src` directory. The fundamental components define how to generate SQLs (including dialects for different protocols) and validate execution results (e.g., column attribute validation), etc.
|
||||
|
||||
### Test targets
|
||||
@@ -21,25 +21,25 @@ They are located in the `/tests-fuzz/targets` directory, with each file represen
|
||||
|
||||
Figure 1 illustrates the fundamental components of the fuzz test provide the ability to generate random SQLs. It utilizes a Random Number Generator (Rng) to generate the Intermediate Representation (IR), then employs a DialectTranslator to produce specified dialects for different protocols. Finally, the fuzz tests send the generated SQL via the specified protocol and verify that the execution results meet expectations.
|
||||
```
|
||||
Rng
|
||||
|
|
||||
|
|
||||
v
|
||||
ExprGenerator
|
||||
|
|
||||
|
|
||||
v
|
||||
Intermediate representation (IR)
|
||||
|
|
||||
|
|
||||
+----------------------+----------------------+
|
||||
| | |
|
||||
v v v
|
||||
Rng
|
||||
|
|
||||
|
|
||||
v
|
||||
ExprGenerator
|
||||
|
|
||||
|
|
||||
v
|
||||
Intermediate representation (IR)
|
||||
|
|
||||
|
|
||||
+----------------------+----------------------+
|
||||
| | |
|
||||
v v v
|
||||
MySQLTranslator PostgreSQLTranslator OtherDialectTranslator
|
||||
| | |
|
||||
| | |
|
||||
v v v
|
||||
SQL(MySQL Dialect) ..... .....
|
||||
| | |
|
||||
| | |
|
||||
v v v
|
||||
SQL(MySQL Dialect) ..... .....
|
||||
|
|
||||
|
|
||||
v
|
||||
@@ -133,4 +133,4 @@ fuzz_target!(|input: FuzzInput| {
|
||||
cargo fuzz run <fuzz-target> --fuzz-dir tests-fuzz
|
||||
```
|
||||
|
||||
For more details, please refer to this [document](/tests-fuzz/README.md).
|
||||
For more details, please refer to this [document](/tests-fuzz/README.md).
|
||||
|
||||
77
docs/rfcs/2025-02-06-remote-wal-purge.md
Normal file
77
docs/rfcs/2025-02-06-remote-wal-purge.md
Normal file
@@ -0,0 +1,77 @@
|
||||
---
|
||||
Feature Name: Remote WAL Purge
|
||||
Tracking Issue: https://github.com/GreptimeTeam/greptimedb/issues/5474
|
||||
Date: 2025-02-06
|
||||
Author: "Yuhan Wang <profsyb@gmail.com>"
|
||||
---
|
||||
|
||||
# Summary
|
||||
|
||||
This RFC proposes a method for purging remote WAL in the database.
|
||||
|
||||
# Motivation
|
||||
|
||||
Currently only local wal entries are purged when flushing, while remote wal does nothing.
|
||||
|
||||
# Details
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
Region0->>Kafka: Last entry id of the topic in use
|
||||
Region0->>WALPruner: Heartbeat with last entry id
|
||||
WALPruner->>+WALPruner: Time Loop
|
||||
WALPruner->>+ProcedureManager: Submit purge procedure
|
||||
ProcedureManager->>Region0: Flush request
|
||||
ProcedureManager->>Kafka: Prune WAL entries
|
||||
Region0->>Region0: Flush
|
||||
```
|
||||
|
||||
## Steps
|
||||
|
||||
### Before purge
|
||||
|
||||
Before purging remote WAL, metasrv needs to know:
|
||||
|
||||
1. `last_entry_id` of each region.
|
||||
2. `kafka_topic_last_entry_id` which is the last entry id of the topic in use. Can be lazily updated and needed when region has empty memtable.
|
||||
3. Kafka topics that each region uses.
|
||||
|
||||
The states are maintained through:
|
||||
1. Heartbeat: Datanode sends `last_entry_id` to metasrv in heartbeat. As for regions with empty memtable, `last_entry_id` should equals to `kafka_topic_last_entry_id`.
|
||||
2. Metasrv maintains a topic-region map to know which region uses which topic.
|
||||
|
||||
`kafka_topic_last_entry_id` will be maintained by the region itself. Region will update the value after `k` heartbeats if the memtable is empty.
|
||||
|
||||
### Purge procedure
|
||||
|
||||
We can better handle locks utilizing current procedure. It's quite similar to the region migration procedure.
|
||||
|
||||
After a period of time, metasrv will submit a purge procedure to ProcedureManager. The purge will apply to all topics.
|
||||
|
||||
The procedure is divided into following stages:
|
||||
|
||||
1. Preparation:
|
||||
- Retrieve `last_entry_id` of each region kvbackend.
|
||||
- Choose regions that have a relatively small `last_entry_id` as candidate regions, which means we need to send a flush request to these regions.
|
||||
2. Communication:
|
||||
- Send flush requests to candidate regions.
|
||||
3. Purge:
|
||||
- Choose proper entry id to delete for each topic. The entry should be the smallest `last_entry_id - 1` among all regions.
|
||||
- Delete legacy entries in Kafka.
|
||||
- Store the `last_purged_entry_id` in kvbackend. It should be locked to prevent other regions from replaying the purged entries.
|
||||
|
||||
### After purge
|
||||
|
||||
After purge, there may be some regions that have `last_entry_id` smaller than the entry we just deleted. It's legal since we only delete the entries that are not needed anymore.
|
||||
|
||||
When restarting a region, it should query the `last_purged_entry_id` from metasrv and replay from `min(last_entry_id, last_purged_entry_id)`.
|
||||
|
||||
### Error handling
|
||||
|
||||
No persisted states are needed since all states are maintained in kvbackend.
|
||||
|
||||
Retry when failed to retrieving metadata from kvbackend.
|
||||
|
||||
# Alternatives
|
||||
|
||||
Purge time can depend on the size of the WAL entries instead of a fixed period of time, which may be more efficient.
|
||||
@@ -4782,7 +4782,7 @@
|
||||
"type": "prometheus",
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"description": "Current counts for stalled write requests by instance\n\nWrite stalls when memtable is full and pending for flush\n\n",
|
||||
"description": "Ingestion size by row counts.",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
@@ -4844,7 +4844,7 @@
|
||||
"x": 12,
|
||||
"y": 138
|
||||
},
|
||||
"id": 221,
|
||||
"id": 277,
|
||||
"options": {
|
||||
"legend": {
|
||||
"calcs": [],
|
||||
@@ -4864,14 +4864,14 @@
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "sum by(pod) (greptime_mito_write_stall_total{pod=~\"$datanode\"})",
|
||||
"expr": "rate(greptime_mito_write_rows_total{pod=~\"$datanode\"}[$__rate_interval])",
|
||||
"instant": false,
|
||||
"legendFormat": "{{pod}}",
|
||||
"range": true,
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "Write Stall per Instance",
|
||||
"title": "Write Rows per Instance",
|
||||
"type": "timeseries"
|
||||
},
|
||||
{
|
||||
@@ -4976,7 +4976,7 @@
|
||||
"type": "prometheus",
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"description": "Cache size by instance.\n",
|
||||
"description": "Current counts for stalled write requests by instance\n\nWrite stalls when memtable is full and pending for flush\n\n",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
@@ -5028,7 +5028,7 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
"unit": "decbytes"
|
||||
"unit": "none"
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
@@ -5038,7 +5038,7 @@
|
||||
"x": 12,
|
||||
"y": 146
|
||||
},
|
||||
"id": 229,
|
||||
"id": 221,
|
||||
"options": {
|
||||
"legend": {
|
||||
"calcs": [],
|
||||
@@ -5058,14 +5058,14 @@
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "greptime_mito_cache_bytes{pod=~\"$datanode\"}",
|
||||
"expr": "sum by(pod) (greptime_mito_write_stall_total{pod=~\"$datanode\"})",
|
||||
"instant": false,
|
||||
"legendFormat": "{{pod}}-{{type}}",
|
||||
"legendFormat": "{{pod}}",
|
||||
"range": true,
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "Cached Bytes per Instance",
|
||||
"title": "Write Stall per Instance",
|
||||
"type": "timeseries"
|
||||
},
|
||||
{
|
||||
@@ -5172,7 +5172,7 @@
|
||||
"type": "prometheus",
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"description": "P99 latency of each type of reads by instance",
|
||||
"description": "Cache size by instance.\n",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
@@ -5224,7 +5224,7 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
"unit": "s"
|
||||
"unit": "decbytes"
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
@@ -5234,17 +5234,13 @@
|
||||
"x": 12,
|
||||
"y": 154
|
||||
},
|
||||
"id": 228,
|
||||
"id": 229,
|
||||
"options": {
|
||||
"legend": {
|
||||
"calcs": [
|
||||
"lastNotNull"
|
||||
],
|
||||
"calcs": [],
|
||||
"displayMode": "table",
|
||||
"placement": "bottom",
|
||||
"showLegend": true,
|
||||
"sortBy": "Last *",
|
||||
"sortDesc": true
|
||||
"showLegend": true
|
||||
},
|
||||
"tooltip": {
|
||||
"mode": "single",
|
||||
@@ -5258,14 +5254,14 @@
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.99, sum by(pod, le, stage) (rate(greptime_mito_read_stage_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
|
||||
"expr": "greptime_mito_cache_bytes{pod=~\"$datanode\"}",
|
||||
"instant": false,
|
||||
"legendFormat": "{{pod}}-{{stage}}-p99",
|
||||
"legendFormat": "{{pod}}-{{type}}",
|
||||
"range": true,
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "Read Stage P99 per Instance",
|
||||
"title": "Cached Bytes per Instance",
|
||||
"type": "timeseries"
|
||||
},
|
||||
{
|
||||
@@ -5317,7 +5313,8 @@
|
||||
"mode": "absolute",
|
||||
"steps": [
|
||||
{
|
||||
"color": "green"
|
||||
"color": "green",
|
||||
"value": null
|
||||
},
|
||||
{
|
||||
"color": "red",
|
||||
@@ -5370,7 +5367,7 @@
|
||||
"type": "prometheus",
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"description": "Latency of compaction task, at p99",
|
||||
"description": "P99 latency of each type of reads by instance",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
@@ -5414,7 +5411,8 @@
|
||||
"mode": "absolute",
|
||||
"steps": [
|
||||
{
|
||||
"color": "green"
|
||||
"color": "green",
|
||||
"value": null
|
||||
},
|
||||
{
|
||||
"color": "red",
|
||||
@@ -5432,7 +5430,7 @@
|
||||
"x": 12,
|
||||
"y": 162
|
||||
},
|
||||
"id": 230,
|
||||
"id": 228,
|
||||
"options": {
|
||||
"legend": {
|
||||
"calcs": [
|
||||
@@ -5440,7 +5438,9 @@
|
||||
],
|
||||
"displayMode": "table",
|
||||
"placement": "bottom",
|
||||
"showLegend": true
|
||||
"showLegend": true,
|
||||
"sortBy": "Last *",
|
||||
"sortDesc": true
|
||||
},
|
||||
"tooltip": {
|
||||
"mode": "single",
|
||||
@@ -5454,14 +5454,14 @@
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.99, sum by(pod, le) (rate(greptime_mito_compaction_total_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
|
||||
"expr": "histogram_quantile(0.99, sum by(pod, le, stage) (rate(greptime_mito_read_stage_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
|
||||
"instant": false,
|
||||
"legendFormat": "[{{pod}}]-compaction-p99",
|
||||
"legendFormat": "{{pod}}-{{stage}}-p99",
|
||||
"range": true,
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "Compaction P99 per Instance",
|
||||
"title": "Read Stage P99 per Instance",
|
||||
"type": "timeseries"
|
||||
},
|
||||
{
|
||||
@@ -5570,7 +5570,7 @@
|
||||
"type": "prometheus",
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"description": "Compaction latency by stage",
|
||||
"description": "Latency of compaction task, at p99",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
@@ -5632,7 +5632,7 @@
|
||||
"x": 12,
|
||||
"y": 170
|
||||
},
|
||||
"id": 232,
|
||||
"id": 230,
|
||||
"options": {
|
||||
"legend": {
|
||||
"calcs": [
|
||||
@@ -5654,9 +5654,9 @@
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.99, sum by(pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
|
||||
"expr": "histogram_quantile(0.99, sum by(pod, le) (rate(greptime_mito_compaction_total_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
|
||||
"instant": false,
|
||||
"legendFormat": "{{pod}}-{{stage}}-p99",
|
||||
"legendFormat": "[{{pod}}]-compaction-p99",
|
||||
"range": true,
|
||||
"refId": "A"
|
||||
}
|
||||
@@ -5794,7 +5794,7 @@
|
||||
"type": "prometheus",
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"description": "Write-ahead log operations latency at p99",
|
||||
"description": "Compaction latency by stage",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
@@ -5856,13 +5856,13 @@
|
||||
"x": 12,
|
||||
"y": 178
|
||||
},
|
||||
"id": 269,
|
||||
"id": 232,
|
||||
"options": {
|
||||
"legend": {
|
||||
"calcs": [
|
||||
"lastNotNull"
|
||||
],
|
||||
"displayMode": "list",
|
||||
"displayMode": "table",
|
||||
"placement": "bottom",
|
||||
"showLegend": true
|
||||
},
|
||||
@@ -5878,14 +5878,14 @@
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.99, sum by(le,logstore,optype,pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))",
|
||||
"expr": "histogram_quantile(0.99, sum by(pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
|
||||
"instant": false,
|
||||
"legendFormat": "{{pod}}-{{logstore}}-{{optype}}-p99",
|
||||
"legendFormat": "{{pod}}-{{stage}}-p99",
|
||||
"range": true,
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "Log Store op duration seconds",
|
||||
"title": "Compaction P99 per Instance",
|
||||
"type": "timeseries"
|
||||
},
|
||||
{
|
||||
@@ -5993,7 +5993,7 @@
|
||||
"type": "prometheus",
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"description": "Ongoing compaction task count",
|
||||
"description": "Write-ahead log operations latency at p99",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
@@ -6045,7 +6045,7 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
"unit": "none"
|
||||
"unit": "s"
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
@@ -6055,13 +6055,13 @@
|
||||
"x": 12,
|
||||
"y": 186
|
||||
},
|
||||
"id": 271,
|
||||
"id": 269,
|
||||
"options": {
|
||||
"legend": {
|
||||
"calcs": [
|
||||
"lastNotNull"
|
||||
],
|
||||
"displayMode": "table",
|
||||
"displayMode": "list",
|
||||
"placement": "bottom",
|
||||
"showLegend": true
|
||||
},
|
||||
@@ -6078,14 +6078,14 @@
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "greptime_mito_inflight_compaction_count",
|
||||
"expr": "histogram_quantile(0.99, sum by(le,logstore,optype,pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))",
|
||||
"instant": false,
|
||||
"legendFormat": "{{pod}}",
|
||||
"legendFormat": "{{pod}}-{{logstore}}-{{optype}}-p99",
|
||||
"range": true,
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "Inflight Compaction",
|
||||
"title": "Log Store op duration seconds",
|
||||
"type": "timeseries"
|
||||
},
|
||||
{
|
||||
@@ -6188,6 +6188,105 @@
|
||||
"title": "Inflight Flush",
|
||||
"type": "timeseries"
|
||||
},
|
||||
{
|
||||
"datasource": {
|
||||
"type": "prometheus",
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"description": "Ongoing compaction task count",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
"mode": "palette-classic"
|
||||
},
|
||||
"custom": {
|
||||
"axisBorderShow": false,
|
||||
"axisCenteredZero": false,
|
||||
"axisColorMode": "text",
|
||||
"axisLabel": "",
|
||||
"axisPlacement": "auto",
|
||||
"barAlignment": 0,
|
||||
"drawStyle": "points",
|
||||
"fillOpacity": 0,
|
||||
"gradientMode": "none",
|
||||
"hideFrom": {
|
||||
"legend": false,
|
||||
"tooltip": false,
|
||||
"viz": false
|
||||
},
|
||||
"insertNulls": false,
|
||||
"lineInterpolation": "linear",
|
||||
"lineWidth": 1,
|
||||
"pointSize": 5,
|
||||
"scaleDistribution": {
|
||||
"type": "linear"
|
||||
},
|
||||
"showPoints": "auto",
|
||||
"spanNulls": false,
|
||||
"stacking": {
|
||||
"group": "A",
|
||||
"mode": "none"
|
||||
},
|
||||
"thresholdsStyle": {
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"mappings": [],
|
||||
"thresholds": {
|
||||
"mode": "absolute",
|
||||
"steps": [
|
||||
{
|
||||
"color": "green"
|
||||
},
|
||||
{
|
||||
"color": "red",
|
||||
"value": 80
|
||||
}
|
||||
]
|
||||
},
|
||||
"unit": "none"
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 8,
|
||||
"w": 12,
|
||||
"x": 12,
|
||||
"y": 194
|
||||
},
|
||||
"id": 271,
|
||||
"options": {
|
||||
"legend": {
|
||||
"calcs": [
|
||||
"lastNotNull"
|
||||
],
|
||||
"displayMode": "table",
|
||||
"placement": "bottom",
|
||||
"showLegend": true
|
||||
},
|
||||
"tooltip": {
|
||||
"mode": "single",
|
||||
"sort": "none"
|
||||
}
|
||||
},
|
||||
"pluginVersion": "11.1.3",
|
||||
"targets": [
|
||||
{
|
||||
"datasource": {
|
||||
"type": "prometheus",
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "greptime_mito_inflight_compaction_count",
|
||||
"instant": false,
|
||||
"legendFormat": "{{pod}}",
|
||||
"range": true,
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "Inflight Compaction",
|
||||
"type": "timeseries"
|
||||
},
|
||||
{
|
||||
"collapsed": false,
|
||||
"gridPos": {
|
||||
|
||||
@@ -15,10 +15,13 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use datatypes::schema::{
|
||||
ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextOptions, SkippingIndexOptions,
|
||||
SkippingIndexType, COMMENT_KEY, FULLTEXT_KEY, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY,
|
||||
ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions,
|
||||
SkippingIndexOptions, SkippingIndexType, COMMENT_KEY, FULLTEXT_KEY, INVERTED_INDEX_KEY,
|
||||
SKIPPING_INDEX_KEY,
|
||||
};
|
||||
use greptime_proto::v1::{
|
||||
Analyzer, FulltextBackend as PbFulltextBackend, SkippingIndexType as PbSkippingIndexType,
|
||||
};
|
||||
use greptime_proto::v1::{Analyzer, SkippingIndexType as PbSkippingIndexType};
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
@@ -142,13 +145,21 @@ pub fn options_from_inverted() -> ColumnOptions {
|
||||
}
|
||||
|
||||
/// Tries to construct a `FulltextAnalyzer` from the given analyzer.
|
||||
pub fn as_fulltext_option(analyzer: Analyzer) -> FulltextAnalyzer {
|
||||
pub fn as_fulltext_option_analyzer(analyzer: Analyzer) -> FulltextAnalyzer {
|
||||
match analyzer {
|
||||
Analyzer::English => FulltextAnalyzer::English,
|
||||
Analyzer::Chinese => FulltextAnalyzer::Chinese,
|
||||
}
|
||||
}
|
||||
|
||||
/// Tries to construct a `FulltextBackend` from the given backend.
|
||||
pub fn as_fulltext_option_backend(backend: PbFulltextBackend) -> FulltextBackend {
|
||||
match backend {
|
||||
PbFulltextBackend::Bloom => FulltextBackend::Bloom,
|
||||
PbFulltextBackend::Tantivy => FulltextBackend::Tantivy,
|
||||
}
|
||||
}
|
||||
|
||||
/// Tries to construct a `SkippingIndexType` from the given skipping index type.
|
||||
pub fn as_skipping_index_type(skipping_index_type: PbSkippingIndexType) -> SkippingIndexType {
|
||||
match skipping_index_type {
|
||||
@@ -160,7 +171,7 @@ pub fn as_skipping_index_type(skipping_index_type: PbSkippingIndexType) -> Skipp
|
||||
mod tests {
|
||||
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::schema::FulltextAnalyzer;
|
||||
use datatypes::schema::{FulltextAnalyzer, FulltextBackend};
|
||||
|
||||
use super::*;
|
||||
use crate::v1::ColumnDataType;
|
||||
@@ -219,13 +230,14 @@ mod tests {
|
||||
enable: true,
|
||||
analyzer: FulltextAnalyzer::English,
|
||||
case_sensitive: false,
|
||||
backend: FulltextBackend::Bloom,
|
||||
})
|
||||
.unwrap();
|
||||
schema.set_inverted_index(true);
|
||||
let options = options_from_column_schema(&schema).unwrap();
|
||||
assert_eq!(
|
||||
options.options.get(FULLTEXT_GRPC_KEY).unwrap(),
|
||||
"{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false}"
|
||||
"{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\"}"
|
||||
);
|
||||
assert_eq!(
|
||||
options.options.get(INVERTED_INDEX_GRPC_KEY).unwrap(),
|
||||
@@ -239,11 +251,12 @@ mod tests {
|
||||
enable: true,
|
||||
analyzer: FulltextAnalyzer::English,
|
||||
case_sensitive: false,
|
||||
backend: FulltextBackend::Bloom,
|
||||
};
|
||||
let options = options_from_fulltext(&fulltext).unwrap().unwrap();
|
||||
assert_eq!(
|
||||
options.options.get(FULLTEXT_GRPC_KEY).unwrap(),
|
||||
"{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false}"
|
||||
"{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\"}"
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ mod information_memory_table;
|
||||
pub mod key_column_usage;
|
||||
mod partitions;
|
||||
mod procedure_info;
|
||||
mod region_peers;
|
||||
pub mod region_peers;
|
||||
mod region_statistics;
|
||||
mod runtime_metrics;
|
||||
pub mod schemata;
|
||||
|
||||
@@ -56,6 +56,8 @@ pub const TABLE_CATALOG: &str = "table_catalog";
|
||||
pub const TABLE_SCHEMA: &str = "table_schema";
|
||||
pub const TABLE_NAME: &str = "table_name";
|
||||
pub const COLUMN_NAME: &str = "column_name";
|
||||
pub const REGION_ID: &str = "region_id";
|
||||
pub const PEER_ID: &str = "peer_id";
|
||||
const ORDINAL_POSITION: &str = "ordinal_position";
|
||||
const CHARACTER_MAXIMUM_LENGTH: &str = "character_maximum_length";
|
||||
const CHARACTER_OCTET_LENGTH: &str = "character_octet_length";
|
||||
|
||||
@@ -21,6 +21,7 @@ use common_error::ext::BoxedError;
|
||||
use common_meta::rpc::router::RegionRoute;
|
||||
use common_recordbatch::adapter::RecordBatchStreamAdapter;
|
||||
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
|
||||
use datafusion::common::HashMap;
|
||||
use datafusion::execution::TaskContext;
|
||||
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
|
||||
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
|
||||
@@ -43,16 +44,22 @@ use crate::kvbackend::KvBackendCatalogManager;
|
||||
use crate::system_schema::information_schema::{InformationTable, Predicates};
|
||||
use crate::CatalogManager;
|
||||
|
||||
const REGION_ID: &str = "region_id";
|
||||
const PEER_ID: &str = "peer_id";
|
||||
pub const TABLE_CATALOG: &str = "table_catalog";
|
||||
pub const TABLE_SCHEMA: &str = "table_schema";
|
||||
pub const TABLE_NAME: &str = "table_name";
|
||||
pub const REGION_ID: &str = "region_id";
|
||||
pub const PEER_ID: &str = "peer_id";
|
||||
const PEER_ADDR: &str = "peer_addr";
|
||||
const IS_LEADER: &str = "is_leader";
|
||||
pub const IS_LEADER: &str = "is_leader";
|
||||
const STATUS: &str = "status";
|
||||
const DOWN_SECONDS: &str = "down_seconds";
|
||||
const INIT_CAPACITY: usize = 42;
|
||||
|
||||
/// The `REGION_PEERS` table provides information about the region distribution and routes. Including fields:
|
||||
///
|
||||
/// - `table_catalog`: the table catalog name
|
||||
/// - `table_schema`: the table schema name
|
||||
/// - `table_name`: the table name
|
||||
/// - `region_id`: the region id
|
||||
/// - `peer_id`: the region storage datanode peer id
|
||||
/// - `peer_addr`: the region storage datanode gRPC peer address
|
||||
@@ -77,6 +84,9 @@ impl InformationSchemaRegionPeers {
|
||||
|
||||
pub(crate) fn schema() -> SchemaRef {
|
||||
Arc::new(Schema::new(vec![
|
||||
ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new(REGION_ID, ConcreteDataType::uint64_datatype(), false),
|
||||
ColumnSchema::new(PEER_ID, ConcreteDataType::uint64_datatype(), true),
|
||||
ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true),
|
||||
@@ -134,6 +144,9 @@ struct InformationSchemaRegionPeersBuilder {
|
||||
catalog_name: String,
|
||||
catalog_manager: Weak<dyn CatalogManager>,
|
||||
|
||||
table_catalogs: StringVectorBuilder,
|
||||
table_schemas: StringVectorBuilder,
|
||||
table_names: StringVectorBuilder,
|
||||
region_ids: UInt64VectorBuilder,
|
||||
peer_ids: UInt64VectorBuilder,
|
||||
peer_addrs: StringVectorBuilder,
|
||||
@@ -152,6 +165,9 @@ impl InformationSchemaRegionPeersBuilder {
|
||||
schema,
|
||||
catalog_name,
|
||||
catalog_manager,
|
||||
table_catalogs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
table_schemas: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
region_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
peer_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
peer_addrs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
@@ -177,24 +193,28 @@ impl InformationSchemaRegionPeersBuilder {
|
||||
let predicates = Predicates::from_scan_request(&request);
|
||||
|
||||
for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
|
||||
let table_id_stream = catalog_manager
|
||||
let table_stream = catalog_manager
|
||||
.tables(&catalog_name, &schema_name, None)
|
||||
.try_filter_map(|t| async move {
|
||||
let table_info = t.table_info();
|
||||
if table_info.table_type == TableType::Temporary {
|
||||
Ok(None)
|
||||
} else {
|
||||
Ok(Some(table_info.ident.table_id))
|
||||
Ok(Some((
|
||||
table_info.ident.table_id,
|
||||
table_info.name.to_string(),
|
||||
)))
|
||||
}
|
||||
});
|
||||
|
||||
const BATCH_SIZE: usize = 128;
|
||||
|
||||
// Split table ids into chunks
|
||||
let mut table_id_chunks = pin!(table_id_stream.ready_chunks(BATCH_SIZE));
|
||||
// Split tables into chunks
|
||||
let mut table_chunks = pin!(table_stream.ready_chunks(BATCH_SIZE));
|
||||
|
||||
while let Some(table_ids) = table_id_chunks.next().await {
|
||||
let table_ids = table_ids.into_iter().collect::<Result<Vec<_>>>()?;
|
||||
while let Some(tables) = table_chunks.next().await {
|
||||
let tables = tables.into_iter().collect::<Result<HashMap<_, _>>>()?;
|
||||
let table_ids = tables.keys().cloned().collect::<Vec<_>>();
|
||||
|
||||
let table_routes = if let Some(partition_manager) = &partition_manager {
|
||||
partition_manager
|
||||
@@ -206,7 +226,16 @@ impl InformationSchemaRegionPeersBuilder {
|
||||
};
|
||||
|
||||
for (table_id, routes) in table_routes {
|
||||
self.add_region_peers(&predicates, table_id, &routes);
|
||||
// Safety: table_id is guaranteed to be in the map
|
||||
let table_name = tables.get(&table_id).unwrap();
|
||||
self.add_region_peers(
|
||||
&catalog_name,
|
||||
&schema_name,
|
||||
table_name,
|
||||
&predicates,
|
||||
table_id,
|
||||
&routes,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -216,6 +245,9 @@ impl InformationSchemaRegionPeersBuilder {
|
||||
|
||||
fn add_region_peers(
|
||||
&mut self,
|
||||
table_catalog: &str,
|
||||
table_schema: &str,
|
||||
table_name: &str,
|
||||
predicates: &Predicates,
|
||||
table_id: TableId,
|
||||
routes: &[RegionRoute],
|
||||
@@ -231,13 +263,20 @@ impl InformationSchemaRegionPeersBuilder {
|
||||
Some("ALIVE".to_string())
|
||||
};
|
||||
|
||||
let row = [(REGION_ID, &Value::from(region_id))];
|
||||
let row = [
|
||||
(TABLE_CATALOG, &Value::from(table_catalog)),
|
||||
(TABLE_SCHEMA, &Value::from(table_schema)),
|
||||
(TABLE_NAME, &Value::from(table_name)),
|
||||
(REGION_ID, &Value::from(region_id)),
|
||||
];
|
||||
|
||||
if !predicates.eval(&row) {
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO(dennis): adds followers.
|
||||
self.table_catalogs.push(Some(table_catalog));
|
||||
self.table_schemas.push(Some(table_schema));
|
||||
self.table_names.push(Some(table_name));
|
||||
self.region_ids.push(Some(region_id));
|
||||
self.peer_ids.push(peer_id);
|
||||
self.peer_addrs.push(peer_addr.as_deref());
|
||||
@@ -245,11 +284,26 @@ impl InformationSchemaRegionPeersBuilder {
|
||||
self.statuses.push(state.as_deref());
|
||||
self.down_seconds
|
||||
.push(route.leader_down_millis().map(|m| m / 1000));
|
||||
|
||||
for follower in &route.follower_peers {
|
||||
self.table_catalogs.push(Some(table_catalog));
|
||||
self.table_schemas.push(Some(table_schema));
|
||||
self.table_names.push(Some(table_name));
|
||||
self.region_ids.push(Some(region_id));
|
||||
self.peer_ids.push(Some(follower.id));
|
||||
self.peer_addrs.push(Some(follower.addr.as_str()));
|
||||
self.is_leaders.push(Some("No"));
|
||||
self.statuses.push(None);
|
||||
self.down_seconds.push(None);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn finish(&mut self) -> Result<RecordBatch> {
|
||||
let columns: Vec<VectorRef> = vec![
|
||||
Arc::new(self.table_catalogs.finish()),
|
||||
Arc::new(self.table_schemas.finish()),
|
||||
Arc::new(self.table_names.finish()),
|
||||
Arc::new(self.region_ids.finish()),
|
||||
Arc::new(self.peer_ids.finish()),
|
||||
Arc::new(self.peer_addrs.finish()),
|
||||
|
||||
@@ -177,7 +177,7 @@ fn create_table_info(table_id: TableId, table_name: TableName) -> RawTableInfo {
|
||||
|
||||
fn create_region_routes(regions: Vec<RegionNumber>) -> Vec<RegionRoute> {
|
||||
let mut region_routes = Vec::with_capacity(100);
|
||||
let mut rng = rand::thread_rng();
|
||||
let mut rng = rand::rng();
|
||||
|
||||
for region_id in regions.into_iter().map(u64::from) {
|
||||
region_routes.push(RegionRoute {
|
||||
@@ -188,7 +188,7 @@ fn create_region_routes(regions: Vec<RegionNumber>) -> Vec<RegionRoute> {
|
||||
attrs: BTreeMap::new(),
|
||||
},
|
||||
leader_peer: Some(Peer {
|
||||
id: rng.gen_range(0..10),
|
||||
id: rng.random_range(0..10),
|
||||
addr: String::new(),
|
||||
}),
|
||||
follower_peers: vec![],
|
||||
|
||||
@@ -16,7 +16,6 @@
|
||||
|
||||
mod client;
|
||||
pub mod client_manager;
|
||||
#[cfg(feature = "testing")]
|
||||
mod database;
|
||||
pub mod error;
|
||||
pub mod flow;
|
||||
@@ -34,7 +33,6 @@ pub use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
|
||||
use snafu::OptionExt;
|
||||
|
||||
pub use self::client::Client;
|
||||
#[cfg(feature = "testing")]
|
||||
pub use self::database::Database;
|
||||
pub use self::error::{Error, Result};
|
||||
use crate::error::{IllegalDatabaseResponseSnafu, ServerSnafu};
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use enum_dispatch::enum_dispatch;
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::seq::IndexedRandom;
|
||||
|
||||
#[enum_dispatch]
|
||||
pub trait LoadBalance {
|
||||
@@ -37,7 +37,7 @@ pub struct Random;
|
||||
|
||||
impl LoadBalance for Random {
|
||||
fn get_peer<'a>(&self, peers: &'a [String]) -> Option<&'a String> {
|
||||
peers.choose(&mut rand::thread_rng())
|
||||
peers.choose(&mut rand::rng())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -30,7 +30,7 @@ use datanode::datanode::{Datanode, DatanodeBuilder};
|
||||
use datanode::service::DatanodeServiceBuilder;
|
||||
use meta_client::{MetaClientOptions, MetaClientType};
|
||||
use servers::Mode;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
|
||||
use crate::error::{
|
||||
@@ -223,15 +223,14 @@ impl StartCommand {
|
||||
.get_or_insert_with(MetaClientOptions::default)
|
||||
.metasrv_addrs
|
||||
.clone_from(metasrv_addrs);
|
||||
opts.mode = Mode::Distributed;
|
||||
}
|
||||
|
||||
if let (Mode::Distributed, None) = (&opts.mode, &opts.node_id) {
|
||||
return MissingConfigSnafu {
|
||||
msg: "Missing node id option",
|
||||
ensure!(
|
||||
opts.node_id.is_some(),
|
||||
MissingConfigSnafu {
|
||||
msg: "Missing node id option"
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
);
|
||||
|
||||
if let Some(data_home) = &self.data_home {
|
||||
opts.storage.data_home.clone_from(data_home);
|
||||
@@ -295,10 +294,13 @@ impl StartCommand {
|
||||
msg: "'meta_client_options'",
|
||||
})?;
|
||||
|
||||
let meta_client =
|
||||
meta_client::create_meta_client(MetaClientType::Datanode { member_id }, meta_config)
|
||||
.await
|
||||
.context(MetaClientInitSnafu)?;
|
||||
let meta_client = meta_client::create_meta_client(
|
||||
MetaClientType::Datanode { member_id },
|
||||
meta_config,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.context(MetaClientInitSnafu)?;
|
||||
|
||||
let meta_backend = Arc::new(MetaKvBackend {
|
||||
client: meta_client.clone(),
|
||||
@@ -311,7 +313,7 @@ impl StartCommand {
|
||||
.build(),
|
||||
);
|
||||
|
||||
let mut datanode = DatanodeBuilder::new(opts.clone(), plugins)
|
||||
let mut datanode = DatanodeBuilder::new(opts.clone(), plugins, Mode::Distributed)
|
||||
.with_meta_client(meta_client)
|
||||
.with_kv_backend(meta_backend)
|
||||
.with_cache_registry(layered_cache_registry)
|
||||
@@ -333,6 +335,7 @@ impl StartCommand {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::io::Write;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -340,7 +343,6 @@ mod tests {
|
||||
use common_test_util::temp_dir::create_named_temp_file;
|
||||
use datanode::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config};
|
||||
use servers::heartbeat_options::HeartbeatOptions;
|
||||
use servers::Mode;
|
||||
|
||||
use super::*;
|
||||
use crate::options::GlobalOptions;
|
||||
@@ -491,22 +493,6 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_try_from_cmd() {
|
||||
let opt = StartCommand::default()
|
||||
.load_options(&GlobalOptions::default())
|
||||
.unwrap()
|
||||
.component;
|
||||
assert_eq!(Mode::Standalone, opt.mode);
|
||||
|
||||
let opt = (StartCommand {
|
||||
node_id: Some(42),
|
||||
metasrv_addrs: Some(vec!["127.0.0.1:3002".to_string()]),
|
||||
..Default::default()
|
||||
})
|
||||
.load_options(&GlobalOptions::default())
|
||||
.unwrap()
|
||||
.component;
|
||||
assert_eq!(Mode::Distributed, opt.mode);
|
||||
|
||||
assert!((StartCommand {
|
||||
metasrv_addrs: Some(vec!["127.0.0.1:3002".to_string()]),
|
||||
..Default::default()
|
||||
@@ -525,7 +511,19 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_load_log_options_from_cli() {
|
||||
let cmd = StartCommand::default();
|
||||
let mut cmd = StartCommand::default();
|
||||
|
||||
let result = cmd.load_options(&GlobalOptions {
|
||||
log_dir: Some("./greptimedb_data/test/logs".to_string()),
|
||||
log_level: Some("debug".to_string()),
|
||||
|
||||
#[cfg(feature = "tokio-console")]
|
||||
tokio_console_addr: None,
|
||||
});
|
||||
// Missing node_id.
|
||||
assert_matches!(result, Err(crate::error::Error::MissingConfig { .. }));
|
||||
|
||||
cmd.node_id = Some(42);
|
||||
|
||||
let options = cmd
|
||||
.load_options(&GlobalOptions {
|
||||
|
||||
@@ -100,6 +100,13 @@ pub enum Error {
|
||||
source: flow::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Servers error"))]
|
||||
Servers {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
source: servers::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to start frontend"))]
|
||||
StartFrontend {
|
||||
#[snafu(implicit)]
|
||||
@@ -365,6 +372,7 @@ impl ErrorExt for Error {
|
||||
Error::ShutdownFrontend { source, .. } => source.status_code(),
|
||||
Error::StartMetaServer { source, .. } => source.status_code(),
|
||||
Error::ShutdownMetaServer { source, .. } => source.status_code(),
|
||||
Error::Servers { source, .. } => source.status_code(),
|
||||
Error::BuildMetaServer { source, .. } => source.status_code(),
|
||||
Error::UnsupportedSelectorType { source, .. } => source.status_code(),
|
||||
Error::BuildCli { source, .. } => source.status_code(),
|
||||
|
||||
@@ -34,8 +34,7 @@ use common_telemetry::logging::TracingOptions;
|
||||
use common_version::{short_version, version};
|
||||
use flow::{FlownodeBuilder, FlownodeInstance, FrontendInvoker};
|
||||
use meta_client::{MetaClientOptions, MetaClientType};
|
||||
use servers::Mode;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
|
||||
use crate::error::{
|
||||
@@ -203,7 +202,6 @@ impl StartCommand {
|
||||
.get_or_insert_with(MetaClientOptions::default)
|
||||
.metasrv_addrs
|
||||
.clone_from(metasrv_addrs);
|
||||
opts.mode = Mode::Distributed;
|
||||
}
|
||||
|
||||
if let Some(http_addr) = &self.http_addr {
|
||||
@@ -214,12 +212,12 @@ impl StartCommand {
|
||||
opts.http.timeout = Duration::from_secs(http_timeout);
|
||||
}
|
||||
|
||||
if let (Mode::Distributed, None) = (&opts.mode, &opts.node_id) {
|
||||
return MissingConfigSnafu {
|
||||
msg: "Missing node id option",
|
||||
ensure!(
|
||||
opts.node_id.is_some(),
|
||||
MissingConfigSnafu {
|
||||
msg: "Missing node id option"
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -249,10 +247,13 @@ impl StartCommand {
|
||||
msg: "'meta_client_options'",
|
||||
})?;
|
||||
|
||||
let meta_client =
|
||||
meta_client::create_meta_client(MetaClientType::Flownode { member_id }, meta_config)
|
||||
.await
|
||||
.context(MetaClientInitSnafu)?;
|
||||
let meta_client = meta_client::create_meta_client(
|
||||
MetaClientType::Flownode { member_id },
|
||||
meta_config,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.context(MetaClientInitSnafu)?;
|
||||
|
||||
let cache_max_capacity = meta_config.metadata_cache_max_capacity;
|
||||
let cache_ttl = meta_config.metadata_cache_ttl;
|
||||
|
||||
@@ -32,28 +32,25 @@ use common_telemetry::info;
|
||||
use common_telemetry::logging::TracingOptions;
|
||||
use common_time::timezone::set_default_timezone;
|
||||
use common_version::{short_version, version};
|
||||
use frontend::frontend::Frontend;
|
||||
use frontend::heartbeat::HeartbeatTask;
|
||||
use frontend::instance::builder::FrontendBuilder;
|
||||
use frontend::instance::{FrontendInstance, Instance as FeInstance};
|
||||
use frontend::server::Services;
|
||||
use meta_client::{MetaClientOptions, MetaClientType};
|
||||
use query::stats::StatementStatistics;
|
||||
use servers::export_metrics::ExportMetricsTask;
|
||||
use servers::tls::{TlsMode, TlsOption};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
|
||||
use crate::error::{
|
||||
self, InitTimezoneSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu, MissingConfigSnafu,
|
||||
Result, StartFrontendSnafu,
|
||||
};
|
||||
use crate::error::{self, Result};
|
||||
use crate::options::{GlobalOptions, GreptimeOptions};
|
||||
use crate::{log_versions, App};
|
||||
|
||||
type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>;
|
||||
|
||||
pub struct Instance {
|
||||
frontend: FeInstance,
|
||||
|
||||
frontend: Frontend,
|
||||
// Keep the logging guard to prevent the worker from being dropped.
|
||||
_guard: Vec<WorkerGuard>,
|
||||
}
|
||||
@@ -61,20 +58,17 @@ pub struct Instance {
|
||||
pub const APP_NAME: &str = "greptime-frontend";
|
||||
|
||||
impl Instance {
|
||||
pub fn new(frontend: FeInstance, guard: Vec<WorkerGuard>) -> Self {
|
||||
Self {
|
||||
frontend,
|
||||
_guard: guard,
|
||||
}
|
||||
pub fn new(frontend: Frontend, _guard: Vec<WorkerGuard>) -> Self {
|
||||
Self { frontend, _guard }
|
||||
}
|
||||
|
||||
pub fn mut_inner(&mut self) -> &mut FeInstance {
|
||||
&mut self.frontend
|
||||
}
|
||||
|
||||
pub fn inner(&self) -> &FeInstance {
|
||||
pub fn inner(&self) -> &Frontend {
|
||||
&self.frontend
|
||||
}
|
||||
|
||||
pub fn mut_inner(&mut self) -> &mut Frontend {
|
||||
&mut self.frontend
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -84,11 +78,15 @@ impl App for Instance {
|
||||
}
|
||||
|
||||
async fn start(&mut self) -> Result<()> {
|
||||
plugins::start_frontend_plugins(self.frontend.plugins().clone())
|
||||
let plugins = self.frontend.instance.plugins().clone();
|
||||
plugins::start_frontend_plugins(plugins)
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
.context(error::StartFrontendSnafu)?;
|
||||
|
||||
self.frontend.start().await.context(StartFrontendSnafu)
|
||||
self.frontend
|
||||
.start()
|
||||
.await
|
||||
.context(error::StartFrontendSnafu)
|
||||
}
|
||||
|
||||
async fn stop(&self) -> Result<()> {
|
||||
@@ -178,7 +176,7 @@ impl StartCommand {
|
||||
self.config_file.as_deref(),
|
||||
self.env_prefix.as_ref(),
|
||||
)
|
||||
.context(LoadLayeredConfigSnafu)?;
|
||||
.context(error::LoadLayeredConfigSnafu)?;
|
||||
|
||||
self.merge_with_cli_options(global_options, &mut opts)?;
|
||||
|
||||
@@ -283,22 +281,28 @@ impl StartCommand {
|
||||
let mut plugins = Plugins::new();
|
||||
plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &opts)
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
.context(error::StartFrontendSnafu)?;
|
||||
|
||||
set_default_timezone(opts.default_timezone.as_deref()).context(InitTimezoneSnafu)?;
|
||||
set_default_timezone(opts.default_timezone.as_deref()).context(error::InitTimezoneSnafu)?;
|
||||
|
||||
let meta_client_options = opts.meta_client.as_ref().context(MissingConfigSnafu {
|
||||
msg: "'meta_client'",
|
||||
})?;
|
||||
let meta_client_options = opts
|
||||
.meta_client
|
||||
.as_ref()
|
||||
.context(error::MissingConfigSnafu {
|
||||
msg: "'meta_client'",
|
||||
})?;
|
||||
|
||||
let cache_max_capacity = meta_client_options.metadata_cache_max_capacity;
|
||||
let cache_ttl = meta_client_options.metadata_cache_ttl;
|
||||
let cache_tti = meta_client_options.metadata_cache_tti;
|
||||
|
||||
let meta_client =
|
||||
meta_client::create_meta_client(MetaClientType::Frontend, meta_client_options)
|
||||
.await
|
||||
.context(MetaClientInitSnafu)?;
|
||||
let meta_client = meta_client::create_meta_client(
|
||||
MetaClientType::Frontend,
|
||||
meta_client_options,
|
||||
Some(&plugins),
|
||||
)
|
||||
.await
|
||||
.context(error::MetaClientInitSnafu)?;
|
||||
|
||||
// TODO(discord9): add helper function to ease the creation of cache registry&such
|
||||
let cached_meta_backend =
|
||||
@@ -345,6 +349,7 @@ impl StartCommand {
|
||||
opts.heartbeat.clone(),
|
||||
Arc::new(executor),
|
||||
);
|
||||
let heartbeat_task = Some(heartbeat_task);
|
||||
|
||||
// frontend to datanode need not timeout.
|
||||
// Some queries are expected to take long time.
|
||||
@@ -356,7 +361,7 @@ impl StartCommand {
|
||||
};
|
||||
let client = NodeClients::new(channel_config);
|
||||
|
||||
let mut instance = FrontendBuilder::new(
|
||||
let instance = FrontendBuilder::new(
|
||||
opts.clone(),
|
||||
cached_meta_backend.clone(),
|
||||
layered_cache_registry.clone(),
|
||||
@@ -367,20 +372,27 @@ impl StartCommand {
|
||||
)
|
||||
.with_plugin(plugins.clone())
|
||||
.with_local_cache_invalidator(layered_cache_registry)
|
||||
.with_heartbeat_task(heartbeat_task)
|
||||
.try_build()
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
.context(error::StartFrontendSnafu)?;
|
||||
let instance = Arc::new(instance);
|
||||
|
||||
let servers = Services::new(opts, Arc::new(instance.clone()), plugins)
|
||||
let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
|
||||
.context(error::ServersSnafu)?;
|
||||
|
||||
let servers = Services::new(opts, instance.clone(), plugins)
|
||||
.build()
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
instance
|
||||
.build_servers(servers)
|
||||
.context(StartFrontendSnafu)?;
|
||||
.context(error::StartFrontendSnafu)?;
|
||||
|
||||
Ok(Instance::new(instance, guard))
|
||||
let frontend = Frontend {
|
||||
instance,
|
||||
servers,
|
||||
heartbeat_task,
|
||||
export_metrics_task,
|
||||
};
|
||||
|
||||
Ok(Instance::new(frontend, guard))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -440,7 +452,7 @@ mod tests {
|
||||
|
||||
[http]
|
||||
addr = "127.0.0.1:4000"
|
||||
timeout = "30s"
|
||||
timeout = "0s"
|
||||
body_limit = "2GB"
|
||||
|
||||
[opentsdb]
|
||||
@@ -461,7 +473,7 @@ mod tests {
|
||||
let fe_opts = command.load_options(&Default::default()).unwrap().component;
|
||||
|
||||
assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
|
||||
assert_eq!(Duration::from_secs(30), fe_opts.http.timeout);
|
||||
assert_eq!(Duration::from_secs(0), fe_opts.http.timeout);
|
||||
|
||||
assert_eq!(ReadableSize::gb(2), fe_opts.http.body_limit);
|
||||
|
||||
|
||||
@@ -42,6 +42,7 @@ use common_meta::kv_backend::KvBackendRef;
|
||||
use common_meta::node_manager::NodeManagerRef;
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::region_keeper::MemoryRegionKeeper;
|
||||
use common_meta::region_registry::LeaderRegionRegistry;
|
||||
use common_meta::sequence::SequenceBuilder;
|
||||
use common_meta::wal_options_allocator::{build_wal_options_allocator, WalOptionsAllocatorRef};
|
||||
use common_procedure::{ProcedureInfo, ProcedureManagerRef};
|
||||
@@ -55,9 +56,9 @@ use datanode::datanode::{Datanode, DatanodeBuilder};
|
||||
use datanode::region_server::RegionServer;
|
||||
use file_engine::config::EngineConfig as FileEngineConfig;
|
||||
use flow::{FlowConfig, FlowWorkerManager, FlownodeBuilder, FlownodeOptions, FrontendInvoker};
|
||||
use frontend::frontend::FrontendOptions;
|
||||
use frontend::frontend::{Frontend, FrontendOptions};
|
||||
use frontend::instance::builder::FrontendBuilder;
|
||||
use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager};
|
||||
use frontend::instance::{Instance as FeInstance, StandaloneDatanodeManager};
|
||||
use frontend::server::Services;
|
||||
use frontend::service_config::{
|
||||
InfluxdbOptions, JaegerOptions, MysqlOptions, OpentsdbOptions, PostgresOptions,
|
||||
@@ -67,7 +68,7 @@ use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
|
||||
use mito2::config::MitoConfig;
|
||||
use query::stats::StatementStatistics;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use servers::export_metrics::ExportMetricsOption;
|
||||
use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
|
||||
use servers::grpc::GrpcOptions;
|
||||
use servers::http::HttpOptions;
|
||||
use servers::tls::{TlsMode, TlsOption};
|
||||
@@ -76,15 +77,9 @@ use snafu::ResultExt;
|
||||
use tokio::sync::{broadcast, RwLock};
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
|
||||
use crate::error::{
|
||||
BuildCacheRegistrySnafu, BuildWalOptionsAllocatorSnafu, CreateDirSnafu, IllegalConfigSnafu,
|
||||
InitDdlManagerSnafu, InitMetadataSnafu, InitTimezoneSnafu, LoadLayeredConfigSnafu, OtherSnafu,
|
||||
Result, ShutdownDatanodeSnafu, ShutdownFlownodeSnafu, ShutdownFrontendSnafu,
|
||||
StartDatanodeSnafu, StartFlownodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu,
|
||||
StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu,
|
||||
};
|
||||
use crate::error::Result;
|
||||
use crate::options::{GlobalOptions, GreptimeOptions};
|
||||
use crate::{log_versions, App};
|
||||
use crate::{error, log_versions, App};
|
||||
|
||||
pub const APP_NAME: &str = "greptime-standalone";
|
||||
|
||||
@@ -132,7 +127,6 @@ impl SubCommand {
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(default)]
|
||||
pub struct StandaloneOptions {
|
||||
pub mode: Mode,
|
||||
pub enable_telemetry: bool,
|
||||
pub default_timezone: Option<String>,
|
||||
pub http: HttpOptions,
|
||||
@@ -162,7 +156,6 @@ pub struct StandaloneOptions {
|
||||
impl Default for StandaloneOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
mode: Mode::Standalone,
|
||||
enable_telemetry: true,
|
||||
default_timezone: None,
|
||||
http: HttpOptions::default(),
|
||||
@@ -243,7 +236,6 @@ impl StandaloneOptions {
|
||||
grpc: cloned_opts.grpc,
|
||||
init_regions_in_background: cloned_opts.init_regions_in_background,
|
||||
init_regions_parallelism: cloned_opts.init_regions_parallelism,
|
||||
mode: Mode::Standalone,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
@@ -251,13 +243,12 @@ impl StandaloneOptions {
|
||||
|
||||
pub struct Instance {
|
||||
datanode: Datanode,
|
||||
frontend: FeInstance,
|
||||
frontend: Frontend,
|
||||
// TODO(discord9): wrapped it in flownode instance instead
|
||||
flow_worker_manager: Arc<FlowWorkerManager>,
|
||||
flow_shutdown: broadcast::Sender<()>,
|
||||
procedure_manager: ProcedureManagerRef,
|
||||
wal_options_allocator: WalOptionsAllocatorRef,
|
||||
|
||||
// Keep the logging guard to prevent the worker from being dropped.
|
||||
_guard: Vec<WorkerGuard>,
|
||||
}
|
||||
@@ -281,21 +272,26 @@ impl App for Instance {
|
||||
self.procedure_manager
|
||||
.start()
|
||||
.await
|
||||
.context(StartProcedureManagerSnafu)?;
|
||||
.context(error::StartProcedureManagerSnafu)?;
|
||||
|
||||
self.wal_options_allocator
|
||||
.start()
|
||||
.await
|
||||
.context(StartWalOptionsAllocatorSnafu)?;
|
||||
.context(error::StartWalOptionsAllocatorSnafu)?;
|
||||
|
||||
plugins::start_frontend_plugins(self.frontend.plugins().clone())
|
||||
plugins::start_frontend_plugins(self.frontend.instance.plugins().clone())
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
.context(error::StartFrontendSnafu)?;
|
||||
|
||||
self.frontend
|
||||
.start()
|
||||
.await
|
||||
.context(error::StartFrontendSnafu)?;
|
||||
|
||||
self.frontend.start().await.context(StartFrontendSnafu)?;
|
||||
self.flow_worker_manager
|
||||
.clone()
|
||||
.run_background(Some(self.flow_shutdown.subscribe()));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -303,17 +299,18 @@ impl App for Instance {
|
||||
self.frontend
|
||||
.shutdown()
|
||||
.await
|
||||
.context(ShutdownFrontendSnafu)?;
|
||||
.context(error::ShutdownFrontendSnafu)?;
|
||||
|
||||
self.procedure_manager
|
||||
.stop()
|
||||
.await
|
||||
.context(StopProcedureManagerSnafu)?;
|
||||
.context(error::StopProcedureManagerSnafu)?;
|
||||
|
||||
self.datanode
|
||||
.shutdown()
|
||||
.await
|
||||
.context(ShutdownDatanodeSnafu)?;
|
||||
.context(error::ShutdownDatanodeSnafu)?;
|
||||
|
||||
self.flow_shutdown
|
||||
.send(())
|
||||
.map_err(|_e| {
|
||||
@@ -322,7 +319,8 @@ impl App for Instance {
|
||||
}
|
||||
.build()
|
||||
})
|
||||
.context(ShutdownFlownodeSnafu)?;
|
||||
.context(error::ShutdownFlownodeSnafu)?;
|
||||
|
||||
info!("Datanode instance stopped.");
|
||||
|
||||
Ok(())
|
||||
@@ -368,7 +366,7 @@ impl StartCommand {
|
||||
self.config_file.as_deref(),
|
||||
self.env_prefix.as_ref(),
|
||||
)
|
||||
.context(LoadLayeredConfigSnafu)?;
|
||||
.context(error::LoadLayeredConfigSnafu)?;
|
||||
|
||||
self.merge_with_cli_options(global_options, &mut opts.component)?;
|
||||
|
||||
@@ -381,9 +379,6 @@ impl StartCommand {
|
||||
global_options: &GlobalOptions,
|
||||
opts: &mut StandaloneOptions,
|
||||
) -> Result<()> {
|
||||
// Should always be standalone mode.
|
||||
opts.mode = Mode::Standalone;
|
||||
|
||||
if let Some(dir) = &global_options.log_dir {
|
||||
opts.logging.dir.clone_from(dir);
|
||||
}
|
||||
@@ -415,7 +410,7 @@ impl StartCommand {
|
||||
// frontend grpc addr conflict with datanode default grpc addr
|
||||
let datanode_grpc_addr = DatanodeOptions::default().grpc.bind_addr;
|
||||
if addr.eq(&datanode_grpc_addr) {
|
||||
return IllegalConfigSnafu {
|
||||
return error::IllegalConfigSnafu {
|
||||
msg: format!(
|
||||
"gRPC listen address conflicts with datanode reserved gRPC addr: {datanode_grpc_addr}",
|
||||
),
|
||||
@@ -474,18 +469,19 @@ impl StartCommand {
|
||||
|
||||
plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &fe_opts)
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
.context(error::StartFrontendSnafu)?;
|
||||
|
||||
plugins::setup_datanode_plugins(&mut plugins, &plugin_opts, &dn_opts)
|
||||
.await
|
||||
.context(StartDatanodeSnafu)?;
|
||||
.context(error::StartDatanodeSnafu)?;
|
||||
|
||||
set_default_timezone(fe_opts.default_timezone.as_deref()).context(InitTimezoneSnafu)?;
|
||||
set_default_timezone(fe_opts.default_timezone.as_deref())
|
||||
.context(error::InitTimezoneSnafu)?;
|
||||
|
||||
let data_home = &dn_opts.storage.data_home;
|
||||
// Ensure the data_home directory exists.
|
||||
fs::create_dir_all(path::Path::new(data_home))
|
||||
.context(CreateDirSnafu { dir: data_home })?;
|
||||
.context(error::CreateDirSnafu { dir: data_home })?;
|
||||
|
||||
let metadata_dir = metadata_store_dir(data_home);
|
||||
let (kv_backend, procedure_manager) = FeInstance::try_build_standalone_components(
|
||||
@@ -494,7 +490,7 @@ impl StartCommand {
|
||||
opts.procedure,
|
||||
)
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
.context(error::StartFrontendSnafu)?;
|
||||
|
||||
// Builds cache registry
|
||||
let layered_cache_builder = LayeredCacheRegistryBuilder::default();
|
||||
@@ -503,16 +499,16 @@ impl StartCommand {
|
||||
with_default_composite_cache_registry(
|
||||
layered_cache_builder.add_cache_registry(fundamental_cache_registry),
|
||||
)
|
||||
.context(BuildCacheRegistrySnafu)?
|
||||
.context(error::BuildCacheRegistrySnafu)?
|
||||
.build(),
|
||||
);
|
||||
|
||||
let datanode = DatanodeBuilder::new(dn_opts, plugins.clone())
|
||||
let datanode = DatanodeBuilder::new(dn_opts, plugins.clone(), Mode::Standalone)
|
||||
.with_kv_backend(kv_backend.clone())
|
||||
.with_cache_registry(layered_cache_registry.clone())
|
||||
.build()
|
||||
.await
|
||||
.context(StartDatanodeSnafu)?;
|
||||
.context(error::StartDatanodeSnafu)?;
|
||||
|
||||
let information_extension = Arc::new(StandaloneInformationExtension::new(
|
||||
datanode.region_server(),
|
||||
@@ -545,7 +541,7 @@ impl StartCommand {
|
||||
.build()
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(OtherSnafu)?,
|
||||
.context(error::OtherSnafu)?,
|
||||
);
|
||||
|
||||
// set the ref to query for the local flow state
|
||||
@@ -576,7 +572,7 @@ impl StartCommand {
|
||||
let kafka_options = opts.wal.clone().into();
|
||||
let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone())
|
||||
.await
|
||||
.context(BuildWalOptionsAllocatorSnafu)?;
|
||||
.context(error::BuildWalOptionsAllocatorSnafu)?;
|
||||
let wal_options_allocator = Arc::new(wal_options_allocator);
|
||||
let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
|
||||
table_id_sequence,
|
||||
@@ -597,8 +593,8 @@ impl StartCommand {
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut frontend = FrontendBuilder::new(
|
||||
fe_opts,
|
||||
let fe_instance = FrontendBuilder::new(
|
||||
fe_opts.clone(),
|
||||
kv_backend.clone(),
|
||||
layered_cache_registry.clone(),
|
||||
catalog_manager.clone(),
|
||||
@@ -609,7 +605,8 @@ impl StartCommand {
|
||||
.with_plugin(plugins.clone())
|
||||
.try_build()
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
.context(error::StartFrontendSnafu)?;
|
||||
let fe_instance = Arc::new(fe_instance);
|
||||
|
||||
let flow_worker_manager = flownode.flow_worker_manager();
|
||||
// flow server need to be able to use frontend to write insert requests back
|
||||
@@ -622,18 +619,25 @@ impl StartCommand {
|
||||
node_manager,
|
||||
)
|
||||
.await
|
||||
.context(StartFlownodeSnafu)?;
|
||||
.context(error::StartFlownodeSnafu)?;
|
||||
flow_worker_manager.set_frontend_invoker(invoker).await;
|
||||
|
||||
let (tx, _rx) = broadcast::channel(1);
|
||||
|
||||
let servers = Services::new(opts, Arc::new(frontend.clone()), plugins)
|
||||
let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
|
||||
.context(error::ServersSnafu)?;
|
||||
|
||||
let servers = Services::new(opts, fe_instance.clone(), plugins)
|
||||
.build()
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
frontend
|
||||
.build_servers(servers)
|
||||
.context(StartFrontendSnafu)?;
|
||||
.context(error::StartFrontendSnafu)?;
|
||||
|
||||
let frontend = Frontend {
|
||||
instance: fe_instance,
|
||||
servers,
|
||||
heartbeat_task: None,
|
||||
export_metrics_task,
|
||||
};
|
||||
|
||||
Ok(Instance {
|
||||
datanode,
|
||||
@@ -661,6 +665,7 @@ impl StartCommand {
|
||||
node_manager,
|
||||
cache_invalidator,
|
||||
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
|
||||
leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
|
||||
table_metadata_manager,
|
||||
table_metadata_allocator,
|
||||
flow_metadata_manager,
|
||||
@@ -670,7 +675,7 @@ impl StartCommand {
|
||||
procedure_manager,
|
||||
true,
|
||||
)
|
||||
.context(InitDdlManagerSnafu)?,
|
||||
.context(error::InitDdlManagerSnafu)?,
|
||||
);
|
||||
|
||||
Ok(procedure_executor)
|
||||
@@ -684,7 +689,7 @@ impl StartCommand {
|
||||
table_metadata_manager
|
||||
.init()
|
||||
.await
|
||||
.context(InitMetadataSnafu)?;
|
||||
.context(error::InitMetadataSnafu)?;
|
||||
|
||||
Ok(table_metadata_manager)
|
||||
}
|
||||
@@ -778,6 +783,7 @@ impl InformationExtension for StandaloneInformationExtension {
|
||||
manifest_size: region_stat.manifest_size,
|
||||
sst_size: region_stat.sst_size,
|
||||
index_size: region_stat.index_size,
|
||||
region_manifest: region_stat.manifest.into(),
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
@@ -1054,7 +1060,6 @@ mod tests {
|
||||
let options =
|
||||
StandaloneOptions::load_layered_options(None, "GREPTIMEDB_STANDALONE").unwrap();
|
||||
let default_options = StandaloneOptions::default();
|
||||
assert_eq!(options.mode, default_options.mode);
|
||||
assert_eq!(options.enable_telemetry, default_options.enable_telemetry);
|
||||
assert_eq!(options.http, default_options.http);
|
||||
assert_eq!(options.grpc, default_options.grpc);
|
||||
|
||||
@@ -135,5 +135,6 @@ pub fn is_readonly_schema(schema: &str) -> bool {
|
||||
pub const TRACE_ID_COLUMN: &str = "trace_id";
|
||||
pub const SPAN_ID_COLUMN: &str = "span_id";
|
||||
pub const SPAN_NAME_COLUMN: &str = "span_name";
|
||||
pub const SERVICE_NAME_COLUMN: &str = "service_name";
|
||||
pub const PARENT_SPAN_ID_COLUMN: &str = "parent_span_id";
|
||||
// ---- End of special table and fields ----
|
||||
|
||||
@@ -39,6 +39,7 @@ geohash = { version = "0.13", optional = true }
|
||||
h3o = { version = "0.6", optional = true }
|
||||
hyperloglogplus = "0.4"
|
||||
jsonb.workspace = true
|
||||
memchr = "2.7"
|
||||
nalgebra.workspace = true
|
||||
num = "0.4"
|
||||
num-traits = "0.2"
|
||||
|
||||
@@ -12,15 +12,19 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod add_region_follower;
|
||||
mod flush_compact_region;
|
||||
mod flush_compact_table;
|
||||
mod migrate_region;
|
||||
mod remove_region_follower;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use add_region_follower::AddRegionFollowerFunction;
|
||||
use flush_compact_region::{CompactRegionFunction, FlushRegionFunction};
|
||||
use flush_compact_table::{CompactTableFunction, FlushTableFunction};
|
||||
use migrate_region::MigrateRegionFunction;
|
||||
use remove_region_follower::RemoveRegionFollowerFunction;
|
||||
|
||||
use crate::flush_flow::FlushFlowFunction;
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
@@ -32,6 +36,8 @@ impl AdminFunction {
|
||||
/// Register all table functions to [`FunctionRegistry`].
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register_async(Arc::new(MigrateRegionFunction));
|
||||
registry.register_async(Arc::new(AddRegionFollowerFunction));
|
||||
registry.register_async(Arc::new(RemoveRegionFollowerFunction));
|
||||
registry.register_async(Arc::new(FlushRegionFunction));
|
||||
registry.register_async(Arc::new(CompactRegionFunction));
|
||||
registry.register_async(Arc::new(FlushTableFunction));
|
||||
|
||||
129
src/common/function/src/admin/add_region_follower.rs
Normal file
129
src/common/function/src/admin/add_region_follower.rs
Normal file
@@ -0,0 +1,129 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_macro::admin_fn;
|
||||
use common_meta::rpc::procedure::AddRegionFollowerRequest;
|
||||
use common_query::error::{
|
||||
InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result,
|
||||
UnsupportedInputDataTypeSnafu,
|
||||
};
|
||||
use common_query::prelude::{Signature, TypeSignature, Volatility};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::value::{Value, ValueRef};
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ensure;
|
||||
|
||||
use crate::handlers::ProcedureServiceHandlerRef;
|
||||
use crate::helper::cast_u64;
|
||||
|
||||
/// A function to add a follower to a region.
|
||||
/// Only available in cluster mode.
|
||||
///
|
||||
/// - `add_region_follower(region_id, peer_id)`.
|
||||
///
|
||||
/// The parameters:
|
||||
/// - `region_id`: the region id
|
||||
/// - `peer_id`: the peer id
|
||||
#[admin_fn(
|
||||
name = AddRegionFollowerFunction,
|
||||
display_name = add_region_follower,
|
||||
sig_fn = signature,
|
||||
ret = uint64
|
||||
)]
|
||||
pub(crate) async fn add_region_follower(
|
||||
procedure_service_handler: &ProcedureServiceHandlerRef,
|
||||
_ctx: &QueryContextRef,
|
||||
params: &[ValueRef<'_>],
|
||||
) -> Result<Value> {
|
||||
ensure!(
|
||||
params.len() == 2,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect exactly 2, have: {}",
|
||||
params.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let Some(region_id) = cast_u64(¶ms[0])? else {
|
||||
return UnsupportedInputDataTypeSnafu {
|
||||
function: "add_region_follower",
|
||||
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
let Some(peer_id) = cast_u64(¶ms[1])? else {
|
||||
return UnsupportedInputDataTypeSnafu {
|
||||
function: "add_region_follower",
|
||||
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
|
||||
procedure_service_handler
|
||||
.add_region_follower(AddRegionFollowerRequest { region_id, peer_id })
|
||||
.await?;
|
||||
|
||||
Ok(Value::from(0u64))
|
||||
}
|
||||
|
||||
fn signature() -> Signature {
|
||||
Signature::one_of(
|
||||
vec![
|
||||
// add_region_follower(region_id, peer)
|
||||
TypeSignature::Uniform(2, ConcreteDataType::numerics()),
|
||||
],
|
||||
Volatility::Immutable,
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::prelude::TypeSignature;
|
||||
use datatypes::vectors::{UInt64Vector, VectorRef};
|
||||
|
||||
use super::*;
|
||||
use crate::function::{AsyncFunction, FunctionContext};
|
||||
|
||||
#[test]
|
||||
fn test_add_region_follower_misc() {
|
||||
let f = AddRegionFollowerFunction;
|
||||
assert_eq!("add_region_follower", f.name());
|
||||
assert_eq!(
|
||||
ConcreteDataType::uint64_datatype(),
|
||||
f.return_type(&[]).unwrap()
|
||||
);
|
||||
assert!(matches!(f.signature(),
|
||||
Signature {
|
||||
type_signature: TypeSignature::OneOf(sigs),
|
||||
volatility: Volatility::Immutable
|
||||
} if sigs.len() == 1));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_add_region_follower() {
|
||||
let f = AddRegionFollowerFunction;
|
||||
let args = vec![1, 1];
|
||||
let args = args
|
||||
.into_iter()
|
||||
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let result = f.eval(FunctionContext::mock(), &args).await.unwrap();
|
||||
let expect: VectorRef = Arc::new(UInt64Vector::from_slice([0u64]));
|
||||
assert_eq!(result, expect);
|
||||
}
|
||||
}
|
||||
129
src/common/function/src/admin/remove_region_follower.rs
Normal file
129
src/common/function/src/admin/remove_region_follower.rs
Normal file
@@ -0,0 +1,129 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_macro::admin_fn;
|
||||
use common_meta::rpc::procedure::RemoveRegionFollowerRequest;
|
||||
use common_query::error::{
|
||||
InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result,
|
||||
UnsupportedInputDataTypeSnafu,
|
||||
};
|
||||
use common_query::prelude::{Signature, TypeSignature, Volatility};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::value::{Value, ValueRef};
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ensure;
|
||||
|
||||
use crate::handlers::ProcedureServiceHandlerRef;
|
||||
use crate::helper::cast_u64;
|
||||
|
||||
/// A function to remove a follower from a region.
|
||||
//// Only available in cluster mode.
|
||||
///
|
||||
/// - `remove_region_follower(region_id, peer_id)`.
|
||||
///
|
||||
/// The parameters:
|
||||
/// - `region_id`: the region id
|
||||
/// - `peer_id`: the peer id
|
||||
#[admin_fn(
|
||||
name = RemoveRegionFollowerFunction,
|
||||
display_name = remove_region_follower,
|
||||
sig_fn = signature,
|
||||
ret = uint64
|
||||
)]
|
||||
pub(crate) async fn remove_region_follower(
|
||||
procedure_service_handler: &ProcedureServiceHandlerRef,
|
||||
_ctx: &QueryContextRef,
|
||||
params: &[ValueRef<'_>],
|
||||
) -> Result<Value> {
|
||||
ensure!(
|
||||
params.len() == 2,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect exactly 2, have: {}",
|
||||
params.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let Some(region_id) = cast_u64(¶ms[0])? else {
|
||||
return UnsupportedInputDataTypeSnafu {
|
||||
function: "add_region_follower",
|
||||
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
let Some(peer_id) = cast_u64(¶ms[1])? else {
|
||||
return UnsupportedInputDataTypeSnafu {
|
||||
function: "add_region_follower",
|
||||
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
|
||||
procedure_service_handler
|
||||
.remove_region_follower(RemoveRegionFollowerRequest { region_id, peer_id })
|
||||
.await?;
|
||||
|
||||
Ok(Value::from(0u64))
|
||||
}
|
||||
|
||||
fn signature() -> Signature {
|
||||
Signature::one_of(
|
||||
vec![
|
||||
// remove_region_follower(region_id, peer_id)
|
||||
TypeSignature::Uniform(2, ConcreteDataType::numerics()),
|
||||
],
|
||||
Volatility::Immutable,
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::prelude::TypeSignature;
|
||||
use datatypes::vectors::{UInt64Vector, VectorRef};
|
||||
|
||||
use super::*;
|
||||
use crate::function::{AsyncFunction, FunctionContext};
|
||||
|
||||
#[test]
|
||||
fn test_remove_region_follower_misc() {
|
||||
let f = RemoveRegionFollowerFunction;
|
||||
assert_eq!("remove_region_follower", f.name());
|
||||
assert_eq!(
|
||||
ConcreteDataType::uint64_datatype(),
|
||||
f.return_type(&[]).unwrap()
|
||||
);
|
||||
assert!(matches!(f.signature(),
|
||||
Signature {
|
||||
type_signature: TypeSignature::OneOf(sigs),
|
||||
volatility: Volatility::Immutable
|
||||
} if sigs.len() == 1));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_remove_region_follower() {
|
||||
let f = RemoveRegionFollowerFunction;
|
||||
let args = vec![1, 1];
|
||||
let args = args
|
||||
.into_iter()
|
||||
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let result = f.eval(FunctionContext::mock(), &args).await.unwrap();
|
||||
let expect: VectorRef = Arc::new(UInt64Vector::from_slice([0u64]));
|
||||
assert_eq!(result, expect);
|
||||
}
|
||||
}
|
||||
@@ -27,6 +27,7 @@ use crate::scalars::hll_count::HllCalcFunction;
|
||||
use crate::scalars::ip::IpFunctions;
|
||||
use crate::scalars::json::JsonFunction;
|
||||
use crate::scalars::matches::MatchesFunction;
|
||||
use crate::scalars::matches_term::MatchesTermFunction;
|
||||
use crate::scalars::math::MathFunction;
|
||||
use crate::scalars::timestamp::TimestampFunction;
|
||||
use crate::scalars::uddsketch_calc::UddSketchCalcFunction;
|
||||
@@ -116,6 +117,7 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
|
||||
|
||||
// Full text search function
|
||||
MatchesFunction::register(&function_registry);
|
||||
MatchesTermFunction::register(&function_registry);
|
||||
|
||||
// System and administration functions
|
||||
SystemFunction::register(&function_registry);
|
||||
|
||||
@@ -16,7 +16,10 @@ use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_base::AffectedRows;
|
||||
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
|
||||
use common_meta::rpc::procedure::{
|
||||
AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
|
||||
RemoveRegionFollowerRequest,
|
||||
};
|
||||
use common_query::error::Result;
|
||||
use common_query::Output;
|
||||
use session::context::QueryContextRef;
|
||||
@@ -63,6 +66,12 @@ pub trait ProcedureServiceHandler: Send + Sync {
|
||||
|
||||
/// Query the procedure' state by its id
|
||||
async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse>;
|
||||
|
||||
/// Add a region follower to a region.
|
||||
async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> Result<()>;
|
||||
|
||||
/// Remove a region follower from a region.
|
||||
async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>;
|
||||
}
|
||||
|
||||
/// This flow service handler is only use for flush flow for now.
|
||||
|
||||
@@ -19,6 +19,7 @@ pub mod expression;
|
||||
pub mod geo;
|
||||
pub mod json;
|
||||
pub mod matches;
|
||||
pub mod matches_term;
|
||||
pub mod math;
|
||||
pub mod vector;
|
||||
|
||||
|
||||
375
src/common/function/src/scalars/matches_term.rs
Normal file
375
src/common/function/src/scalars/matches_term.rs
Normal file
@@ -0,0 +1,375 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::{fmt, iter};
|
||||
|
||||
use common_query::error::{InvalidFuncArgsSnafu, Result};
|
||||
use common_query::prelude::Volatility;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::scalars::ScalarVectorBuilder;
|
||||
use datatypes::vectors::{BooleanVector, BooleanVectorBuilder, MutableVector, VectorRef};
|
||||
use memchr::memmem;
|
||||
use snafu::ensure;
|
||||
|
||||
use crate::function::{Function, FunctionContext};
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
|
||||
/// Exact term/phrase matching function for text columns.
|
||||
///
|
||||
/// This function checks if a text column contains exact term/phrase matches
|
||||
/// with non-alphanumeric boundaries. Designed for:
|
||||
/// - Whole-word matching (e.g. "cat" in "cat!" but not in "category")
|
||||
/// - Phrase matching (e.g. "hello world" in "note:hello world!")
|
||||
///
|
||||
/// # Signature
|
||||
/// `matches_term(text: String, term: String) -> Boolean`
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `text` - String column to search
|
||||
/// * `term` - Search term/phrase
|
||||
///
|
||||
/// # Returns
|
||||
/// BooleanVector where each element indicates if the corresponding text
|
||||
/// contains an exact match of the term, following these rules:
|
||||
/// 1. Exact substring match found (case-sensitive)
|
||||
/// 2. Match boundaries are either:
|
||||
/// - Start/end of text
|
||||
/// - Any non-alphanumeric character (including spaces, hyphens, punctuation, etc.)
|
||||
///
|
||||
/// # Examples
|
||||
/// ```
|
||||
/// -- SQL examples --
|
||||
/// -- Match phrase with space --
|
||||
/// SELECT matches_term(column, 'hello world') FROM table;
|
||||
/// -- Text: "warning:hello world!" => true
|
||||
/// -- Text: "hello-world" => false (hyphen instead of space)
|
||||
/// -- Text: "hello world2023" => false (ending with numbers)
|
||||
///
|
||||
/// -- Match multiple words with boundaries --
|
||||
/// SELECT matches_term(column, 'critical error') FROM logs;
|
||||
/// -- Match in: "ERROR:critical error!"
|
||||
/// -- No match: "critical_errors"
|
||||
///
|
||||
/// -- Empty string handling --
|
||||
/// SELECT matches_term(column, '') FROM table;
|
||||
/// -- Text: "" => true
|
||||
/// -- Text: "any" => false
|
||||
///
|
||||
/// -- Case sensitivity --
|
||||
/// SELECT matches_term(column, 'Cat') FROM table;
|
||||
/// -- Text: "Cat" => true
|
||||
/// -- Text: "cat" => false
|
||||
/// ```
|
||||
pub struct MatchesTermFunction;
|
||||
|
||||
impl MatchesTermFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register(Arc::new(MatchesTermFunction));
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for MatchesTermFunction {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "MATCHES_TERM")
|
||||
}
|
||||
}
|
||||
|
||||
impl Function for MatchesTermFunction {
|
||||
fn name(&self) -> &str {
|
||||
"matches_term"
|
||||
}
|
||||
|
||||
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
|
||||
Ok(ConcreteDataType::boolean_datatype())
|
||||
}
|
||||
|
||||
fn signature(&self) -> common_query::prelude::Signature {
|
||||
common_query::prelude::Signature::exact(
|
||||
vec![
|
||||
ConcreteDataType::string_datatype(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
],
|
||||
Volatility::Immutable,
|
||||
)
|
||||
}
|
||||
|
||||
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
|
||||
ensure!(
|
||||
columns.len() == 2,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect exactly 2, have: {}",
|
||||
columns.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let text_column = &columns[0];
|
||||
if text_column.is_empty() {
|
||||
return Ok(Arc::new(BooleanVector::from(Vec::<bool>::with_capacity(0))));
|
||||
}
|
||||
|
||||
let term_column = &columns[1];
|
||||
let compiled_finder = if term_column.is_const() {
|
||||
let term = term_column.get_ref(0).as_string().unwrap();
|
||||
match term {
|
||||
None => {
|
||||
return Ok(Arc::new(BooleanVector::from_iter(
|
||||
iter::repeat(None).take(text_column.len()),
|
||||
)));
|
||||
}
|
||||
Some(term) => Some(MatchesTermFinder::new(term)),
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let len = text_column.len();
|
||||
let mut result = BooleanVectorBuilder::with_capacity(len);
|
||||
for i in 0..len {
|
||||
let text = text_column.get_ref(i).as_string().unwrap();
|
||||
let Some(text) = text else {
|
||||
result.push_null();
|
||||
continue;
|
||||
};
|
||||
|
||||
let contains = match &compiled_finder {
|
||||
Some(finder) => finder.find(text),
|
||||
None => {
|
||||
let term = match term_column.get_ref(i).as_string().unwrap() {
|
||||
None => {
|
||||
result.push_null();
|
||||
continue;
|
||||
}
|
||||
Some(term) => term,
|
||||
};
|
||||
MatchesTermFinder::new(term).find(text)
|
||||
}
|
||||
};
|
||||
result.push(Some(contains));
|
||||
}
|
||||
|
||||
Ok(result.to_vector())
|
||||
}
|
||||
}
|
||||
|
||||
/// A compiled finder for `matches_term` function that holds the compiled term
|
||||
/// and its metadata for efficient matching.
|
||||
///
|
||||
/// A term is considered matched when:
|
||||
/// 1. The exact sequence appears in the text
|
||||
/// 2. It is either:
|
||||
/// - At the start/end of text with adjacent non-alphanumeric character
|
||||
/// - Surrounded by non-alphanumeric characters
|
||||
///
|
||||
/// # Examples
|
||||
/// ```
|
||||
/// let finder = MatchesTermFinder::new("cat");
|
||||
/// assert!(finder.find("cat!")); // Term at end with punctuation
|
||||
/// assert!(finder.find("dog,cat")); // Term preceded by comma
|
||||
/// assert!(!finder.find("category")); // Partial match rejected
|
||||
///
|
||||
/// let finder = MatchesTermFinder::new("world");
|
||||
/// assert!(finder.find("hello-world")); // Hyphen boundary
|
||||
/// ```
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct MatchesTermFinder {
|
||||
finder: memmem::Finder<'static>,
|
||||
term: String,
|
||||
starts_with_non_alnum: bool,
|
||||
ends_with_non_alnum: bool,
|
||||
}
|
||||
|
||||
impl MatchesTermFinder {
|
||||
/// Create a new `MatchesTermFinder` for the given term.
|
||||
pub fn new(term: &str) -> Self {
|
||||
let starts_with_non_alnum = term.chars().next().is_some_and(|c| !c.is_alphanumeric());
|
||||
let ends_with_non_alnum = term.chars().last().is_some_and(|c| !c.is_alphanumeric());
|
||||
|
||||
Self {
|
||||
finder: memmem::Finder::new(term).into_owned(),
|
||||
term: term.to_string(),
|
||||
starts_with_non_alnum,
|
||||
ends_with_non_alnum,
|
||||
}
|
||||
}
|
||||
|
||||
/// Find the term in the text.
|
||||
pub fn find(&self, text: &str) -> bool {
|
||||
if self.term.is_empty() {
|
||||
return text.is_empty();
|
||||
}
|
||||
|
||||
if text.len() < self.term.len() {
|
||||
return false;
|
||||
}
|
||||
|
||||
let mut pos = 0;
|
||||
while let Some(found_pos) = self.finder.find(text[pos..].as_bytes()) {
|
||||
let actual_pos = pos + found_pos;
|
||||
|
||||
let prev_ok = self.starts_with_non_alnum
|
||||
|| text[..actual_pos]
|
||||
.chars()
|
||||
.last()
|
||||
.map(|c| !c.is_alphanumeric())
|
||||
.unwrap_or(true);
|
||||
|
||||
if prev_ok {
|
||||
let next_pos = actual_pos + self.finder.needle().len();
|
||||
let next_ok = self.ends_with_non_alnum
|
||||
|| text[next_pos..]
|
||||
.chars()
|
||||
.next()
|
||||
.map(|c| !c.is_alphanumeric())
|
||||
.unwrap_or(true);
|
||||
|
||||
if next_ok {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(next_char) = text[actual_pos..].chars().next() {
|
||||
pos = actual_pos + next_char.len_utf8();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn matches_term_example() {
|
||||
let finder = MatchesTermFinder::new("hello world");
|
||||
assert!(finder.find("warning:hello world!"));
|
||||
assert!(!finder.find("hello-world"));
|
||||
assert!(!finder.find("hello world2023"));
|
||||
|
||||
let finder = MatchesTermFinder::new("critical error");
|
||||
assert!(finder.find("ERROR:critical error!"));
|
||||
assert!(!finder.find("critical_errors"));
|
||||
|
||||
let finder = MatchesTermFinder::new("");
|
||||
assert!(finder.find(""));
|
||||
assert!(!finder.find("any"));
|
||||
|
||||
let finder = MatchesTermFinder::new("Cat");
|
||||
assert!(finder.find("Cat"));
|
||||
assert!(!finder.find("cat"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn matches_term_with_punctuation() {
|
||||
assert!(MatchesTermFinder::new("cat").find("cat!"));
|
||||
assert!(MatchesTermFinder::new("dog").find("!dog"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn matches_phrase_with_boundaries() {
|
||||
assert!(MatchesTermFinder::new("hello-world").find("hello-world"));
|
||||
assert!(MatchesTermFinder::new("'foo bar'").find("test: 'foo bar'"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn matches_at_text_boundaries() {
|
||||
assert!(MatchesTermFinder::new("start").find("start..."));
|
||||
assert!(MatchesTermFinder::new("end").find("...end"));
|
||||
}
|
||||
|
||||
// Negative cases
|
||||
#[test]
|
||||
fn rejects_partial_matches() {
|
||||
assert!(!MatchesTermFinder::new("cat").find("category"));
|
||||
assert!(!MatchesTermFinder::new("boot").find("rebooted"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rejects_missing_term() {
|
||||
assert!(!MatchesTermFinder::new("foo").find("hello world"));
|
||||
}
|
||||
|
||||
// Edge cases
|
||||
#[test]
|
||||
fn handles_empty_inputs() {
|
||||
assert!(!MatchesTermFinder::new("test").find(""));
|
||||
assert!(!MatchesTermFinder::new("").find("text"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn different_unicode_boundaries() {
|
||||
assert!(MatchesTermFinder::new("café").find("café>"));
|
||||
assert!(!MatchesTermFinder::new("café").find("口café>"));
|
||||
assert!(!MatchesTermFinder::new("café").find("café口"));
|
||||
assert!(!MatchesTermFinder::new("café").find("cafémore"));
|
||||
assert!(MatchesTermFinder::new("русский").find("русский!"));
|
||||
assert!(MatchesTermFinder::new("русский").find("русский!"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn case_sensitive_matching() {
|
||||
assert!(!MatchesTermFinder::new("cat").find("Cat"));
|
||||
assert!(MatchesTermFinder::new("CaT").find("CaT"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn numbers_in_term() {
|
||||
assert!(MatchesTermFinder::new("v1.0").find("v1.0!"));
|
||||
assert!(!MatchesTermFinder::new("v1.0").find("v1.0a"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn adjacent_alphanumeric_fails() {
|
||||
assert!(!MatchesTermFinder::new("cat").find("cat5"));
|
||||
assert!(!MatchesTermFinder::new("dog").find("dogcat"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn empty_term_text() {
|
||||
assert!(!MatchesTermFinder::new("").find("text"));
|
||||
assert!(MatchesTermFinder::new("").find(""));
|
||||
assert!(!MatchesTermFinder::new("text").find(""));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn leading_non_alphanumeric() {
|
||||
assert!(MatchesTermFinder::new("/cat").find("dog/cat"));
|
||||
assert!(MatchesTermFinder::new("dog/").find("dog/cat"));
|
||||
assert!(MatchesTermFinder::new("dog/cat").find("dog/cat"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn continues_searching_after_boundary_mismatch() {
|
||||
assert!(!MatchesTermFinder::new("log").find("bloglog!"));
|
||||
assert!(MatchesTermFinder::new("log").find("bloglog log"));
|
||||
assert!(MatchesTermFinder::new("log").find("alogblog_log!"));
|
||||
|
||||
assert!(MatchesTermFinder::new("error").find("errorlog_error_case"));
|
||||
assert!(MatchesTermFinder::new("test").find("atestbtestc_test_end"));
|
||||
assert!(MatchesTermFinder::new("data").find("database_data_store"));
|
||||
assert!(!MatchesTermFinder::new("data").find("database_datastore"));
|
||||
assert!(MatchesTermFinder::new("log.txt").find("catalog.txt_log.txt!"));
|
||||
assert!(!MatchesTermFinder::new("log.txt").find("catalog.txtlog.txt!"));
|
||||
assert!(MatchesTermFinder::new("data-set").find("bigdata-set_data-set!"));
|
||||
|
||||
assert!(MatchesTermFinder::new("中文").find("这是中文测试,中文!"));
|
||||
assert!(MatchesTermFinder::new("error").find("错误errorerror日志_error!"));
|
||||
}
|
||||
}
|
||||
@@ -24,6 +24,7 @@ pub(crate) mod sum;
|
||||
mod vector_add;
|
||||
mod vector_dim;
|
||||
mod vector_div;
|
||||
mod vector_kth_elem;
|
||||
mod vector_mul;
|
||||
mod vector_norm;
|
||||
mod vector_sub;
|
||||
@@ -57,6 +58,7 @@ impl VectorFunction {
|
||||
registry.register(Arc::new(vector_div::VectorDivFunction));
|
||||
registry.register(Arc::new(vector_norm::VectorNormFunction));
|
||||
registry.register(Arc::new(vector_dim::VectorDimFunction));
|
||||
registry.register(Arc::new(vector_kth_elem::VectorKthElemFunction));
|
||||
registry.register(Arc::new(vector_subvector::VectorSubvectorFunction));
|
||||
registry.register(Arc::new(elem_sum::ElemSumFunction));
|
||||
registry.register(Arc::new(elem_product::ElemProductFunction));
|
||||
|
||||
211
src/common/function/src/scalars/vector/vector_kth_elem.rs
Normal file
211
src/common/function/src/scalars/vector/vector_kth_elem.rs
Normal file
@@ -0,0 +1,211 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::borrow::Cow;
|
||||
use std::fmt::Display;
|
||||
|
||||
use common_query::error::{InvalidFuncArgsSnafu, Result};
|
||||
use common_query::prelude::Signature;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::scalars::ScalarVectorBuilder;
|
||||
use datatypes::vectors::{Float32VectorBuilder, MutableVector, VectorRef};
|
||||
use snafu::ensure;
|
||||
|
||||
use crate::function::{Function, FunctionContext};
|
||||
use crate::helper;
|
||||
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const};
|
||||
|
||||
const NAME: &str = "vec_kth_elem";
|
||||
|
||||
/// Returns the k-th(0-based index) element of the vector.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```sql
|
||||
/// SELECT vec_kth_elem("[2, 4, 6]",1) as result;
|
||||
///
|
||||
/// +---------+
|
||||
/// | result |
|
||||
/// +---------+
|
||||
/// | 4 |
|
||||
/// +---------+
|
||||
///
|
||||
/// ```
|
||||
///
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct VectorKthElemFunction;
|
||||
|
||||
impl Function for VectorKthElemFunction {
|
||||
fn name(&self) -> &str {
|
||||
NAME
|
||||
}
|
||||
|
||||
fn return_type(
|
||||
&self,
|
||||
_input_types: &[ConcreteDataType],
|
||||
) -> common_query::error::Result<ConcreteDataType> {
|
||||
Ok(ConcreteDataType::float32_datatype())
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
helper::one_of_sigs2(
|
||||
vec![
|
||||
ConcreteDataType::string_datatype(),
|
||||
ConcreteDataType::binary_datatype(),
|
||||
],
|
||||
vec![ConcreteDataType::int64_datatype()],
|
||||
)
|
||||
}
|
||||
|
||||
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
|
||||
ensure!(
|
||||
columns.len() == 2,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect exactly two, have: {}",
|
||||
columns.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let arg0 = &columns[0];
|
||||
let arg1 = &columns[1];
|
||||
|
||||
let len = arg0.len();
|
||||
let mut result = Float32VectorBuilder::with_capacity(len);
|
||||
if len == 0 {
|
||||
return Ok(result.to_vector());
|
||||
};
|
||||
|
||||
let arg0_const = as_veclit_if_const(arg0)?;
|
||||
|
||||
for i in 0..len {
|
||||
let arg0 = match arg0_const.as_ref() {
|
||||
Some(arg0) => Some(Cow::Borrowed(arg0.as_ref())),
|
||||
None => as_veclit(arg0.get_ref(i))?,
|
||||
};
|
||||
let Some(arg0) = arg0 else {
|
||||
result.push_null();
|
||||
continue;
|
||||
};
|
||||
|
||||
let arg1 = arg1.get(i).as_f64_lossy();
|
||||
let Some(arg1) = arg1 else {
|
||||
result.push_null();
|
||||
continue;
|
||||
};
|
||||
|
||||
ensure!(
|
||||
arg1 >= 0.0 && arg1.fract() == 0.0,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"Invalid argument: k must be a non-negative integer, but got k = {}.",
|
||||
arg1
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let k = arg1 as usize;
|
||||
|
||||
ensure!(
|
||||
k < arg0.len(),
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"Out of range: k must be in the range [0, {}], but got k = {}.",
|
||||
arg0.len() - 1,
|
||||
k
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let value = arg0[k];
|
||||
|
||||
result.push(Some(value));
|
||||
}
|
||||
Ok(result.to_vector())
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for VectorKthElemFunction {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", NAME.to_ascii_uppercase())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::error;
|
||||
use datatypes::vectors::{Int64Vector, StringVector};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_vec_kth_elem() {
|
||||
let func = VectorKthElemFunction;
|
||||
|
||||
let input0 = Arc::new(StringVector::from(vec![
|
||||
Some("[1.0,2.0,3.0]".to_string()),
|
||||
Some("[4.0,5.0,6.0]".to_string()),
|
||||
Some("[7.0,8.0,9.0]".to_string()),
|
||||
None,
|
||||
]));
|
||||
let input1 = Arc::new(Int64Vector::from(vec![Some(0), Some(2), None, Some(1)]));
|
||||
|
||||
let result = func
|
||||
.eval(&FunctionContext::default(), &[input0, input1])
|
||||
.unwrap();
|
||||
|
||||
let result = result.as_ref();
|
||||
assert_eq!(result.len(), 4);
|
||||
assert_eq!(result.get_ref(0).as_f32().unwrap(), Some(1.0));
|
||||
assert_eq!(result.get_ref(1).as_f32().unwrap(), Some(6.0));
|
||||
assert!(result.get_ref(2).is_null());
|
||||
assert!(result.get_ref(3).is_null());
|
||||
|
||||
let input0 = Arc::new(StringVector::from(vec![Some("[1.0,2.0,3.0]".to_string())]));
|
||||
let input1 = Arc::new(Int64Vector::from(vec![Some(3)]));
|
||||
|
||||
let err = func
|
||||
.eval(&FunctionContext::default(), &[input0, input1])
|
||||
.unwrap_err();
|
||||
match err {
|
||||
error::Error::InvalidFuncArgs { err_msg, .. } => {
|
||||
assert_eq!(
|
||||
err_msg,
|
||||
format!("Out of range: k must be in the range [0, 2], but got k = 3.")
|
||||
)
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
|
||||
let input0 = Arc::new(StringVector::from(vec![Some("[1.0,2.0,3.0]".to_string())]));
|
||||
let input1 = Arc::new(Int64Vector::from(vec![Some(-1)]));
|
||||
|
||||
let err = func
|
||||
.eval(&FunctionContext::default(), &[input0, input1])
|
||||
.unwrap_err();
|
||||
match err {
|
||||
error::Error::InvalidFuncArgs { err_msg, .. } => {
|
||||
assert_eq!(
|
||||
err_msg,
|
||||
format!("Invalid argument: k must be a non-negative integer, but got k = -1.")
|
||||
)
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -35,7 +35,10 @@ impl FunctionState {
|
||||
use api::v1::meta::ProcedureStatus;
|
||||
use async_trait::async_trait;
|
||||
use common_base::AffectedRows;
|
||||
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
|
||||
use common_meta::rpc::procedure::{
|
||||
AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
|
||||
RemoveRegionFollowerRequest,
|
||||
};
|
||||
use common_query::error::Result;
|
||||
use common_query::Output;
|
||||
use session::context::QueryContextRef;
|
||||
@@ -66,6 +69,17 @@ impl FunctionState {
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
|
||||
async fn add_region_follower(&self, _request: AddRegionFollowerRequest) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn remove_region_follower(
|
||||
&self,
|
||||
_request: RemoveRegionFollowerRequest,
|
||||
) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
||||
@@ -22,7 +22,9 @@ mod version;
|
||||
use std::sync::Arc;
|
||||
|
||||
use build::BuildFunction;
|
||||
use database::{CurrentSchemaFunction, DatabaseFunction, SessionUserFunction};
|
||||
use database::{
|
||||
CurrentSchemaFunction, DatabaseFunction, ReadPreferenceFunction, SessionUserFunction,
|
||||
};
|
||||
use pg_catalog::PGCatalogFunction;
|
||||
use procedure_state::ProcedureStateFunction;
|
||||
use timezone::TimezoneFunction;
|
||||
@@ -39,6 +41,7 @@ impl SystemFunction {
|
||||
registry.register(Arc::new(CurrentSchemaFunction));
|
||||
registry.register(Arc::new(DatabaseFunction));
|
||||
registry.register(Arc::new(SessionUserFunction));
|
||||
registry.register(Arc::new(ReadPreferenceFunction));
|
||||
registry.register(Arc::new(TimezoneFunction));
|
||||
registry.register_async(Arc::new(ProcedureStateFunction));
|
||||
PGCatalogFunction::register(registry);
|
||||
|
||||
@@ -30,9 +30,12 @@ pub struct DatabaseFunction;
|
||||
pub struct CurrentSchemaFunction;
|
||||
pub struct SessionUserFunction;
|
||||
|
||||
pub struct ReadPreferenceFunction;
|
||||
|
||||
const DATABASE_FUNCTION_NAME: &str = "database";
|
||||
const CURRENT_SCHEMA_FUNCTION_NAME: &str = "current_schema";
|
||||
const SESSION_USER_FUNCTION_NAME: &str = "session_user";
|
||||
const READ_PREFERENCE_FUNCTION_NAME: &str = "read_preference";
|
||||
|
||||
impl Function for DatabaseFunction {
|
||||
fn name(&self) -> &str {
|
||||
@@ -94,6 +97,26 @@ impl Function for SessionUserFunction {
|
||||
}
|
||||
}
|
||||
|
||||
impl Function for ReadPreferenceFunction {
|
||||
fn name(&self) -> &str {
|
||||
READ_PREFERENCE_FUNCTION_NAME
|
||||
}
|
||||
|
||||
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
|
||||
Ok(ConcreteDataType::string_datatype())
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
Signature::nullary(Volatility::Immutable)
|
||||
}
|
||||
|
||||
fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
|
||||
let read_preference = func_ctx.query_ctx.read_preference();
|
||||
|
||||
Ok(Arc::new(StringVector::from_slice(&[read_preference.as_ref()])) as _)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for DatabaseFunction {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "DATABASE")
|
||||
@@ -112,6 +135,12 @@ impl fmt::Display for SessionUserFunction {
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for ReadPreferenceFunction {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "READ_PREFERENCE")
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -15,11 +15,13 @@
|
||||
use api::helper::ColumnDataTypeWrapper;
|
||||
use api::v1::add_column_location::LocationType;
|
||||
use api::v1::alter_table_expr::Kind;
|
||||
use api::v1::column_def::{as_fulltext_option, as_skipping_index_type};
|
||||
use api::v1::column_def::{
|
||||
as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
|
||||
};
|
||||
use api::v1::{
|
||||
column_def, AddColumnLocation as Location, AlterTableExpr, Analyzer, CreateTableExpr,
|
||||
DropColumns, ModifyColumnTypes, RenameTable, SemanticType,
|
||||
SkippingIndexType as PbSkippingIndexType,
|
||||
DropColumns, FulltextBackend as PbFulltextBackend, ModifyColumnTypes, RenameTable,
|
||||
SemanticType, SkippingIndexType as PbSkippingIndexType,
|
||||
};
|
||||
use common_query::AddColumnLocation;
|
||||
use datatypes::schema::{ColumnSchema, FulltextOptions, RawSchema, SkippingIndexOptions};
|
||||
@@ -126,11 +128,15 @@ pub fn alter_expr_to_request(table_id: TableId, expr: AlterTableExpr) -> Result<
|
||||
column_name: f.column_name.clone(),
|
||||
options: FulltextOptions {
|
||||
enable: f.enable,
|
||||
analyzer: as_fulltext_option(
|
||||
analyzer: as_fulltext_option_analyzer(
|
||||
Analyzer::try_from(f.analyzer)
|
||||
.context(InvalidSetFulltextOptionRequestSnafu)?,
|
||||
),
|
||||
case_sensitive: f.case_sensitive,
|
||||
backend: as_fulltext_option_backend(
|
||||
PbFulltextBackend::try_from(f.backend)
|
||||
.context(InvalidSetFulltextOptionRequestSnafu)?,
|
||||
),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
@@ -25,7 +25,7 @@ async fn do_bench_channel_manager() {
|
||||
let m_clone = m.clone();
|
||||
let join = tokio::spawn(async move {
|
||||
for _ in 0..10000 {
|
||||
let idx = rand::random::<usize>() % 100;
|
||||
let idx = rand::random::<u32>() % 100;
|
||||
let ret = m_clone.get(format!("{idx}"));
|
||||
let _ = ret.unwrap();
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ use crate::error::{
|
||||
DecodeJsonSnafu, EncodeJsonSnafu, Error, FromUtf8Snafu, InvalidNodeInfoKeySnafu,
|
||||
InvalidRoleSnafu, ParseNumSnafu, Result,
|
||||
};
|
||||
use crate::key::flow::flow_state::FlowStat;
|
||||
use crate::peer::Peer;
|
||||
|
||||
const CLUSTER_NODE_INFO_PREFIX: &str = "__meta_cluster_node_info";
|
||||
@@ -52,6 +53,9 @@ pub trait ClusterInfo {
|
||||
/// List all region stats in the cluster.
|
||||
async fn list_region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
|
||||
|
||||
/// List all flow stats in the cluster.
|
||||
async fn list_flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
|
||||
|
||||
// TODO(jeremy): Other info, like region status, etc.
|
||||
}
|
||||
|
||||
|
||||
@@ -92,6 +92,22 @@ pub struct RegionStat {
|
||||
pub sst_size: u64,
|
||||
/// The size of the SST index files in bytes.
|
||||
pub index_size: u64,
|
||||
/// The manifest infoof the region.
|
||||
pub region_manifest: RegionManifestInfo,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||
pub enum RegionManifestInfo {
|
||||
Mito {
|
||||
manifest_version: u64,
|
||||
flushed_entry_id: u64,
|
||||
},
|
||||
Metric {
|
||||
data_manifest_version: u64,
|
||||
data_flushed_entry_id: u64,
|
||||
metadata_manifest_version: u64,
|
||||
metadata_flushed_entry_id: u64,
|
||||
},
|
||||
}
|
||||
|
||||
impl Stat {
|
||||
@@ -165,6 +181,31 @@ impl TryFrom<&HeartbeatRequest> for Stat {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<store_api::region_engine::RegionManifestInfo> for RegionManifestInfo {
|
||||
fn from(value: store_api::region_engine::RegionManifestInfo) -> Self {
|
||||
match value {
|
||||
store_api::region_engine::RegionManifestInfo::Mito {
|
||||
manifest_version,
|
||||
flushed_entry_id,
|
||||
} => RegionManifestInfo::Mito {
|
||||
manifest_version,
|
||||
flushed_entry_id,
|
||||
},
|
||||
store_api::region_engine::RegionManifestInfo::Metric {
|
||||
data_manifest_version,
|
||||
data_flushed_entry_id,
|
||||
metadata_manifest_version,
|
||||
metadata_flushed_entry_id,
|
||||
} => RegionManifestInfo::Metric {
|
||||
data_manifest_version,
|
||||
data_flushed_entry_id,
|
||||
metadata_manifest_version,
|
||||
metadata_flushed_entry_id,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&api::v1::meta::RegionStat> for RegionStat {
|
||||
fn from(value: &api::v1::meta::RegionStat) -> Self {
|
||||
let region_stat = value
|
||||
@@ -185,6 +226,7 @@ impl From<&api::v1::meta::RegionStat> for RegionStat {
|
||||
manifest_size: region_stat.manifest_size,
|
||||
sst_size: region_stat.sst_size,
|
||||
index_size: region_stat.index_size,
|
||||
region_manifest: region_stat.manifest.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,14 +22,18 @@ use store_api::storage::{RegionId, RegionNumber, TableId};
|
||||
use crate::cache_invalidator::CacheInvalidatorRef;
|
||||
use crate::ddl::flow_meta::FlowMetadataAllocatorRef;
|
||||
use crate::ddl::table_meta::TableMetadataAllocatorRef;
|
||||
use crate::error::Result;
|
||||
use crate::error::{Result, UnsupportedSnafu};
|
||||
use crate::key::flow::FlowMetadataManagerRef;
|
||||
use crate::key::table_route::PhysicalTableRouteValue;
|
||||
use crate::key::TableMetadataManagerRef;
|
||||
use crate::node_manager::NodeManagerRef;
|
||||
use crate::region_keeper::MemoryRegionKeeperRef;
|
||||
use crate::region_registry::LeaderRegionRegistryRef;
|
||||
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
|
||||
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
|
||||
use crate::rpc::procedure::{
|
||||
AddRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
|
||||
RemoveRegionFollowerRequest,
|
||||
};
|
||||
use crate::DatanodeId;
|
||||
|
||||
pub mod alter_database;
|
||||
@@ -70,6 +74,30 @@ pub trait ProcedureExecutor: Send + Sync {
|
||||
request: SubmitDdlTaskRequest,
|
||||
) -> Result<SubmitDdlTaskResponse>;
|
||||
|
||||
/// Add a region follower
|
||||
async fn add_region_follower(
|
||||
&self,
|
||||
_ctx: &ExecutorContext,
|
||||
_request: AddRegionFollowerRequest,
|
||||
) -> Result<()> {
|
||||
UnsupportedSnafu {
|
||||
operation: "add_region_follower",
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
|
||||
/// Remove a region follower
|
||||
async fn remove_region_follower(
|
||||
&self,
|
||||
_ctx: &ExecutorContext,
|
||||
_request: RemoveRegionFollowerRequest,
|
||||
) -> Result<()> {
|
||||
UnsupportedSnafu {
|
||||
operation: "remove_region_follower",
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
|
||||
/// Submit a region migration task
|
||||
async fn migrate_region(
|
||||
&self,
|
||||
@@ -137,6 +165,8 @@ pub struct DdlContext {
|
||||
pub cache_invalidator: CacheInvalidatorRef,
|
||||
/// Keep tracking operating regions.
|
||||
pub memory_region_keeper: MemoryRegionKeeperRef,
|
||||
/// The leader region registry.
|
||||
pub leader_region_registry: LeaderRegionRegistryRef,
|
||||
/// Table metadata manager.
|
||||
pub table_metadata_manager: TableMetadataManagerRef,
|
||||
/// Allocator for table metadata.
|
||||
|
||||
@@ -35,7 +35,9 @@ use crate::error::{self, Result};
|
||||
use crate::instruction::CacheIdent;
|
||||
use crate::key::table_name::TableNameKey;
|
||||
use crate::key::table_route::TableRouteValue;
|
||||
use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
|
||||
use crate::rpc::router::{
|
||||
find_leader_regions, find_leaders, operating_leader_regions, RegionRoute,
|
||||
};
|
||||
|
||||
/// [Control] indicated to the caller whether to go to the next step.
|
||||
#[derive(Debug)]
|
||||
@@ -250,6 +252,11 @@ impl DropTableExecutor {
|
||||
.into_iter()
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
// Deletes the leader region from registry.
|
||||
let region_ids = operating_leader_regions(region_routes);
|
||||
ctx.leader_region_registry
|
||||
.batch_delete(region_ids.into_iter().map(|(region_id, _)| region_id));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -98,13 +98,14 @@ impl TableMetadataAllocator {
|
||||
fn create_wal_options(
|
||||
&self,
|
||||
table_route: &PhysicalTableRouteValue,
|
||||
skip_wal: bool,
|
||||
) -> Result<HashMap<RegionNumber, String>> {
|
||||
let region_numbers = table_route
|
||||
.region_routes
|
||||
.iter()
|
||||
.map(|route| route.region.id.region_number())
|
||||
.collect();
|
||||
allocate_region_wal_options(region_numbers, &self.wal_options_allocator)
|
||||
allocate_region_wal_options(region_numbers, &self.wal_options_allocator, skip_wal)
|
||||
}
|
||||
|
||||
async fn create_table_route(
|
||||
@@ -158,7 +159,9 @@ impl TableMetadataAllocator {
|
||||
pub async fn create(&self, task: &CreateTableTask) -> Result<TableMetadata> {
|
||||
let table_id = self.allocate_table_id(&task.create_table.table_id).await?;
|
||||
let table_route = self.create_table_route(table_id, task).await?;
|
||||
let region_wal_options = self.create_wal_options(&table_route)?;
|
||||
|
||||
let region_wal_options =
|
||||
self.create_wal_options(&table_route, task.table_info.meta.options.skip_wal)?;
|
||||
|
||||
debug!(
|
||||
"Allocated region wal options {:?} for table {}",
|
||||
|
||||
@@ -850,6 +850,7 @@ mod tests {
|
||||
use crate::node_manager::{DatanodeRef, FlownodeRef, NodeManager};
|
||||
use crate::peer::Peer;
|
||||
use crate::region_keeper::MemoryRegionKeeper;
|
||||
use crate::region_registry::LeaderRegionRegistry;
|
||||
use crate::sequence::SequenceBuilder;
|
||||
use crate::state_store::KvStateStore;
|
||||
use crate::wal_options_allocator::WalOptionsAllocator;
|
||||
@@ -893,6 +894,7 @@ mod tests {
|
||||
flow_metadata_manager,
|
||||
flow_metadata_allocator,
|
||||
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
|
||||
leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
|
||||
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
|
||||
},
|
||||
procedure_manager.clone(),
|
||||
|
||||
@@ -512,6 +512,10 @@ impl TableMetadataManager {
|
||||
&self.table_route_manager
|
||||
}
|
||||
|
||||
pub fn topic_region_manager(&self) -> &TopicRegionManager {
|
||||
&self.topic_region_manager
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
pub fn kv_backend(&self) -> &KvBackendRef {
|
||||
&self.kv_backend
|
||||
@@ -1471,7 +1475,8 @@ mod tests {
|
||||
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
|
||||
let wal_allocator = WalOptionsAllocator::RaftEngine;
|
||||
let regions = (0..16).collect();
|
||||
let region_wal_options = allocate_region_wal_options(regions, &wal_allocator).unwrap();
|
||||
let region_wal_options =
|
||||
allocate_region_wal_options(regions, &wal_allocator, false).unwrap();
|
||||
create_physical_table_metadata(
|
||||
&table_metadata_manager,
|
||||
table_info.clone(),
|
||||
|
||||
@@ -224,6 +224,7 @@ impl TopicRegionManager {
|
||||
Some((region_id, kafka.topic.as_str()))
|
||||
}
|
||||
Some(WalOptions::RaftEngine) => None,
|
||||
Some(WalOptions::Noop) => None,
|
||||
None => None,
|
||||
},
|
||||
)
|
||||
|
||||
@@ -155,21 +155,21 @@ impl<'a> MySqlTemplateFactory<'a> {
|
||||
table_name: table_name.to_string(),
|
||||
create_table_statement: format!(
|
||||
// Cannot be more than 3072 bytes in PRIMARY KEY
|
||||
"CREATE TABLE IF NOT EXISTS {table_name}(k VARBINARY(3072) PRIMARY KEY, v BLOB);",
|
||||
"CREATE TABLE IF NOT EXISTS `{table_name}`(k VARBINARY(3072) PRIMARY KEY, v BLOB);",
|
||||
),
|
||||
range_template: RangeTemplate {
|
||||
point: format!("SELECT k, v FROM {table_name} WHERE k = ?"),
|
||||
range: format!("SELECT k, v FROM {table_name} WHERE k >= ? AND k < ? ORDER BY k"),
|
||||
full: format!("SELECT k, v FROM {table_name} ? ORDER BY k"),
|
||||
left_bounded: format!("SELECT k, v FROM {table_name} WHERE k >= ? ORDER BY k"),
|
||||
prefix: format!("SELECT k, v FROM {table_name} WHERE k LIKE ? ORDER BY k"),
|
||||
point: format!("SELECT k, v FROM `{table_name}` WHERE k = ?"),
|
||||
range: format!("SELECT k, v FROM `{table_name}` WHERE k >= ? AND k < ? ORDER BY k"),
|
||||
full: format!("SELECT k, v FROM `{table_name}` ? ORDER BY k"),
|
||||
left_bounded: format!("SELECT k, v FROM `{table_name}` WHERE k >= ? ORDER BY k"),
|
||||
prefix: format!("SELECT k, v FROM `{table_name}` WHERE k LIKE ? ORDER BY k"),
|
||||
},
|
||||
delete_template: RangeTemplate {
|
||||
point: format!("DELETE FROM {table_name} WHERE k = ?;"),
|
||||
range: format!("DELETE FROM {table_name} WHERE k >= ? AND k < ?;"),
|
||||
full: format!("DELETE FROM {table_name}"),
|
||||
left_bounded: format!("DELETE FROM {table_name} WHERE k >= ?;"),
|
||||
prefix: format!("DELETE FROM {table_name} WHERE k LIKE ?;"),
|
||||
point: format!("DELETE FROM `{table_name}` WHERE k = ?;"),
|
||||
range: format!("DELETE FROM `{table_name}` WHERE k >= ? AND k < ?;"),
|
||||
full: format!("DELETE FROM `{table_name}`"),
|
||||
left_bounded: format!("DELETE FROM `{table_name}` WHERE k >= ?;"),
|
||||
prefix: format!("DELETE FROM `{table_name}` WHERE k LIKE ?;"),
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -189,14 +189,17 @@ impl MySqlTemplateSet {
|
||||
fn generate_batch_get_query(&self, key_len: usize) -> String {
|
||||
let table_name = &self.table_name;
|
||||
let in_clause = mysql_generate_in_placeholders(1, key_len).join(", ");
|
||||
format!("SELECT k, v FROM {table_name} WHERE k in ({});", in_clause)
|
||||
format!(
|
||||
"SELECT k, v FROM `{table_name}` WHERE k in ({});",
|
||||
in_clause
|
||||
)
|
||||
}
|
||||
|
||||
/// Generates the sql for batch delete.
|
||||
fn generate_batch_delete_query(&self, key_len: usize) -> String {
|
||||
let table_name = &self.table_name;
|
||||
let in_clause = mysql_generate_in_placeholders(1, key_len).join(", ");
|
||||
format!("DELETE FROM {table_name} WHERE k in ({});", in_clause)
|
||||
format!("DELETE FROM `{table_name}` WHERE k in ({});", in_clause)
|
||||
}
|
||||
|
||||
/// Generates the sql for batch upsert.
|
||||
@@ -212,9 +215,9 @@ impl MySqlTemplateSet {
|
||||
let values_clause = values_placeholders.join(", ");
|
||||
|
||||
(
|
||||
format!(r#"SELECT k, v FROM {table_name} WHERE k IN ({in_clause})"#,),
|
||||
format!(r#"SELECT k, v FROM `{table_name}` WHERE k IN ({in_clause})"#,),
|
||||
format!(
|
||||
r#"INSERT INTO {table_name} (k, v) VALUES {values_clause} ON DUPLICATE KEY UPDATE v = VALUES(v);"#,
|
||||
r#"INSERT INTO `{table_name}` (k, v) VALUES {values_clause} ON DUPLICATE KEY UPDATE v = VALUES(v);"#,
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -157,21 +157,25 @@ impl<'a> PgSqlTemplateFactory<'a> {
|
||||
PgSqlTemplateSet {
|
||||
table_name: table_name.to_string(),
|
||||
create_table_statement: format!(
|
||||
"CREATE TABLE IF NOT EXISTS {table_name}(k bytea PRIMARY KEY, v bytea)",
|
||||
"CREATE TABLE IF NOT EXISTS \"{table_name}\"(k bytea PRIMARY KEY, v bytea)",
|
||||
),
|
||||
range_template: RangeTemplate {
|
||||
point: format!("SELECT k, v FROM {table_name} WHERE k = $1"),
|
||||
range: format!("SELECT k, v FROM {table_name} WHERE k >= $1 AND k < $2 ORDER BY k"),
|
||||
full: format!("SELECT k, v FROM {table_name} $1 ORDER BY k"),
|
||||
left_bounded: format!("SELECT k, v FROM {table_name} WHERE k >= $1 ORDER BY k"),
|
||||
prefix: format!("SELECT k, v FROM {table_name} WHERE k LIKE $1 ORDER BY k"),
|
||||
point: format!("SELECT k, v FROM \"{table_name}\" WHERE k = $1"),
|
||||
range: format!(
|
||||
"SELECT k, v FROM \"{table_name}\" WHERE k >= $1 AND k < $2 ORDER BY k"
|
||||
),
|
||||
full: format!("SELECT k, v FROM \"{table_name}\" $1 ORDER BY k"),
|
||||
left_bounded: format!("SELECT k, v FROM \"{table_name}\" WHERE k >= $1 ORDER BY k"),
|
||||
prefix: format!("SELECT k, v FROM \"{table_name}\" WHERE k LIKE $1 ORDER BY k"),
|
||||
},
|
||||
delete_template: RangeTemplate {
|
||||
point: format!("DELETE FROM {table_name} WHERE k = $1 RETURNING k,v;"),
|
||||
range: format!("DELETE FROM {table_name} WHERE k >= $1 AND k < $2 RETURNING k,v;"),
|
||||
full: format!("DELETE FROM {table_name} RETURNING k,v"),
|
||||
left_bounded: format!("DELETE FROM {table_name} WHERE k >= $1 RETURNING k,v;"),
|
||||
prefix: format!("DELETE FROM {table_name} WHERE k LIKE $1 RETURNING k,v;"),
|
||||
point: format!("DELETE FROM \"{table_name}\" WHERE k = $1 RETURNING k,v;"),
|
||||
range: format!(
|
||||
"DELETE FROM \"{table_name}\" WHERE k >= $1 AND k < $2 RETURNING k,v;"
|
||||
),
|
||||
full: format!("DELETE FROM \"{table_name}\" RETURNING k,v"),
|
||||
left_bounded: format!("DELETE FROM \"{table_name}\" WHERE k >= $1 RETURNING k,v;"),
|
||||
prefix: format!("DELETE FROM \"{table_name}\" WHERE k LIKE $1 RETURNING k,v;"),
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -191,7 +195,10 @@ impl PgSqlTemplateSet {
|
||||
fn generate_batch_get_query(&self, key_len: usize) -> String {
|
||||
let table_name = &self.table_name;
|
||||
let in_clause = pg_generate_in_placeholders(1, key_len).join(", ");
|
||||
format!("SELECT k, v FROM {table_name} WHERE k in ({});", in_clause)
|
||||
format!(
|
||||
"SELECT k, v FROM \"{table_name}\" WHERE k in ({});",
|
||||
in_clause
|
||||
)
|
||||
}
|
||||
|
||||
/// Generates the sql for batch delete.
|
||||
@@ -199,7 +206,7 @@ impl PgSqlTemplateSet {
|
||||
let table_name = &self.table_name;
|
||||
let in_clause = pg_generate_in_placeholders(1, key_len).join(", ");
|
||||
format!(
|
||||
"DELETE FROM {table_name} WHERE k in ({}) RETURNING k,v;",
|
||||
"DELETE FROM \"{table_name}\" WHERE k in ({}) RETURNING k,v;",
|
||||
in_clause
|
||||
)
|
||||
}
|
||||
@@ -220,9 +227,9 @@ impl PgSqlTemplateSet {
|
||||
format!(
|
||||
r#"
|
||||
WITH prev AS (
|
||||
SELECT k,v FROM {table_name} WHERE k IN ({in_clause})
|
||||
SELECT k,v FROM "{table_name}" WHERE k IN ({in_clause})
|
||||
), update AS (
|
||||
INSERT INTO {table_name} (k, v) VALUES
|
||||
INSERT INTO "{table_name}" (k, v) VALUES
|
||||
{values_clause}
|
||||
ON CONFLICT (
|
||||
k
|
||||
|
||||
@@ -39,6 +39,7 @@ pub mod node_manager;
|
||||
pub mod peer;
|
||||
pub mod range_stream;
|
||||
pub mod region_keeper;
|
||||
pub mod region_registry;
|
||||
pub mod rpc;
|
||||
pub mod sequence;
|
||||
pub mod state_store;
|
||||
|
||||
@@ -27,6 +27,7 @@ const TABLE_NAME_LOCK_PREFIX: &str = "__table_name_lock";
|
||||
const FLOW_NAME_LOCK_PREFIX: &str = "__flow_name_lock";
|
||||
const REGION_LOCK_PREFIX: &str = "__region_lock";
|
||||
const FLOW_LOCK_PREFIX: &str = "__flow_lock";
|
||||
const REMOTE_WAL_LOCK_PREFIX: &str = "__remote_wal_lock";
|
||||
|
||||
/// [CatalogLock] acquires the lock on the tenant level.
|
||||
pub enum CatalogLock<'a> {
|
||||
@@ -231,6 +232,31 @@ impl From<FlowLock> for StringKey {
|
||||
}
|
||||
}
|
||||
|
||||
/// [RemoteWalLock] acquires the lock on the remote wal topic level.
|
||||
pub enum RemoteWalLock {
|
||||
Read(String),
|
||||
Write(String),
|
||||
}
|
||||
|
||||
impl Display for RemoteWalLock {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let key = match self {
|
||||
RemoteWalLock::Read(s) => s,
|
||||
RemoteWalLock::Write(s) => s,
|
||||
};
|
||||
write!(f, "{}/{}", REMOTE_WAL_LOCK_PREFIX, key)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<RemoteWalLock> for StringKey {
|
||||
fn from(value: RemoteWalLock) -> Self {
|
||||
match value {
|
||||
RemoteWalLock::Write(_) => StringKey::Exclusive(value.to_string()),
|
||||
RemoteWalLock::Read(_) => StringKey::Share(value.to_string()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_procedure::StringKey;
|
||||
@@ -308,5 +334,16 @@ mod tests {
|
||||
string_key,
|
||||
StringKey::Exclusive(format!("{}/{}", FLOW_LOCK_PREFIX, flow_id))
|
||||
);
|
||||
// The remote wal lock
|
||||
let string_key: StringKey = RemoteWalLock::Read("foo".to_string()).into();
|
||||
assert_eq!(
|
||||
string_key,
|
||||
StringKey::Share(format!("{}/{}", REMOTE_WAL_LOCK_PREFIX, "foo"))
|
||||
);
|
||||
let string_key: StringKey = RemoteWalLock::Write("foo".to_string()).into();
|
||||
assert_eq!(
|
||||
string_key,
|
||||
StringKey::Exclusive(format!("{}/{}", REMOTE_WAL_LOCK_PREFIX, "foo"))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,63 +12,13 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::meta::Peer as PbPeer;
|
||||
use serde::{Deserialize, Serialize};
|
||||
pub use api::v1::meta::Peer;
|
||||
|
||||
use crate::error::Error;
|
||||
use crate::{DatanodeId, FlownodeId};
|
||||
|
||||
#[derive(Debug, Default, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)]
|
||||
pub struct Peer {
|
||||
/// Node identifier. Unique in a cluster.
|
||||
pub id: u64,
|
||||
pub addr: String,
|
||||
}
|
||||
|
||||
impl From<PbPeer> for Peer {
|
||||
fn from(p: PbPeer) -> Self {
|
||||
Self {
|
||||
id: p.id,
|
||||
addr: p.addr,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Peer> for PbPeer {
|
||||
fn from(p: Peer) -> Self {
|
||||
Self {
|
||||
id: p.id,
|
||||
addr: p.addr,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Peer {
|
||||
pub fn new(id: u64, addr: impl Into<String>) -> Self {
|
||||
Self {
|
||||
id,
|
||||
addr: addr.into(),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub fn empty(id: u64) -> Self {
|
||||
Self {
|
||||
id,
|
||||
addr: String::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for Peer {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "peer-{}({})", self.id, self.addr)
|
||||
}
|
||||
}
|
||||
|
||||
/// can query peer given a node id
|
||||
#[async_trait::async_trait]
|
||||
pub trait PeerLookupService {
|
||||
|
||||
186
src/common/meta/src/region_registry.rs
Normal file
186
src/common/meta/src/region_registry.rs
Normal file
@@ -0,0 +1,186 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use common_telemetry::warn;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::datanode::RegionManifestInfo;
|
||||
|
||||
/// Represents information about a leader region in the cluster.
|
||||
/// Contains the datanode id where the leader is located,
|
||||
/// and the current manifest version.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub struct LeaderRegion {
|
||||
pub datanode_id: u64,
|
||||
pub manifest: LeaderRegionManifestInfo,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub enum LeaderRegionManifestInfo {
|
||||
Mito {
|
||||
manifest_version: u64,
|
||||
flushed_entry_id: u64,
|
||||
},
|
||||
Metric {
|
||||
data_manifest_version: u64,
|
||||
data_flushed_entry_id: u64,
|
||||
metadata_manifest_version: u64,
|
||||
metadata_flushed_entry_id: u64,
|
||||
},
|
||||
}
|
||||
|
||||
impl From<RegionManifestInfo> for LeaderRegionManifestInfo {
|
||||
fn from(value: RegionManifestInfo) -> Self {
|
||||
match value {
|
||||
RegionManifestInfo::Mito {
|
||||
manifest_version,
|
||||
flushed_entry_id,
|
||||
} => LeaderRegionManifestInfo::Mito {
|
||||
manifest_version,
|
||||
flushed_entry_id,
|
||||
},
|
||||
RegionManifestInfo::Metric {
|
||||
data_manifest_version,
|
||||
data_flushed_entry_id,
|
||||
metadata_manifest_version,
|
||||
metadata_flushed_entry_id,
|
||||
} => LeaderRegionManifestInfo::Metric {
|
||||
data_manifest_version,
|
||||
data_flushed_entry_id,
|
||||
metadata_manifest_version,
|
||||
metadata_flushed_entry_id,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LeaderRegionManifestInfo {
|
||||
/// Returns the manifest version of the leader region.
|
||||
pub fn manifest_version(&self) -> u64 {
|
||||
match self {
|
||||
LeaderRegionManifestInfo::Mito {
|
||||
manifest_version, ..
|
||||
} => *manifest_version,
|
||||
LeaderRegionManifestInfo::Metric {
|
||||
data_manifest_version,
|
||||
..
|
||||
} => *data_manifest_version,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the flushed entry id of the leader region.
|
||||
pub fn flushed_entry_id(&self) -> u64 {
|
||||
match self {
|
||||
LeaderRegionManifestInfo::Mito {
|
||||
flushed_entry_id, ..
|
||||
} => *flushed_entry_id,
|
||||
LeaderRegionManifestInfo::Metric {
|
||||
data_flushed_entry_id,
|
||||
..
|
||||
} => *data_flushed_entry_id,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the minimum flushed entry id of the leader region.
|
||||
/// It is used to determine the minimum flushed entry id that can be pruned in remote wal.
|
||||
pub fn min_flushed_entry_id(&self) -> u64 {
|
||||
match self {
|
||||
LeaderRegionManifestInfo::Mito {
|
||||
flushed_entry_id, ..
|
||||
} => *flushed_entry_id,
|
||||
LeaderRegionManifestInfo::Metric {
|
||||
data_flushed_entry_id,
|
||||
metadata_flushed_entry_id,
|
||||
..
|
||||
} => (*data_flushed_entry_id).min(*metadata_flushed_entry_id),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub type LeaderRegionRegistryRef = Arc<LeaderRegionRegistry>;
|
||||
|
||||
/// Registry that maintains a mapping of all leader regions in the cluster.
|
||||
/// Tracks which datanode is hosting the leader for each region and the corresponding
|
||||
/// manifest version.
|
||||
#[derive(Default)]
|
||||
pub struct LeaderRegionRegistry {
|
||||
inner: RwLock<HashMap<RegionId, LeaderRegion>>,
|
||||
}
|
||||
|
||||
impl LeaderRegionRegistry {
|
||||
/// Creates a new empty leader region registry.
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
inner: RwLock::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets the leader region for the given region ids.
|
||||
pub fn batch_get<I: Iterator<Item = RegionId>>(
|
||||
&self,
|
||||
region_ids: I,
|
||||
) -> HashMap<RegionId, LeaderRegion> {
|
||||
let inner = self.inner.read().unwrap();
|
||||
region_ids
|
||||
.into_iter()
|
||||
.flat_map(|region_id| {
|
||||
inner
|
||||
.get(®ion_id)
|
||||
.map(|leader_region| (region_id, *leader_region))
|
||||
})
|
||||
.collect::<HashMap<_, _>>()
|
||||
}
|
||||
|
||||
/// Puts the leader regions into the registry.
|
||||
pub fn batch_put(&self, key_values: Vec<(RegionId, LeaderRegion)>) {
|
||||
let mut inner = self.inner.write().unwrap();
|
||||
for (region_id, leader_region) in key_values {
|
||||
match inner.entry(region_id) {
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(leader_region);
|
||||
}
|
||||
Entry::Occupied(mut entry) => {
|
||||
let manifest_version = entry.get().manifest.manifest_version();
|
||||
if manifest_version > leader_region.manifest.manifest_version() {
|
||||
warn!(
|
||||
"Received a leader region with a smaller manifest version than the existing one, ignore it. region: {}, existing_manifest_version: {}, new_manifest_version: {}",
|
||||
region_id,
|
||||
manifest_version,
|
||||
leader_region.manifest.manifest_version()
|
||||
);
|
||||
} else {
|
||||
entry.insert(leader_region);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn batch_delete<I: Iterator<Item = RegionId>>(&self, region_ids: I) {
|
||||
let mut inner = self.inner.write().unwrap();
|
||||
for region_id in region_ids {
|
||||
inner.remove(®ion_id);
|
||||
}
|
||||
}
|
||||
|
||||
/// Resets the registry to an empty state.
|
||||
pub fn reset(&self) {
|
||||
let mut inner = self.inner.write().unwrap();
|
||||
inner.clear();
|
||||
}
|
||||
}
|
||||
@@ -1240,6 +1240,7 @@ impl From<QueryContext> for PbQueryContext {
|
||||
extensions,
|
||||
channel: channel as u32,
|
||||
snapshot_seqs: None,
|
||||
explain: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -290,13 +290,13 @@ mod tests {
|
||||
num_per_range: u32,
|
||||
max_bytes: u32,
|
||||
) {
|
||||
let num_cases = rand::thread_rng().gen_range(1..=8);
|
||||
let num_cases = rand::rng().random_range(1..=8);
|
||||
common_telemetry::info!("num_cases: {}", num_cases);
|
||||
let mut cases = Vec::with_capacity(num_cases);
|
||||
for i in 0..num_cases {
|
||||
let size = rand::thread_rng().gen_range(size_limit..=max_bytes);
|
||||
let size = rand::rng().random_range(size_limit..=max_bytes);
|
||||
let mut large_value = vec![0u8; size as usize];
|
||||
rand::thread_rng().fill_bytes(&mut large_value);
|
||||
rand::rng().fill_bytes(&mut large_value);
|
||||
|
||||
// Starts from `a`.
|
||||
let prefix = format!("{}/", std::char::from_u32(97 + i as u32).unwrap());
|
||||
@@ -354,8 +354,8 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_meta_state_store_split_value() {
|
||||
let size_limit = rand::thread_rng().gen_range(128..=512);
|
||||
let page_size = rand::thread_rng().gen_range(1..10);
|
||||
let size_limit = rand::rng().random_range(128..=512);
|
||||
let page_size = rand::rng().random_range(1..10);
|
||||
let kv_backend = Arc::new(MemoryKvBackend::new());
|
||||
test_meta_state_store_split_value_with_size_limit(kv_backend, size_limit, page_size, 8192)
|
||||
.await;
|
||||
@@ -388,7 +388,7 @@ mod tests {
|
||||
// However, some KvBackends, the `ChrootKvBackend`, will add the prefix to `key`;
|
||||
// we don't know the exact size of the key.
|
||||
let size_limit = 1536 * 1024 - key_size;
|
||||
let page_size = rand::thread_rng().gen_range(1..10);
|
||||
let page_size = rand::rng().random_range(1..10);
|
||||
test_meta_state_store_split_value_with_size_limit(
|
||||
kv_backend,
|
||||
size_limit,
|
||||
|
||||
@@ -35,6 +35,7 @@ use crate::node_manager::{
|
||||
};
|
||||
use crate::peer::{Peer, PeerLookupService};
|
||||
use crate::region_keeper::MemoryRegionKeeper;
|
||||
use crate::region_registry::LeaderRegionRegistry;
|
||||
use crate::sequence::SequenceBuilder;
|
||||
use crate::wal_options_allocator::WalOptionsAllocator;
|
||||
use crate::{DatanodeId, FlownodeId};
|
||||
@@ -177,6 +178,7 @@ pub fn new_ddl_context_with_kv_backend(
|
||||
node_manager,
|
||||
cache_invalidator: Arc::new(DummyCacheInvalidator),
|
||||
memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
|
||||
leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
|
||||
table_metadata_allocator,
|
||||
table_metadata_manager,
|
||||
flow_metadata_allocator,
|
||||
|
||||
@@ -30,7 +30,7 @@ use crate::error::{EncodeWalOptionsSnafu, InvalidTopicNamePrefixSnafu, Result};
|
||||
use crate::key::NAME_PATTERN_REGEX;
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
use crate::leadership_notifier::LeadershipChangeListener;
|
||||
use crate::wal_options_allocator::topic_creator::build_kafka_topic_creator;
|
||||
pub use crate::wal_options_allocator::topic_creator::build_kafka_topic_creator;
|
||||
use crate::wal_options_allocator::topic_pool::KafkaTopicPool;
|
||||
|
||||
/// Allocates wal options in region granularity.
|
||||
@@ -53,21 +53,12 @@ impl WalOptionsAllocator {
|
||||
}
|
||||
}
|
||||
|
||||
/// Allocates a wal options for a region.
|
||||
pub fn alloc(&self) -> Result<WalOptions> {
|
||||
match self {
|
||||
Self::RaftEngine => Ok(WalOptions::RaftEngine),
|
||||
Self::Kafka(topic_manager) => {
|
||||
let topic = topic_manager.select()?;
|
||||
Ok(WalOptions::Kafka(KafkaWalOptions {
|
||||
topic: topic.clone(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Allocates a batch of wal options where each wal options goes to a region.
|
||||
pub fn alloc_batch(&self, num_regions: usize) -> Result<Vec<WalOptions>> {
|
||||
/// If skip_wal is true, the wal options will be set to Noop regardless of the allocator type.
|
||||
pub fn alloc_batch(&self, num_regions: usize, skip_wal: bool) -> Result<Vec<WalOptions>> {
|
||||
if skip_wal {
|
||||
return Ok(vec![WalOptions::Noop; num_regions]);
|
||||
}
|
||||
match self {
|
||||
WalOptionsAllocator::RaftEngine => Ok(vec![WalOptions::RaftEngine; num_regions]),
|
||||
WalOptionsAllocator::Kafka(topic_manager) => {
|
||||
@@ -130,9 +121,10 @@ pub async fn build_wal_options_allocator(
|
||||
pub fn allocate_region_wal_options(
|
||||
regions: Vec<RegionNumber>,
|
||||
wal_options_allocator: &WalOptionsAllocator,
|
||||
skip_wal: bool,
|
||||
) -> Result<HashMap<RegionNumber, String>> {
|
||||
let wal_options = wal_options_allocator
|
||||
.alloc_batch(regions.len())?
|
||||
.alloc_batch(regions.len(), skip_wal)?
|
||||
.into_iter()
|
||||
.map(|wal_options| {
|
||||
serde_json::to_string(&wal_options).context(EncodeWalOptionsSnafu { wal_options })
|
||||
@@ -177,7 +169,7 @@ mod tests {
|
||||
|
||||
let num_regions = 32;
|
||||
let regions = (0..num_regions).collect::<Vec<_>>();
|
||||
let got = allocate_region_wal_options(regions.clone(), &allocator).unwrap();
|
||||
let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap();
|
||||
|
||||
let encoded_wal_options = serde_json::to_string(&WalOptions::RaftEngine).unwrap();
|
||||
let expected = regions
|
||||
@@ -237,7 +229,7 @@ mod tests {
|
||||
|
||||
let num_regions = 32;
|
||||
let regions = (0..num_regions).collect::<Vec<_>>();
|
||||
let got = allocate_region_wal_options(regions.clone(), &allocator).unwrap();
|
||||
let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap();
|
||||
|
||||
// Check the allocated wal options contain the expected topics.
|
||||
let expected = (0..num_regions)
|
||||
@@ -253,4 +245,18 @@ mod tests {
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_allocator_with_skip_wal() {
|
||||
let allocator = WalOptionsAllocator::RaftEngine;
|
||||
allocator.start().await.unwrap();
|
||||
|
||||
let num_regions = 32;
|
||||
let regions = (0..num_regions).collect::<Vec<_>>();
|
||||
let got = allocate_region_wal_options(regions.clone(), &allocator, true).unwrap();
|
||||
assert_eq!(got.len(), num_regions as usize);
|
||||
for wal_options in got.values() {
|
||||
assert_eq!(wal_options, &"{\"wal.provider\":\"noop\"}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,7 +39,7 @@ impl RoundRobinTopicSelector {
|
||||
// The cursor in the round-robin selector is not persisted which may break the round-robin strategy cross crashes.
|
||||
// Introducing a shuffling strategy may help mitigate this issue.
|
||||
pub fn with_shuffle() -> Self {
|
||||
let offset = rand::thread_rng().gen_range(0..64);
|
||||
let offset = rand::rng().random_range(0..64);
|
||||
Self {
|
||||
cursor: AtomicUsize::new(offset),
|
||||
}
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::{error, info};
|
||||
use common_wal::config::kafka::MetasrvKafkaConfig;
|
||||
use rskafka::client::error::Error as RsKafkaError;
|
||||
@@ -32,9 +34,11 @@ use crate::error::{
|
||||
// The `DEFAULT_PARTITION` refers to the index of the partition.
|
||||
const DEFAULT_PARTITION: i32 = 0;
|
||||
|
||||
type KafkaClientRef = Arc<Client>;
|
||||
|
||||
/// Creates topics in kafka.
|
||||
pub struct KafkaTopicCreator {
|
||||
client: Client,
|
||||
client: KafkaClientRef,
|
||||
/// The number of partitions per topic.
|
||||
num_partitions: i32,
|
||||
/// The replication factor of each topic.
|
||||
@@ -44,6 +48,10 @@ pub struct KafkaTopicCreator {
|
||||
}
|
||||
|
||||
impl KafkaTopicCreator {
|
||||
pub fn client(&self) -> &KafkaClientRef {
|
||||
&self.client
|
||||
}
|
||||
|
||||
async fn create_topic(&self, topic: &String, client: &Client) -> Result<()> {
|
||||
let controller = client
|
||||
.controller_client()
|
||||
@@ -151,7 +159,7 @@ pub async fn build_kafka_topic_creator(config: &MetasrvKafkaConfig) -> Result<Ka
|
||||
})?;
|
||||
|
||||
Ok(KafkaTopicCreator {
|
||||
client,
|
||||
client: Arc::new(client),
|
||||
num_partitions: config.kafka_topic.num_partitions,
|
||||
replication_factor: config.kafka_topic.replication_factor,
|
||||
create_topic_timeout: config.kafka_topic.create_topic_timeout.as_millis() as i32,
|
||||
|
||||
@@ -58,6 +58,13 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Too many running procedures, max: {}", max_running_procedures))]
|
||||
TooManyRunningProcedures {
|
||||
max_running_procedures: usize,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to put state, key: '{key}'"))]
|
||||
PutState {
|
||||
key: String,
|
||||
@@ -192,7 +199,8 @@ impl ErrorExt for Error {
|
||||
| Error::FromJson { .. }
|
||||
| Error::WaitWatcher { .. }
|
||||
| Error::RetryLater { .. }
|
||||
| Error::RollbackProcedureRecovered { .. } => StatusCode::Internal,
|
||||
| Error::RollbackProcedureRecovered { .. }
|
||||
| Error::TooManyRunningProcedures { .. } => StatusCode::Internal,
|
||||
|
||||
Error::RetryTimesExceeded { .. }
|
||||
| Error::RollbackTimesExceeded { .. }
|
||||
|
||||
@@ -15,7 +15,8 @@
|
||||
mod runner;
|
||||
mod rwlock;
|
||||
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{HashMap, HashSet, VecDeque};
|
||||
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
@@ -33,6 +34,7 @@ use self::rwlock::KeyRwLock;
|
||||
use crate::error::{
|
||||
self, DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu, Result,
|
||||
StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu,
|
||||
TooManyRunningProceduresSnafu,
|
||||
};
|
||||
use crate::local::runner::Runner;
|
||||
use crate::procedure::{BoxedProcedureLoader, InitProcedureState, ProcedureInfo};
|
||||
@@ -147,7 +149,6 @@ type ProcedureMetaRef = Arc<ProcedureMeta>;
|
||||
/// Procedure loaded from store.
|
||||
struct LoadedProcedure {
|
||||
procedure: BoxedProcedure,
|
||||
parent_id: Option<ProcedureId>,
|
||||
step: u32,
|
||||
}
|
||||
|
||||
@@ -157,8 +158,7 @@ pub(crate) struct ManagerContext {
|
||||
loaders: Mutex<HashMap<String, BoxedProcedureLoader>>,
|
||||
key_lock: KeyRwLock<String>,
|
||||
procedures: RwLock<HashMap<ProcedureId, ProcedureMetaRef>>,
|
||||
/// Messages loaded from the procedure store.
|
||||
messages: Mutex<HashMap<ProcedureId, ProcedureMessage>>,
|
||||
running_procedures: Mutex<HashSet<ProcedureId>>,
|
||||
/// Ids and finished time of finished procedures.
|
||||
finished_procedures: Mutex<VecDeque<(ProcedureId, Instant)>>,
|
||||
/// Running flag.
|
||||
@@ -179,7 +179,7 @@ impl ManagerContext {
|
||||
key_lock: KeyRwLock::new(),
|
||||
loaders: Mutex::new(HashMap::new()),
|
||||
procedures: RwLock::new(HashMap::new()),
|
||||
messages: Mutex::new(HashMap::new()),
|
||||
running_procedures: Mutex::new(HashSet::new()),
|
||||
finished_procedures: Mutex::new(VecDeque::new()),
|
||||
running: Arc::new(AtomicBool::new(false)),
|
||||
}
|
||||
@@ -210,18 +210,27 @@ impl ManagerContext {
|
||||
procedures.contains_key(&procedure_id)
|
||||
}
|
||||
|
||||
/// Returns the number of running procedures.
|
||||
fn num_running_procedures(&self) -> usize {
|
||||
self.running_procedures.lock().unwrap().len()
|
||||
}
|
||||
|
||||
/// Try to insert the `procedure` to the context if there is no procedure
|
||||
/// with same [ProcedureId].
|
||||
///
|
||||
/// Returns `false` if there is already a procedure using the same [ProcedureId].
|
||||
fn try_insert_procedure(&self, meta: ProcedureMetaRef) -> bool {
|
||||
let procedure_id = meta.id;
|
||||
let mut procedures = self.procedures.write().unwrap();
|
||||
if procedures.contains_key(&meta.id) {
|
||||
return false;
|
||||
match procedures.entry(procedure_id) {
|
||||
Entry::Occupied(_) => return false,
|
||||
Entry::Vacant(vacant_entry) => {
|
||||
vacant_entry.insert(meta);
|
||||
}
|
||||
}
|
||||
|
||||
let old = procedures.insert(meta.id, meta);
|
||||
debug_assert!(old.is_none());
|
||||
let mut running_procedures = self.running_procedures.lock().unwrap();
|
||||
running_procedures.insert(procedure_id);
|
||||
|
||||
true
|
||||
}
|
||||
@@ -264,16 +273,6 @@ impl ManagerContext {
|
||||
}
|
||||
}
|
||||
|
||||
/// Load procedure with specific `procedure_id` from cached [ProcedureMessage]s.
|
||||
fn load_one_procedure(&self, procedure_id: ProcedureId) -> Option<LoadedProcedure> {
|
||||
let message = {
|
||||
let messages = self.messages.lock().unwrap();
|
||||
messages.get(&procedure_id).cloned()?
|
||||
};
|
||||
|
||||
self.load_one_procedure_from_message(procedure_id, &message)
|
||||
}
|
||||
|
||||
/// Load procedure from specific [ProcedureMessage].
|
||||
fn load_one_procedure_from_message(
|
||||
&self,
|
||||
@@ -301,7 +300,6 @@ impl ManagerContext {
|
||||
|
||||
Some(LoadedProcedure {
|
||||
procedure,
|
||||
parent_id: message.parent_id,
|
||||
step: message.step,
|
||||
})
|
||||
}
|
||||
@@ -350,23 +348,19 @@ impl ManagerContext {
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove cached [ProcedureMessage] by ids.
|
||||
fn remove_messages(&self, procedure_ids: &[ProcedureId]) {
|
||||
let mut messages = self.messages.lock().unwrap();
|
||||
for procedure_id in procedure_ids {
|
||||
let _ = messages.remove(procedure_id);
|
||||
}
|
||||
}
|
||||
|
||||
/// Clean resources of finished procedures.
|
||||
fn on_procedures_finish(&self, procedure_ids: &[ProcedureId]) {
|
||||
self.remove_messages(procedure_ids);
|
||||
|
||||
// Since users need to query the procedure state, so we can't remove the
|
||||
// meta of the procedure directly.
|
||||
let now = Instant::now();
|
||||
let mut finished_procedures = self.finished_procedures.lock().unwrap();
|
||||
finished_procedures.extend(procedure_ids.iter().map(|id| (*id, now)));
|
||||
|
||||
// Remove the procedures from the running set.
|
||||
let mut running_procedures = self.running_procedures.lock().unwrap();
|
||||
for procedure_id in procedure_ids {
|
||||
running_procedures.remove(procedure_id);
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove metadata of outdated procedures.
|
||||
@@ -410,6 +404,7 @@ pub struct ManagerConfig {
|
||||
pub retry_delay: Duration,
|
||||
pub remove_outdated_meta_task_interval: Duration,
|
||||
pub remove_outdated_meta_ttl: Duration,
|
||||
pub max_running_procedures: usize,
|
||||
}
|
||||
|
||||
impl Default for ManagerConfig {
|
||||
@@ -420,6 +415,7 @@ impl Default for ManagerConfig {
|
||||
retry_delay: Duration::from_millis(500),
|
||||
remove_outdated_meta_task_interval: Duration::from_secs(60 * 10),
|
||||
remove_outdated_meta_ttl: META_TTL,
|
||||
max_running_procedures: 128,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -492,6 +488,13 @@ impl LocalManager {
|
||||
|
||||
let watcher = meta.state_receiver.clone();
|
||||
|
||||
ensure!(
|
||||
self.manager_ctx.num_running_procedures() < self.config.max_running_procedures,
|
||||
TooManyRunningProceduresSnafu {
|
||||
max_running_procedures: self.config.max_running_procedures,
|
||||
}
|
||||
);
|
||||
|
||||
// Inserts meta into the manager before actually spawnd the runner.
|
||||
ensure!(
|
||||
self.manager_ctx.try_insert_procedure(meta),
|
||||
@@ -1119,6 +1122,7 @@ mod tests {
|
||||
retry_delay: Duration::from_millis(500),
|
||||
remove_outdated_meta_task_interval: Duration::from_millis(1),
|
||||
remove_outdated_meta_ttl: Duration::from_millis(1),
|
||||
max_running_procedures: 128,
|
||||
};
|
||||
let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
|
||||
let manager = LocalManager::new(config, state_store);
|
||||
@@ -1191,6 +1195,69 @@ mod tests {
|
||||
.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_too_many_running_procedures() {
|
||||
let dir = create_temp_dir("too_many_running_procedures");
|
||||
let config = ManagerConfig {
|
||||
parent_path: "data/".to_string(),
|
||||
max_retry_times: 3,
|
||||
retry_delay: Duration::from_millis(500),
|
||||
max_running_procedures: 1,
|
||||
..Default::default()
|
||||
};
|
||||
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
|
||||
let manager = LocalManager::new(config, state_store);
|
||||
manager.manager_ctx.set_running();
|
||||
|
||||
manager
|
||||
.manager_ctx
|
||||
.running_procedures
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(ProcedureId::random());
|
||||
manager.start().await.unwrap();
|
||||
|
||||
// Submit a new procedure should fail.
|
||||
let mut procedure = ProcedureToLoad::new("submit");
|
||||
procedure.lock_key = LockKey::single_exclusive("test.submit");
|
||||
let procedure_id = ProcedureId::random();
|
||||
let err = manager
|
||||
.submit(ProcedureWithId {
|
||||
id: procedure_id,
|
||||
procedure: Box::new(procedure),
|
||||
})
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(matches!(err, Error::TooManyRunningProcedures { .. }));
|
||||
|
||||
manager
|
||||
.manager_ctx
|
||||
.running_procedures
|
||||
.lock()
|
||||
.unwrap()
|
||||
.clear();
|
||||
|
||||
// Submit a new procedure should succeed.
|
||||
let mut procedure = ProcedureToLoad::new("submit");
|
||||
procedure.lock_key = LockKey::single_exclusive("test.submit");
|
||||
assert!(manager
|
||||
.submit(ProcedureWithId {
|
||||
id: procedure_id,
|
||||
procedure: Box::new(procedure),
|
||||
})
|
||||
.await
|
||||
.is_ok());
|
||||
assert!(manager
|
||||
.procedure_state(procedure_id)
|
||||
.await
|
||||
.unwrap()
|
||||
.is_some());
|
||||
// Wait for the procedure done.
|
||||
let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
|
||||
watcher.changed().await.unwrap();
|
||||
assert!(watcher.borrow().is_done());
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ProcedureToRecover {
|
||||
content: String,
|
||||
|
||||
@@ -207,7 +207,7 @@ impl Runner {
|
||||
if let Some(d) = retry.next() {
|
||||
let millis = d.as_millis() as u64;
|
||||
// Add random noise to the retry delay to avoid retry storms.
|
||||
let noise = rand::thread_rng().gen_range(0..(millis / 4) + 1);
|
||||
let noise = rand::rng().random_range(0..(millis / 4) + 1);
|
||||
let d = d.add(Duration::from_millis(noise));
|
||||
|
||||
self.wait_on_err(d, retry_times).await;
|
||||
@@ -348,24 +348,14 @@ impl Runner {
|
||||
&self,
|
||||
procedure_id: ProcedureId,
|
||||
procedure_state: ProcedureState,
|
||||
mut procedure: BoxedProcedure,
|
||||
procedure: BoxedProcedure,
|
||||
) {
|
||||
if self.manager_ctx.contains_procedure(procedure_id) {
|
||||
// If the parent has already submitted this procedure, don't submit it again.
|
||||
return;
|
||||
}
|
||||
|
||||
let mut step = 0;
|
||||
if let Some(loaded_procedure) = self.manager_ctx.load_one_procedure(procedure_id) {
|
||||
// Try to load procedure state from the message to avoid re-run the subprocedure
|
||||
// from initial state.
|
||||
assert_eq!(self.meta.id, loaded_procedure.parent_id.unwrap());
|
||||
|
||||
// Use the dumped procedure from the procedure store.
|
||||
procedure = loaded_procedure.procedure;
|
||||
// Update step number.
|
||||
step = loaded_procedure.step;
|
||||
}
|
||||
let step = 0;
|
||||
|
||||
let meta = Arc::new(ProcedureMeta::new(
|
||||
procedure_id,
|
||||
|
||||
@@ -29,6 +29,8 @@ pub struct ProcedureConfig {
|
||||
pub retry_delay: Duration,
|
||||
/// `None` stands for no limit.
|
||||
pub max_metadata_value_size: Option<ReadableSize>,
|
||||
/// Max running procedures.
|
||||
pub max_running_procedures: usize,
|
||||
}
|
||||
|
||||
impl Default for ProcedureConfig {
|
||||
@@ -37,6 +39,7 @@ impl Default for ProcedureConfig {
|
||||
max_retry_times: 3,
|
||||
retry_delay: Duration::from_millis(500),
|
||||
max_metadata_value_size: None,
|
||||
max_running_procedures: 128,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt::Display;
|
||||
use std::fmt::{self, Display};
|
||||
use std::future::Future;
|
||||
use std::marker::PhantomData;
|
||||
use std::pin::Pin;
|
||||
@@ -28,7 +28,7 @@ use datafusion::logical_expr::Expr;
|
||||
use datafusion::physical_expr::create_physical_expr;
|
||||
use datafusion::physical_plan::metrics::{BaselineMetrics, MetricValue};
|
||||
use datafusion::physical_plan::{
|
||||
accept, displayable, ExecutionPlan, ExecutionPlanVisitor, PhysicalExpr,
|
||||
accept, DisplayFormatType, ExecutionPlan, ExecutionPlanVisitor, PhysicalExpr,
|
||||
RecordBatchStream as DfRecordBatchStream,
|
||||
};
|
||||
use datafusion_common::arrow::error::ArrowError;
|
||||
@@ -206,13 +206,16 @@ impl Stream for DfRecordBatchStreamAdapter {
|
||||
}
|
||||
|
||||
/// DataFusion [SendableRecordBatchStream](DfSendableRecordBatchStream) -> Greptime [RecordBatchStream].
|
||||
/// The reverse one is [DfRecordBatchStreamAdapter]
|
||||
/// The reverse one is [DfRecordBatchStreamAdapter].
|
||||
/// It can collect metrics from DataFusion execution plan.
|
||||
pub struct RecordBatchStreamAdapter {
|
||||
schema: SchemaRef,
|
||||
stream: DfSendableRecordBatchStream,
|
||||
metrics: Option<BaselineMetrics>,
|
||||
/// Aggregated plan-level metrics. Resolved after an [ExecutionPlan] is finished.
|
||||
metrics_2: Metrics,
|
||||
/// Display plan and metrics in verbose mode.
|
||||
explain_verbose: bool,
|
||||
}
|
||||
|
||||
/// Json encoded metrics. Contains metric from a whole plan tree.
|
||||
@@ -231,6 +234,7 @@ impl RecordBatchStreamAdapter {
|
||||
stream,
|
||||
metrics: None,
|
||||
metrics_2: Metrics::Unavailable,
|
||||
explain_verbose: false,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -246,12 +250,18 @@ impl RecordBatchStreamAdapter {
|
||||
stream,
|
||||
metrics: Some(metrics),
|
||||
metrics_2: Metrics::Unresolved(df_plan),
|
||||
explain_verbose: false,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn set_metrics2(&mut self, plan: Arc<dyn ExecutionPlan>) {
|
||||
self.metrics_2 = Metrics::Unresolved(plan)
|
||||
}
|
||||
|
||||
/// Set the verbose mode for displaying plan and metrics.
|
||||
pub fn set_explain_verbose(&mut self, verbose: bool) {
|
||||
self.explain_verbose = verbose;
|
||||
}
|
||||
}
|
||||
|
||||
impl RecordBatchStream for RecordBatchStreamAdapter {
|
||||
@@ -296,7 +306,7 @@ impl Stream for RecordBatchStreamAdapter {
|
||||
}
|
||||
Poll::Ready(None) => {
|
||||
if let Metrics::Unresolved(df_plan) = &self.metrics_2 {
|
||||
let mut metric_collector = MetricCollector::default();
|
||||
let mut metric_collector = MetricCollector::new(self.explain_verbose);
|
||||
accept(df_plan.as_ref(), &mut metric_collector).unwrap();
|
||||
self.metrics_2 = Metrics::Resolved(metric_collector.record_batch_metrics);
|
||||
}
|
||||
@@ -312,10 +322,20 @@ impl Stream for RecordBatchStreamAdapter {
|
||||
}
|
||||
|
||||
/// An [ExecutionPlanVisitor] to collect metrics from a [ExecutionPlan].
|
||||
#[derive(Default)]
|
||||
pub struct MetricCollector {
|
||||
current_level: usize,
|
||||
pub record_batch_metrics: RecordBatchMetrics,
|
||||
verbose: bool,
|
||||
}
|
||||
|
||||
impl MetricCollector {
|
||||
pub fn new(verbose: bool) -> Self {
|
||||
Self {
|
||||
current_level: 0,
|
||||
record_batch_metrics: RecordBatchMetrics::default(),
|
||||
verbose,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ExecutionPlanVisitor for MetricCollector {
|
||||
@@ -339,7 +359,7 @@ impl ExecutionPlanVisitor for MetricCollector {
|
||||
.sorted_for_display()
|
||||
.timestamps_removed();
|
||||
let mut plan_metric = PlanMetrics {
|
||||
plan: displayable(plan).one_line().to_string(),
|
||||
plan: one_line(plan, self.verbose).to_string(),
|
||||
level: self.current_level,
|
||||
metrics: Vec::with_capacity(metric.iter().size_hint().0),
|
||||
};
|
||||
@@ -371,6 +391,29 @@ impl ExecutionPlanVisitor for MetricCollector {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a single-line summary of the root of the plan.
|
||||
/// If the `verbose` flag is set, it will display detailed information about the plan.
|
||||
fn one_line(plan: &dyn ExecutionPlan, verbose: bool) -> impl fmt::Display + '_ {
|
||||
struct Wrapper<'a> {
|
||||
plan: &'a dyn ExecutionPlan,
|
||||
format_type: DisplayFormatType,
|
||||
}
|
||||
|
||||
impl fmt::Display for Wrapper<'_> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
self.plan.fmt_as(self.format_type, f)?;
|
||||
writeln!(f)
|
||||
}
|
||||
}
|
||||
|
||||
let format_type = if verbose {
|
||||
DisplayFormatType::Verbose
|
||||
} else {
|
||||
DisplayFormatType::Default
|
||||
};
|
||||
Wrapper { plan, format_type }
|
||||
}
|
||||
|
||||
/// [`RecordBatchMetrics`] carrys metrics value
|
||||
/// from datanode to frontend through gRPC
|
||||
#[derive(serde::Serialize, serde::Deserialize, Default, Debug, Clone)]
|
||||
|
||||
@@ -22,10 +22,12 @@ use datafusion::physical_plan::PhysicalExpr;
|
||||
use datafusion_common::arrow::array::{ArrayRef, Datum, Scalar};
|
||||
use datafusion_common::arrow::buffer::BooleanBuffer;
|
||||
use datafusion_common::arrow::compute::kernels::cmp;
|
||||
use datafusion_common::cast::{as_boolean_array, as_null_array};
|
||||
use datafusion_common::cast::{as_boolean_array, as_null_array, as_string_array};
|
||||
use datafusion_common::{internal_err, DataFusionError, ScalarValue};
|
||||
use datatypes::arrow::array::{Array, BooleanArray, RecordBatch};
|
||||
use datatypes::arrow::compute::filter_record_batch;
|
||||
use datatypes::arrow::error::ArrowError;
|
||||
use datatypes::compute::kernels::regexp;
|
||||
use datatypes::compute::or_kleene;
|
||||
use datatypes::vectors::VectorRef;
|
||||
use snafu::ResultExt;
|
||||
@@ -36,7 +38,8 @@ use crate::error::{ArrowComputeSnafu, Result, ToArrowScalarSnafu, UnsupportedOpe
|
||||
/// - `col` `op` `literal`
|
||||
/// - `literal` `op` `col`
|
||||
///
|
||||
/// And the `op` is one of `=`, `!=`, `>`, `>=`, `<`, `<=`.
|
||||
/// And the `op` is one of `=`, `!=`, `>`, `>=`, `<`, `<=`,
|
||||
/// or regex operators: `~`, `~*`, `!~`, `!~*`.
|
||||
///
|
||||
/// This struct contains normalized predicate expr. In the form of
|
||||
/// `col` `op` `literal` where the `col` is provided from input.
|
||||
@@ -86,7 +89,11 @@ impl SimpleFilterEvaluator {
|
||||
| Operator::Lt
|
||||
| Operator::LtEq
|
||||
| Operator::Gt
|
||||
| Operator::GtEq => {}
|
||||
| Operator::GtEq
|
||||
| Operator::RegexMatch
|
||||
| Operator::RegexIMatch
|
||||
| Operator::RegexNotMatch
|
||||
| Operator::RegexNotIMatch => {}
|
||||
Operator::Or => {
|
||||
let lhs = Self::try_new(&binary.left)?;
|
||||
let rhs = Self::try_new(&binary.right)?;
|
||||
@@ -172,6 +179,10 @@ impl SimpleFilterEvaluator {
|
||||
Operator::LtEq => cmp::lt_eq(input, &self.literal),
|
||||
Operator::Gt => cmp::gt(input, &self.literal),
|
||||
Operator::GtEq => cmp::gt_eq(input, &self.literal),
|
||||
Operator::RegexMatch => self.regex_match(input, false, false),
|
||||
Operator::RegexIMatch => self.regex_match(input, true, false),
|
||||
Operator::RegexNotMatch => self.regex_match(input, false, true),
|
||||
Operator::RegexNotIMatch => self.regex_match(input, true, true),
|
||||
Operator::Or => {
|
||||
// OR operator stands for OR-chained EQs (or INLIST in other words)
|
||||
let mut result: BooleanArray = vec![false; input_len].into();
|
||||
@@ -192,6 +203,28 @@ impl SimpleFilterEvaluator {
|
||||
.context(ArrowComputeSnafu)
|
||||
.map(|array| array.values().clone())
|
||||
}
|
||||
|
||||
fn regex_match(
|
||||
&self,
|
||||
input: &impl Datum,
|
||||
ignore_case: bool,
|
||||
negative: bool,
|
||||
) -> std::result::Result<BooleanArray, ArrowError> {
|
||||
let flag = if ignore_case { Some("i") } else { None };
|
||||
let array = input.get().0;
|
||||
let string_array = as_string_array(array).map_err(|_| {
|
||||
ArrowError::CastError(format!("Cannot cast {:?} to StringArray", array))
|
||||
})?;
|
||||
let literal_array = self.literal.clone().into_inner();
|
||||
let regex_array = as_string_array(&literal_array).map_err(|_| {
|
||||
ArrowError::CastError(format!("Cannot cast {:?} to StringArray", literal_array))
|
||||
})?;
|
||||
let mut result = regexp::regexp_is_match_scalar(string_array, regex_array.value(0), flag)?;
|
||||
if negative {
|
||||
result = datatypes::compute::not(&result)?;
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
/// Evaluate the predicate on the input [RecordBatch], and return a new [RecordBatch].
|
||||
|
||||
11
src/common/session/Cargo.toml
Normal file
11
src/common/session/Cargo.toml
Normal file
@@ -0,0 +1,11 @@
|
||||
[package]
|
||||
name = "common-session"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
strum.workspace = true
|
||||
45
src/common/session/src/lib.rs
Normal file
45
src/common/session/src/lib.rs
Normal file
@@ -0,0 +1,45 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use strum::{AsRefStr, Display, EnumString};
|
||||
|
||||
/// Defines the read preference for frontend route operations,
|
||||
/// determining whether to read from the region leader or follower.
|
||||
#[derive(Debug, Clone, Copy, Default, EnumString, Display, AsRefStr, PartialEq, Eq)]
|
||||
pub enum ReadPreference {
|
||||
#[default]
|
||||
// Reads all operations from the region leader. This is the default mode.
|
||||
#[strum(serialize = "leader", to_string = "LEADER")]
|
||||
Leader,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::str::FromStr;
|
||||
|
||||
use crate::ReadPreference;
|
||||
|
||||
#[test]
|
||||
fn test_read_preference() {
|
||||
assert_eq!(ReadPreference::Leader.to_string(), "LEADER");
|
||||
|
||||
let read_preference = ReadPreference::from_str("LEADER").unwrap();
|
||||
assert_eq!(read_preference, ReadPreference::Leader);
|
||||
|
||||
let read_preference = ReadPreference::from_str("leader").unwrap();
|
||||
assert_eq!(read_preference, ReadPreference::Leader);
|
||||
|
||||
ReadPreference::from_str("follower").unwrap_err();
|
||||
}
|
||||
}
|
||||
@@ -22,6 +22,6 @@ static PORTS: OnceCell<AtomicUsize> = OnceCell::new();
|
||||
/// Return a unique port(in runtime) for test
|
||||
pub fn get_port() -> usize {
|
||||
PORTS
|
||||
.get_or_init(|| AtomicUsize::new(rand::thread_rng().gen_range(13000..13800)))
|
||||
.get_or_init(|| AtomicUsize::new(rand::rng().random_range(13000..13800)))
|
||||
.fetch_add(1, Ordering::Relaxed)
|
||||
}
|
||||
|
||||
@@ -715,10 +715,10 @@ mod tests {
|
||||
TimeUnit::Microsecond,
|
||||
TimeUnit::Nanosecond,
|
||||
];
|
||||
let mut rng = rand::thread_rng();
|
||||
let unit_idx: usize = rng.gen_range(0..4);
|
||||
let mut rng = rand::rng();
|
||||
let unit_idx: usize = rng.random_range(0..4);
|
||||
let unit = units[unit_idx];
|
||||
let value: i64 = rng.gen();
|
||||
let value: i64 = rng.random();
|
||||
Timestamp::new(value, unit)
|
||||
}
|
||||
|
||||
@@ -745,8 +745,8 @@ mod tests {
|
||||
|
||||
/// Generate timestamp less than or equal to `threshold`
|
||||
fn gen_ts_le(threshold: &Timestamp) -> Timestamp {
|
||||
let mut rng = rand::thread_rng();
|
||||
let timestamp = rng.gen_range(i64::MIN..=threshold.value);
|
||||
let mut rng = rand::rng();
|
||||
let timestamp = rng.random_range(i64::MIN..=threshold.value);
|
||||
Timestamp::new(timestamp, threshold.unit)
|
||||
}
|
||||
|
||||
|
||||
@@ -33,6 +33,7 @@ pub enum WalOptions {
|
||||
RaftEngine,
|
||||
#[serde(with = "kafka_prefix")]
|
||||
Kafka(KafkaWalOptions),
|
||||
Noop,
|
||||
}
|
||||
|
||||
with_prefix!(kafka_prefix "wal.kafka.");
|
||||
@@ -62,5 +63,14 @@ mod tests {
|
||||
|
||||
let decoded: WalOptions = serde_json::from_str(&encoded).unwrap();
|
||||
assert_eq!(decoded, wal_options);
|
||||
|
||||
// Test serde noop wal options.
|
||||
let wal_options = WalOptions::Noop;
|
||||
let encoded = serde_json::to_string(&wal_options).unwrap();
|
||||
let expected = r#"{"wal.provider":"noop"}"#;
|
||||
assert_eq!(&encoded, expected);
|
||||
|
||||
let decoded: WalOptions = serde_json::from_str(&encoded).unwrap();
|
||||
assert_eq!(decoded, wal_options);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,17 +58,24 @@ pub struct RegionAliveKeeper {
|
||||
/// non-decreasing). The heartbeat requests will carry the duration since this epoch, and the
|
||||
/// duration acts like an "invariant point" for region's keep alive lease.
|
||||
epoch: Instant,
|
||||
|
||||
countdown_task_ext_handler: Option<CountdownTaskHandlerExtRef>,
|
||||
}
|
||||
|
||||
impl RegionAliveKeeper {
|
||||
/// Returns an empty [RegionAliveKeeper].
|
||||
pub fn new(region_server: RegionServer, heartbeat_interval_millis: u64) -> Self {
|
||||
pub fn new(
|
||||
region_server: RegionServer,
|
||||
countdown_task_ext_handler: Option<CountdownTaskHandlerExtRef>,
|
||||
heartbeat_interval_millis: u64,
|
||||
) -> Self {
|
||||
Self {
|
||||
region_server,
|
||||
tasks: Arc::new(Mutex::new(HashMap::new())),
|
||||
heartbeat_interval_millis,
|
||||
started: Arc::new(AtomicBool::new(false)),
|
||||
epoch: Instant::now(),
|
||||
countdown_task_ext_handler,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -85,6 +92,7 @@ impl RegionAliveKeeper {
|
||||
|
||||
let handle = Arc::new(CountdownTaskHandle::new(
|
||||
self.region_server.clone(),
|
||||
self.countdown_task_ext_handler.clone(),
|
||||
region_id,
|
||||
));
|
||||
|
||||
@@ -114,7 +122,9 @@ impl RegionAliveKeeper {
|
||||
for region in regions {
|
||||
let (role, region_id) = (region.role().into(), RegionId::from(region.region_id));
|
||||
if let Some(handle) = self.find_handle(region_id).await {
|
||||
handle.reset_deadline(role, deadline).await;
|
||||
handle
|
||||
.reset_deadline(role, deadline, region.extensions.clone())
|
||||
.await;
|
||||
} else {
|
||||
warn!(
|
||||
"Trying to renew the lease for region {region_id}, the keeper handler is not found!"
|
||||
@@ -265,13 +275,27 @@ enum CountdownCommand {
|
||||
/// 4 * `heartbeat_interval_millis`
|
||||
Start(u64),
|
||||
/// Reset countdown deadline to the given instance.
|
||||
/// (NextRole, Deadline)
|
||||
Reset((RegionRole, Instant)),
|
||||
/// (NextRole, Deadline, ExtensionInfo)
|
||||
Reset((RegionRole, Instant, HashMap<String, Vec<u8>>)),
|
||||
/// Returns the current deadline of the countdown task.
|
||||
#[cfg(test)]
|
||||
Deadline(oneshot::Sender<Instant>),
|
||||
}
|
||||
|
||||
pub type CountdownTaskHandlerExtRef = Arc<dyn CountdownTaskExtHandler>;
|
||||
|
||||
/// Extension trait for [CountdownTaskHandle] to reset deadline method.
|
||||
#[async_trait]
|
||||
pub trait CountdownTaskExtHandler: Send + Sync {
|
||||
async fn reset_deadline(
|
||||
&self,
|
||||
region_server: &RegionServer,
|
||||
role: RegionRole,
|
||||
deadline: Instant,
|
||||
extension_info: HashMap<String, Vec<u8>>,
|
||||
);
|
||||
}
|
||||
|
||||
struct CountdownTaskHandle {
|
||||
tx: mpsc::Sender<CountdownCommand>,
|
||||
handler: JoinHandle<()>,
|
||||
@@ -280,11 +304,16 @@ struct CountdownTaskHandle {
|
||||
|
||||
impl CountdownTaskHandle {
|
||||
/// Creates a new [CountdownTaskHandle] and starts the countdown task.
|
||||
fn new(region_server: RegionServer, region_id: RegionId) -> Self {
|
||||
fn new(
|
||||
region_server: RegionServer,
|
||||
handler_ext: Option<CountdownTaskHandlerExtRef>,
|
||||
region_id: RegionId,
|
||||
) -> Self {
|
||||
let (tx, rx) = mpsc::channel(1024);
|
||||
|
||||
let mut countdown_task = CountdownTask {
|
||||
region_server,
|
||||
handler_ext,
|
||||
region_id,
|
||||
rx,
|
||||
};
|
||||
@@ -323,10 +352,15 @@ impl CountdownTaskHandle {
|
||||
None
|
||||
}
|
||||
|
||||
async fn reset_deadline(&self, role: RegionRole, deadline: Instant) {
|
||||
async fn reset_deadline(
|
||||
&self,
|
||||
role: RegionRole,
|
||||
deadline: Instant,
|
||||
extension_info: HashMap<String, Vec<u8>>,
|
||||
) {
|
||||
if let Err(e) = self
|
||||
.tx
|
||||
.send(CountdownCommand::Reset((role, deadline)))
|
||||
.send(CountdownCommand::Reset((role, deadline, extension_info)))
|
||||
.await
|
||||
{
|
||||
warn!(
|
||||
@@ -350,6 +384,7 @@ impl Drop for CountdownTaskHandle {
|
||||
struct CountdownTask {
|
||||
region_server: RegionServer,
|
||||
region_id: RegionId,
|
||||
handler_ext: Option<CountdownTaskHandlerExtRef>,
|
||||
rx: mpsc::Receiver<CountdownCommand>,
|
||||
}
|
||||
|
||||
@@ -379,8 +414,18 @@ impl CountdownTask {
|
||||
started = true;
|
||||
}
|
||||
},
|
||||
Some(CountdownCommand::Reset((role, deadline))) => {
|
||||
let _ = self.region_server.set_region_role(self.region_id, role);
|
||||
Some(CountdownCommand::Reset((role, deadline, extension_info))) => {
|
||||
if let Err(err) = self.region_server.set_region_role(self.region_id, role) {
|
||||
error!(err; "Failed to set region role to {role} for region {region_id}");
|
||||
}
|
||||
if let Some(ext_handler) = self.handler_ext.as_ref() {
|
||||
ext_handler.reset_deadline(
|
||||
&self.region_server,
|
||||
role,
|
||||
deadline,
|
||||
extension_info,
|
||||
).await;
|
||||
}
|
||||
trace!(
|
||||
"Reset deadline of region {region_id} to approximately {} seconds later.",
|
||||
(deadline - Instant::now()).as_secs_f32(),
|
||||
@@ -402,7 +447,9 @@ impl CountdownTask {
|
||||
}
|
||||
() = &mut countdown => {
|
||||
warn!("The region {region_id} lease is expired, convert region to follower.");
|
||||
let _ = self.region_server.set_region_role(self.region_id, RegionRole::Follower);
|
||||
if let Err(err) = self.region_server.set_region_role(self.region_id, RegionRole::Follower) {
|
||||
error!(err; "Failed to set region role to follower for region {region_id}");
|
||||
}
|
||||
// resets the countdown.
|
||||
let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);
|
||||
countdown.as_mut().reset(far_future);
|
||||
@@ -431,7 +478,7 @@ mod test {
|
||||
let engine = Arc::new(engine);
|
||||
region_server.register_engine(engine.clone());
|
||||
|
||||
let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server.clone(), 100));
|
||||
let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server.clone(), None, 100));
|
||||
|
||||
let region_id = RegionId::new(1024, 1);
|
||||
let builder = CreateRequestBuilder::new();
|
||||
@@ -468,6 +515,7 @@ mod test {
|
||||
&[GrantedRegion {
|
||||
region_id: region_id.as_u64(),
|
||||
role: api::v1::meta::RegionRole::Leader.into(),
|
||||
extensions: HashMap::new(),
|
||||
}],
|
||||
Instant::now() + Duration::from_millis(200),
|
||||
)
|
||||
@@ -492,7 +540,8 @@ mod test {
|
||||
async fn countdown_task() {
|
||||
let region_server = mock_region_server();
|
||||
|
||||
let countdown_handle = CountdownTaskHandle::new(region_server, RegionId::new(9999, 2));
|
||||
let countdown_handle =
|
||||
CountdownTaskHandle::new(region_server, None, RegionId::new(9999, 2));
|
||||
|
||||
// If countdown task is not started, its deadline is set to far future.
|
||||
assert!(
|
||||
@@ -522,6 +571,7 @@ mod test {
|
||||
.reset_deadline(
|
||||
RegionRole::Leader,
|
||||
Instant::now() + Duration::from_millis(heartbeat_interval_millis * 5),
|
||||
HashMap::new(),
|
||||
)
|
||||
.await;
|
||||
assert!(
|
||||
|
||||
@@ -31,7 +31,6 @@ use servers::export_metrics::ExportMetricsOption;
|
||||
use servers::grpc::GrpcOptions;
|
||||
use servers::heartbeat_options::HeartbeatOptions;
|
||||
use servers::http::HttpOptions;
|
||||
use servers::Mode;
|
||||
|
||||
pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
|
||||
|
||||
@@ -359,7 +358,6 @@ impl Default for ObjectStoreConfig {
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(default)]
|
||||
pub struct DatanodeOptions {
|
||||
pub mode: Mode,
|
||||
pub node_id: Option<u64>,
|
||||
pub require_lease_before_startup: bool,
|
||||
pub init_regions_in_background: bool,
|
||||
@@ -395,7 +393,6 @@ impl Default for DatanodeOptions {
|
||||
#[allow(deprecated)]
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
mode: Mode::Standalone,
|
||||
node_id: None,
|
||||
require_lease_before_startup: false,
|
||||
init_regions_in_background: false,
|
||||
|
||||
@@ -157,6 +157,7 @@ impl Datanode {
|
||||
|
||||
pub struct DatanodeBuilder {
|
||||
opts: DatanodeOptions,
|
||||
mode: Mode,
|
||||
plugins: Plugins,
|
||||
meta_client: Option<MetaClientRef>,
|
||||
kv_backend: Option<KvBackendRef>,
|
||||
@@ -166,9 +167,10 @@ pub struct DatanodeBuilder {
|
||||
impl DatanodeBuilder {
|
||||
/// `kv_backend` is optional. If absent, the builder will try to build one
|
||||
/// by using the given `opts`
|
||||
pub fn new(opts: DatanodeOptions, plugins: Plugins) -> Self {
|
||||
pub fn new(opts: DatanodeOptions, plugins: Plugins, mode: Mode) -> Self {
|
||||
Self {
|
||||
opts,
|
||||
mode,
|
||||
plugins,
|
||||
meta_client: None,
|
||||
kv_backend: None,
|
||||
@@ -198,7 +200,7 @@ impl DatanodeBuilder {
|
||||
}
|
||||
|
||||
pub async fn build(mut self) -> Result<Datanode> {
|
||||
let mode = &self.opts.mode;
|
||||
let mode = &self.mode;
|
||||
let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
|
||||
|
||||
let meta_client = self.meta_client.take();
|
||||
@@ -263,6 +265,7 @@ impl DatanodeBuilder {
|
||||
region_server.clone(),
|
||||
meta_client,
|
||||
cache_registry,
|
||||
self.plugins.clone(),
|
||||
)
|
||||
.await?,
|
||||
)
|
||||
@@ -629,6 +632,7 @@ mod tests {
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use mito2::engine::MITO_ENGINE_NAME;
|
||||
use servers::Mode;
|
||||
use store_api::region_request::RegionRequest;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
@@ -674,6 +678,7 @@ mod tests {
|
||||
..Default::default()
|
||||
},
|
||||
Plugins::default(),
|
||||
Mode::Standalone,
|
||||
)
|
||||
.with_cache_registry(layered_cache_registry);
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
|
||||
use common_base::Plugins;
|
||||
use common_meta::cache_invalidator::CacheInvalidatorRef;
|
||||
use common_meta::datanode::REGION_STATISTIC_KEY;
|
||||
use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
|
||||
@@ -37,7 +38,7 @@ use tokio::sync::{mpsc, Notify};
|
||||
use tokio::time::Instant;
|
||||
|
||||
use self::handler::RegionHeartbeatResponseHandler;
|
||||
use crate::alive_keeper::RegionAliveKeeper;
|
||||
use crate::alive_keeper::{CountdownTaskHandlerExtRef, RegionAliveKeeper};
|
||||
use crate::config::DatanodeOptions;
|
||||
use crate::error::{self, MetaClientInitSnafu, Result};
|
||||
use crate::event_listener::RegionServerEventReceiver;
|
||||
@@ -73,9 +74,12 @@ impl HeartbeatTask {
|
||||
region_server: RegionServer,
|
||||
meta_client: MetaClientRef,
|
||||
cache_invalidator: CacheInvalidatorRef,
|
||||
plugins: Plugins,
|
||||
) -> Result<Self> {
|
||||
let countdown_task_handler_ext = plugins.get::<CountdownTaskHandlerExtRef>();
|
||||
let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
|
||||
region_server.clone(),
|
||||
countdown_task_handler_ext,
|
||||
opts.heartbeat.interval.as_millis() as u64,
|
||||
));
|
||||
let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![
|
||||
|
||||
@@ -25,6 +25,6 @@ pub mod heartbeat;
|
||||
pub mod metrics;
|
||||
pub mod region_server;
|
||||
pub mod service;
|
||||
mod store;
|
||||
pub mod store;
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub mod tests;
|
||||
|
||||
@@ -55,7 +55,7 @@ use store_api::metric_engine_consts::{
|
||||
FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
|
||||
};
|
||||
use store_api::region_engine::{
|
||||
RegionEngineRef, RegionRole, RegionStatistic, SetRegionRoleStateResponse,
|
||||
RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, SetRegionRoleStateResponse,
|
||||
SettableRegionRoleState,
|
||||
};
|
||||
use store_api::region_request::{
|
||||
@@ -308,6 +308,22 @@ impl RegionServer {
|
||||
.with_context(|_| HandleRegionRequestSnafu { region_id })
|
||||
}
|
||||
|
||||
pub async fn sync_region_manifest(
|
||||
&self,
|
||||
region_id: RegionId,
|
||||
manifest_info: RegionManifestInfo,
|
||||
) -> Result<()> {
|
||||
let engine = self
|
||||
.inner
|
||||
.region_map
|
||||
.get(®ion_id)
|
||||
.with_context(|| RegionNotFoundSnafu { region_id })?;
|
||||
engine
|
||||
.sync_region(region_id, manifest_info)
|
||||
.await
|
||||
.with_context(|_| HandleRegionRequestSnafu { region_id })
|
||||
}
|
||||
|
||||
/// Set region role state gracefully.
|
||||
///
|
||||
/// For [SettableRegionRoleState::Follower]:
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
//! object storage utilities
|
||||
|
||||
mod azblob;
|
||||
mod fs;
|
||||
pub mod fs;
|
||||
mod gcs;
|
||||
mod oss;
|
||||
mod s3;
|
||||
|
||||
@@ -24,7 +24,8 @@ use crate::config::FileConfig;
|
||||
use crate::error::{self, Result};
|
||||
use crate::store;
|
||||
|
||||
pub(crate) async fn new_fs_object_store(
|
||||
/// A helper function to create a file system object store.
|
||||
pub async fn new_fs_object_store(
|
||||
data_home: &str,
|
||||
_file_config: &FileConfig,
|
||||
) -> Result<ObjectStore> {
|
||||
|
||||
@@ -32,8 +32,8 @@ use query::{QueryEngine, QueryEngineContext};
|
||||
use session::context::QueryContextRef;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::region_engine::{
|
||||
RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetRegionRoleStateResponse,
|
||||
SettableRegionRoleState,
|
||||
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
|
||||
SetRegionRoleStateResponse, SettableRegionRoleState,
|
||||
};
|
||||
use store_api::region_request::{AffectedRows, RegionRequest};
|
||||
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
|
||||
@@ -246,6 +246,14 @@ impl RegionEngine for MockRegionEngine {
|
||||
Some(RegionRole::Leader)
|
||||
}
|
||||
|
||||
async fn sync_region(
|
||||
&self,
|
||||
_region_id: RegionId,
|
||||
_manifest_info: RegionManifestInfo,
|
||||
) -> Result<(), BoxedError> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
@@ -28,8 +28,9 @@ use snafu::{ensure, ResultExt};
|
||||
use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result};
|
||||
use crate::prelude::ConcreteDataType;
|
||||
pub use crate::schema::column_schema::{
|
||||
ColumnSchema, FulltextAnalyzer, FulltextOptions, Metadata, SkippingIndexOptions,
|
||||
SkippingIndexType, COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE, COLUMN_FULLTEXT_OPT_KEY_ANALYZER,
|
||||
ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions, Metadata,
|
||||
SkippingIndexOptions, SkippingIndexType, COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE,
|
||||
COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
|
||||
COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY,
|
||||
COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY, FULLTEXT_KEY, INVERTED_INDEX_KEY,
|
||||
SKIPPING_INDEX_KEY, TIME_INDEX_KEY,
|
||||
|
||||
@@ -46,6 +46,7 @@ pub const SKIPPING_INDEX_KEY: &str = "greptime:skipping_index";
|
||||
pub const COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE: &str = "enable";
|
||||
pub const COLUMN_FULLTEXT_OPT_KEY_ANALYZER: &str = "analyzer";
|
||||
pub const COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE: &str = "case_sensitive";
|
||||
pub const COLUMN_FULLTEXT_OPT_KEY_BACKEND: &str = "backend";
|
||||
|
||||
/// Keys used in SKIPPING index options
|
||||
pub const COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY: &str = "granularity";
|
||||
@@ -514,6 +515,9 @@ pub struct FulltextOptions {
|
||||
/// Whether the fulltext index is case-sensitive.
|
||||
#[serde(default)]
|
||||
pub case_sensitive: bool,
|
||||
/// The fulltext backend to use.
|
||||
#[serde(default)]
|
||||
pub backend: FulltextBackend,
|
||||
}
|
||||
|
||||
impl fmt::Display for FulltextOptions {
|
||||
@@ -522,11 +526,30 @@ impl fmt::Display for FulltextOptions {
|
||||
if self.enable {
|
||||
write!(f, ", analyzer={}", self.analyzer)?;
|
||||
write!(f, ", case_sensitive={}", self.case_sensitive)?;
|
||||
write!(f, ", backend={}", self.backend)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// The backend of the fulltext index.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default, Visit, VisitMut)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum FulltextBackend {
|
||||
#[default]
|
||||
Tantivy,
|
||||
Bloom, // TODO(zhongzc): when bloom is ready, use it as default
|
||||
}
|
||||
|
||||
impl fmt::Display for FulltextBackend {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
FulltextBackend::Tantivy => write!(f, "tantivy"),
|
||||
FulltextBackend::Bloom => write!(f, "bloom"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<HashMap<String, String>> for FulltextOptions {
|
||||
type Error = Error;
|
||||
|
||||
@@ -575,6 +598,19 @@ impl TryFrom<HashMap<String, String>> for FulltextOptions {
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(backend) = options.get(COLUMN_FULLTEXT_OPT_KEY_BACKEND) {
|
||||
match backend.to_ascii_lowercase().as_str() {
|
||||
"bloom" => fulltext_options.backend = FulltextBackend::Bloom,
|
||||
"tantivy" => fulltext_options.backend = FulltextBackend::Tantivy,
|
||||
_ => {
|
||||
return InvalidFulltextOptionSnafu {
|
||||
msg: format!("{backend}, expected: 'bloom' | 'tantivy'"),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(fulltext_options)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,6 +30,7 @@ mod boolean;
|
||||
mod constant;
|
||||
mod date;
|
||||
mod decimal;
|
||||
mod dictionary;
|
||||
mod duration;
|
||||
mod eq;
|
||||
mod helper;
|
||||
@@ -48,6 +49,7 @@ pub use boolean::{BooleanVector, BooleanVectorBuilder};
|
||||
pub use constant::ConstantVector;
|
||||
pub use date::{DateVector, DateVectorBuilder};
|
||||
pub use decimal::{Decimal128Vector, Decimal128VectorBuilder};
|
||||
pub use dictionary::{DictionaryIter, DictionaryVector};
|
||||
pub use duration::{
|
||||
DurationMicrosecondVector, DurationMicrosecondVectorBuilder, DurationMillisecondVector,
|
||||
DurationMillisecondVectorBuilder, DurationNanosecondVector, DurationNanosecondVectorBuilder,
|
||||
|
||||
438
src/datatypes/src/vectors/dictionary.rs
Normal file
438
src/datatypes/src/vectors/dictionary.rs
Normal file
@@ -0,0 +1,438 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::array::Array;
|
||||
use arrow::datatypes::Int32Type;
|
||||
use arrow_array::{ArrayRef, DictionaryArray, Int32Array};
|
||||
use serde_json::Value as JsonValue;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use super::operations::VectorOp;
|
||||
use crate::data_type::ConcreteDataType;
|
||||
use crate::error::{self, Result};
|
||||
use crate::serialize::Serializable;
|
||||
use crate::types::DictionaryType;
|
||||
use crate::value::{Value, ValueRef};
|
||||
use crate::vectors::{self, Helper, Validity, Vector, VectorRef};
|
||||
|
||||
/// Vector of dictionaries, basically backed by Arrow's `DictionaryArray`.
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct DictionaryVector {
|
||||
array: DictionaryArray<Int32Type>,
|
||||
/// The datatype of the items in the dictionary.
|
||||
item_type: ConcreteDataType,
|
||||
/// The vector of items in the dictionary.
|
||||
item_vector: VectorRef,
|
||||
}
|
||||
|
||||
impl DictionaryVector {
|
||||
/// Create a new instance of `DictionaryVector` from a dictionary array and item type
|
||||
pub fn new(array: DictionaryArray<Int32Type>, item_type: ConcreteDataType) -> Result<Self> {
|
||||
let item_vector = Helper::try_into_vector(array.values())?;
|
||||
|
||||
Ok(Self {
|
||||
array,
|
||||
item_type,
|
||||
item_vector,
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns the underlying Arrow dictionary array
|
||||
pub fn array(&self) -> &DictionaryArray<Int32Type> {
|
||||
&self.array
|
||||
}
|
||||
|
||||
/// Returns the keys array of this dictionary
|
||||
pub fn keys(&self) -> &arrow_array::PrimitiveArray<Int32Type> {
|
||||
self.array.keys()
|
||||
}
|
||||
|
||||
/// Returns the values array of this dictionary
|
||||
pub fn values(&self) -> &ArrayRef {
|
||||
self.array.values()
|
||||
}
|
||||
|
||||
pub fn as_arrow(&self) -> &dyn Array {
|
||||
&self.array
|
||||
}
|
||||
}
|
||||
|
||||
impl Vector for DictionaryVector {
|
||||
fn data_type(&self) -> ConcreteDataType {
|
||||
ConcreteDataType::Dictionary(DictionaryType::new(
|
||||
ConcreteDataType::int32_datatype(),
|
||||
self.item_type.clone(),
|
||||
))
|
||||
}
|
||||
|
||||
fn vector_type_name(&self) -> String {
|
||||
"DictionaryVector".to_string()
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn len(&self) -> usize {
|
||||
self.array.len()
|
||||
}
|
||||
|
||||
fn to_arrow_array(&self) -> ArrayRef {
|
||||
Arc::new(self.array.clone())
|
||||
}
|
||||
|
||||
fn to_boxed_arrow_array(&self) -> Box<dyn Array> {
|
||||
Box::new(self.array.clone())
|
||||
}
|
||||
|
||||
fn validity(&self) -> Validity {
|
||||
vectors::impl_validity_for_vector!(self.array)
|
||||
}
|
||||
|
||||
fn memory_size(&self) -> usize {
|
||||
self.array.get_buffer_memory_size()
|
||||
}
|
||||
|
||||
fn null_count(&self) -> usize {
|
||||
self.array.null_count()
|
||||
}
|
||||
|
||||
fn is_null(&self, row: usize) -> bool {
|
||||
self.array.is_null(row)
|
||||
}
|
||||
|
||||
fn slice(&self, offset: usize, length: usize) -> VectorRef {
|
||||
Arc::new(Self {
|
||||
array: self.array.slice(offset, length),
|
||||
item_type: self.item_type.clone(),
|
||||
item_vector: self.item_vector.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
fn get(&self, index: usize) -> Value {
|
||||
if !self.array.is_valid(index) {
|
||||
return Value::Null;
|
||||
}
|
||||
|
||||
let key = self.array.keys().value(index);
|
||||
self.item_vector.get(key as usize)
|
||||
}
|
||||
|
||||
fn get_ref(&self, index: usize) -> ValueRef {
|
||||
if !self.array.is_valid(index) {
|
||||
return ValueRef::Null;
|
||||
}
|
||||
|
||||
let key = self.array.keys().value(index);
|
||||
self.item_vector.get_ref(key as usize)
|
||||
}
|
||||
}
|
||||
|
||||
impl Serializable for DictionaryVector {
|
||||
fn serialize_to_json(&self) -> Result<Vec<JsonValue>> {
|
||||
// Convert the dictionary array to JSON, where each element is either null or
|
||||
// the value it refers to in the dictionary
|
||||
let mut result = Vec::with_capacity(self.len());
|
||||
|
||||
for i in 0..self.len() {
|
||||
if self.is_null(i) {
|
||||
result.push(JsonValue::Null);
|
||||
} else {
|
||||
let key = self.array.keys().value(i);
|
||||
let value = self.item_vector.get(key as usize);
|
||||
let json_value = serde_json::to_value(value).context(error::SerializeSnafu)?;
|
||||
result.push(json_value);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<DictionaryArray<Int32Type>> for DictionaryVector {
|
||||
type Error = crate::error::Error;
|
||||
|
||||
fn try_from(array: DictionaryArray<Int32Type>) -> Result<Self> {
|
||||
let item_type = ConcreteDataType::from_arrow_type(array.values().data_type());
|
||||
let item_vector = Helper::try_into_vector(array.values())?;
|
||||
|
||||
Ok(Self {
|
||||
array,
|
||||
item_type,
|
||||
item_vector,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DictionaryIter<'a> {
|
||||
vector: &'a DictionaryVector,
|
||||
idx: usize,
|
||||
}
|
||||
|
||||
impl<'a> DictionaryIter<'a> {
|
||||
pub fn new(vector: &'a DictionaryVector) -> DictionaryIter<'a> {
|
||||
DictionaryIter { vector, idx: 0 }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Iterator for DictionaryIter<'a> {
|
||||
type Item = Option<ValueRef<'a>>;
|
||||
|
||||
#[inline]
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
if self.idx >= self.vector.len() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let idx = self.idx;
|
||||
self.idx += 1;
|
||||
|
||||
if self.vector.is_null(idx) {
|
||||
return Some(None);
|
||||
}
|
||||
|
||||
Some(Some(self.vector.item_vector.get_ref(self.idx)))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
(
|
||||
self.vector.len() - self.idx,
|
||||
Some(self.vector.len() - self.idx),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl VectorOp for DictionaryVector {
|
||||
fn replicate(&self, offsets: &[usize]) -> VectorRef {
|
||||
let keys = self.array.keys();
|
||||
let mut replicated_keys = Vec::with_capacity(offsets.len());
|
||||
|
||||
let mut previous_offset = 0;
|
||||
for (i, &offset) in offsets.iter().enumerate() {
|
||||
let key = if i < self.len() {
|
||||
if keys.is_valid(i) {
|
||||
Some(keys.value(i))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// repeat this key (offset - previous_offset) times
|
||||
let repeat_count = offset - previous_offset;
|
||||
if repeat_count > 0 {
|
||||
replicated_keys.resize(replicated_keys.len() + repeat_count, key);
|
||||
}
|
||||
|
||||
previous_offset = offset;
|
||||
}
|
||||
|
||||
let new_keys = Int32Array::from(replicated_keys);
|
||||
let new_array = DictionaryArray::try_new(new_keys, self.values().clone())
|
||||
.expect("Failed to create replicated dictionary array");
|
||||
|
||||
Arc::new(Self {
|
||||
array: new_array,
|
||||
item_type: self.item_type.clone(),
|
||||
item_vector: self.item_vector.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
fn filter(&self, filter: &vectors::BooleanVector) -> Result<VectorRef> {
|
||||
let key_array: ArrayRef = Arc::new(self.array.keys().clone());
|
||||
let key_vector = Helper::try_into_vector(&key_array)?;
|
||||
let filtered_key_vector = key_vector.filter(filter)?;
|
||||
let filtered_key_array = filtered_key_vector.to_arrow_array();
|
||||
let filtered_key_array = filtered_key_array
|
||||
.as_any()
|
||||
.downcast_ref::<Int32Array>()
|
||||
.unwrap();
|
||||
|
||||
let new_array = DictionaryArray::try_new(filtered_key_array.clone(), self.values().clone())
|
||||
.expect("Failed to create filtered dictionary array");
|
||||
|
||||
Ok(Arc::new(Self {
|
||||
array: new_array,
|
||||
item_type: self.item_type.clone(),
|
||||
item_vector: self.item_vector.clone(),
|
||||
}))
|
||||
}
|
||||
|
||||
fn cast(&self, to_type: &ConcreteDataType) -> Result<VectorRef> {
|
||||
let new_items = self.item_vector.cast(to_type)?;
|
||||
let new_array =
|
||||
DictionaryArray::try_new(self.array.keys().clone(), new_items.to_arrow_array())
|
||||
.expect("Failed to create casted dictionary array");
|
||||
Ok(Arc::new(Self {
|
||||
array: new_array,
|
||||
item_type: to_type.clone(),
|
||||
item_vector: self.item_vector.clone(),
|
||||
}))
|
||||
}
|
||||
|
||||
fn take(&self, indices: &vectors::UInt32Vector) -> Result<VectorRef> {
|
||||
let key_array: ArrayRef = Arc::new(self.array.keys().clone());
|
||||
let key_vector = Helper::try_into_vector(&key_array)?;
|
||||
let new_key_vector = key_vector.take(indices)?;
|
||||
let new_key_array = new_key_vector.to_arrow_array();
|
||||
let new_key_array = new_key_array.as_any().downcast_ref::<Int32Array>().unwrap();
|
||||
|
||||
let new_array = DictionaryArray::try_new(new_key_array.clone(), self.values().clone())
|
||||
.expect("Failed to create filtered dictionary array");
|
||||
|
||||
Ok(Arc::new(Self {
|
||||
array: new_array,
|
||||
item_type: self.item_type.clone(),
|
||||
item_vector: self.item_vector.clone(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow_array::StringArray;
|
||||
|
||||
use super::*;
|
||||
|
||||
// Helper function to create a test dictionary vector with string values
|
||||
fn create_test_dictionary() -> DictionaryVector {
|
||||
// Dictionary values: ["a", "b", "c", "d"]
|
||||
// Keys: [0, 1, 2, null, 1, 3]
|
||||
// Resulting in: ["a", "b", "c", null, "b", "d"]
|
||||
let values = StringArray::from(vec!["a", "b", "c", "d"]);
|
||||
let keys = Int32Array::from(vec![Some(0), Some(1), Some(2), None, Some(1), Some(3)]);
|
||||
let dict_array = DictionaryArray::new(keys, Arc::new(values));
|
||||
DictionaryVector::try_from(dict_array).unwrap()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_dictionary_vector_basics() {
|
||||
let dict_vec = create_test_dictionary();
|
||||
|
||||
// Test length and null count
|
||||
assert_eq!(dict_vec.len(), 6);
|
||||
assert_eq!(dict_vec.null_count(), 1);
|
||||
|
||||
// Test data type
|
||||
let data_type = dict_vec.data_type();
|
||||
if let ConcreteDataType::Dictionary(dict_type) = data_type {
|
||||
assert_eq!(*dict_type.value_type(), ConcreteDataType::string_datatype());
|
||||
} else {
|
||||
panic!("Expected Dictionary data type");
|
||||
}
|
||||
|
||||
// Test is_null
|
||||
assert!(!dict_vec.is_null(0));
|
||||
assert!(dict_vec.is_null(3));
|
||||
|
||||
// Test get values
|
||||
assert_eq!(dict_vec.get(0), Value::String("a".to_string().into()));
|
||||
assert_eq!(dict_vec.get(1), Value::String("b".to_string().into()));
|
||||
assert_eq!(dict_vec.get(3), Value::Null);
|
||||
assert_eq!(dict_vec.get(4), Value::String("b".to_string().into()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_slice() {
|
||||
let dict_vec = create_test_dictionary();
|
||||
let sliced = dict_vec.slice(1, 3);
|
||||
|
||||
assert_eq!(sliced.len(), 3);
|
||||
assert_eq!(sliced.get(0), Value::String("b".to_string().into()));
|
||||
assert_eq!(sliced.get(1), Value::String("c".to_string().into()));
|
||||
assert_eq!(sliced.get(2), Value::Null);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_replicate() {
|
||||
let dict_vec = create_test_dictionary();
|
||||
|
||||
// Replicate with offsets [0, 2, 5] - should get values at these indices
|
||||
let offsets = vec![0, 2, 5];
|
||||
let replicated = dict_vec.replicate(&offsets);
|
||||
assert_eq!(replicated.len(), 5);
|
||||
assert_eq!(replicated.get(0), Value::String("b".to_string().into()));
|
||||
assert_eq!(replicated.get(1), Value::String("b".to_string().into()));
|
||||
assert_eq!(replicated.get(2), Value::String("c".to_string().into()));
|
||||
assert_eq!(replicated.get(3), Value::String("c".to_string().into()));
|
||||
assert_eq!(replicated.get(4), Value::String("c".to_string().into()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_filter() {
|
||||
let dict_vec = create_test_dictionary();
|
||||
|
||||
// Keep only indices 0, 2, 4
|
||||
let filter_values = vec![true, false, true, false, true, false];
|
||||
let filter = vectors::BooleanVector::from(filter_values);
|
||||
|
||||
let filtered = dict_vec.filter(&filter).unwrap();
|
||||
assert_eq!(filtered.len(), 3);
|
||||
|
||||
// Check the values
|
||||
assert_eq!(filtered.get(0), Value::String("a".to_string().into()));
|
||||
assert_eq!(filtered.get(1), Value::String("c".to_string().into()));
|
||||
assert_eq!(filtered.get(2), Value::String("b".to_string().into()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cast() {
|
||||
let dict_vec = create_test_dictionary();
|
||||
|
||||
// Cast to the same type should return an equivalent vector
|
||||
let casted = dict_vec.cast(&ConcreteDataType::string_datatype()).unwrap();
|
||||
|
||||
// The returned vector should have string values
|
||||
assert_eq!(
|
||||
casted.data_type(),
|
||||
ConcreteDataType::Dictionary(DictionaryType::new(
|
||||
ConcreteDataType::int32_datatype(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
))
|
||||
);
|
||||
assert_eq!(casted.len(), dict_vec.len());
|
||||
|
||||
// Values should match the original dictionary lookups
|
||||
assert_eq!(casted.get(0), Value::String("a".to_string().into()));
|
||||
assert_eq!(casted.get(1), Value::String("b".to_string().into()));
|
||||
assert_eq!(casted.get(2), Value::String("c".to_string().into()));
|
||||
assert_eq!(casted.get(3), Value::Null);
|
||||
assert_eq!(casted.get(4), Value::String("b".to_string().into()));
|
||||
assert_eq!(casted.get(5), Value::String("d".to_string().into()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_take() {
|
||||
let dict_vec = create_test_dictionary();
|
||||
|
||||
// Take indices 2, 0, 4
|
||||
let indices_vec = vec![Some(2u32), Some(0), Some(4)];
|
||||
let indices = vectors::UInt32Vector::from(indices_vec);
|
||||
|
||||
let taken = dict_vec.take(&indices).unwrap();
|
||||
assert_eq!(taken.len(), 3);
|
||||
|
||||
// Check the values
|
||||
assert_eq!(taken.get(0), Value::String("c".to_string().into()));
|
||||
assert_eq!(taken.get(1), Value::String("a".to_string().into()));
|
||||
assert_eq!(taken.get(2), Value::String("b".to_string().into()));
|
||||
}
|
||||
}
|
||||
@@ -20,7 +20,8 @@ use std::sync::Arc;
|
||||
use arrow::array::{Array, ArrayRef, StringArray};
|
||||
use arrow::compute;
|
||||
use arrow::compute::kernels::comparison;
|
||||
use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
|
||||
use arrow::datatypes::{DataType as ArrowDataType, Int32Type, TimeUnit};
|
||||
use arrow_array::DictionaryArray;
|
||||
use arrow_schema::IntervalUnit;
|
||||
use datafusion_common::ScalarValue;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
@@ -31,7 +32,7 @@ use crate::prelude::DataType;
|
||||
use crate::scalars::{Scalar, ScalarVectorBuilder};
|
||||
use crate::value::{ListValue, ListValueRef, Value};
|
||||
use crate::vectors::{
|
||||
BinaryVector, BooleanVector, ConstantVector, DateVector, Decimal128Vector,
|
||||
BinaryVector, BooleanVector, ConstantVector, DateVector, Decimal128Vector, DictionaryVector,
|
||||
DurationMicrosecondVector, DurationMillisecondVector, DurationNanosecondVector,
|
||||
DurationSecondVector, Float32Vector, Float64Vector, Int16Vector, Int32Vector, Int64Vector,
|
||||
Int8Vector, IntervalDayTimeVector, IntervalMonthDayNanoVector, IntervalYearMonthVector,
|
||||
@@ -347,6 +348,17 @@ impl Helper {
|
||||
ArrowDataType::Decimal128(_, _) => {
|
||||
Arc::new(Decimal128Vector::try_from_arrow_array(array)?)
|
||||
}
|
||||
ArrowDataType::Dictionary(key, value) if matches!(&**key, ArrowDataType::Int32) => {
|
||||
let array = array
|
||||
.as_ref()
|
||||
.as_any()
|
||||
.downcast_ref::<DictionaryArray<Int32Type>>()
|
||||
.unwrap(); // Safety: the type is guarded by match arm condition
|
||||
Arc::new(DictionaryVector::new(
|
||||
array.clone(),
|
||||
ConcreteDataType::try_from(value.as_ref())?,
|
||||
)?)
|
||||
}
|
||||
ArrowDataType::Float16
|
||||
| ArrowDataType::LargeList(_)
|
||||
| ArrowDataType::FixedSizeList(_, _)
|
||||
|
||||
@@ -14,14 +14,11 @@
|
||||
|
||||
mod cast;
|
||||
mod filter;
|
||||
mod find_unique;
|
||||
mod replicate;
|
||||
mod take;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_base::BitVec;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::types::LogicalPrimitiveType;
|
||||
use crate::vectors::constant::ConstantVector;
|
||||
@@ -40,23 +37,6 @@ pub trait VectorOp {
|
||||
/// Panics if `offsets.len() != self.len()`.
|
||||
fn replicate(&self, offsets: &[usize]) -> VectorRef;
|
||||
|
||||
/// Mark `i-th` bit of `selected` to `true` if the `i-th` element of `self` is unique, which
|
||||
/// means there is no elements behind it have same value as it.
|
||||
///
|
||||
/// The caller should ensure
|
||||
/// 1. the length of `selected` bitmap is equal to `vector.len()`.
|
||||
/// 2. `vector` and `prev_vector` are sorted.
|
||||
///
|
||||
/// If there are multiple duplicate elements, this function retains the **first** element.
|
||||
/// The first element is considered as unique if the first element of `self` is different
|
||||
/// from its previous element, that is the last element of `prev_vector`.
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if
|
||||
/// - `selected.len() < self.len()`.
|
||||
/// - `prev_vector` and `self` have different data types.
|
||||
fn find_unique(&self, selected: &mut BitVec, prev_vector: Option<&dyn Vector>);
|
||||
|
||||
/// Filters the vector, returns elements matching the `filter` (i.e. where the values are true).
|
||||
///
|
||||
/// Note that the nulls of `filter` are interpreted as `false` will lead to these elements being masked out.
|
||||
@@ -81,11 +61,6 @@ macro_rules! impl_scalar_vector_op {
|
||||
replicate::replicate_scalar(self, offsets)
|
||||
}
|
||||
|
||||
fn find_unique(&self, selected: &mut BitVec, prev_vector: Option<&dyn Vector>) {
|
||||
let prev_vector = prev_vector.map(|pv| pv.as_any().downcast_ref::<$VectorType>().unwrap());
|
||||
find_unique::find_unique_scalar(self, selected, prev_vector);
|
||||
}
|
||||
|
||||
fn filter(&self, filter: &BooleanVector) -> Result<VectorRef> {
|
||||
filter::filter_non_constant!(self, $VectorType, filter)
|
||||
}
|
||||
@@ -121,11 +96,6 @@ impl VectorOp for Decimal128Vector {
|
||||
std::sync::Arc::new(replicate::replicate_decimal128(self, offsets))
|
||||
}
|
||||
|
||||
fn find_unique(&self, selected: &mut BitVec, prev_vector: Option<&dyn Vector>) {
|
||||
let prev_vector = prev_vector.and_then(|pv| pv.as_any().downcast_ref::<Decimal128Vector>());
|
||||
find_unique::find_unique_scalar(self, selected, prev_vector);
|
||||
}
|
||||
|
||||
fn filter(&self, filter: &BooleanVector) -> Result<VectorRef> {
|
||||
filter::filter_non_constant!(self, Decimal128Vector, filter)
|
||||
}
|
||||
@@ -144,12 +114,6 @@ impl<T: LogicalPrimitiveType> VectorOp for PrimitiveVector<T> {
|
||||
std::sync::Arc::new(replicate::replicate_primitive(self, offsets))
|
||||
}
|
||||
|
||||
fn find_unique(&self, selected: &mut BitVec, prev_vector: Option<&dyn Vector>) {
|
||||
let prev_vector =
|
||||
prev_vector.and_then(|pv| pv.as_any().downcast_ref::<PrimitiveVector<T>>());
|
||||
find_unique::find_unique_scalar(self, selected, prev_vector);
|
||||
}
|
||||
|
||||
fn filter(&self, filter: &BooleanVector) -> Result<VectorRef> {
|
||||
filter::filter_non_constant!(self, PrimitiveVector<T>, filter)
|
||||
}
|
||||
@@ -168,11 +132,6 @@ impl VectorOp for NullVector {
|
||||
replicate::replicate_null(self, offsets)
|
||||
}
|
||||
|
||||
fn find_unique(&self, selected: &mut BitVec, prev_vector: Option<&dyn Vector>) {
|
||||
let prev_vector = prev_vector.and_then(|pv| pv.as_any().downcast_ref::<NullVector>());
|
||||
find_unique::find_unique_null(self, selected, prev_vector);
|
||||
}
|
||||
|
||||
fn filter(&self, filter: &BooleanVector) -> Result<VectorRef> {
|
||||
filter::filter_non_constant!(self, NullVector, filter)
|
||||
}
|
||||
@@ -195,11 +154,6 @@ impl VectorOp for ConstantVector {
|
||||
self.replicate_vector(offsets)
|
||||
}
|
||||
|
||||
fn find_unique(&self, selected: &mut BitVec, prev_vector: Option<&dyn Vector>) {
|
||||
let prev_vector = prev_vector.and_then(|pv| pv.as_any().downcast_ref::<ConstantVector>());
|
||||
find_unique::find_unique_constant(self, selected, prev_vector);
|
||||
}
|
||||
|
||||
fn filter(&self, filter: &BooleanVector) -> Result<VectorRef> {
|
||||
self.filter_vector(filter)
|
||||
}
|
||||
|
||||
@@ -1,366 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_base::BitVec;
|
||||
|
||||
use crate::scalars::ScalarVector;
|
||||
use crate::vectors::constant::ConstantVector;
|
||||
use crate::vectors::{NullVector, Vector};
|
||||
|
||||
// To implement `find_unique()` correctly, we need to keep in mind that always marks an element as
|
||||
// selected when it is different from the previous one, and leaves the `selected` unchanged
|
||||
// in any other case.
|
||||
pub(crate) fn find_unique_scalar<'a, T: ScalarVector>(
|
||||
vector: &'a T,
|
||||
selected: &'a mut BitVec,
|
||||
prev_vector: Option<&'a T>,
|
||||
) where
|
||||
T::RefItem<'a>: PartialEq,
|
||||
{
|
||||
assert!(selected.len() >= vector.len());
|
||||
|
||||
if vector.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
for ((i, current), next) in vector
|
||||
.iter_data()
|
||||
.enumerate()
|
||||
.zip(vector.iter_data().skip(1))
|
||||
{
|
||||
if current != next {
|
||||
// If next element is a different element, we mark it as selected.
|
||||
selected.set(i + 1, true);
|
||||
}
|
||||
}
|
||||
|
||||
// Marks first element as selected if it is different from previous element, otherwise
|
||||
// keep selected bitmap unchanged.
|
||||
let is_first_not_duplicate = prev_vector
|
||||
.map(|pv| {
|
||||
if pv.is_empty() {
|
||||
true
|
||||
} else {
|
||||
let last = pv.get_data(pv.len() - 1);
|
||||
last != vector.get_data(0)
|
||||
}
|
||||
})
|
||||
.unwrap_or(true);
|
||||
if is_first_not_duplicate {
|
||||
selected.set(0, true);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn find_unique_null(
|
||||
vector: &NullVector,
|
||||
selected: &mut BitVec,
|
||||
prev_vector: Option<&NullVector>,
|
||||
) {
|
||||
if vector.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let is_first_not_duplicate = prev_vector.map(NullVector::is_empty).unwrap_or(true);
|
||||
if is_first_not_duplicate {
|
||||
selected.set(0, true);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn find_unique_constant(
|
||||
vector: &ConstantVector,
|
||||
selected: &mut BitVec,
|
||||
prev_vector: Option<&ConstantVector>,
|
||||
) {
|
||||
if vector.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let is_first_not_duplicate = prev_vector
|
||||
.map(|pv| {
|
||||
if pv.is_empty() {
|
||||
true
|
||||
} else {
|
||||
vector.get_constant_ref() != pv.get_constant_ref()
|
||||
}
|
||||
})
|
||||
.unwrap_or(true);
|
||||
|
||||
if is_first_not_duplicate {
|
||||
selected.set(0, true);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_time::Date;
|
||||
|
||||
use super::*;
|
||||
use crate::timestamp::*;
|
||||
use crate::vectors::{Int32Vector, StringVector, Vector, VectorOp};
|
||||
|
||||
fn check_bitmap(expect: &[bool], selected: &BitVec) {
|
||||
let actual = selected.iter().collect::<Vec<_>>();
|
||||
assert_eq!(expect, actual);
|
||||
}
|
||||
|
||||
fn check_find_unique_scalar(expect: &[bool], input: &[i32], prev: Option<&[i32]>) {
|
||||
check_find_unique_scalar_opt(expect, input.iter().map(|v| Some(*v)), prev);
|
||||
}
|
||||
|
||||
fn check_find_unique_scalar_opt(
|
||||
expect: &[bool],
|
||||
input: impl Iterator<Item = Option<i32>>,
|
||||
prev: Option<&[i32]>,
|
||||
) {
|
||||
let input = Int32Vector::from(input.collect::<Vec<_>>());
|
||||
let prev = prev.map(Int32Vector::from_slice);
|
||||
|
||||
let mut selected = BitVec::repeat(false, input.len());
|
||||
input.find_unique(&mut selected, prev.as_ref().map(|v| v as _));
|
||||
|
||||
check_bitmap(expect, &selected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_unique_scalar() {
|
||||
check_find_unique_scalar(&[], &[], None);
|
||||
check_find_unique_scalar(&[true], &[1], None);
|
||||
check_find_unique_scalar(&[true, false], &[1, 1], None);
|
||||
check_find_unique_scalar(&[true, true], &[1, 2], None);
|
||||
check_find_unique_scalar(&[true, true, true, true], &[1, 2, 3, 4], None);
|
||||
check_find_unique_scalar(&[true, false, true, false], &[1, 1, 3, 3], None);
|
||||
check_find_unique_scalar(&[true, false, false, false, true], &[2, 2, 2, 2, 3], None);
|
||||
|
||||
check_find_unique_scalar(&[true], &[5], Some(&[]));
|
||||
check_find_unique_scalar(&[true], &[5], Some(&[3]));
|
||||
check_find_unique_scalar(&[false], &[5], Some(&[5]));
|
||||
check_find_unique_scalar(&[false], &[5], Some(&[4, 5]));
|
||||
check_find_unique_scalar(&[false, true], &[5, 6], Some(&[4, 5]));
|
||||
check_find_unique_scalar(&[false, true, false], &[5, 6, 6], Some(&[4, 5]));
|
||||
check_find_unique_scalar(
|
||||
&[false, true, false, true, true],
|
||||
&[5, 6, 6, 7, 8],
|
||||
Some(&[4, 5]),
|
||||
);
|
||||
|
||||
check_find_unique_scalar_opt(
|
||||
&[true, true, false, true, false],
|
||||
[Some(1), Some(2), Some(2), None, None].into_iter(),
|
||||
None,
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_unique_scalar_multi_times_with_prev() {
|
||||
let prev = Int32Vector::from_slice([1]);
|
||||
|
||||
let v1 = Int32Vector::from_slice([2, 3, 4]);
|
||||
let mut selected = BitVec::repeat(false, v1.len());
|
||||
v1.find_unique(&mut selected, Some(&prev));
|
||||
|
||||
// Though element in v2 are the same as prev, but we should still keep them.
|
||||
let v2 = Int32Vector::from_slice([1, 1, 1]);
|
||||
v2.find_unique(&mut selected, Some(&prev));
|
||||
|
||||
check_bitmap(&[true, true, true], &selected);
|
||||
}
|
||||
|
||||
fn new_bitmap(bits: &[bool]) -> BitVec {
|
||||
BitVec::from_iter(bits)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_unique_scalar_with_prev() {
|
||||
let prev = Int32Vector::from_slice([1]);
|
||||
|
||||
let mut selected = new_bitmap(&[true, false, true, false]);
|
||||
let v = Int32Vector::from_slice([2, 3, 4, 5]);
|
||||
v.find_unique(&mut selected, Some(&prev));
|
||||
// All elements are different.
|
||||
check_bitmap(&[true, true, true, true], &selected);
|
||||
|
||||
let mut selected = new_bitmap(&[true, false, true, false]);
|
||||
let v = Int32Vector::from_slice([1, 2, 3, 4]);
|
||||
v.find_unique(&mut selected, Some(&prev));
|
||||
// Though first element is duplicate, but we keep the flag unchanged.
|
||||
check_bitmap(&[true, true, true, true], &selected);
|
||||
|
||||
// Same case as above, but now `prev` is None.
|
||||
let mut selected = new_bitmap(&[true, false, true, false]);
|
||||
let v = Int32Vector::from_slice([1, 2, 3, 4]);
|
||||
v.find_unique(&mut selected, None);
|
||||
check_bitmap(&[true, true, true, true], &selected);
|
||||
|
||||
// Same case as above, but now `prev` is empty.
|
||||
let mut selected = new_bitmap(&[true, false, true, false]);
|
||||
let v = Int32Vector::from_slice([1, 2, 3, 4]);
|
||||
v.find_unique(&mut selected, Some(&Int32Vector::from_slice([])));
|
||||
check_bitmap(&[true, true, true, true], &selected);
|
||||
|
||||
let mut selected = new_bitmap(&[false, false, false, false]);
|
||||
let v = Int32Vector::from_slice([2, 2, 4, 5]);
|
||||
v.find_unique(&mut selected, Some(&prev));
|
||||
// only v[1] is duplicate.
|
||||
check_bitmap(&[true, false, true, true], &selected);
|
||||
}
|
||||
|
||||
fn check_find_unique_null(len: usize) {
|
||||
let input = NullVector::new(len);
|
||||
let mut selected = BitVec::repeat(false, input.len());
|
||||
input.find_unique(&mut selected, None);
|
||||
|
||||
let mut expect = vec![false; len];
|
||||
if !expect.is_empty() {
|
||||
expect[0] = true;
|
||||
}
|
||||
check_bitmap(&expect, &selected);
|
||||
|
||||
let mut selected = BitVec::repeat(false, input.len());
|
||||
let prev = Some(NullVector::new(1));
|
||||
input.find_unique(&mut selected, prev.as_ref().map(|v| v as _));
|
||||
let expect = vec![false; len];
|
||||
check_bitmap(&expect, &selected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_unique_null() {
|
||||
for len in 0..5 {
|
||||
check_find_unique_null(len);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_unique_null_with_prev() {
|
||||
let prev = NullVector::new(1);
|
||||
|
||||
// Keep flags unchanged.
|
||||
let mut selected = new_bitmap(&[true, false, true, false]);
|
||||
let v = NullVector::new(4);
|
||||
v.find_unique(&mut selected, Some(&prev));
|
||||
check_bitmap(&[true, false, true, false], &selected);
|
||||
|
||||
// Keep flags unchanged.
|
||||
let mut selected = new_bitmap(&[false, false, true, false]);
|
||||
v.find_unique(&mut selected, Some(&prev));
|
||||
check_bitmap(&[false, false, true, false], &selected);
|
||||
|
||||
// Prev is None, select first element.
|
||||
let mut selected = new_bitmap(&[false, false, true, false]);
|
||||
v.find_unique(&mut selected, None);
|
||||
check_bitmap(&[true, false, true, false], &selected);
|
||||
|
||||
// Prev is empty, select first element.
|
||||
let mut selected = new_bitmap(&[false, false, true, false]);
|
||||
v.find_unique(&mut selected, Some(&NullVector::new(0)));
|
||||
check_bitmap(&[true, false, true, false], &selected);
|
||||
}
|
||||
|
||||
fn check_find_unique_constant(len: usize) {
|
||||
let input = ConstantVector::new(Arc::new(Int32Vector::from_slice([8])), len);
|
||||
let mut selected = BitVec::repeat(false, len);
|
||||
input.find_unique(&mut selected, None);
|
||||
|
||||
let mut expect = vec![false; len];
|
||||
if !expect.is_empty() {
|
||||
expect[0] = true;
|
||||
}
|
||||
check_bitmap(&expect, &selected);
|
||||
|
||||
let mut selected = BitVec::repeat(false, len);
|
||||
let prev = Some(ConstantVector::new(
|
||||
Arc::new(Int32Vector::from_slice([8])),
|
||||
1,
|
||||
));
|
||||
input.find_unique(&mut selected, prev.as_ref().map(|v| v as _));
|
||||
let expect = vec![false; len];
|
||||
check_bitmap(&expect, &selected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_unique_constant() {
|
||||
for len in 0..5 {
|
||||
check_find_unique_constant(len);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_unique_constant_with_prev() {
|
||||
let prev = ConstantVector::new(Arc::new(Int32Vector::from_slice([1])), 1);
|
||||
|
||||
// Keep flags unchanged.
|
||||
let mut selected = new_bitmap(&[true, false, true, false]);
|
||||
let v = ConstantVector::new(Arc::new(Int32Vector::from_slice([1])), 4);
|
||||
v.find_unique(&mut selected, Some(&prev));
|
||||
check_bitmap(&[true, false, true, false], &selected);
|
||||
|
||||
// Keep flags unchanged.
|
||||
let mut selected = new_bitmap(&[false, false, true, false]);
|
||||
v.find_unique(&mut selected, Some(&prev));
|
||||
check_bitmap(&[false, false, true, false], &selected);
|
||||
|
||||
// Prev is None, select first element.
|
||||
let mut selected = new_bitmap(&[false, false, true, false]);
|
||||
v.find_unique(&mut selected, None);
|
||||
check_bitmap(&[true, false, true, false], &selected);
|
||||
|
||||
// Prev is empty, select first element.
|
||||
let mut selected = new_bitmap(&[false, false, true, false]);
|
||||
v.find_unique(
|
||||
&mut selected,
|
||||
Some(&ConstantVector::new(
|
||||
Arc::new(Int32Vector::from_slice([1])),
|
||||
0,
|
||||
)),
|
||||
);
|
||||
check_bitmap(&[true, false, true, false], &selected);
|
||||
|
||||
// Different constant vector.
|
||||
let mut selected = new_bitmap(&[false, false, true, false]);
|
||||
let v = ConstantVector::new(Arc::new(Int32Vector::from_slice([2])), 4);
|
||||
v.find_unique(&mut selected, Some(&prev));
|
||||
check_bitmap(&[true, false, true, false], &selected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_unique_string() {
|
||||
let input = StringVector::from_slice(&["a", "a", "b", "c"]);
|
||||
let mut selected = BitVec::repeat(false, 4);
|
||||
input.find_unique(&mut selected, None);
|
||||
let expect = vec![true, false, true, true];
|
||||
check_bitmap(&expect, &selected);
|
||||
}
|
||||
|
||||
macro_rules! impl_find_unique_date_like_test {
|
||||
($VectorType: ident, $ValueType: ident, $method: ident) => {{
|
||||
use $crate::vectors::$VectorType;
|
||||
|
||||
let v = $VectorType::from_iterator([8, 8, 9, 10].into_iter().map($ValueType::$method));
|
||||
let mut selected = BitVec::repeat(false, 4);
|
||||
v.find_unique(&mut selected, None);
|
||||
let expect = vec![true, false, true, true];
|
||||
check_bitmap(&expect, &selected);
|
||||
}};
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_unique_date_like() {
|
||||
impl_find_unique_date_like_test!(DateVector, Date, new);
|
||||
impl_find_unique_date_like_test!(TimestampSecondVector, TimestampSecond, from);
|
||||
impl_find_unique_date_like_test!(TimestampMillisecondVector, TimestampMillisecond, from);
|
||||
impl_find_unique_date_like_test!(TimestampMicrosecondVector, TimestampMicrosecond, from);
|
||||
impl_find_unique_date_like_test!(TimestampNanosecondVector, TimestampNanosecond, from);
|
||||
}
|
||||
}
|
||||
@@ -26,8 +26,8 @@ use object_store::ObjectStore;
|
||||
use snafu::{ensure, OptionExt};
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::region_engine::{
|
||||
RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetRegionRoleStateResponse,
|
||||
SettableRegionRoleState, SinglePartitionScanner,
|
||||
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
|
||||
SetRegionRoleStateResponse, SettableRegionRoleState, SinglePartitionScanner,
|
||||
};
|
||||
use store_api::region_request::{
|
||||
AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest,
|
||||
@@ -138,6 +138,15 @@ impl RegionEngine for FileRegionEngine {
|
||||
}
|
||||
}
|
||||
|
||||
async fn sync_region(
|
||||
&self,
|
||||
_region_id: RegionId,
|
||||
_manifest_info: RegionManifestInfo,
|
||||
) -> Result<(), BoxedError> {
|
||||
// File engine doesn't need to sync region manifest.
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn role(&self, region_id: RegionId) -> Option<RegionRole> {
|
||||
self.inner.state(region_id)
|
||||
}
|
||||
|
||||
@@ -37,7 +37,6 @@ use serde::{Deserialize, Serialize};
|
||||
use servers::grpc::GrpcOptions;
|
||||
use servers::heartbeat_options::HeartbeatOptions;
|
||||
use servers::http::HttpOptions;
|
||||
use servers::Mode;
|
||||
use session::context::QueryContext;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::storage::{ConcreteDataType, RegionId};
|
||||
@@ -63,7 +62,7 @@ pub(crate) mod refill;
|
||||
mod stat;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
mod util;
|
||||
pub(crate) mod util;
|
||||
mod worker;
|
||||
|
||||
pub(crate) mod node_context;
|
||||
@@ -102,7 +101,6 @@ impl Default for FlowConfig {
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct FlownodeOptions {
|
||||
pub mode: Mode,
|
||||
pub node_id: Option<u64>,
|
||||
pub flow: FlowConfig,
|
||||
pub grpc: GrpcOptions,
|
||||
@@ -116,7 +114,6 @@ pub struct FlownodeOptions {
|
||||
impl Default for FlownodeOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
mode: servers::Mode::Standalone,
|
||||
node_id: None,
|
||||
flow: FlowConfig::default(),
|
||||
grpc: GrpcOptions::default().with_bind_addr("127.0.0.1:3004"),
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Util functions for adapter
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::helper::ColumnDataTypeWrapper;
|
||||
@@ -20,7 +22,7 @@ use api::v1::{ColumnDataType, ColumnDataTypeExtension, CreateTableExpr, Semantic
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::key::table_info::TableInfoValue;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema};
|
||||
use itertools::Itertools;
|
||||
use operator::expr_helper;
|
||||
use session::context::QueryContextBuilder;
|
||||
@@ -174,7 +176,15 @@ pub fn table_info_value_to_relation_desc(
|
||||
let default_values = raw_schema
|
||||
.column_schemas
|
||||
.iter()
|
||||
.map(|c| c.default_constraint().cloned())
|
||||
.map(|c| {
|
||||
c.default_constraint().cloned().or_else(|| {
|
||||
if c.is_nullable() {
|
||||
Some(ColumnDefaultConstraint::null_value())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect_vec();
|
||||
|
||||
Ok(TableDesc::new(relation_desc, default_values))
|
||||
|
||||
@@ -179,7 +179,7 @@ impl Context<'_, '_> {
|
||||
) -> CollectionBundle<Batch> {
|
||||
let (send_port, recv_port) = self.df.make_edge::<_, Toff<Batch>>("constant_batch");
|
||||
let mut per_time: BTreeMap<repr::Timestamp, Vec<DiffRow>> = Default::default();
|
||||
for (key, group) in &rows.into_iter().group_by(|(_row, ts, _diff)| *ts) {
|
||||
for (key, group) in &rows.into_iter().chunk_by(|(_row, ts, _diff)| *ts) {
|
||||
per_time.entry(key).or_default().extend(group);
|
||||
}
|
||||
|
||||
@@ -233,7 +233,7 @@ impl Context<'_, '_> {
|
||||
pub fn render_constant(&mut self, rows: Vec<DiffRow>) -> CollectionBundle {
|
||||
let (send_port, recv_port) = self.df.make_edge::<_, Toff>("constant");
|
||||
let mut per_time: BTreeMap<repr::Timestamp, Vec<DiffRow>> = Default::default();
|
||||
for (key, group) in &rows.into_iter().group_by(|(_row, ts, _diff)| *ts) {
|
||||
for (key, group) in &rows.into_iter().chunk_by(|(_row, ts, _diff)| *ts) {
|
||||
per_time.entry(key).or_default().extend(group);
|
||||
}
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
use std::any::Any;
|
||||
|
||||
use arrow_schema::ArrowError;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_error::{define_into_tonic_status, from_err_code_msg_to_header};
|
||||
use common_macro::stack_trace_debug;
|
||||
@@ -156,6 +157,15 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Arrow error: {raw:?} in context: {context}"))]
|
||||
Arrow {
|
||||
#[snafu(source)]
|
||||
raw: ArrowError,
|
||||
context: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Datafusion error: {raw:?} in context: {context}"))]
|
||||
Datafusion {
|
||||
#[snafu(source)]
|
||||
@@ -238,7 +248,9 @@ impl ErrorExt for Error {
|
||||
| Self::FlowNotFound { .. }
|
||||
| Self::ListFlows { .. } => StatusCode::TableNotFound,
|
||||
Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery,
|
||||
Self::InvalidQuery { .. } | Self::CreateFlow { .. } => StatusCode::EngineExecuteQuery,
|
||||
Self::InvalidQuery { .. } | Self::CreateFlow { .. } | Self::Arrow { .. } => {
|
||||
StatusCode::EngineExecuteQuery
|
||||
}
|
||||
Self::Unexpected { .. } => StatusCode::Unexpected,
|
||||
Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => {
|
||||
StatusCode::Unsupported
|
||||
|
||||
@@ -151,12 +151,12 @@ impl ScalarExpr {
|
||||
|
||||
/// apply optimization to the expression, like flatten variadic function
|
||||
pub fn optimize(&mut self) {
|
||||
self.flatten_varidic_fn();
|
||||
self.flatten_variadic_fn();
|
||||
}
|
||||
|
||||
/// Because Substrait's `And`/`Or` function is binary, but FlowPlan's
|
||||
/// `And`/`Or` function is variadic, we need to flatten the `And` function if multiple `And`/`Or` functions are nested.
|
||||
fn flatten_varidic_fn(&mut self) {
|
||||
fn flatten_variadic_fn(&mut self) {
|
||||
if let ScalarExpr::CallVariadic { func, exprs } = self {
|
||||
let mut new_exprs = vec![];
|
||||
for expr in std::mem::take(exprs) {
|
||||
@@ -167,7 +167,7 @@ impl ScalarExpr {
|
||||
{
|
||||
if *func == inner_func {
|
||||
for inner_expr in inner_exprs.iter_mut() {
|
||||
inner_expr.flatten_varidic_fn();
|
||||
inner_expr.flatten_variadic_fn();
|
||||
}
|
||||
new_exprs.extend(inner_exprs);
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@ mod expr;
|
||||
pub mod heartbeat;
|
||||
mod metrics;
|
||||
mod plan;
|
||||
mod recording_rules;
|
||||
mod repr;
|
||||
mod server;
|
||||
mod transform;
|
||||
|
||||
25
src/flow/src/recording_rules.rs
Normal file
25
src/flow/src/recording_rules.rs
Normal file
@@ -0,0 +1,25 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Run flow as recording rule which is time-window-aware normal query triggered when new data arrives
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
mod frontend_client;
|
||||
mod time_window;
|
||||
mod utils;
|
||||
|
||||
/// TODO(discord9): make those constants configurable
|
||||
/// The default rule engine query timeout is 10 minutes
|
||||
pub const DEFAULT_RULE_ENGINE_QUERY_TIMEOUT: Duration = Duration::from_secs(10 * 60);
|
||||
148
src/flow/src/recording_rules/frontend_client.rs
Normal file
148
src/flow/src/recording_rules/frontend_client.rs
Normal file
@@ -0,0 +1,148 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Frontend client to run flow as recording rule which is time-window-aware normal query triggered every tick set by user
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
|
||||
use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::rpc::store::RangeRequest;
|
||||
use meta_client::client::MetaClient;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{ExternalSnafu, UnexpectedSnafu};
|
||||
use crate::recording_rules::DEFAULT_RULE_ENGINE_QUERY_TIMEOUT;
|
||||
use crate::Error;
|
||||
|
||||
fn default_channel_mgr() -> ChannelManager {
|
||||
let cfg = ChannelConfig::new().timeout(DEFAULT_RULE_ENGINE_QUERY_TIMEOUT);
|
||||
ChannelManager::with_config(cfg)
|
||||
}
|
||||
|
||||
fn client_from_urls(addrs: Vec<String>) -> Client {
|
||||
Client::with_manager_and_urls(default_channel_mgr(), addrs)
|
||||
}
|
||||
|
||||
/// A simple frontend client able to execute sql using grpc protocol
|
||||
#[derive(Debug)]
|
||||
pub enum FrontendClient {
|
||||
Distributed {
|
||||
meta_client: Arc<MetaClient>,
|
||||
},
|
||||
Standalone {
|
||||
/// for the sake of simplicity still use grpc even in standalone mode
|
||||
/// notice the client here should all be lazy, so that can wait after frontend is booted then make conn
|
||||
/// TODO(discord9): not use grpc under standalone mode
|
||||
database_client: DatabaseWithPeer,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DatabaseWithPeer {
|
||||
pub database: Database,
|
||||
pub peer: Peer,
|
||||
}
|
||||
|
||||
impl DatabaseWithPeer {
|
||||
fn new(database: Database, peer: Peer) -> Self {
|
||||
Self { database, peer }
|
||||
}
|
||||
}
|
||||
|
||||
impl FrontendClient {
|
||||
pub fn from_meta_client(meta_client: Arc<MetaClient>) -> Self {
|
||||
Self::Distributed { meta_client }
|
||||
}
|
||||
|
||||
pub fn from_static_grpc_addr(addr: String) -> Self {
|
||||
let peer = Peer {
|
||||
id: 0,
|
||||
addr: addr.clone(),
|
||||
};
|
||||
|
||||
let client = client_from_urls(vec![addr]);
|
||||
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
|
||||
Self::Standalone {
|
||||
database_client: DatabaseWithPeer::new(database, peer),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FrontendClient {
|
||||
async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
|
||||
let Self::Distributed { meta_client, .. } = self else {
|
||||
return Ok(vec![]);
|
||||
};
|
||||
let cluster_client = meta_client
|
||||
.cluster_client()
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
|
||||
let prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
|
||||
let req = RangeRequest::new().with_prefix(prefix);
|
||||
let resp = cluster_client
|
||||
.range(req)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let mut res = Vec::with_capacity(resp.kvs.len());
|
||||
for kv in resp.kvs {
|
||||
let key = NodeInfoKey::try_from(kv.key)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
|
||||
let val = NodeInfo::try_from(kv.value)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
res.push((key, val));
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
/// Get the database with max `last_activity_ts`
|
||||
async fn get_last_active_frontend(&self) -> Result<DatabaseWithPeer, Error> {
|
||||
if let Self::Standalone { database_client } = self {
|
||||
return Ok(database_client.clone());
|
||||
}
|
||||
|
||||
let frontends = self.scan_for_frontend().await?;
|
||||
let mut peer = None;
|
||||
|
||||
if let Some((_, val)) = frontends.iter().max_by_key(|(_, val)| val.last_activity_ts) {
|
||||
peer = Some(val.peer.clone());
|
||||
}
|
||||
|
||||
let Some(peer) = peer else {
|
||||
UnexpectedSnafu {
|
||||
reason: format!("No frontend available: {:?}", frontends),
|
||||
}
|
||||
.fail()?
|
||||
};
|
||||
let client = client_from_urls(vec![peer.addr.clone()]);
|
||||
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
|
||||
Ok(DatabaseWithPeer::new(database, peer))
|
||||
}
|
||||
|
||||
/// Get a database client, and possibly update it before returning.
|
||||
pub async fn get_database_client(&self) -> Result<DatabaseWithPeer, Error> {
|
||||
match self {
|
||||
Self::Standalone { database_client } => Ok(database_client.clone()),
|
||||
Self::Distributed { meta_client: _ } => self.get_last_active_frontend().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user