Compare commits

...

18 Commits

Author SHA1 Message Date
discord9
13582c9efb bytes trace
Signed-off-by: discord9 <discord9@163.com>
2025-11-04 11:19:07 +08:00
liyang
5d0ef376de fix: initializer container not work (#7152)
* fix: initializer not work

Signed-off-by: liyang <daviderli614@gmail.com>

* use a one version of operator

Signed-off-by: liyang <daviderli614@gmail.com>

---------

Signed-off-by: liyang <daviderli614@gmail.com>
2025-10-29 18:11:55 +00:00
shuiyisong
11c0381fc1 chore: set default catalog using build env (#7156)
* chore: update reference to const

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: use option_env to set default catalog

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: use const_format

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update reference in cli

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: introduce a build.rs to set default catalog

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: remove unused feature gate

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
2025-10-29 18:10:58 +00:00
LFC
e8b7b0ad16 fix: memtable value push result was ignored (#7136)
* fix: memtable value push result was ignored

Signed-off-by: luofucong <luofc@foxmail.com>

* chore: apply suggestion

Co-authored-by: Yingwen <realevenyag@gmail.com>

---------

Signed-off-by: luofucong <luofc@foxmail.com>
Co-authored-by: dennis zhuang <killme2008@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
2025-10-29 13:44:36 +00:00
Weny Xu
6efffa427d fix: missing flamegraph feature in pprof dependency (#7158)
fix: fix pprof deps

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-10-29 11:41:21 +00:00
Ruihang Xia
6576e3555d fix: cache estimate methods (#7157)
* fix: cache estimate methods

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* revert page value change

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Apply suggestion from @evenyag

Co-authored-by: Yingwen <realevenyag@gmail.com>

* update test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
2025-10-29 09:57:28 +00:00
Lei, HUANG
f0afd675e3 feat: objbench sub command for datanode (#7114)
* feat/objbench-subcmd:
 ### Add Object Storage Benchmark Tool and Update Dependencies

 - **`Cargo.lock` & `Cargo.toml`**: Added dependencies for `colored`, `parquet`, and `pprof` to support new features.
 - **`datanode.rs`**: Introduced `ObjbenchCommand` for benchmarking object storage, including command-line options for configuration and execution. Added `StorageConfig` and `StorageConfigWrapper` for storage engine configuration.
 - **`datanode.rs`**: Implemented a stub for `build_object_store` function to initialize object storage.

 These changes introduce a new subcommand for object storage benchmarking and update dependencies to support additional functionality.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* init

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: code style and clippy

* feat/objbench-subcmd:
 Improve error handling in `objbench.rs`

 - Enhanced error handling in `parse_config` and `parse_file_dir_components` functions by replacing `unwrap` with `OptionExt` and `context` for better error messages.
 - Updated `build_access_layer_simple` and `build_cache_manager` functions to use `map_err` for more descriptive error handling.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* chore: rebase main

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-10-29 05:26:29 +00:00
discord9
37bc2e6b07 feat: gc worker heartbeat instruction (#7118)
again



false by default



test: config api



refactor: per code review



less info!



even less info!!



docs: gc regions instr



refactor: grp by region id



per code review



per review



error handling?



test: fix



todos



aft rebase fix



after refactor

Signed-off-by: discord9 <discord9@163.com>
2025-10-29 02:59:36 +00:00
Ning Sun
a9d1d33138 feat: update datafusion-pg-catalog for better dbeaver support (#7143)
* chore: update datafusion-pg-catalog to 0.12.1

* feat: import more udfs
2025-10-28 18:42:03 +00:00
discord9
22d9eb6930 feat: part sort provide dyn filter (#7140)
* feat: part sort provide dyn filter

Signed-off-by: discord9 <discord9@163.com>

* fix: reset_state reset dynamic filter

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2025-10-28 02:44:29 +00:00
shuiyisong
da976e534d refactor: add test feature gate to numbers table (#7148)
* refactor: add test feature gate to numbers table

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: add debug_assertions

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* refactor: extract numbers table provider

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: address CR issues

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
2025-10-27 10:16:00 +00:00
discord9
f2bc92b9e6 refactor: use generic for heartbeat instruction handler (#7149)
* refactor: use generic

Signed-off-by: discord9 <discord9@163.com>

* w

Signed-off-by: discord9 <discord9@163.com>

* per review

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2025-10-27 09:09:48 +00:00
Weny Xu
785f9d7fd7 fix: add delays in reconcile tests for async cache invalidation (#7147)
Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-10-27 08:07:51 +00:00
shuiyisong
a20ac4f9e5 feat: prefix option for timestamp index and value column (#7125)
* refactor: use GREPTIME_TIMESTAMP const

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* feat: add config for default ts col name

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* refactor: replace GREPTIME_TIMESTAMP with function get

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update config doc

* fix: test

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: remove opts on flownode and metasrv

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: add validation for ts column name

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: use get_or_init to avoid test error

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: fmt

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update docs

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: using empty string to disable prefix

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update comment

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: address CR issues

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
2025-10-27 08:00:03 +00:00
zyy17
0a3961927d refactor!: add a opentelemetry_traces_operations table to aggregate (service_name, span_name, span_kind) to improve query performance (#7144)
refactor: add a `*_operations` table to aggregate `(service_name, span_name, span_kind)` to improve query performance

Signed-off-by: zyy17 <zyylsxm@gmail.com>
2025-10-27 03:36:22 +00:00
LFC
d7ed6a69ab feat: merge json datatype (#7142)
* feat: merge json datatype

Signed-off-by: luofucong <luofc@foxmail.com>

* resolve PR comments

Signed-off-by: luofucong <luofc@foxmail.com>

---------

Signed-off-by: luofucong <luofc@foxmail.com>
2025-10-27 03:30:52 +00:00
discord9
68247fc9b1 fix: count_state use stat to eval&predicate w/out region (#7116)
* fix: count_state use stat to eval

Signed-off-by: discord9 <discord9@163.com>

* cleanup

Signed-off-by: discord9 <discord9@163.com>

* fix: use predicate without region

Signed-off-by: discord9 <discord9@163.com>

* test: diverge standalone/dist impl

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2025-10-27 02:14:45 +00:00
Lei, HUANG
e386a366d0 feat: add HTTP endpoint to control prof.gdump feature (#6999)
* feat/gdump:
 ### Add Support for Jemalloc Gdump Flag

 - **`jemalloc.rs`**: Introduced `PROF_GDUMP` constant and added functions `set_gdump_active` and `is_gdump_active` to manage the gdump flag.
 - **`error.rs`**: Added error handling for reading and updating the jemalloc gdump flag with `ReadGdump` and `UpdateGdump` errors.
 - **`lib.rs`**: Exposed `is_gdump_active` and `set_gdump_active` functions for non-Windows platforms.
 - **`http.rs`**: Added HTTP routes for checking and toggling the jemalloc gdump flag status.
 - **`mem_prof.rs`**: Implemented handlers `gdump_toggle_handler` and `gdump_status_handler` for managing gdump flag via HTTP requests.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* Update docs/how-to/how-to-profile-memory.md

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>

* fix: typo in docs

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>
2025-10-27 01:41:19 +00:00
142 changed files with 4227 additions and 827 deletions

View File

@@ -7,6 +7,8 @@ KUBERNETES_VERSION="${KUBERNETES_VERSION:-v1.32.0}"
ENABLE_STANDALONE_MODE="${ENABLE_STANDALONE_MODE:-true}"
DEFAULT_INSTALL_NAMESPACE=${DEFAULT_INSTALL_NAMESPACE:-default}
GREPTIMEDB_IMAGE_TAG=${GREPTIMEDB_IMAGE_TAG:-latest}
GREPTIMEDB_OPERATOR_IMAGE_TAG=${GREPTIMEDB_OPERATOR_IMAGE_TAG:-v0.5.1}
GREPTIMEDB_INITIALIZER_IMAGE_TAG="${GREPTIMEDB_OPERATOR_IMAGE_TAG}"
GREPTIME_CHART="https://greptimeteam.github.io/helm-charts/"
ETCD_CHART="oci://registry-1.docker.io/bitnamicharts/etcd"
ETCD_CHART_VERSION="${ETCD_CHART_VERSION:-12.0.8}"
@@ -58,7 +60,7 @@ function deploy_greptimedb_operator() {
# Use the latest chart and image.
helm upgrade --install greptimedb-operator greptime/greptimedb-operator \
--create-namespace \
--set image.tag=latest \
--set image.tag="$GREPTIMEDB_OPERATOR_IMAGE_TAG" \
-n "$DEFAULT_INSTALL_NAMESPACE"
# Wait for greptimedb-operator to be ready.
@@ -78,6 +80,7 @@ function deploy_greptimedb_cluster() {
helm upgrade --install "$cluster_name" greptime/greptimedb-cluster \
--create-namespace \
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
--set initializer.tag="$GREPTIMEDB_INITIALIZER_IMAGE_TAG" \
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
--set meta.backendStorage.etcd.storeKeyPrefix="$cluster_name" \
-n "$install_namespace"
@@ -115,6 +118,7 @@ function deploy_greptimedb_cluster_with_s3_storage() {
helm upgrade --install "$cluster_name" greptime/greptimedb-cluster -n "$install_namespace" \
--create-namespace \
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
--set initializer.tag="$GREPTIMEDB_INITIALIZER_IMAGE_TAG" \
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
--set meta.backendStorage.etcd.storeKeyPrefix="$cluster_name" \
--set objectStorage.s3.bucket="$AWS_CI_TEST_BUCKET" \

110
Cargo.lock generated
View File

@@ -1336,9 +1336,13 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a"
source = "git+https://github.com/discord9/bytes?rev=1572ab22c3cbad0e9b6681d1f68eca4139322a2a#1572ab22c3cbad0e9b6681d1f68eca4139322a2a"
dependencies = [
"backtrace",
"crossbeam-channel",
"inferno 0.12.2",
"papaya",
"quanta",
"serde",
]
@@ -1896,6 +1900,7 @@ dependencies = [
"clap 4.5.40",
"cli",
"client",
"colored",
"common-base",
"common-catalog",
"common-config",
@@ -1917,6 +1922,7 @@ dependencies = [
"common-wal",
"datanode",
"datatypes",
"either",
"etcd-client",
"file-engine",
"flow",
@@ -1932,7 +1938,9 @@ dependencies = [
"moka",
"nu-ansi-term",
"object-store",
"parquet",
"plugins",
"pprof",
"prometheus",
"prost 0.13.5",
"query",
@@ -1975,6 +1983,16 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
[[package]]
name = "colored"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c"
dependencies = [
"lazy_static",
"windows-sys 0.59.0",
]
[[package]]
name = "comfy-table"
version = "7.1.2"
@@ -2004,9 +2022,11 @@ dependencies = [
"common-macro",
"common-test-util",
"futures",
"lazy_static",
"paste",
"pin-project",
"rand 0.9.1",
"regex",
"serde",
"snafu 0.8.6",
"tokio",
@@ -2017,6 +2037,9 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "0.18.0"
dependencies = [
"const_format",
]
[[package]]
name = "common-config"
@@ -2454,6 +2477,7 @@ dependencies = [
"datafusion-expr",
"datatypes",
"futures-util",
"once_cell",
"serde",
"snafu 0.8.6",
"sqlparser",
@@ -3714,9 +3738,9 @@ dependencies = [
[[package]]
name = "datafusion-pg-catalog"
version = "0.11.0"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f258caedd1593e7dca3bf53912249de6685fa224bcce897ede1fbb7b040ac6f6"
checksum = "15824c98ff2009c23b0398d441499b147f7c5ac0e5ee993e7a473d79040e3626"
dependencies = [
"async-trait",
"datafusion",
@@ -6304,17 +6328,6 @@ dependencies = [
"derive_utils",
]
[[package]]
name = "io-uring"
version = "0.7.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d93587f37623a1a17d94ef2bc9ada592f5465fe7732084ab7beefabe5c77c0c4"
dependencies = [
"bitflags 2.9.1",
"cfg-if",
"libc",
]
[[package]]
name = "ipnet"
version = "2.11.0"
@@ -7579,6 +7592,7 @@ dependencies = [
"common-decimal",
"common-error",
"common-macro",
"common-query",
"common-recordbatch",
"common-telemetry",
"common-time",
@@ -8852,6 +8866,16 @@ dependencies = [
"unicode-width 0.1.14",
]
[[package]]
name = "papaya"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f92dd0b07c53a0a0c764db2ace8c541dc47320dad97c2200c2a637ab9dd2328f"
dependencies = [
"equivalent",
"seize",
]
[[package]]
name = "parking"
version = "2.2.1"
@@ -10085,6 +10109,21 @@ dependencies = [
"variadics",
]
[[package]]
name = "quanta"
version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7"
dependencies = [
"crossbeam-utils",
"libc",
"once_cell",
"raw-cpuid",
"wasi 0.11.1+wasi-snapshot-preview1",
"web-sys",
"winapi",
]
[[package]]
name = "query"
version = "0.18.0"
@@ -10384,6 +10423,15 @@ dependencies = [
"thiserror 1.0.69",
]
[[package]]
name = "raw-cpuid"
version = "11.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186"
dependencies = [
"bitflags 2.9.1",
]
[[package]]
name = "rawpointer"
version = "0.2.1"
@@ -11324,6 +11372,16 @@ dependencies = [
"libc",
]
[[package]]
name = "seize"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b55fb86dfd3a2f5f76ea78310a88f96c4ea21a3031f8d212443d56123fd0521"
dependencies = [
"libc",
"windows-sys 0.52.0",
]
[[package]]
name = "semver"
version = "1.0.26"
@@ -13252,23 +13310,20 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.47.1"
version = "1.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89e49afdadebb872d3145a5638b59eb0691ea23e46ca484037cfab3b76b95038"
checksum = "ff360e02eab121e0bc37a2d3b4d4dc622e6eda3a8e5253d5435ecf5bd4c68408"
dependencies = [
"backtrace",
"bytes",
"io-uring",
"libc",
"mio",
"parking_lot 0.12.4",
"pin-project-lite",
"signal-hook-registry",
"slab",
"socket2 0.6.0",
"tokio-macros",
"tracing",
"windows-sys 0.59.0",
"windows-sys 0.61.2",
]
[[package]]
@@ -13283,9 +13338,9 @@ dependencies = [
[[package]]
name = "tokio-macros"
version = "2.5.0"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8"
checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5"
dependencies = [
"proc-macro2",
"quote",
@@ -14707,6 +14762,15 @@ dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "windows-sys"
version = "0.61.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc"
dependencies = [
"windows-link 0.2.1",
]
[[package]]
name = "windows-targets"
version = "0.48.5"

View File

@@ -121,6 +121,7 @@ chrono = { version = "0.4", features = ["serde"] }
chrono-tz = "0.10.1"
clap = { version = "4.4", features = ["derive"] }
config = "0.13.0"
const_format = "0.2"
crossbeam-utils = "0.8"
dashmap = "6.1"
datafusion = "50"
@@ -130,7 +131,7 @@ datafusion-functions = "50"
datafusion-functions-aggregate-common = "50"
datafusion-optimizer = "50"
datafusion-orc = "0.5"
datafusion-pg-catalog = "0.11"
datafusion-pg-catalog = "0.12.1"
datafusion-physical-expr = "50"
datafusion-physical-plan = "50"
datafusion-sql = "50"
@@ -332,6 +333,7 @@ datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git"
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "4b519a5caa95472cc3988f5556813a583dd35af1" } # branch = "v0.58.x"
bytes = { git = "https://github.com/discord9/bytes", rev = "1572ab22c3cbad0e9b6681d1f68eca4139322a2a" }
[profile.release]
debug = 1

View File

@@ -13,6 +13,7 @@
| Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- |
| `default_timezone` | String | Unset | The default timezone of the server. |
| `default_column_prefix` | String | Unset | The default column prefix for auto-created time index and value columns. |
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. |
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. |
@@ -226,6 +227,7 @@
| Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- |
| `default_timezone` | String | Unset | The default timezone of the server. |
| `default_column_prefix` | String | Unset | The default column prefix for auto-created time index and value columns. |
| `max_in_flight_write_bytes` | String | Unset | The maximum in-flight write bytes. |
| `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
@@ -440,6 +442,7 @@
| Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- |
| `node_id` | Integer | Unset | The datanode identifier and should be unique in the cluster. |
| `default_column_prefix` | String | Unset | The default column prefix for auto-created time index and value columns. |
| `require_lease_before_startup` | Bool | `false` | Start services after regions have obtained leases.<br/>It will block the datanode start if it can't receive leases in the heartbeat from metasrv. |
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. |
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |

View File

@@ -2,6 +2,10 @@
## @toml2docs:none-default
node_id = 42
## The default column prefix for auto-created time index and value columns.
## @toml2docs:none-default
default_column_prefix = "greptime"
## Start services after regions have obtained leases.
## It will block the datanode start if it can't receive leases in the heartbeat from metasrv.
require_lease_before_startup = false

View File

@@ -2,6 +2,10 @@
## @toml2docs:none-default
default_timezone = "UTC"
## The default column prefix for auto-created time index and value columns.
## @toml2docs:none-default
default_column_prefix = "greptime"
## The maximum in-flight write bytes.
## @toml2docs:none-default
#+ max_in_flight_write_bytes = "500MB"

View File

@@ -2,6 +2,10 @@
## @toml2docs:none-default
default_timezone = "UTC"
## The default column prefix for auto-created time index and value columns.
## @toml2docs:none-default
default_column_prefix = "greptime"
## Initialize all regions in the background during the startup.
## By default, it provides services after all regions have been initialized.
init_regions_in_background = false

View File

@@ -71,6 +71,15 @@ curl -X POST localhost:4000/debug/prof/mem/activate
# Deactivate heap profiling
curl -X POST localhost:4000/debug/prof/mem/deactivate
# Activate gdump feature that dumps memory profiling data every time virtual memory usage exceeds previous maximum value.
curl -X POST localhost:4000/debug/prof/mem/gdump -d 'activate=true'
# Deactivate gdump.
curl -X POST localhost:4000/debug/prof/mem/gdump -d 'activate=false'
# Retrieve current gdump status.
curl -X GET localhost:4000/debug/prof/mem/gdump
```
### Dump memory profiling data
@@ -83,6 +92,9 @@ curl -X POST localhost:4000/debug/prof/mem > greptime.hprof
curl -X POST "localhost:4000/debug/prof/mem?output=flamegraph" > greptime.svg
# or output pprof format
curl -X POST "localhost:4000/debug/prof/mem?output=proto" > greptime.pprof
curl -X POST "localhost:4000/debug/prof/bytes" > greptime.svg
```
You can periodically dump profiling data and compare them to find the delta memory usage.

View File

@@ -29,6 +29,7 @@ use crate::information_schema::{InformationExtensionRef, InformationSchemaProvid
use crate::kvbackend::KvBackendCatalogManager;
use crate::kvbackend::manager::{CATALOG_CACHE_MAX_CAPACITY, SystemCatalog};
use crate::process_manager::ProcessManagerRef;
use crate::system_schema::numbers_table_provider::NumbersTableProvider;
use crate::system_schema::pg_catalog::PGCatalogProvider;
pub struct KvBackendCatalogManagerBuilder {
@@ -119,6 +120,7 @@ impl KvBackendCatalogManagerBuilder {
DEFAULT_CATALOG_NAME.to_string(),
me.clone(),
)),
numbers_table_provider: NumbersTableProvider,
backend,
process_manager,
#[cfg(feature = "enterprise")]

View File

@@ -18,8 +18,7 @@ use std::sync::{Arc, Weak};
use async_stream::try_stream;
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID,
PG_CATALOG_NAME,
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME,
};
use common_error::ext::BoxedError;
use common_meta::cache::{
@@ -45,7 +44,6 @@ use table::TableRef;
use table::dist_table::DistTable;
use table::metadata::{TableId, TableInfoRef};
use table::table::PartitionRules;
use table::table::numbers::{NUMBERS_TABLE_NAME, NumbersTable};
use table::table_name::TableName;
use tokio::sync::Semaphore;
use tokio_stream::wrappers::ReceiverStream;
@@ -61,6 +59,7 @@ use crate::information_schema::{InformationExtensionRef, InformationSchemaProvid
use crate::kvbackend::TableCacheRef;
use crate::process_manager::ProcessManagerRef;
use crate::system_schema::SystemSchemaProvider;
use crate::system_schema::numbers_table_provider::NumbersTableProvider;
use crate::system_schema::pg_catalog::PGCatalogProvider;
/// Access all existing catalog, schema and tables.
@@ -555,6 +554,7 @@ pub(super) struct SystemCatalog {
// system_schema_provider for default catalog
pub(super) information_schema_provider: Arc<InformationSchemaProvider>,
pub(super) pg_catalog_provider: Arc<PGCatalogProvider>,
pub(super) numbers_table_provider: NumbersTableProvider,
pub(super) backend: KvBackendRef,
pub(super) process_manager: Option<ProcessManagerRef>,
#[cfg(feature = "enterprise")]
@@ -584,9 +584,7 @@ impl SystemCatalog {
PG_CATALOG_NAME if channel == Channel::Postgres => {
self.pg_catalog_provider.table_names()
}
DEFAULT_SCHEMA_NAME => {
vec![NUMBERS_TABLE_NAME.to_string()]
}
DEFAULT_SCHEMA_NAME => self.numbers_table_provider.table_names(),
_ => vec![],
}
}
@@ -604,7 +602,7 @@ impl SystemCatalog {
if schema == INFORMATION_SCHEMA_NAME {
self.information_schema_provider.table(table).is_some()
} else if schema == DEFAULT_SCHEMA_NAME {
table == NUMBERS_TABLE_NAME
self.numbers_table_provider.table_exists(table)
} else if schema == PG_CATALOG_NAME && channel == Channel::Postgres {
self.pg_catalog_provider.table(table).is_some()
} else {
@@ -649,8 +647,8 @@ impl SystemCatalog {
});
pg_catalog_provider.table(table_name)
}
} else if schema == DEFAULT_SCHEMA_NAME && table_name == NUMBERS_TABLE_NAME {
Some(NumbersTable::table(NUMBERS_TABLE_ID))
} else if schema == DEFAULT_SCHEMA_NAME {
self.numbers_table_provider.table(table_name)
} else {
None
}

View File

@@ -14,6 +14,7 @@
pub mod information_schema;
mod memory_table;
pub mod numbers_table_provider;
pub mod pg_catalog;
pub mod predicate;
mod utils;

View File

@@ -0,0 +1,59 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(any(test, feature = "testing", debug_assertions))]
use common_catalog::consts::NUMBERS_TABLE_ID;
use table::TableRef;
#[cfg(any(test, feature = "testing", debug_assertions))]
use table::table::numbers::NUMBERS_TABLE_NAME;
#[cfg(any(test, feature = "testing", debug_assertions))]
use table::table::numbers::NumbersTable;
// NumbersTableProvider is a dedicated provider for feature-gating the numbers table.
#[derive(Clone)]
pub struct NumbersTableProvider;
#[cfg(any(test, feature = "testing", debug_assertions))]
impl NumbersTableProvider {
pub(crate) fn table_exists(&self, name: &str) -> bool {
name == NUMBERS_TABLE_NAME
}
pub(crate) fn table_names(&self) -> Vec<String> {
vec![NUMBERS_TABLE_NAME.to_string()]
}
pub(crate) fn table(&self, name: &str) -> Option<TableRef> {
if name == NUMBERS_TABLE_NAME {
Some(NumbersTable::table(NUMBERS_TABLE_ID))
} else {
None
}
}
}
#[cfg(not(any(test, feature = "testing", debug_assertions)))]
impl NumbersTableProvider {
pub(crate) fn table_exists(&self, _name: &str) -> bool {
false
}
pub(crate) fn table_names(&self) -> Vec<String> {
vec![]
}
pub(crate) fn table(&self, _name: &str) -> Option<TableRef> {
None
}
}

View File

@@ -16,6 +16,7 @@ mod export;
mod import;
use clap::Subcommand;
use client::DEFAULT_CATALOG_NAME;
use common_error::ext::BoxedError;
use crate::Tool;
@@ -37,3 +38,7 @@ impl DataCommand {
}
}
}
pub(crate) fn default_database() -> String {
format!("{DEFAULT_CATALOG_NAME}-*")
}

View File

@@ -30,6 +30,7 @@ use snafu::{OptionExt, ResultExt};
use tokio::sync::Semaphore;
use tokio::time::Instant;
use crate::data::default_database;
use crate::database::{DatabaseClient, parse_proxy_opts};
use crate::error::{
EmptyResultSnafu, Error, OpenDalSnafu, OutputDirNotSetSnafu, Result, S3ConfigNotSetSnafu,
@@ -63,7 +64,7 @@ pub struct ExportCommand {
output_dir: Option<String>,
/// The name of the catalog to export.
#[clap(long, default_value = "greptime-*")]
#[clap(long, default_value_t = default_database())]
database: String,
/// Parallelism of the export.

View File

@@ -25,6 +25,7 @@ use snafu::{OptionExt, ResultExt};
use tokio::sync::Semaphore;
use tokio::time::Instant;
use crate::data::default_database;
use crate::database::{DatabaseClient, parse_proxy_opts};
use crate::error::{Error, FileIoSnafu, Result, SchemaNotFoundSnafu};
use crate::{Tool, database};
@@ -52,7 +53,7 @@ pub struct ImportCommand {
input_dir: String,
/// The name of the catalog to import.
#[clap(long, default_value = "greptime-*")]
#[clap(long, default_value_t = default_database())]
database: String,
/// Parallelism of the import.

View File

@@ -29,9 +29,11 @@ base64.workspace = true
cache.workspace = true
catalog.workspace = true
chrono.workspace = true
either = "1.15"
clap.workspace = true
cli.workspace = true
client.workspace = true
colored = "2.1.0"
common-base.workspace = true
common-catalog.workspace = true
common-config.workspace = true
@@ -63,9 +65,11 @@ lazy_static.workspace = true
meta-client.workspace = true
meta-srv.workspace = true
metric-engine.workspace = true
mito2.workspace = true
moka.workspace = true
nu-ansi-term = "0.46"
object-store.workspace = true
parquet = { workspace = true, features = ["object_store"] }
plugins.workspace = true
prometheus.workspace = true
prost.workspace = true
@@ -88,6 +92,11 @@ toml.workspace = true
tonic.workspace = true
tracing-appender.workspace = true
[target.'cfg(unix)'.dependencies]
pprof = { version = "0.14", features = [
"flamegraph",
] }
[target.'cfg(not(windows))'.dependencies]
tikv-jemallocator = "0.6"

View File

@@ -103,12 +103,15 @@ async fn main_body() -> Result<()> {
async fn start(cli: Command) -> Result<()> {
match cli.subcmd {
SubCommand::Datanode(cmd) => {
let opts = cmd.load_options(&cli.global_options)?;
let plugins = Plugins::new();
let builder = InstanceBuilder::try_new_with_init(opts, plugins).await?;
cmd.build_with(builder).await?.run().await
}
SubCommand::Datanode(cmd) => match cmd.subcmd {
datanode::SubCommand::Start(ref start) => {
let opts = start.load_options(&cli.global_options)?;
let plugins = Plugins::new();
let builder = InstanceBuilder::try_new_with_init(opts, plugins).await?;
cmd.build_with(builder).await?.run().await
}
datanode::SubCommand::Objbench(ref bench) => bench.run().await,
},
SubCommand::Flownode(cmd) => {
cmd.build(cmd.load_options(&cli.global_options)?)
.await?

View File

@@ -13,6 +13,8 @@
// limitations under the License.
pub mod builder;
#[allow(clippy::print_stdout)]
mod objbench;
use std::path::Path;
use std::time::Duration;
@@ -23,13 +25,16 @@ use common_config::Configurable;
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
use common_telemetry::{info, warn};
use common_wal::config::DatanodeWalConfig;
use datanode::config::RegionEngineConfig;
use datanode::datanode::Datanode;
use meta_client::MetaClientOptions;
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, ensure};
use tracing_appender::non_blocking::WorkerGuard;
use crate::App;
use crate::datanode::builder::InstanceBuilder;
use crate::datanode::objbench::ObjbenchCommand;
use crate::error::{
LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu,
};
@@ -89,7 +94,7 @@ impl App for Instance {
#[derive(Parser)]
pub struct Command {
#[clap(subcommand)]
subcmd: SubCommand,
pub subcmd: SubCommand,
}
impl Command {
@@ -100,13 +105,26 @@ impl Command {
pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
match &self.subcmd {
SubCommand::Start(cmd) => cmd.load_options(global_options),
SubCommand::Objbench(_) => {
// For objbench command, we don't need to load DatanodeOptions
// It's a standalone utility command
let mut opts = datanode::config::DatanodeOptions::default();
opts.sanitize();
Ok(DatanodeOptions {
runtime: Default::default(),
plugins: Default::default(),
component: opts,
})
}
}
}
}
#[derive(Parser)]
enum SubCommand {
pub enum SubCommand {
Start(StartCommand),
/// Object storage benchmark tool
Objbench(ObjbenchCommand),
}
impl SubCommand {
@@ -116,12 +134,33 @@ impl SubCommand {
info!("Building datanode with {:#?}", cmd);
builder.build().await
}
SubCommand::Objbench(cmd) => {
cmd.run().await?;
std::process::exit(0);
}
}
}
}
/// Storage engine config
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
#[serde(default)]
pub struct StorageConfig {
/// The working directory of database
pub data_home: String,
#[serde(flatten)]
pub store: object_store::config::ObjectStoreConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
#[serde(default)]
struct StorageConfigWrapper {
storage: StorageConfig,
region_engine: Vec<RegionEngineConfig>,
}
#[derive(Debug, Parser, Default)]
struct StartCommand {
pub struct StartCommand {
#[clap(long)]
node_id: Option<u64>,
/// The address to bind the gRPC server.
@@ -149,7 +188,7 @@ struct StartCommand {
}
impl StartCommand {
fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
let mut opts = DatanodeOptions::load_layered_options(
self.config_file.as_deref(),
self.env_prefix.as_ref(),

View File

@@ -0,0 +1,676 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use clap::Parser;
use colored::Colorize;
use datanode::config::RegionEngineConfig;
use datanode::store;
use either::Either;
use mito2::access_layer::{
AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType,
};
use mito2::cache::{CacheManager, CacheManagerRef};
use mito2::config::{FulltextIndexConfig, MitoConfig, Mode};
use mito2::read::Source;
use mito2::sst::file::{FileHandle, FileMeta};
use mito2::sst::file_purger::{FilePurger, FilePurgerRef};
use mito2::sst::index::intermediate::IntermediateManager;
use mito2::sst::index::puffin_manager::PuffinManagerFactory;
use mito2::sst::parquet::reader::ParquetReaderBuilder;
use mito2::sst::parquet::{PARQUET_METADATA_KEY, WriteOptions};
use mito2::worker::write_cache_from_config;
use object_store::ObjectStore;
use regex::Regex;
use snafu::OptionExt;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::path_utils::region_name;
use store_api::region_request::PathType;
use store_api::storage::FileId;
use crate::datanode::{StorageConfig, StorageConfigWrapper};
use crate::error;
/// Object storage benchmark command
#[derive(Debug, Parser)]
pub struct ObjbenchCommand {
/// Path to the object-store config file (TOML). Must deserialize into object_store::config::ObjectStoreConfig.
#[clap(long, value_name = "FILE")]
pub config: PathBuf,
/// Source SST file path in object-store (e.g. "region_dir/<uuid>.parquet").
#[clap(long, value_name = "PATH")]
pub source: String,
/// Verbose output
#[clap(short, long, default_value_t = false)]
pub verbose: bool,
/// Output file path for pprof flamegraph (enables profiling)
#[clap(long, value_name = "FILE")]
pub pprof_file: Option<PathBuf>,
}
fn parse_config(config_path: &PathBuf) -> error::Result<(StorageConfig, MitoConfig)> {
let cfg_str = std::fs::read_to_string(config_path).map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("failed to read config {}: {e}", config_path.display()),
}
.build()
})?;
let store_cfg: StorageConfigWrapper = toml::from_str(&cfg_str).map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("failed to parse config {}: {e}", config_path.display()),
}
.build()
})?;
let storage_config = store_cfg.storage;
let mito_engine_config = store_cfg
.region_engine
.into_iter()
.filter_map(|c| {
if let RegionEngineConfig::Mito(mito) = c {
Some(mito)
} else {
None
}
})
.next()
.with_context(|| error::IllegalConfigSnafu {
msg: format!("Engine config not found in {:?}", config_path),
})?;
Ok((storage_config, mito_engine_config))
}
impl ObjbenchCommand {
pub async fn run(&self) -> error::Result<()> {
if self.verbose {
common_telemetry::init_default_ut_logging();
}
println!("{}", "Starting objbench with config:".cyan().bold());
// Build object store from config
let (store_cfg, mut mito_engine_config) = parse_config(&self.config)?;
let object_store = build_object_store(&store_cfg).await?;
println!("{} Object store initialized", "".green());
// Prepare source identifiers
let components = parse_file_dir_components(&self.source)?;
println!(
"{} Source path parsed: {}, components: {:?}",
"".green(),
self.source,
components
);
// Load parquet metadata to extract RegionMetadata and file stats
println!("{}", "Loading parquet metadata...".yellow());
let file_size = object_store
.stat(&self.source)
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("stat failed: {e}"),
}
.build()
})?
.content_length();
let parquet_meta = load_parquet_metadata(object_store.clone(), &self.source, file_size)
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("read parquet metadata failed: {e}"),
}
.build()
})?;
let region_meta = extract_region_metadata(&self.source, &parquet_meta)?;
let num_rows = parquet_meta.file_metadata().num_rows() as u64;
let num_row_groups = parquet_meta.num_row_groups() as u64;
println!(
"{} Metadata loaded - rows: {}, size: {} bytes",
"".green(),
num_rows,
file_size
);
// Build a FileHandle for the source file
let file_meta = FileMeta {
region_id: region_meta.region_id,
file_id: components.file_id,
time_range: Default::default(),
level: 0,
file_size,
available_indexes: Default::default(),
index_file_size: 0,
num_rows,
num_row_groups,
sequence: None,
partition_expr: None,
num_series: 0,
};
let src_handle = FileHandle::new(file_meta, new_noop_file_purger());
// Build the reader for a single file via ParquetReaderBuilder
let table_dir = components.table_dir();
let (src_access_layer, cache_manager) = build_access_layer_simple(
&components,
object_store.clone(),
&mut mito_engine_config,
&store_cfg.data_home,
)
.await?;
let reader_build_start = Instant::now();
let reader = ParquetReaderBuilder::new(
table_dir,
components.path_type,
src_handle.clone(),
object_store.clone(),
)
.expected_metadata(Some(region_meta.clone()))
.build()
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("build reader failed: {e:?}"),
}
.build()
})?;
let reader_build_elapsed = reader_build_start.elapsed();
let total_rows = reader.parquet_metadata().file_metadata().num_rows();
println!("{} Reader built in {:?}", "".green(), reader_build_elapsed);
// Build write request
let fulltext_index_config = FulltextIndexConfig {
create_on_compaction: Mode::Disable,
..Default::default()
};
let write_req = SstWriteRequest {
op_type: OperationType::Flush,
metadata: region_meta,
source: Either::Left(Source::Reader(Box::new(reader))),
cache_manager,
storage: None,
max_sequence: None,
index_options: Default::default(),
index_config: mito_engine_config.index.clone(),
inverted_index_config: MitoConfig::default().inverted_index,
fulltext_index_config,
bloom_filter_index_config: MitoConfig::default().bloom_filter_index,
};
// Write SST
println!("{}", "Writing SST...".yellow());
// Start profiling if pprof_file is specified
#[cfg(unix)]
let profiler_guard = if self.pprof_file.is_some() {
println!("{} Starting profiling...", "".yellow());
Some(
pprof::ProfilerGuardBuilder::default()
.frequency(99)
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
.build()
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("Failed to start profiler: {e}"),
}
.build()
})?,
)
} else {
None
};
#[cfg(not(unix))]
if self.pprof_file.is_some() {
eprintln!(
"{}: Profiling is not supported on this platform",
"Warning".yellow()
);
}
let write_start = Instant::now();
let mut metrics = Metrics::new(WriteType::Flush);
let infos = src_access_layer
.write_sst(write_req, &WriteOptions::default(), &mut metrics)
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("write_sst failed: {e:?}"),
}
.build()
})?;
let write_elapsed = write_start.elapsed();
// Stop profiling and generate flamegraph if enabled
#[cfg(unix)]
if let (Some(guard), Some(pprof_file)) = (profiler_guard, &self.pprof_file) {
println!("{} Generating flamegraph...", "🔥".yellow());
match guard.report().build() {
Ok(report) => {
let mut flamegraph_data = Vec::new();
if let Err(e) = report.flamegraph(&mut flamegraph_data) {
println!("{}: Failed to generate flamegraph: {}", "Error".red(), e);
} else if let Err(e) = std::fs::write(pprof_file, flamegraph_data) {
println!(
"{}: Failed to write flamegraph to {}: {}",
"Error".red(),
pprof_file.display(),
e
);
} else {
println!(
"{} Flamegraph saved to {}",
"".green(),
pprof_file.display().to_string().cyan()
);
}
}
Err(e) => {
println!("{}: Failed to generate pprof report: {}", "Error".red(), e);
}
}
}
assert_eq!(infos.len(), 1);
let dst_file_id = infos[0].file_id;
let dst_file_path = format!("{}/{}.parquet", components.region_dir(), dst_file_id);
let mut dst_index_path = None;
if infos[0].index_metadata.file_size > 0 {
dst_index_path = Some(format!(
"{}/index/{}.puffin",
components.region_dir(),
dst_file_id
));
}
// Report results with ANSI colors
println!("\n{} {}", "Write complete!".green().bold(), "".green());
println!(" {}: {}", "Destination file".bold(), dst_file_path.cyan());
println!(" {}: {}", "Rows".bold(), total_rows.to_string().cyan());
println!(
" {}: {}",
"File size".bold(),
format!("{} bytes", file_size).cyan()
);
println!(
" {}: {:?}",
"Reader build time".bold(),
reader_build_elapsed
);
println!(" {}: {:?}", "Total time".bold(), write_elapsed);
// Print metrics in a formatted way
println!(" {}: {:?}", "Metrics".bold(), metrics,);
// Print infos
println!(" {}: {:?}", "Index".bold(), infos[0].index_metadata);
// Cleanup
println!("\n{}", "Cleaning up...".yellow());
object_store.delete(&dst_file_path).await.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("Failed to delete dest file {}: {}", dst_file_path, e),
}
.build()
})?;
println!("{} Temporary file {} deleted", "".green(), dst_file_path);
if let Some(index_path) = dst_index_path {
object_store.delete(&index_path).await.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("Failed to delete dest index file {}: {}", index_path, e),
}
.build()
})?;
println!(
"{} Temporary index file {} deleted",
"".green(),
index_path
);
}
println!("\n{}", "Benchmark completed successfully!".green().bold());
Ok(())
}
}
#[derive(Debug)]
struct FileDirComponents {
catalog: String,
schema: String,
table_id: u32,
region_sequence: u32,
path_type: PathType,
file_id: FileId,
}
impl FileDirComponents {
fn table_dir(&self) -> String {
format!("data/{}/{}/{}", self.catalog, self.schema, self.table_id)
}
fn region_dir(&self) -> String {
let region_name = region_name(self.table_id, self.region_sequence);
match self.path_type {
PathType::Bare => {
format!(
"data/{}/{}/{}/{}",
self.catalog, self.schema, self.table_id, region_name
)
}
PathType::Data => {
format!(
"data/{}/{}/{}/{}/data",
self.catalog, self.schema, self.table_id, region_name
)
}
PathType::Metadata => {
format!(
"data/{}/{}/{}/{}/metadata",
self.catalog, self.schema, self.table_id, region_name
)
}
}
}
}
fn parse_file_dir_components(path: &str) -> error::Result<FileDirComponents> {
// Define the regex pattern to match all three path styles
let pattern =
r"^data/([^/]+)/([^/]+)/([^/]+)/([^/]+)_([^/]+)(?:/data|/metadata)?/(.+).parquet$";
// Compile the regex
let re = Regex::new(pattern).expect("Invalid regex pattern");
// Determine the path type
let path_type = if path.contains("/data/") {
PathType::Data
} else if path.contains("/metadata/") {
PathType::Metadata
} else {
PathType::Bare
};
// Try to match the path
let components = (|| {
let captures = re.captures(path)?;
if captures.len() != 7 {
return None;
}
let mut components = FileDirComponents {
catalog: "".to_string(),
schema: "".to_string(),
table_id: 0,
region_sequence: 0,
path_type,
file_id: FileId::default(),
};
// Extract the components
components.catalog = captures.get(1)?.as_str().to_string();
components.schema = captures.get(2)?.as_str().to_string();
components.table_id = captures[3].parse().ok()?;
components.region_sequence = captures[5].parse().ok()?;
let file_id_str = &captures[6];
components.file_id = FileId::parse_str(file_id_str).ok()?;
Some(components)
})();
components.context(error::IllegalConfigSnafu {
msg: format!("Expect valid source file path, got: {}", path),
})
}
fn extract_region_metadata(
file_path: &str,
meta: &parquet::file::metadata::ParquetMetaData,
) -> error::Result<RegionMetadataRef> {
use parquet::format::KeyValue;
let kvs: Option<&Vec<KeyValue>> = meta.file_metadata().key_value_metadata();
let Some(kvs) = kvs else {
return Err(error::IllegalConfigSnafu {
msg: format!("{file_path}: missing parquet key_value metadata"),
}
.build());
};
let json = kvs
.iter()
.find(|kv| kv.key == PARQUET_METADATA_KEY)
.and_then(|kv| kv.value.as_ref())
.ok_or_else(|| {
error::IllegalConfigSnafu {
msg: format!("{file_path}: key {PARQUET_METADATA_KEY} not found or empty"),
}
.build()
})?;
let region: RegionMetadata = RegionMetadata::from_json(json).map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("invalid region metadata json: {e}"),
}
.build()
})?;
Ok(Arc::new(region))
}
async fn build_object_store(sc: &StorageConfig) -> error::Result<ObjectStore> {
store::new_object_store(sc.store.clone(), &sc.data_home)
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("Failed to build object store: {e:?}"),
}
.build()
})
}
async fn build_access_layer_simple(
components: &FileDirComponents,
object_store: ObjectStore,
config: &mut MitoConfig,
data_home: &str,
) -> error::Result<(AccessLayerRef, CacheManagerRef)> {
let _ = config.index.sanitize(data_home, &config.inverted_index);
let puffin_manager = PuffinManagerFactory::new(
&config.index.aux_path,
config.index.staging_size.as_bytes(),
Some(config.index.write_buffer_size.as_bytes() as _),
config.index.staging_ttl,
)
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("Failed to build access layer: {e:?}"),
}
.build()
})?;
let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("Failed to build IntermediateManager: {e:?}"),
}
.build()
})?
.with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
let cache_manager =
build_cache_manager(config, puffin_manager.clone(), intermediate_manager.clone()).await?;
let layer = AccessLayer::new(
components.table_dir(),
components.path_type,
object_store,
puffin_manager,
intermediate_manager,
);
Ok((Arc::new(layer), cache_manager))
}
async fn build_cache_manager(
config: &MitoConfig,
puffin_manager: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
) -> error::Result<CacheManagerRef> {
let write_cache = write_cache_from_config(config, puffin_manager, intermediate_manager)
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("Failed to build write cache: {e:?}"),
}
.build()
})?;
let cache_manager = Arc::new(
CacheManager::builder()
.sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
.vector_cache_size(config.vector_cache_size.as_bytes())
.page_cache_size(config.page_cache_size.as_bytes())
.selector_result_cache_size(config.selector_result_cache_size.as_bytes())
.index_metadata_size(config.index.metadata_cache_size.as_bytes())
.index_content_size(config.index.content_cache_size.as_bytes())
.index_content_page_size(config.index.content_cache_page_size.as_bytes())
.index_result_cache_size(config.index.result_cache_size.as_bytes())
.puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
.write_cache(write_cache)
.build(),
);
Ok(cache_manager)
}
fn new_noop_file_purger() -> FilePurgerRef {
#[derive(Debug)]
struct Noop;
impl FilePurger for Noop {
fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool) {}
}
Arc::new(Noop)
}
async fn load_parquet_metadata(
object_store: ObjectStore,
path: &str,
file_size: u64,
) -> Result<parquet::file::metadata::ParquetMetaData, Box<dyn std::error::Error + Send + Sync>> {
use parquet::file::FOOTER_SIZE;
use parquet::file::metadata::ParquetMetaDataReader;
let actual_size = if file_size == 0 {
object_store.stat(path).await?.content_length()
} else {
file_size
};
if actual_size < FOOTER_SIZE as u64 {
return Err("file too small".into());
}
let prefetch: u64 = 64 * 1024;
let start = actual_size.saturating_sub(prefetch);
let buffer = object_store
.read_with(path)
.range(start..actual_size)
.await?
.to_vec();
let buffer_len = buffer.len();
let mut footer = [0; 8];
footer.copy_from_slice(&buffer[buffer_len - FOOTER_SIZE..]);
let footer = ParquetMetaDataReader::decode_footer_tail(&footer)?;
let metadata_len = footer.metadata_length() as u64;
if actual_size - (FOOTER_SIZE as u64) < metadata_len {
return Err("invalid footer/metadata length".into());
}
if (metadata_len as usize) <= buffer_len - FOOTER_SIZE {
let metadata_start = buffer_len - metadata_len as usize - FOOTER_SIZE;
let meta = ParquetMetaDataReader::decode_metadata(
&buffer[metadata_start..buffer_len - FOOTER_SIZE],
)?;
Ok(meta)
} else {
let metadata_start = actual_size - metadata_len - FOOTER_SIZE as u64;
let data = object_store
.read_with(path)
.range(metadata_start..(actual_size - FOOTER_SIZE as u64))
.await?
.to_vec();
let meta = ParquetMetaDataReader::decode_metadata(&data)?;
Ok(meta)
}
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use std::str::FromStr;
use common_base::readable_size::ReadableSize;
use store_api::region_request::PathType;
use crate::datanode::objbench::{parse_config, parse_file_dir_components};
#[test]
fn test_parse_dir() {
let meta_path = "data/greptime/public/1024/1024_0000000000/metadata/00020380-009c-426d-953e-b4e34c15af34.parquet";
let c = parse_file_dir_components(meta_path).unwrap();
assert_eq!(
c.file_id.to_string(),
"00020380-009c-426d-953e-b4e34c15af34"
);
assert_eq!(c.catalog, "greptime");
assert_eq!(c.schema, "public");
assert_eq!(c.table_id, 1024);
assert_eq!(c.region_sequence, 0);
assert_eq!(c.path_type, PathType::Metadata);
let c = parse_file_dir_components(
"data/greptime/public/1024/1024_0000000000/data/00020380-009c-426d-953e-b4e34c15af34.parquet",
).unwrap();
assert_eq!(
c.file_id.to_string(),
"00020380-009c-426d-953e-b4e34c15af34"
);
assert_eq!(c.catalog, "greptime");
assert_eq!(c.schema, "public");
assert_eq!(c.table_id, 1024);
assert_eq!(c.region_sequence, 0);
assert_eq!(c.path_type, PathType::Data);
let c = parse_file_dir_components(
"data/greptime/public/1024/1024_0000000000/00020380-009c-426d-953e-b4e34c15af34.parquet",
).unwrap();
assert_eq!(
c.file_id.to_string(),
"00020380-009c-426d-953e-b4e34c15af34"
);
assert_eq!(c.catalog, "greptime");
assert_eq!(c.schema, "public");
assert_eq!(c.table_id, 1024);
assert_eq!(c.region_sequence, 0);
assert_eq!(c.path_type, PathType::Bare);
}
#[test]
fn test_parse_config() {
let path = "../../config/datanode.example.toml";
let (storage, engine) = parse_config(&PathBuf::from_str(path).unwrap()).unwrap();
assert_eq!(storage.data_home, "./greptimedb_data");
assert_eq!(engine.index.staging_size, ReadableSize::gb(2));
}
}

View File

@@ -25,11 +25,13 @@ use clap::Parser;
use client::client_manager::NodeClients;
use common_base::Plugins;
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_error::ext::BoxedError;
use common_grpc::channel_manager::ChannelConfig;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_query::prelude::set_default_prefix;
use common_stat::ResourceStatImpl;
use common_telemetry::info;
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
@@ -333,6 +335,9 @@ impl StartCommand {
.context(error::StartFrontendSnafu)?;
set_default_timezone(opts.default_timezone.as_deref()).context(error::InitTimezoneSnafu)?;
set_default_prefix(opts.default_column_prefix.as_deref())
.map_err(BoxedError::new)
.context(error::BuildCliSnafu)?;
let meta_client_options = opts
.meta_client

View File

@@ -41,6 +41,7 @@ use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::{WalOptionsAllocatorRef, build_wal_options_allocator};
use common_procedure::ProcedureManagerRef;
use common_query::prelude::set_default_prefix;
use common_telemetry::info;
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
use common_time::timezone::set_default_timezone;
@@ -355,6 +356,10 @@ impl StartCommand {
let mut plugins = Plugins::new();
let plugin_opts = opts.plugins;
let mut opts = opts.component;
set_default_prefix(opts.default_column_prefix.as_deref())
.map_err(BoxedError::new)
.context(error::BuildCliSnafu)?;
opts.grpc.detect_server_addr();
let fe_opts = opts.frontend_options();
let dn_opts = opts.datanode_options();

View File

@@ -48,6 +48,7 @@ fn test_load_datanode_example_config() {
let expected = GreptimeOptions::<DatanodeOptions> {
component: DatanodeOptions {
node_id: Some(42),
default_column_prefix: Some("greptime".to_string()),
meta_client: Some(MetaClientOptions {
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
timeout: Duration::from_secs(3),
@@ -113,6 +114,7 @@ fn test_load_frontend_example_config() {
let expected = GreptimeOptions::<FrontendOptions> {
component: FrontendOptions {
default_timezone: Some("UTC".to_string()),
default_column_prefix: Some("greptime".to_string()),
meta_client: Some(MetaClientOptions {
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
timeout: Duration::from_secs(3),
@@ -273,6 +275,7 @@ fn test_load_standalone_example_config() {
let expected = GreptimeOptions::<StandaloneOptions> {
component: StandaloneOptions {
default_timezone: Some("UTC".to_string()),
default_column_prefix: Some("greptime".to_string()),
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
dir: Some(format!("{}/{}", DEFAULT_DATA_HOME, WAL_DIR)),
sync_period: Some(Duration::from_secs(10)),

View File

@@ -18,9 +18,11 @@ bytes.workspace = true
common-error.workspace = true
common-macro.workspace = true
futures.workspace = true
lazy_static.workspace = true
paste.workspace = true
pin-project.workspace = true
rand.workspace = true
regex.workspace = true
serde = { version = "1.0", features = ["derive"] }
snafu.workspace = true
tokio.workspace = true

View File

@@ -19,6 +19,7 @@ pub mod plugins;
pub mod range_read;
#[allow(clippy::all)]
pub mod readable_size;
pub mod regex_pattern;
pub mod secrets;
pub mod serde;

View File

@@ -0,0 +1,22 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use lazy_static::lazy_static;
use regex::Regex;
pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*";
lazy_static! {
pub static ref NAME_PATTERN_REG: Regex = Regex::new(&format!("^{NAME_PATTERN}$")).unwrap();
}

View File

@@ -8,5 +8,6 @@ license.workspace = true
workspace = true
[dependencies]
const_format.workspace = true
[dev-dependencies]

View File

@@ -0,0 +1,27 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
fn main() {
// Set DEFAULT_CATALOG_NAME from environment variable or use default value
let default_catalog_name =
std::env::var("DEFAULT_CATALOG_NAME").unwrap_or_else(|_| "greptime".to_string());
println!(
"cargo:rustc-env=DEFAULT_CATALOG_NAME={}",
default_catalog_name
);
// Rerun build script if the environment variable changes
println!("cargo:rerun-if-env-changed=DEFAULT_CATALOG_NAME");
}

View File

@@ -12,13 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use const_format::concatcp;
pub const SYSTEM_CATALOG_NAME: &str = "system";
pub const INFORMATION_SCHEMA_NAME: &str = "information_schema";
pub const PG_CATALOG_NAME: &str = "pg_catalog";
pub const SYSTEM_CATALOG_TABLE_NAME: &str = "system_catalog";
pub const DEFAULT_CATALOG_NAME: &str = "greptime";
pub const DEFAULT_CATALOG_NAME: &str = env!("DEFAULT_CATALOG_NAME");
pub const DEFAULT_SCHEMA_NAME: &str = "public";
pub const DEFAULT_PRIVATE_SCHEMA_NAME: &str = "greptime_private";
pub const DEFAULT_PRIVATE_SCHEMA_NAME: &str = concatcp!(DEFAULT_CATALOG_NAME, "_private");
/// Reserves [0,MIN_USER_FLOW_ID) for internal usage.
/// User defined table id starts from this value.
@@ -150,4 +152,9 @@ pub const TRACE_TABLE_NAME_SESSION_KEY: &str = "trace_table_name";
pub fn trace_services_table_name(trace_table_name: &str) -> String {
format!("{}_services", trace_table_name)
}
/// Generate the trace operations table name from the trace table name by adding `_operations` suffix.
pub fn trace_operations_table_name(trace_table_name: &str) -> String {
format!("{}_operations", trace_table_name)
}
// ---- End of special table and fields ----

View File

@@ -29,6 +29,8 @@ use arrow::array::StructArray;
use arrow_schema::{FieldRef, Fields};
use common_telemetry::debug;
use datafusion::functions_aggregate::all_default_aggregate_functions;
use datafusion::functions_aggregate::count::Count;
use datafusion::functions_aggregate::min_max::{Max, Min};
use datafusion::optimizer::AnalyzerRule;
use datafusion::optimizer::analyzer::type_coercion::TypeCoercion;
use datafusion::physical_planner::create_aggregate_expr_and_maybe_filter;
@@ -413,6 +415,51 @@ impl AggregateUDFImpl for StateWrapper {
fn coerce_types(&self, arg_types: &[DataType]) -> datafusion_common::Result<Vec<DataType>> {
self.inner.coerce_types(arg_types)
}
fn value_from_stats(
&self,
statistics_args: &datafusion_expr::StatisticsArgs,
) -> Option<ScalarValue> {
let inner = self.inner().inner().as_any();
// only count/min/max need special handling here, for getting result from statistics
// the result of count/min/max is also the result of count_state so can return directly
let can_use_stat = inner.is::<Count>() || inner.is::<Max>() || inner.is::<Min>();
if !can_use_stat {
return None;
}
// fix return type by extract the first field's data type from the struct type
let state_type = if let DataType::Struct(fields) = &statistics_args.return_type {
if fields.is_empty() {
return None;
}
fields[0].data_type().clone()
} else {
return None;
};
let fixed_args = datafusion_expr::StatisticsArgs {
statistics: statistics_args.statistics,
return_type: &state_type,
is_distinct: statistics_args.is_distinct,
exprs: statistics_args.exprs,
};
let ret = self.inner().value_from_stats(&fixed_args)?;
// wrap the result into struct scalar value
let fields = if let DataType::Struct(fields) = &statistics_args.return_type {
fields
} else {
return None;
};
let array = ret.to_array().ok()?;
let struct_array = StructArray::new(fields.clone(), vec![array], None);
let ret = ScalarValue::Struct(Arc::new(struct_array));
Some(ret)
}
}
/// The wrapper's input is the same as the original aggregate function's input,

View File

@@ -16,6 +16,9 @@ mod version;
use std::sync::Arc;
use common_catalog::consts::{
DEFAULT_PRIVATE_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME,
};
use datafusion::arrow::array::{ArrayRef, StringArray, as_boolean_array};
use datafusion::catalog::TableFunction;
use datafusion::common::ScalarValue;
@@ -143,9 +146,9 @@ impl Function for CurrentSchemasFunction {
let mut values = vec!["public"];
// include implicit schemas
if input.value(0) {
values.push("information_schema");
values.push("pg_catalog");
values.push("greptime_private");
values.push(INFORMATION_SCHEMA_NAME);
values.push(PG_CATALOG_NAME);
values.push(DEFAULT_PRIVATE_SCHEMA_NAME);
}
let list_array = SingleRowListArrayBuilder::new(Arc::new(StringArray::from(values)));
@@ -191,7 +194,10 @@ impl PGCatalogFunction {
registry.register(pg_catalog::create_pg_get_userbyid_udf());
registry.register(pg_catalog::create_pg_table_is_visible());
registry.register(pg_catalog::pg_get_expr_udf::create_pg_get_expr_udf());
// TODO(sunng87): upgrade datafusion to add
//registry.register(pg_catalog::create_pg_encoding_to_char_udf());
registry.register(pg_catalog::create_pg_encoding_to_char_udf());
registry.register(pg_catalog::create_pg_relation_size_udf());
registry.register(pg_catalog::create_pg_total_relation_size_udf());
registry.register(pg_catalog::create_pg_stat_get_numscans());
registry.register(pg_catalog::create_pg_get_constraintdef());
}
}

View File

@@ -32,6 +32,7 @@ use crate::error::{FlamegraphSnafu, ParseJeHeapSnafu, Result};
const PROF_DUMP: &[u8] = b"prof.dump\0";
const OPT_PROF: &[u8] = b"opt.prof\0";
const PROF_ACTIVE: &[u8] = b"prof.active\0";
const PROF_GDUMP: &[u8] = b"prof.gdump\0";
pub async fn dump_profile() -> Result<Vec<u8>> {
ensure!(is_prof_enabled()?, ProfilingNotEnabledSnafu);
@@ -119,3 +120,16 @@ fn is_prof_enabled() -> Result<bool> {
// safety: OPT_PROF variable, if present, is always a boolean value.
Ok(unsafe { tikv_jemalloc_ctl::raw::read::<bool>(OPT_PROF).context(ReadOptProfSnafu)? })
}
pub fn set_gdump_active(active: bool) -> Result<()> {
ensure!(is_prof_enabled()?, ProfilingNotEnabledSnafu);
unsafe {
tikv_jemalloc_ctl::raw::update(PROF_GDUMP, active).context(error::UpdateGdumpSnafu)?;
}
Ok(())
}
pub fn is_gdump_active() -> Result<bool> {
// safety: PROF_GDUMP, if present, is a boolean value.
unsafe { Ok(tikv_jemalloc_ctl::raw::read::<bool>(PROF_GDUMP).context(error::ReadGdumpSnafu)?) }
}

View File

@@ -71,6 +71,18 @@ pub enum Error {
#[snafu(source)]
error: tikv_jemalloc_ctl::Error,
},
#[snafu(display("Failed to read jemalloc gdump flag"))]
ReadGdump {
#[snafu(source)]
error: tikv_jemalloc_ctl::Error,
},
#[snafu(display("Failed to update jemalloc gdump flag"))]
UpdateGdump {
#[snafu(source)]
error: tikv_jemalloc_ctl::Error,
},
}
impl ErrorExt for Error {
@@ -84,6 +96,8 @@ impl ErrorExt for Error {
Error::ActivateProf { .. } => StatusCode::Internal,
Error::DeactivateProf { .. } => StatusCode::Internal,
Error::ReadProfActive { .. } => StatusCode::Internal,
Error::ReadGdump { .. } => StatusCode::Internal,
Error::UpdateGdump { .. } => StatusCode::Internal,
}
}

View File

@@ -19,7 +19,7 @@ mod jemalloc;
#[cfg(not(windows))]
pub use jemalloc::{
activate_heap_profile, deactivate_heap_profile, dump_flamegraph, dump_pprof, dump_profile,
is_heap_profile_active,
is_gdump_active, is_heap_profile_active, set_gdump_active,
};
#[cfg(windows)]
@@ -51,3 +51,13 @@ pub fn deactivate_heap_profile() -> error::Result<()> {
pub fn is_heap_profile_active() -> error::Result<bool> {
error::ProfilingNotSupportedSnafu.fail()
}
#[cfg(windows)]
pub fn is_gdump_active() -> error::Result<bool> {
error::ProfilingNotSupportedSnafu.fail()
}
#[cfg(windows)]
pub fn set_gdump_active(_: bool) -> error::Result<()> {
error::ProfilingNotSupportedSnafu.fail()
}

View File

@@ -25,8 +25,7 @@ use store_api::region_engine::{RegionRole, RegionStatistic};
use store_api::storage::RegionId;
use table::metadata::TableId;
use crate::error;
use crate::error::Result;
use crate::error::{self, DeserializeFromJsonSnafu, Result};
use crate::heartbeat::utils::get_datanode_workloads;
const DATANODE_STAT_PREFIX: &str = "__meta_datanode_stat";
@@ -66,10 +65,12 @@ pub struct Stat {
pub node_epoch: u64,
/// The datanode workloads.
pub datanode_workloads: DatanodeWorkloads,
/// The GC statistics of the datanode.
pub gc_stat: Option<GcStat>,
}
/// The statistics of a region.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RegionStat {
/// The region_id.
pub id: RegionId,
@@ -126,7 +127,7 @@ pub trait TopicStatsReporter: Send + Sync {
fn reportable_topics(&mut self) -> Vec<TopicStat>;
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub enum RegionManifestInfo {
Mito {
manifest_version: u64,
@@ -222,11 +223,12 @@ impl TryFrom<&HeartbeatRequest> for Stat {
node_epoch,
node_workloads,
topic_stats,
extensions,
..
} = value;
match (header, peer) {
(Some(_header), Some(peer)) => {
(Some(header), Some(peer)) => {
let region_stats = region_stats
.iter()
.map(RegionStat::from)
@@ -234,6 +236,14 @@ impl TryFrom<&HeartbeatRequest> for Stat {
let topic_stats = topic_stats.iter().map(TopicStat::from).collect::<Vec<_>>();
let datanode_workloads = get_datanode_workloads(node_workloads.as_ref());
let gc_stat = GcStat::from_extensions(extensions).map_err(|err| {
common_telemetry::error!(
"Failed to deserialize GcStat from extensions: {}",
err
);
header.clone()
})?;
Ok(Self {
timestamp_millis: time_util::current_time_millis(),
// datanode id
@@ -247,6 +257,7 @@ impl TryFrom<&HeartbeatRequest> for Stat {
topic_stats,
node_epoch: *node_epoch,
datanode_workloads,
gc_stat,
})
}
(header, _) => Err(header.clone()),
@@ -319,6 +330,43 @@ impl From<&api::v1::meta::TopicStat> for TopicStat {
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct GcStat {
/// Number of GC tasks currently running on the datanode.
pub running_gc_tasks: u32,
/// The maximum number of concurrent GC tasks the datanode can handle.
pub gc_concurrency: u32,
}
impl GcStat {
pub const GC_STAT_KEY: &str = "__gc_stat";
pub fn new(running_gc_tasks: u32, gc_concurrency: u32) -> Self {
Self {
running_gc_tasks,
gc_concurrency,
}
}
pub fn into_extensions(&self, extensions: &mut std::collections::HashMap<String, Vec<u8>>) {
let bytes = serde_json::to_vec(self).unwrap_or_default();
extensions.insert(Self::GC_STAT_KEY.to_string(), bytes);
}
pub fn from_extensions(
extensions: &std::collections::HashMap<String, Vec<u8>>,
) -> Result<Option<Self>> {
extensions
.get(Self::GC_STAT_KEY)
.map(|bytes| {
serde_json::from_slice(bytes).with_context(|_| DeserializeFromJsonSnafu {
input: String::from_utf8_lossy(bytes).to_string(),
})
})
.transpose()
}
}
/// The key of the datanode stat in the memory store.
///
/// The format is `__meta_datanode_stat-0-{node_id}`.

View File

@@ -17,7 +17,7 @@ use std::fmt::{Display, Formatter};
use std::time::Duration;
use serde::{Deserialize, Deserializer, Serialize};
use store_api::storage::{RegionId, RegionNumber};
use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber};
use strum::Display;
use table::metadata::TableId;
use table::table_name::TableName;
@@ -417,6 +417,88 @@ where
})
}
/// Instruction to get file references for specified regions.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct GetFileRefs {
/// List of region IDs to get file references for.
pub region_ids: Vec<RegionId>,
}
impl Display for GetFileRefs {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "GetFileRefs(region_ids={:?})", self.region_ids)
}
}
/// Instruction to trigger garbage collection for a region.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct GcRegions {
/// The region ID to perform GC on.
pub regions: Vec<RegionId>,
/// The file references manifest containing temporary file references.
pub file_refs_manifest: FileRefsManifest,
/// Whether to perform a full file listing to find orphan files.
pub full_file_listing: bool,
}
impl Display for GcRegions {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"GcRegion(regions={:?}, file_refs_count={}, full_file_listing={})",
self.regions,
self.file_refs_manifest.file_refs.len(),
self.full_file_listing
)
}
}
/// Reply for GetFileRefs instruction.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct GetFileRefsReply {
/// The file references manifest.
pub file_refs_manifest: FileRefsManifest,
/// Whether the operation was successful.
pub success: bool,
/// Error message if any.
pub error: Option<String>,
}
impl Display for GetFileRefsReply {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"GetFileRefsReply(success={}, file_refs_count={}, error={:?})",
self.success,
self.file_refs_manifest.file_refs.len(),
self.error
)
}
}
/// Reply for GC instruction.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct GcRegionsReply {
pub result: Result<GcReport, String>,
}
impl Display for GcRegionsReply {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"GcReply(result={})",
match &self.result {
Ok(report) => format!(
"GcReport(deleted_files_count={}, need_retry_regions_count={})",
report.deleted_files.len(),
report.need_retry_regions.len()
),
Err(err) => format!("Err({})", err),
}
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
pub enum Instruction {
/// Opens regions.
@@ -437,6 +519,10 @@ pub enum Instruction {
InvalidateCaches(Vec<CacheIdent>),
/// Flushes regions.
FlushRegions(FlushRegions),
/// Gets file references for regions.
GetFileRefs(GetFileRefs),
/// Triggers garbage collection for a region.
GcRegions(GcRegions),
}
impl Instruction {
@@ -479,6 +565,20 @@ impl Instruction {
_ => None,
}
}
pub fn into_get_file_refs(self) -> Option<GetFileRefs> {
match self {
Self::GetFileRefs(get_file_refs) => Some(get_file_refs),
_ => None,
}
}
pub fn into_gc_regions(self) -> Option<GcRegions> {
match self {
Self::GcRegions(gc_regions) => Some(gc_regions),
_ => None,
}
}
}
/// The reply of [UpgradeRegion].
@@ -549,6 +649,8 @@ pub enum InstructionReply {
)]
DowngradeRegions(DowngradeRegionsReply),
FlushRegions(FlushRegionReply),
GetFileRefs(GetFileRefsReply),
GcRegions(GcRegionsReply),
}
impl Display for InstructionReply {
@@ -561,6 +663,8 @@ impl Display for InstructionReply {
write!(f, "InstructionReply::DowngradeRegions({:?})", reply)
}
Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply),
Self::GetFileRefs(reply) => write!(f, "InstructionReply::GetFileRefs({})", reply),
Self::GcRegions(reply) => write!(f, "InstructionReply::GcRegion({})", reply),
}
}
}
@@ -605,6 +709,10 @@ impl InstructionReply {
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use store_api::storage::FileId;
use super::*;
#[test]
@@ -903,4 +1011,30 @@ mod tests {
_ => panic!("Expected FlushRegions instruction"),
}
}
#[test]
fn test_serialize_get_file_refs_instruction_reply() {
let mut manifest = FileRefsManifest::default();
let r0 = RegionId::new(1024, 1);
let r1 = RegionId::new(1024, 2);
manifest
.file_refs
.insert(r0, HashSet::from([FileId::random()]));
manifest
.file_refs
.insert(r1, HashSet::from([FileId::random()]));
manifest.manifest_version.insert(r0, 10);
manifest.manifest_version.insert(r1, 20);
let instruction_reply = InstructionReply::GetFileRefs(GetFileRefsReply {
file_refs_manifest: manifest,
success: true,
error: None,
});
let serialized = serde_json::to_string(&instruction_reply).unwrap();
let deserialized = serde_json::from_str(&serialized).unwrap();
assert_eq!(instruction_reply, deserialized);
}
}

View File

@@ -121,6 +121,7 @@ use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use bytes::Bytes;
use common_base::regex_pattern::NAME_PATTERN;
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
};
@@ -164,7 +165,6 @@ use crate::rpc::router::{LeaderState, RegionRoute, region_distribution};
use crate::rpc::store::BatchDeleteRequest;
use crate::state_store::PoisonValue;
pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*";
pub const TOPIC_NAME_PATTERN: &str = r"[a-zA-Z0-9_:-][a-zA-Z0-9_:\-\.@#]*";
pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance";
pub const MAINTENANCE_KEY: &str = "__switches/maintenance";
@@ -269,10 +269,6 @@ pub type FlowId = u32;
/// The partition of flow.
pub type FlowPartitionId = u32;
lazy_static! {
pub static ref NAME_PATTERN_REGEX: Regex = Regex::new(NAME_PATTERN).unwrap();
}
lazy_static! {
pub static ref TOPIC_NAME_PATTERN_REGEX: Regex = Regex::new(TOPIC_NAME_PATTERN).unwrap();
}

View File

@@ -14,6 +14,7 @@ workspace = true
api.workspace = true
async-trait.workspace = true
bytes.workspace = true
common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-recordbatch.workspace = true
@@ -22,6 +23,7 @@ datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datatypes.workspace = true
once_cell.workspace = true
serde.workspace = true
snafu.workspace = true
sqlparser.workspace = true

View File

@@ -199,6 +199,9 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid character in prefix config: {}", prefix))]
InvalidColumnPrefix { prefix: String },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -227,7 +230,8 @@ impl ErrorExt for Error {
Error::UnsupportedInputDataType { .. }
| Error::TypeCast { .. }
| Error::InvalidFuncArgs { .. } => StatusCode::InvalidArguments,
| Error::InvalidFuncArgs { .. }
| Error::InvalidColumnPrefix { .. } => StatusCode::InvalidArguments,
Error::ConvertDfRecordBatchStream { source, .. } => source.status_code(),

View File

@@ -12,15 +12,61 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::regex_pattern::NAME_PATTERN_REG;
pub use datafusion_common::ScalarValue;
use once_cell::sync::OnceCell;
use snafu::ensure;
pub use crate::columnar_value::ColumnarValue;
use crate::error::{InvalidColumnPrefixSnafu, Result};
/// Default timestamp column name for Prometheus metrics.
pub const GREPTIME_TIMESTAMP: &str = "greptime_timestamp";
/// Default value column name for Prometheus metrics.
pub const GREPTIME_VALUE: &str = "greptime_value";
/// Default counter column name for OTLP metrics.
/// Default time index column name.
static GREPTIME_TIMESTAMP_CELL: OnceCell<String> = OnceCell::new();
/// Default value column name.
static GREPTIME_VALUE_CELL: OnceCell<String> = OnceCell::new();
pub fn set_default_prefix(prefix: Option<&str>) -> Result<()> {
match prefix {
None => {
// use default greptime prefix
GREPTIME_TIMESTAMP_CELL.get_or_init(|| GREPTIME_TIMESTAMP.to_string());
GREPTIME_VALUE_CELL.get_or_init(|| GREPTIME_VALUE.to_string());
}
Some(s) if s.trim().is_empty() => {
// use "" to disable prefix
GREPTIME_TIMESTAMP_CELL.get_or_init(|| "timestamp".to_string());
GREPTIME_VALUE_CELL.get_or_init(|| "value".to_string());
}
Some(x) => {
ensure!(
NAME_PATTERN_REG.is_match(x),
InvalidColumnPrefixSnafu { prefix: x }
);
GREPTIME_TIMESTAMP_CELL.get_or_init(|| format!("{}_timestamp", x));
GREPTIME_VALUE_CELL.get_or_init(|| format!("{}_value", x));
}
}
Ok(())
}
/// Get the default timestamp column name.
/// Returns the configured value, or `greptime_timestamp` if not set.
pub fn greptime_timestamp() -> &'static str {
GREPTIME_TIMESTAMP_CELL.get_or_init(|| GREPTIME_TIMESTAMP.to_string())
}
/// Get the default value column name.
/// Returns the configured value, or `greptime_value` if not set.
pub fn greptime_value() -> &'static str {
GREPTIME_VALUE_CELL.get_or_init(|| GREPTIME_VALUE.to_string())
}
/// Default timestamp column name constant for backward compatibility.
const GREPTIME_TIMESTAMP: &str = "greptime_timestamp";
/// Default value column name constant for backward compatibility.
const GREPTIME_VALUE: &str = "greptime_value";
/// Default counter column name for OTLP metrics (legacy mode).
pub const GREPTIME_COUNT: &str = "greptime_count";
/// Default physical table name
pub const GREPTIME_PHYSICAL_TABLE: &str = "greptime_physical_table";

View File

@@ -11,7 +11,7 @@ workspace = true
codec = ["dep:serde"]
[dependencies]
const_format = "0.2"
const_format.workspace = true
serde = { workspace = true, optional = true }
shadow-rs = { version = "1.2.1", default-features = false }

View File

@@ -66,6 +66,7 @@ impl Default for StorageConfig {
#[serde(default)]
pub struct DatanodeOptions {
pub node_id: Option<u64>,
pub default_column_prefix: Option<String>,
pub workload_types: Vec<DatanodeWorkloadType>,
pub require_lease_before_startup: bool,
pub init_regions_in_background: bool,
@@ -119,6 +120,7 @@ impl Default for DatanodeOptions {
fn default() -> Self {
Self {
node_id: None,
default_column_prefix: None,
workload_types: vec![DatanodeWorkloadType::Hybrid],
require_lease_before_startup: false,
init_regions_in_background: false,

View File

@@ -27,6 +27,7 @@ use common_meta::key::runtime_switch::RuntimeSwitchManager;
use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
pub use common_procedure::options::ProcedureConfig;
use common_query::prelude::set_default_prefix;
use common_stat::ResourceStatImpl;
use common_telemetry::{error, info, warn};
use common_wal::config::DatanodeWalConfig;
@@ -59,9 +60,9 @@ use tokio::sync::Notify;
use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
use crate::error::{
self, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu,
MissingCacheSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, ShutdownInstanceSnafu,
ShutdownServerSnafu, StartServerSnafu,
self, BuildDatanodeSnafu, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu,
GetMetadataSnafu, MissingCacheSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result,
ShutdownInstanceSnafu, ShutdownServerSnafu, StartServerSnafu,
};
use crate::event_listener::{
NoopRegionServerEventListener, RegionServerEventListenerRef, RegionServerEventReceiver,
@@ -220,6 +221,9 @@ impl DatanodeBuilder {
pub async fn build(mut self) -> Result<Datanode> {
let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
set_default_prefix(self.opts.default_column_prefix.as_deref())
.map_err(BoxedError::new)
.context(BuildDatanodeSnafu)?;
let meta_client = self.meta_client.take();

View File

@@ -165,6 +165,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to build datanode"))]
BuildDatanode {
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
#[snafu(display("Failed to build http client"))]
BuildHttpClient {
#[snafu(implicit)]
@@ -315,6 +322,21 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to run gc for region {}", region_id))]
GcMitoEngine {
region_id: RegionId,
source: mito2::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid arguments for GC: {}", msg))]
InvalidGcArgs {
msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to list SST entries from storage"))]
ListStorageSsts {
#[snafu(implicit)]
@@ -429,7 +451,8 @@ impl ErrorExt for Error {
| MissingRequiredField { .. }
| RegionEngineNotFound { .. }
| ParseAddr { .. }
| TomlFormat { .. } => StatusCode::InvalidArguments,
| TomlFormat { .. }
| BuildDatanode { .. } => StatusCode::InvalidArguments,
PayloadNotExist { .. }
| Unexpected { .. }
@@ -438,9 +461,11 @@ impl ErrorExt for Error {
AsyncTaskExecute { source, .. } => source.status_code(),
CreateDir { .. } | RemoveDir { .. } | ShutdownInstance { .. } | DataFusion { .. } => {
StatusCode::Internal
}
CreateDir { .. }
| RemoveDir { .. }
| ShutdownInstance { .. }
| DataFusion { .. }
| InvalidGcArgs { .. } => StatusCode::Internal,
RegionNotFound { .. } => StatusCode::RegionNotFound,
RegionNotReady { .. } => StatusCode::RegionNotReady,
@@ -458,7 +483,7 @@ impl ErrorExt for Error {
StopRegionEngine { source, .. } => source.status_code(),
FindLogicalRegions { source, .. } => source.status_code(),
BuildMitoEngine { source, .. } => source.status_code(),
BuildMitoEngine { source, .. } | GcMitoEngine { source, .. } => source.status_code(),
BuildMetricEngine { source, .. } => source.status_code(),
ListStorageSsts { source, .. } => source.status_code(),
ConcurrentQueryLimiterClosed { .. } | ConcurrentQueryLimiterTimeout { .. } => {

View File

@@ -36,14 +36,14 @@ use common_workload::DatanodeWorkloadType;
use meta_client::MetaClientRef;
use meta_client::client::{HeartbeatSender, MetaClient};
use servers::addrs;
use snafu::ResultExt;
use snafu::{OptionExt as _, ResultExt};
use tokio::sync::{Notify, mpsc};
use tokio::time::Instant;
use self::handler::RegionHeartbeatResponseHandler;
use crate::alive_keeper::{CountdownTaskHandlerExtRef, RegionAliveKeeper};
use crate::config::DatanodeOptions;
use crate::error::{self, MetaClientInitSnafu, Result};
use crate::error::{self, MetaClientInitSnafu, RegionEngineNotFoundSnafu, Result};
use crate::event_listener::RegionServerEventReceiver;
use crate::metrics::{self, HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
use crate::region_server::RegionServer;
@@ -242,12 +242,18 @@ impl HeartbeatTask {
let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores();
let total_memory_bytes = self.resource_stat.get_total_memory_bytes();
let resource_stat = self.resource_stat.clone();
let gc_limiter = self
.region_server
.mito_engine()
.context(RegionEngineNotFoundSnafu { name: "mito" })?
.gc_limiter();
common_runtime::spawn_hb(async move {
let sleep = tokio::time::sleep(Duration::from_millis(0));
tokio::pin!(sleep);
let build_info = common_version::build_info();
let heartbeat_request = HeartbeatRequest {
peer: self_peer,
node_epoch,
@@ -283,8 +289,13 @@ impl HeartbeatTask {
if let Some(message) = message {
match outgoing_message_to_mailbox_message(message) {
Ok(message) => {
let mut extensions = heartbeat_request.extensions.clone();
let gc_stat = gc_limiter.gc_stat();
gc_stat.into_extensions(&mut extensions);
let req = HeartbeatRequest {
mailbox_message: Some(message),
extensions,
..heartbeat_request.clone()
};
HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
@@ -305,10 +316,16 @@ impl HeartbeatTask {
let topic_stats = region_server_clone.topic_stats();
let now = Instant::now();
let duration_since_epoch = (now - epoch).as_millis() as u64;
let mut extensions = heartbeat_request.extensions.clone();
let gc_stat = gc_limiter.gc_stat();
gc_stat.into_extensions(&mut extensions);
let mut req = HeartbeatRequest {
region_stats,
topic_stats,
duration_since_epoch,
extensions,
..heartbeat_request.clone()
};

View File

@@ -20,16 +20,21 @@ use common_meta::heartbeat::handler::{
use common_meta::instruction::{Instruction, InstructionReply};
use common_telemetry::error;
use snafu::OptionExt;
use store_api::storage::GcReport;
mod close_region;
mod downgrade_region;
mod file_ref;
mod flush_region;
mod gc_worker;
mod open_region;
mod upgrade_region;
use crate::heartbeat::handler::close_region::CloseRegionsHandler;
use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler;
use crate::heartbeat::handler::file_ref::GetFileRefsHandler;
use crate::heartbeat::handler::flush_region::FlushRegionsHandler;
use crate::heartbeat::handler::gc_worker::GcRegionsHandler;
use crate::heartbeat::handler::open_region::OpenRegionsHandler;
use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
use crate::heartbeat::task_tracker::TaskTracker;
@@ -43,14 +48,16 @@ pub struct RegionHeartbeatResponseHandler {
downgrade_tasks: TaskTracker<()>,
flush_tasks: TaskTracker<()>,
open_region_parallelism: usize,
gc_tasks: TaskTracker<GcReport>,
}
#[async_trait::async_trait]
pub trait InstructionHandler: Send + Sync {
type Instruction;
async fn handle(
&self,
ctx: &HandlerContext,
instruction: Instruction,
instruction: Self::Instruction,
) -> Option<InstructionReply>;
}
@@ -60,6 +67,7 @@ pub struct HandlerContext {
catchup_tasks: TaskTracker<()>,
downgrade_tasks: TaskTracker<()>,
flush_tasks: TaskTracker<()>,
gc_tasks: TaskTracker<GcReport>,
}
impl HandlerContext {
@@ -70,6 +78,7 @@ impl HandlerContext {
catchup_tasks: TaskTracker::new(),
downgrade_tasks: TaskTracker::new(),
flush_tasks: TaskTracker::new(),
gc_tasks: TaskTracker::new(),
}
}
}
@@ -84,6 +93,7 @@ impl RegionHeartbeatResponseHandler {
flush_tasks: TaskTracker::new(),
// Default to half of the number of CPUs.
open_region_parallelism: (num_cpus::get() / 2).max(1),
gc_tasks: TaskTracker::new(),
}
}
@@ -93,39 +103,109 @@ impl RegionHeartbeatResponseHandler {
self
}
fn build_handler(&self, instruction: &Instruction) -> MetaResult<Box<dyn InstructionHandler>> {
fn build_handler(&self, instruction: &Instruction) -> MetaResult<Box<InstructionHandlers>> {
match instruction {
Instruction::CloseRegions(_) => Ok(Box::new(CloseRegionsHandler)),
Instruction::OpenRegions(_) => Ok(Box::new(OpenRegionsHandler {
open_region_parallelism: self.open_region_parallelism,
})),
Instruction::FlushRegions(_) => Ok(Box::new(FlushRegionsHandler)),
Instruction::DowngradeRegions(_) => Ok(Box::new(DowngradeRegionsHandler)),
Instruction::UpgradeRegion(_) => Ok(Box::new(UpgradeRegionsHandler)),
Instruction::CloseRegions(_) => Ok(Box::new(CloseRegionsHandler.into())),
Instruction::OpenRegions(_) => Ok(Box::new(
OpenRegionsHandler {
open_region_parallelism: self.open_region_parallelism,
}
.into(),
)),
Instruction::FlushRegions(_) => Ok(Box::new(FlushRegionsHandler.into())),
Instruction::DowngradeRegions(_) => Ok(Box::new(DowngradeRegionsHandler.into())),
Instruction::UpgradeRegion(_) => Ok(Box::new(UpgradeRegionsHandler.into())),
Instruction::GetFileRefs(_) => Ok(Box::new(GetFileRefsHandler.into())),
Instruction::GcRegions(_) => Ok(Box::new(GcRegionsHandler.into())),
Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
}
}
}
#[allow(clippy::enum_variant_names)]
pub enum InstructionHandlers {
CloseRegions(CloseRegionsHandler),
OpenRegions(OpenRegionsHandler),
FlushRegions(FlushRegionsHandler),
DowngradeRegions(DowngradeRegionsHandler),
UpgradeRegions(UpgradeRegionsHandler),
GetFileRefs(GetFileRefsHandler),
GcRegions(GcRegionsHandler),
}
macro_rules! impl_from_handler {
($($handler:ident => $variant:ident),*) => {
$(
impl From<$handler> for InstructionHandlers {
fn from(handler: $handler) -> Self {
InstructionHandlers::$variant(handler)
}
}
)*
};
}
impl_from_handler!(
CloseRegionsHandler => CloseRegions,
OpenRegionsHandler => OpenRegions,
FlushRegionsHandler => FlushRegions,
DowngradeRegionsHandler => DowngradeRegions,
UpgradeRegionsHandler => UpgradeRegions,
GetFileRefsHandler => GetFileRefs,
GcRegionsHandler => GcRegions
);
macro_rules! dispatch_instr {
(
$( $instr_variant:ident => $handler_variant:ident ),* $(,)?
) => {
impl InstructionHandlers {
pub async fn handle(
&self,
ctx: &HandlerContext,
instruction: Instruction,
) -> Option<InstructionReply> {
match (self, instruction) {
$(
(
InstructionHandlers::$handler_variant(handler),
Instruction::$instr_variant(instr),
) => handler.handle(ctx, instr).await,
)*
// Safety: must be used in pairs with `build_handler`.
_ => unreachable!(),
}
}
/// Check whether this instruction is acceptable by any handler.
pub fn is_acceptable(instruction: &Instruction) -> bool {
matches!(
instruction,
$(
Instruction::$instr_variant { .. }
)|*
)
}
}
};
}
dispatch_instr!(
CloseRegions => CloseRegions,
OpenRegions => OpenRegions,
FlushRegions => FlushRegions,
DowngradeRegions => DowngradeRegions,
UpgradeRegion => UpgradeRegions,
GetFileRefs => GetFileRefs,
GcRegions => GcRegions,
);
#[async_trait]
impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
matches!(ctx.incoming_message.as_ref(), |Some((
_,
Instruction::DowngradeRegions { .. },
))| Some((
_,
Instruction::UpgradeRegion { .. }
)) | Some((
_,
Instruction::FlushRegions { .. }
)) | Some((
_,
Instruction::OpenRegions { .. }
)) | Some((
_,
Instruction::CloseRegions { .. }
)))
if let Some((_, instruction)) = ctx.incoming_message.as_ref() {
return InstructionHandlers::is_acceptable(instruction);
}
false
}
async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
@@ -139,6 +219,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
let catchup_tasks = self.catchup_tasks.clone();
let downgrade_tasks = self.downgrade_tasks.clone();
let flush_tasks = self.flush_tasks.clone();
let gc_tasks = self.gc_tasks.clone();
let handler = self.build_handler(&instruction)?;
let _handle = common_runtime::spawn_global(async move {
let reply = handler
@@ -148,6 +229,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
catchup_tasks,
downgrade_tasks,
flush_tasks,
gc_tasks,
},
instruction,
)

View File

@@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::RegionIdent;
use common_meta::instruction::{InstructionReply, SimpleReply};
use common_telemetry::warn;
use futures::future::join_all;
use store_api::region_request::{RegionCloseRequest, RegionRequest};
@@ -26,13 +27,13 @@ pub struct CloseRegionsHandler;
#[async_trait::async_trait]
impl InstructionHandler for CloseRegionsHandler {
type Instruction = Vec<RegionIdent>;
async fn handle(
&self,
ctx: &HandlerContext,
instruction: Instruction,
region_idents: Self::Instruction,
) -> Option<InstructionReply> {
// Safety: must be `Instruction::CloseRegions` instruction.
let region_idents = instruction.into_close_regions().unwrap();
let region_ids = region_idents
.into_iter()
.map(|region_ident| RegionId::new(region_ident.table_id, region_ident.region_number))

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use common_meta::instruction::{
DowngradeRegion, DowngradeRegionReply, DowngradeRegionsReply, Instruction, InstructionReply,
DowngradeRegion, DowngradeRegionReply, DowngradeRegionsReply, InstructionReply,
};
use common_telemetry::tracing::info;
use common_telemetry::{error, warn};
@@ -156,13 +156,13 @@ impl DowngradeRegionsHandler {
#[async_trait::async_trait]
impl InstructionHandler for DowngradeRegionsHandler {
type Instruction = Vec<DowngradeRegion>;
async fn handle(
&self,
ctx: &HandlerContext,
instruction: Instruction,
downgrade_regions: Self::Instruction,
) -> Option<InstructionReply> {
// Safety: must be `Instruction::DowngradeRegion` instruction.
let downgrade_regions = instruction.into_downgrade_regions().unwrap();
let futures = downgrade_regions
.into_iter()
.map(|downgrade_region| Self::handle_downgrade_region(ctx, downgrade_region));
@@ -263,10 +263,10 @@ mod tests {
let reply = DowngradeRegionsHandler
.handle(
&handler_context,
Instruction::DowngradeRegions(vec![DowngradeRegion {
vec![DowngradeRegion {
region_id,
flush_timeout,
}]),
}],
)
.await;
@@ -306,10 +306,10 @@ mod tests {
let reply = DowngradeRegionsHandler
.handle(
&handler_context,
Instruction::DowngradeRegions(vec![DowngradeRegion {
vec![DowngradeRegion {
region_id,
flush_timeout,
}]),
}],
)
.await;
@@ -341,10 +341,10 @@ mod tests {
let reply = DowngradeRegionsHandler
.handle(
&handler_context,
Instruction::DowngradeRegions(vec![DowngradeRegion {
vec![DowngradeRegion {
region_id,
flush_timeout: Some(flush_timeout),
}]),
}],
)
.await;
@@ -380,10 +380,10 @@ mod tests {
let reply = DowngradeRegionsHandler
.handle(
&handler_context,
Instruction::DowngradeRegions(vec![DowngradeRegion {
vec![DowngradeRegion {
region_id,
flush_timeout,
}]),
}],
)
.await;
@@ -396,10 +396,10 @@ mod tests {
let reply = DowngradeRegionsHandler
.handle(
&handler_context,
Instruction::DowngradeRegions(vec![DowngradeRegion {
vec![DowngradeRegion {
region_id,
flush_timeout: Some(Duration::from_millis(500)),
}]),
}],
)
.await;
// Must less than 300 ms.
@@ -443,10 +443,10 @@ mod tests {
let reply = DowngradeRegionsHandler
.handle(
&handler_context,
Instruction::DowngradeRegions(vec![DowngradeRegion {
vec![DowngradeRegion {
region_id,
flush_timeout,
}]),
}],
)
.await;
let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
@@ -458,10 +458,10 @@ mod tests {
let reply = DowngradeRegionsHandler
.handle(
&handler_context,
Instruction::DowngradeRegions(vec![DowngradeRegion {
vec![DowngradeRegion {
region_id,
flush_timeout: Some(Duration::from_millis(500)),
}]),
}],
)
.await;
// Must less than 300 ms.
@@ -487,10 +487,10 @@ mod tests {
let reply = DowngradeRegionsHandler
.handle(
&handler_context,
Instruction::DowngradeRegions(vec![DowngradeRegion {
vec![DowngradeRegion {
region_id,
flush_timeout: None,
}]),
}],
)
.await;
let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
@@ -518,10 +518,10 @@ mod tests {
let reply = DowngradeRegionsHandler
.handle(
&handler_context,
Instruction::DowngradeRegions(vec![DowngradeRegion {
vec![DowngradeRegion {
region_id,
flush_timeout: None,
}]),
}],
)
.await;
let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];

View File

@@ -0,0 +1,62 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_error::ext::ErrorExt;
use common_meta::instruction::{GetFileRefs, GetFileRefsReply, InstructionReply};
use store_api::storage::FileRefsManifest;
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
pub struct GetFileRefsHandler;
#[async_trait::async_trait]
impl InstructionHandler for GetFileRefsHandler {
type Instruction = GetFileRefs;
async fn handle(
&self,
ctx: &HandlerContext,
get_file_refs: Self::Instruction,
) -> Option<InstructionReply> {
let region_server = &ctx.region_server;
// Get the MitoEngine
let Some(mito_engine) = region_server.mito_engine() else {
return Some(InstructionReply::GetFileRefs(GetFileRefsReply {
file_refs_manifest: FileRefsManifest::default(),
success: false,
error: Some("MitoEngine not found".to_string()),
}));
};
match mito_engine
.get_snapshot_of_unmanifested_refs(get_file_refs.region_ids)
.await
{
Ok(all_file_refs) => {
// Return the file references
Some(InstructionReply::GetFileRefs(GetFileRefsReply {
file_refs_manifest: all_file_refs,
success: true,
error: None,
}))
}
Err(e) => Some(InstructionReply::GetFileRefs(GetFileRefsReply {
file_refs_manifest: FileRefsManifest::default(),
success: false,
error: Some(format!("Failed to get file refs: {}", e.output_msg())),
})),
}
}
}

View File

@@ -15,7 +15,7 @@
use std::time::Instant;
use common_meta::instruction::{
FlushErrorStrategy, FlushRegionReply, FlushStrategy, Instruction, InstructionReply,
FlushErrorStrategy, FlushRegionReply, FlushRegions, FlushStrategy, InstructionReply,
};
use common_telemetry::{debug, warn};
use store_api::region_request::{RegionFlushRequest, RegionRequest};
@@ -28,13 +28,14 @@ pub struct FlushRegionsHandler;
#[async_trait::async_trait]
impl InstructionHandler for FlushRegionsHandler {
type Instruction = FlushRegions;
async fn handle(
&self,
ctx: &HandlerContext,
instruction: Instruction,
flush_regions: FlushRegions,
) -> Option<InstructionReply> {
let start_time = Instant::now();
let flush_regions = instruction.into_flush_regions().unwrap();
let strategy = flush_regions.strategy;
let region_ids = flush_regions.region_ids;
let error_strategy = flush_regions.error_strategy;
@@ -205,10 +206,7 @@ mod tests {
// Async hint mode
let flush_instruction = FlushRegions::async_batch(region_ids.clone());
let reply = FlushRegionsHandler
.handle(
&handler_context,
Instruction::FlushRegions(flush_instruction),
)
.handle(&handler_context, flush_instruction)
.await;
assert!(reply.is_none()); // Hint mode returns no reply
assert_eq!(*flushed_region_ids.read().unwrap(), region_ids);
@@ -218,10 +216,7 @@ mod tests {
let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::<Vec<_>>();
let flush_instruction = FlushRegions::async_batch(not_found_region_ids);
let reply = FlushRegionsHandler
.handle(
&handler_context,
Instruction::FlushRegions(flush_instruction),
)
.handle(&handler_context, flush_instruction)
.await;
assert!(reply.is_none());
assert!(flushed_region_ids.read().unwrap().is_empty());
@@ -247,10 +242,7 @@ mod tests {
let flush_instruction = FlushRegions::sync_single(region_id);
let reply = FlushRegionsHandler
.handle(
&handler_context,
Instruction::FlushRegions(flush_instruction),
)
.handle(&handler_context, flush_instruction)
.await;
let flush_reply = reply.unwrap().expect_flush_regions_reply();
assert!(flush_reply.overall_success);
@@ -287,10 +279,7 @@ mod tests {
let flush_instruction =
FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
let reply = FlushRegionsHandler
.handle(
&handler_context,
Instruction::FlushRegions(flush_instruction),
)
.handle(&handler_context, flush_instruction)
.await;
let flush_reply = reply.unwrap().expect_flush_regions_reply();
assert!(!flush_reply.overall_success); // Should fail due to non-existent regions
@@ -321,10 +310,7 @@ mod tests {
let flush_instruction =
FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
let reply = FlushRegionsHandler
.handle(
&handler_context,
Instruction::FlushRegions(flush_instruction),
)
.handle(&handler_context, flush_instruction)
.await;
let flush_reply = reply.unwrap().expect_flush_regions_reply();
assert!(!flush_reply.overall_success); // Should fail due to one non-existent region

View File

@@ -0,0 +1,156 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::instruction::{GcRegions, GcRegionsReply, InstructionReply};
use common_telemetry::{debug, warn};
use mito2::gc::LocalGcWorker;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{FileRefsManifest, RegionId};
use crate::error::{GcMitoEngineSnafu, InvalidGcArgsSnafu, Result, UnexpectedSnafu};
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
pub struct GcRegionsHandler;
#[async_trait::async_trait]
impl InstructionHandler for GcRegionsHandler {
type Instruction = GcRegions;
async fn handle(
&self,
ctx: &HandlerContext,
gc_regions: Self::Instruction,
) -> Option<InstructionReply> {
let region_ids = gc_regions.regions.clone();
debug!("Received gc regions instruction: {:?}", region_ids);
let is_same_table = region_ids.windows(2).all(|w| {
let t1 = w[0].table_id();
let t2 = w[1].table_id();
t1 == t2
});
if !is_same_table {
return Some(InstructionReply::GcRegions(GcRegionsReply {
result: Err(format!(
"Regions to GC should belong to the same table, found: {:?}",
region_ids
)),
}));
}
let (region_id, gc_worker) = match self
.create_gc_worker(
ctx,
region_ids,
&gc_regions.file_refs_manifest,
gc_regions.full_file_listing,
)
.await
{
Ok(worker) => worker,
Err(e) => {
return Some(InstructionReply::GcRegions(GcRegionsReply {
result: Err(format!("Failed to create GC worker: {}", e)),
}));
}
};
let register_result = ctx
.gc_tasks
.try_register(
region_id,
Box::pin(async move {
debug!("Starting gc worker for region {}", region_id);
let report = gc_worker
.run()
.await
.context(GcMitoEngineSnafu { region_id })?;
debug!("Gc worker for region {} finished", region_id);
Ok(report)
}),
)
.await;
if register_result.is_busy() {
warn!("Another gc task is running for the region: {region_id}");
return Some(InstructionReply::GcRegions(GcRegionsReply {
result: Err(format!(
"Another gc task is running for the region: {region_id}"
)),
}));
}
let mut watcher = register_result.into_watcher();
let result = ctx.gc_tasks.wait_until_finish(&mut watcher).await;
match result {
Ok(report) => Some(InstructionReply::GcRegions(GcRegionsReply {
result: Ok(report),
})),
Err(err) => Some(InstructionReply::GcRegions(GcRegionsReply {
result: Err(format!("{err:?}")),
})),
}
}
}
impl GcRegionsHandler {
async fn create_gc_worker(
&self,
ctx: &HandlerContext,
mut region_ids: Vec<RegionId>,
file_ref_manifest: &FileRefsManifest,
full_file_listing: bool,
) -> Result<(RegionId, LocalGcWorker)> {
// always use the smallest region id on datanode as the target region id
region_ids.sort_by_key(|r| r.region_number());
let mito_engine = ctx
.region_server
.mito_engine()
.with_context(|| UnexpectedSnafu {
violated: "MitoEngine not found".to_string(),
})?;
let region_id = *region_ids.first().with_context(|| UnexpectedSnafu {
violated: "No region ids provided".to_string(),
})?;
let mito_config = mito_engine.mito_config();
// Find the access layer from one of the regions that exists on this datanode
let access_layer = region_ids
.iter()
.find_map(|rid| mito_engine.find_region(*rid))
.with_context(|| InvalidGcArgsSnafu {
msg: format!(
"None of the regions is on current datanode:{:?}",
region_ids
),
})?
.access_layer();
let cache_manager = mito_engine.cache_manager();
let gc_worker = LocalGcWorker::try_new(
access_layer.clone(),
Some(cache_manager),
region_ids.into_iter().collect(),
Default::default(),
mito_config.clone().into(),
file_ref_manifest.clone(),
&mito_engine.gc_limiter(),
full_file_listing,
)
.await
.context(GcMitoEngineSnafu { region_id })?;
Ok((region_id, gc_worker))
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply};
use common_meta::wal_options_allocator::prepare_wal_options;
use store_api::path_utils::table_dir;
use store_api::region_request::{PathType, RegionOpenRequest};
@@ -26,13 +26,12 @@ pub struct OpenRegionsHandler {
#[async_trait::async_trait]
impl InstructionHandler for OpenRegionsHandler {
type Instruction = Vec<OpenRegion>;
async fn handle(
&self,
ctx: &HandlerContext,
instruction: Instruction,
open_regions: Self::Instruction,
) -> Option<InstructionReply> {
let open_regions = instruction.into_open_regions().unwrap();
let requests = open_regions
.into_iter()
.map(|open_region| {

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply};
use common_meta::instruction::{InstructionReply, UpgradeRegion, UpgradeRegionReply};
use common_telemetry::{info, warn};
use store_api::region_request::{RegionCatchupRequest, RegionRequest, ReplayCheckpoint};
@@ -24,12 +24,12 @@ pub struct UpgradeRegionsHandler;
#[async_trait::async_trait]
impl InstructionHandler for UpgradeRegionsHandler {
type Instruction = UpgradeRegion;
async fn handle(
&self,
ctx: &HandlerContext,
instruction: Instruction,
) -> Option<InstructionReply> {
let UpgradeRegion {
UpgradeRegion {
region_id,
last_entry_id,
metadata_last_entry_id,
@@ -37,8 +37,8 @@ impl InstructionHandler for UpgradeRegionsHandler {
location_id,
replay_entry_id,
metadata_replay_entry_id,
} = instruction.into_upgrade_regions().unwrap();
}: UpgradeRegion,
) -> Option<InstructionReply> {
let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: false,
@@ -138,7 +138,7 @@ impl InstructionHandler for UpgradeRegionsHandler {
mod tests {
use std::time::Duration;
use common_meta::instruction::{Instruction, UpgradeRegion};
use common_meta::instruction::UpgradeRegion;
use mito2::engine::MITO_ENGINE_NAME;
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
@@ -164,11 +164,11 @@ mod tests {
let reply = UpgradeRegionsHandler
.handle(
&handler_context,
Instruction::UpgradeRegion(UpgradeRegion {
UpgradeRegion {
region_id,
replay_timeout,
..Default::default()
}),
},
)
.await;
@@ -201,11 +201,11 @@ mod tests {
let reply = UpgradeRegionsHandler
.handle(
&handler_context,
Instruction::UpgradeRegion(UpgradeRegion {
UpgradeRegion {
region_id,
replay_timeout,
..Default::default()
}),
},
)
.await;
@@ -239,11 +239,11 @@ mod tests {
let reply = UpgradeRegionsHandler
.handle(
&handler_context,
Instruction::UpgradeRegion(UpgradeRegion {
UpgradeRegion {
region_id,
replay_timeout,
..Default::default()
}),
},
)
.await;
@@ -280,11 +280,11 @@ mod tests {
let reply = UpgradeRegionsHandler
.handle(
&handler_context,
Instruction::UpgradeRegion(UpgradeRegion {
UpgradeRegion {
region_id,
replay_timeout,
..Default::default()
}),
},
)
.await;
@@ -298,11 +298,11 @@ mod tests {
let reply = UpgradeRegionsHandler
.handle(
&handler_context,
Instruction::UpgradeRegion(UpgradeRegion {
UpgradeRegion {
region_id,
replay_timeout: Some(Duration::from_millis(500)),
..Default::default()
}),
},
)
.await;
// Must less than 300 ms.
@@ -339,10 +339,10 @@ mod tests {
let reply = UpgradeRegionsHandler
.handle(
&handler_context,
Instruction::UpgradeRegion(UpgradeRegion {
UpgradeRegion {
region_id,
..Default::default()
}),
},
)
.await;
@@ -355,11 +355,11 @@ mod tests {
let reply = UpgradeRegionsHandler
.handle(
&handler_context,
Instruction::UpgradeRegion(UpgradeRegion {
UpgradeRegion {
region_id,
replay_timeout: Some(Duration::from_millis(200)),
..Default::default()
}),
},
)
.await;

View File

@@ -158,6 +158,27 @@ impl RegionServer {
}
}
/// Gets the MitoEngine if it's registered.
pub fn mito_engine(&self) -> Option<MitoEngine> {
if let Some(mito) = self.inner.mito_engine.read().unwrap().clone() {
Some(mito)
} else {
self.inner
.engines
.read()
.unwrap()
.get(MITO_ENGINE_NAME)
.cloned()
.and_then(|e| {
let mito = e.as_any().downcast_ref::<MitoEngine>().cloned();
if mito.is_none() {
warn!("Mito engine not found in region server engines");
}
mito
})
}
}
#[tracing::instrument(skip_all)]
pub async fn handle_batch_open_requests(
&self,
@@ -676,14 +697,14 @@ struct RegionServerInner {
runtime: Runtime,
event_listener: RegionServerEventListenerRef,
table_provider_factory: TableProviderFactoryRef,
// The number of queries allowed to be executed at the same time.
// Act as last line of defense on datanode to prevent query overloading.
/// The number of queries allowed to be executed at the same time.
/// Act as last line of defense on datanode to prevent query overloading.
parallelism: Option<RegionServerParallelism>,
// The topic stats reporter.
/// The topic stats reporter.
topic_stats_reporter: RwLock<Option<Box<dyn TopicStatsReporter>>>,
// HACK(zhongzc): Direct MitoEngine handle for diagnostics. This couples the
// server with a concrete engine; acceptable for now to fetch Mito-specific
// info (e.g., list SSTs). Consider a diagnostics trait later.
/// HACK(zhongzc): Direct MitoEngine handle for diagnostics. This couples the
/// server with a concrete engine; acceptable for now to fetch Mito-specific
/// info (e.g., list SSTs). Consider a diagnostics trait later.
mito_engine: RwLock<Option<MitoEngine>>,
}

View File

@@ -47,10 +47,7 @@ pub(crate) async fn new_object_store_without_cache(
Ok(object_store)
}
pub(crate) async fn new_object_store(
store: ObjectStoreConfig,
data_home: &str,
) -> Result<ObjectStore> {
pub async fn new_object_store(store: ObjectStoreConfig, data_home: &str) -> Result<ObjectStore> {
let object_store = new_raw_object_store(&store, data_home)
.await
.context(error::ObjectStoreSnafu)?;
@@ -59,7 +56,7 @@ pub(crate) async fn new_object_store(
let object_store = {
// It's safe to unwrap here because we already checked above.
let cache_config = store.cache_config().unwrap();
if let Some(cache_layer) = build_cache_layer(cache_config).await? {
if let Some(cache_layer) = build_cache_layer(cache_config, data_home).await? {
// Adds cache layer
object_store.layer(cache_layer)
} else {
@@ -79,17 +76,22 @@ pub(crate) async fn new_object_store(
async fn build_cache_layer(
cache_config: &ObjectStorageCacheConfig,
data_home: &str,
) -> Result<Option<LruCacheLayer<impl Access>>> {
// No need to build cache layer if read cache is disabled.
if !cache_config.enable_read_cache {
return Ok(None);
}
let atomic_temp_dir = join_dir(&cache_config.cache_path, ATOMIC_WRITE_DIR);
let cache_base_dir = if cache_config.cache_path.is_empty() {
data_home
} else {
&cache_config.cache_path
};
let atomic_temp_dir = join_dir(cache_base_dir, ATOMIC_WRITE_DIR);
clean_temp_dir(&atomic_temp_dir).context(error::ObjectStoreSnafu)?;
let cache_store = Fs::default()
.root(&cache_config.cache_path)
.root(cache_base_dir)
.atomic_write_dir(&atomic_temp_dir)
.build()
.context(error::BuildCacheStoreSnafu)?;

View File

@@ -348,9 +348,9 @@ impl ConcreteDataType {
}
}
pub fn as_json(&self) -> Option<JsonType> {
pub fn as_json(&self) -> Option<&JsonType> {
match self {
ConcreteDataType::Json(j) => Some(j.clone()),
ConcreteDataType::Json(j) => Some(j),
_ => None,
}
}

View File

@@ -259,6 +259,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to merge JSON datatype: {reason}"))]
MergeJsonDatatype {
reason: String,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -281,7 +288,8 @@ impl ErrorExt for Error {
| InvalidJsonb { .. }
| InvalidVector { .. }
| InvalidFulltextOption { .. }
| InvalidSkippingIndexOption { .. } => StatusCode::InvalidArguments,
| InvalidSkippingIndexOption { .. }
| MergeJsonDatatype { .. } => StatusCode::InvalidArguments,
ValueExceedsPrecision { .. }
| CastType { .. }

View File

@@ -30,7 +30,7 @@ use snafu::{ResultExt, ensure};
use crate::data_type::{ConcreteDataType, DataType};
use crate::error::{self, Error};
use crate::types::{StructField, StructType};
use crate::types::{ListType, StructField, StructType};
use crate::value::{ListValue, StructValue, Value};
/// The configuration of JSON encoding
@@ -375,8 +375,8 @@ fn encode_json_value_with_context<'a>(
}
Json::Array(arr) => {
let list_value = encode_json_array_with_context(arr, expected_type, context)?;
let data_type = list_value.datatype().clone();
Ok((Value::List(list_value), (*data_type).clone()))
let datatype = ConcreteDataType::List(ListType::new(list_value.datatype()));
Ok((Value::List(list_value), datatype))
}
Json::Object(obj) => {
let struct_value = encode_json_object_with_context(obj, None, context)?;

View File

@@ -12,7 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use std::str::FromStr;
use std::sync::Arc;
use arrow::datatypes::DataType as ArrowDataType;
use arrow_schema::Fields;
@@ -21,10 +23,13 @@ use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use crate::data_type::DataType;
use crate::error::{DeserializeSnafu, InvalidJsonSnafu, InvalidJsonbSnafu, Result};
use crate::error::{
DeserializeSnafu, InvalidJsonSnafu, InvalidJsonbSnafu, MergeJsonDatatypeSnafu, Result,
};
use crate::prelude::ConcreteDataType;
use crate::scalars::ScalarVectorBuilder;
use crate::type_id::LogicalTypeId;
use crate::types::{ListType, StructField, StructType};
use crate::value::Value;
use crate::vectors::{BinaryVectorBuilder, MutableVector};
@@ -48,11 +53,101 @@ impl JsonType {
pub fn new(format: JsonFormat) -> Self {
Self { format }
}
// TODO(LFC): remove "allow unused"
#[allow(unused)]
/// Make json type a struct type, by:
/// - if the json is an object, its entries are mapped to struct fields, obviously;
/// - if not, the json is one of bool, number, string or array, make it a special field called
/// "__plain" in a struct with only that field.
pub(crate) fn as_struct_type(&self) -> StructType {
match &self.format {
JsonFormat::Jsonb => StructType::default(),
JsonFormat::Native(inner) => match inner.as_ref() {
ConcreteDataType::Struct(t) => t.clone(),
x => StructType::new(Arc::new(vec![StructField::new(
"__plain".to_string(),
x.clone(),
true,
)])),
},
}
}
// TODO(LFC): remove "allow unused"
#[allow(unused)]
/// Try to merge this json type with others, error on datatype conflict.
pub(crate) fn merge(&mut self, other: &JsonType) -> Result<()> {
match (&self.format, &other.format) {
(JsonFormat::Jsonb, JsonFormat::Jsonb) => Ok(()),
(JsonFormat::Native(this), JsonFormat::Native(that)) => {
let merged = merge(this.as_ref(), that.as_ref())?;
self.format = JsonFormat::Native(Box::new(merged));
Ok(())
}
_ => MergeJsonDatatypeSnafu {
reason: "json format not match",
}
.fail(),
}
}
}
fn merge(this: &ConcreteDataType, that: &ConcreteDataType) -> Result<ConcreteDataType> {
match (this, that) {
(this, that) if this == that => Ok(this.clone()),
(ConcreteDataType::List(this), ConcreteDataType::List(that)) => {
merge_list(this, that).map(ConcreteDataType::List)
}
(ConcreteDataType::Struct(this), ConcreteDataType::Struct(that)) => {
merge_struct(this, that).map(ConcreteDataType::Struct)
}
(ConcreteDataType::Null(_), x) | (x, ConcreteDataType::Null(_)) => Ok(x.clone()),
_ => MergeJsonDatatypeSnafu {
reason: format!("datatypes have conflict, this: {this}, that: {that}"),
}
.fail(),
}
}
fn merge_list(this: &ListType, that: &ListType) -> Result<ListType> {
let merged = merge(this.item_type(), that.item_type())?;
Ok(ListType::new(Arc::new(merged)))
}
fn merge_struct(this: &StructType, that: &StructType) -> Result<StructType> {
let this = Arc::unwrap_or_clone(this.fields());
let that = Arc::unwrap_or_clone(that.fields());
let mut this: BTreeMap<String, StructField> = this
.into_iter()
.map(|x| (x.name().to_string(), x))
.collect();
// merge "that" into "this" directly:
for that_field in that {
let field_name = that_field.name().to_string();
if let Some(this_field) = this.get(&field_name) {
let merged_field = StructField::new(
field_name.clone(),
merge(this_field.data_type(), that_field.data_type())?,
true, // the value in json object must be always nullable
);
this.insert(field_name, merged_field);
} else {
this.insert(field_name, that_field);
}
}
let fields = this.into_values().collect::<Vec<_>>();
Ok(StructType::new(Arc::new(fields)))
}
impl DataType for JsonType {
fn name(&self) -> String {
JSON_TYPE_NAME.to_string()
match &self.format {
JsonFormat::Jsonb => JSON_TYPE_NAME.to_string(),
JsonFormat::Native(x) => format!("Json<{x}>"),
}
}
fn logical_type_id(&self) -> LogicalTypeId {
@@ -106,3 +201,95 @@ pub fn parse_string_to_jsonb(s: &str) -> Result<Vec<u8>> {
.map_err(|_| InvalidJsonSnafu { value: s }.build())
.map(|json| json.to_vec())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::json::JsonStructureSettings;
#[test]
fn test_merge_json_type() -> Result<()> {
fn test(
json: &str,
json_type: &mut JsonType,
expected: std::result::Result<&str, &str>,
) -> Result<()> {
let json: serde_json::Value = serde_json::from_str(json).unwrap();
let settings = JsonStructureSettings::Structured(None);
let value = settings.encode(json)?;
let value_type = value.data_type();
let Some(other) = value_type.as_json() else {
unreachable!()
};
let result = json_type.merge(other);
match (result, expected) {
(Ok(()), Ok(expected)) => {
assert_eq!(json_type.name(), expected)
}
(Err(err), Err(expected)) => {
assert_eq!(err.to_string(), expected)
}
_ => unreachable!(),
}
Ok(())
}
let json_type = &mut JsonType::new(JsonFormat::Native(Box::new(
ConcreteDataType::null_datatype(),
)));
// can merge with json object:
let json = r#"{
"hello": "world",
"list": [1, 2, 3],
"object": {"a": 1}
}"#;
let expected =
r#"Json<Struct<"hello": String, "list": List<Int64>, "object": Struct<"a": Int64>>>"#;
test(json, json_type, Ok(expected))?;
// cannot merge with other non-object json values:
let jsons = [r#""s""#, "1", "[1]"];
let expects = [
r#"Failed to merge JSON datatype: datatypes have conflict, this: Struct<"hello": String, "list": List<Int64>, "object": Struct<"a": Int64>>, that: String"#,
r#"Failed to merge JSON datatype: datatypes have conflict, this: Struct<"hello": String, "list": List<Int64>, "object": Struct<"a": Int64>>, that: Int64"#,
r#"Failed to merge JSON datatype: datatypes have conflict, this: Struct<"hello": String, "list": List<Int64>, "object": Struct<"a": Int64>>, that: List<Int64>"#,
];
for (json, expect) in jsons.into_iter().zip(expects.into_iter()) {
test(json, json_type, Err(expect))?;
}
// cannot merge with other json object with conflict field datatype:
let json = r#"{
"hello": 1,
"float": 0.123,
"no": 42
}"#;
let expected =
r#"Failed to merge JSON datatype: datatypes have conflict, this: String, that: Int64"#;
test(json, json_type, Err(expected))?;
// can merge with another json object:
let json = r#"{
"hello": "greptime",
"float": 0.123,
"int": 42
}"#;
let expected = r#"Json<Struct<"float": Float64, "hello": String, "int": Int64, "list": List<Int64>, "object": Struct<"a": Int64>>>"#;
test(json, json_type, Ok(expected))?;
// can merge with some complex nested json object:
let json = r#"{
"list": [4],
"object": {"foo": "bar", "l": ["x"], "o": {"key": "value"}},
"float": 0.456,
"int": 0
}"#;
let expected = r#"Json<Struct<"float": Float64, "hello": String, "int": Int64, "list": List<Int64>, "object": Struct<"a": Int64, "foo": String, "l": List<String>, "o": Struct<"key": String>>>>"#;
test(json, json_type, Ok(expected))?;
Ok(())
}
}

View File

@@ -52,7 +52,7 @@ impl DataType for StructType {
"Struct<{}>",
self.fields
.iter()
.map(|f| f.name())
.map(|f| format!(r#""{}": {}"#, f.name(), f.data_type()))
.collect::<Vec<_>>()
.join(", ")
)

View File

@@ -18,6 +18,7 @@ use std::collections::BTreeSet;
use std::sync::Arc;
use catalog::CatalogManagerRef;
use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::BoxedError;
use common_meta::key::flow::FlowMetadataManagerRef;
use common_recordbatch::{RecordBatch, RecordBatches, SendableRecordBatchStream};
@@ -396,8 +397,8 @@ impl RefillTask {
// we don't need information from query context in this query so a default query context is enough
let query_ctx = Arc::new(
QueryContextBuilder::default()
.current_catalog("greptime".to_string())
.current_schema("public".to_string())
.current_catalog(DEFAULT_CATALOG_NAME.to_string())
.current_schema(DEFAULT_SCHEMA_NAME.to_string())
.build(),
);

View File

@@ -45,6 +45,7 @@ use crate::service_config::{
pub struct FrontendOptions {
pub node_id: Option<String>,
pub default_timezone: Option<String>,
pub default_column_prefix: Option<String>,
pub heartbeat: HeartbeatOptions,
pub http: HttpOptions,
pub grpc: GrpcOptions,
@@ -77,6 +78,7 @@ impl Default for FrontendOptions {
Self {
node_id: None,
default_timezone: None,
default_column_prefix: None,
heartbeat: HeartbeatOptions::frontend_default(),
http: HttpOptions::default(),
grpc: GrpcOptions::default(),

View File

@@ -104,6 +104,9 @@ impl HeartbeatTask {
match resp_stream.message().await {
Ok(Some(resp)) => {
debug!("Receiving heartbeat response: {:?}", resp);
if let Some(message) = &resp.mailbox_message {
info!("Received mailbox message: {message:?}");
}
let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
if let Err(e) = capture_self.handle_response(ctx).await {
error!(e; "Error while handling heartbeat response");

View File

@@ -17,7 +17,9 @@ use std::sync::Arc;
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_catalog::consts::{TRACE_TABLE_NAME, trace_services_table_name};
use common_catalog::consts::{
TRACE_TABLE_NAME, trace_operations_table_name, trace_services_table_name,
};
use common_function::function::FunctionRef;
use common_function::scalars::json::json_get::{
JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString,
@@ -76,8 +78,6 @@ impl JaegerQueryHandler for Instance {
ctx: QueryContextRef,
service_name: &str,
span_kind: Option<&str>,
start_time: Option<i64>,
end_time: Option<i64>,
) -> ServerResult<Output> {
let mut filters = vec![col(SERVICE_NAME_COLUMN).eq(lit(service_name))];
@@ -89,16 +89,6 @@ impl JaegerQueryHandler for Instance {
))));
}
if let Some(start_time) = start_time {
// Microseconds to nanoseconds.
filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time * 1_000)));
}
if let Some(end_time) = end_time {
// Microseconds to nanoseconds.
filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time * 1_000)));
}
// It's equivalent to the following SQL query:
//
// ```
@@ -107,8 +97,6 @@ impl JaegerQueryHandler for Instance {
// {db}.{trace_table}
// WHERE
// service_name = '{service_name}' AND
// timestamp >= {start_time} AND
// timestamp <= {end_time} AND
// span_kind = '{span_kind}'
// ORDER BY
// span_name ASC
@@ -301,12 +289,18 @@ async fn query_trace_table(
.unwrap_or(TRACE_TABLE_NAME);
// If only select services, use the trace services table.
// If querying operations (distinct by span_name and span_kind), use the trace operations table.
let table_name = {
if match selects.as_slice() {
[SelectExpr::Expression(x)] => x == &col(SERVICE_NAME_COLUMN),
_ => false,
} {
&trace_services_table_name(trace_table_name)
} else if !distincts.is_empty()
&& distincts.contains(&col(SPAN_NAME_COLUMN))
&& distincts.contains(&col(SPAN_KIND_COLUMN))
{
&trace_operations_table_name(trace_table_name)
} else {
trace_table_name
}

View File

@@ -77,6 +77,7 @@ struct PersistRegionStat<'a> {
sst_size: u64,
write_bytes_delta: u64,
#[col(
// This col name is for the information schema table, so we don't touch it
name = "greptime_timestamp",
semantic = "Timestamp",
datatype = "TimestampMillisecond"

View File

@@ -254,7 +254,7 @@ mod tests {
assert_eq!(status, http::StatusCode::OK);
assert_eq!(
body,
"[[{\"timestamp_millis\":3,\"id\":0,\"addr\":\"127.0.0.1:3001\",\"rcus\":0,\"wcus\":0,\"region_num\":0,\"region_stats\":[],\"topic_stats\":[],\"node_epoch\":0,\"datanode_workloads\":{\"types\":[]}}]]"
"[[{\"timestamp_millis\":3,\"id\":0,\"addr\":\"127.0.0.1:3001\",\"rcus\":0,\"wcus\":0,\"region_num\":0,\"region_stats\":[],\"topic_stats\":[],\"node_epoch\":0,\"datanode_workloads\":{\"types\":[]},\"gc_stat\":null}]]"
);
}
}

View File

@@ -240,6 +240,7 @@ impl DataRegion {
#[cfg(test)]
mod test {
use common_query::prelude::{greptime_timestamp, greptime_value};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
@@ -300,8 +301,8 @@ mod test {
.map(|c| &c.column_schema.name)
.collect::<Vec<_>>();
let expected = vec![
"greptime_timestamp",
"greptime_value",
greptime_timestamp(),
greptime_value(),
"__table_id",
"__tsid",
"job",

View File

@@ -224,6 +224,7 @@ mod test {
use api::v1::SemanticType;
use common_meta::ddl::test_util::assert_column_name_and_id;
use common_meta::ddl::utils::{parse_column_metadatas, parse_manifest_infos_from_extensions};
use common_query::prelude::{greptime_timestamp, greptime_value};
use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
@@ -295,7 +296,7 @@ mod test {
.unwrap();
assert_eq!(semantic_type, SemanticType::Tag);
let timestamp_index = metadata_region
.column_semantic_type(physical_region_id, logical_region_id, "greptime_timestamp")
.column_semantic_type(physical_region_id, logical_region_id, greptime_timestamp())
.await
.unwrap()
.unwrap();
@@ -305,8 +306,8 @@ mod test {
assert_column_name_and_id(
&column_metadatas,
&[
("greptime_timestamp", 0),
("greptime_value", 1),
(greptime_timestamp(), 0),
(greptime_value(), 1),
("__table_id", ReservedColumnId::table_id()),
("__tsid", ReservedColumnId::tsid()),
("job", 2),
@@ -364,8 +365,8 @@ mod test {
assert_column_name_and_id(
&column_metadatas,
&[
("greptime_timestamp", 0),
("greptime_value", 1),
(greptime_timestamp(), 0),
(greptime_value(), 1),
("__table_id", ReservedColumnId::table_id()),
("__tsid", ReservedColumnId::tsid()),
("job", 2),

View File

@@ -619,6 +619,7 @@ pub(crate) fn region_options_for_metadata_region(
mod test {
use common_meta::ddl::test_util::assert_column_name_and_id;
use common_meta::ddl::utils::{parse_column_metadatas, parse_manifest_infos_from_extensions};
use common_query::prelude::{greptime_timestamp, greptime_value};
use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
use store_api::region_request::BatchRegionDdlRequest;
@@ -856,8 +857,8 @@ mod test {
assert_column_name_and_id(
&column_metadatas,
&[
("greptime_timestamp", 0),
("greptime_value", 1),
(greptime_timestamp(), 0),
(greptime_value(), 1),
("__table_id", ReservedColumnId::table_id()),
("__tsid", ReservedColumnId::tsid()),
("job", 2),

View File

@@ -110,6 +110,7 @@ mod tests {
use std::collections::HashMap;
use api::v1::SemanticType;
use common_query::prelude::greptime_timestamp;
use common_telemetry::info;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
@@ -243,7 +244,7 @@ mod tests {
.unwrap();
assert_eq!(semantic_type, SemanticType::Tag);
let timestamp_index = metadata_region
.column_semantic_type(physical_region_id, logical_region_id, "greptime_timestamp")
.column_semantic_type(physical_region_id, logical_region_id, greptime_timestamp())
.await
.unwrap()
.unwrap();

View File

@@ -17,6 +17,7 @@
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema as PbColumnSchema, Row, SemanticType, Value};
use common_meta::ddl::utils::parse_column_metadatas;
use common_query::prelude::{greptime_timestamp, greptime_value};
use common_telemetry::debug;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
@@ -132,7 +133,7 @@ impl TestEnv {
column_id: 0,
semantic_type: SemanticType::Timestamp,
column_schema: ColumnSchema::new(
"greptime_timestamp",
greptime_timestamp(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
@@ -141,7 +142,7 @@ impl TestEnv {
column_id: 1,
semantic_type: SemanticType::Field,
column_schema: ColumnSchema::new(
"greptime_value",
greptime_value(),
ConcreteDataType::float64_datatype(),
false,
),
@@ -204,8 +205,8 @@ impl TestEnv {
assert_eq!(
column_names,
vec![
"greptime_timestamp",
"greptime_value",
greptime_timestamp(),
greptime_value(),
"__table_id",
"__tsid",
"job",
@@ -300,7 +301,7 @@ pub fn create_logical_region_request(
column_id: 0,
semantic_type: SemanticType::Timestamp,
column_schema: ColumnSchema::new(
"greptime_timestamp",
greptime_timestamp(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
@@ -309,7 +310,7 @@ pub fn create_logical_region_request(
column_id: 1,
semantic_type: SemanticType::Field,
column_schema: ColumnSchema::new(
"greptime_value",
greptime_value(),
ConcreteDataType::float64_datatype(),
false,
),
@@ -372,14 +373,14 @@ pub fn alter_logical_region_request(tags: &[&str]) -> RegionAlterRequest {
pub fn row_schema_with_tags(tags: &[&str]) -> Vec<PbColumnSchema> {
let mut schema = vec![
PbColumnSchema {
column_name: "greptime_timestamp".to_string(),
column_name: greptime_timestamp().to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
semantic_type: SemanticType::Timestamp as _,
datatype_extension: None,
options: None,
},
PbColumnSchema {
column_name: "greptime_value".to_string(),
column_name: greptime_value().to_string(),
datatype: ColumnDataType::Float64 as i32,
semantic_type: SemanticType::Field as _,
datatype_extension: None,

View File

@@ -15,6 +15,7 @@ common-base.workspace = true
common-decimal.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
common-telemetry.workspace = true
common-time.workspace = true

View File

@@ -154,6 +154,7 @@ mod tests {
use std::sync::Arc;
use api::v1::SemanticType;
use common_query::prelude::{greptime_timestamp, greptime_value};
use datafusion_common::Column;
use datafusion_expr::{BinaryExpr, Expr, Literal, Operator};
use datatypes::prelude::ConcreteDataType;
@@ -193,7 +194,7 @@ mod tests {
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"greptime_value",
greptime_value(),
ConcreteDataType::float64_datatype(),
false,
),
@@ -202,7 +203,7 @@ mod tests {
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"greptime_timestamp",
greptime_timestamp(),
ConcreteDataType::timestamp_nanosecond_datatype(),
false,
),

View File

@@ -385,6 +385,7 @@ mod tests {
use std::sync::Arc;
use api::v1::SemanticType;
use common_query::prelude::{greptime_timestamp, greptime_value};
use common_time::Timestamp;
use common_time::timestamp::TimeUnit;
use datatypes::schema::ColumnSchema;
@@ -461,7 +462,7 @@ mod tests {
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"greptime_value",
greptime_value(),
ConcreteDataType::float64_datatype(),
false,
),
@@ -470,7 +471,7 @@ mod tests {
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"greptime_timestamp",
greptime_timestamp(),
ConcreteDataType::timestamp_nanosecond_datatype(),
false,
),

View File

@@ -72,7 +72,7 @@ pub struct Metrics {
}
impl Metrics {
pub(crate) fn new(write_type: WriteType) -> Self {
pub fn new(write_type: WriteType) -> Self {
Self {
write_type,
iter_source: Default::default(),
@@ -255,12 +255,12 @@ impl AccessLayer {
&self,
request: SstWriteRequest,
write_opts: &WriteOptions,
write_type: WriteType,
) -> Result<(SstInfoArray, Metrics)> {
metrics: &mut Metrics,
) -> Result<SstInfoArray> {
let region_id = request.metadata.region_id;
let cache_manager = request.cache_manager.clone();
let (sst_info, metrics) = if let Some(write_cache) = cache_manager.write_cache() {
let sst_info = if let Some(write_cache) = cache_manager.write_cache() {
// Write to the write cache.
write_cache
.write_and_upload_sst(
@@ -273,7 +273,7 @@ impl AccessLayer {
remote_store: self.object_store.clone(),
},
write_opts,
write_type,
metrics,
)
.await?
} else {
@@ -303,11 +303,11 @@ impl AccessLayer {
request.index_config,
indexer_builder,
path_provider,
Metrics::new(write_type),
metrics,
)
.await
.with_file_cleaner(cleaner);
let ssts = match request.source {
match request.source {
Either::Left(source) => {
writer
.write_all(source, request.max_sequence, write_opts)
@@ -316,9 +316,7 @@ impl AccessLayer {
Either::Right(flat_source) => {
writer.write_all_flat(flat_source, write_opts).await?
}
};
let metrics = writer.into_metrics();
(ssts, metrics)
}
};
// Put parquet metadata to cache manager.
@@ -333,7 +331,7 @@ impl AccessLayer {
}
}
Ok((sst_info, metrics))
Ok(sst_info)
}
/// Puts encoded SST bytes to the write cache (if enabled) and uploads it to the object store.

View File

@@ -15,7 +15,7 @@
use std::ops::Range;
use std::sync::Arc;
use api::v1::index::BloomFilterMeta;
use api::v1::index::{BloomFilterLoc, BloomFilterMeta};
use async_trait::async_trait;
use bytes::Bytes;
use index::bloom_filter::error::Result;
@@ -60,11 +60,17 @@ impl BloomFilterIndexCache {
/// Calculates weight for bloom filter index metadata.
fn bloom_filter_index_metadata_weight(
k: &(FileId, ColumnId, Tag),
_: &Arc<BloomFilterMeta>,
meta: &Arc<BloomFilterMeta>,
) -> u32 {
(k.0.as_bytes().len()
let base = k.0.as_bytes().len()
+ std::mem::size_of::<ColumnId>()
+ std::mem::size_of::<BloomFilterMeta>()) as u32
+ std::mem::size_of::<Tag>()
+ std::mem::size_of::<BloomFilterMeta>();
let vec_estimated = meta.segment_loc_indices.len() * std::mem::size_of::<u64>()
+ meta.bloom_filter_locs.len() * std::mem::size_of::<BloomFilterLoc>();
(base + vec_estimated) as u32
}
/// Calculates weight for bloom filter index content.
@@ -171,6 +177,45 @@ mod test {
const FUZZ_REPEAT_TIMES: usize = 100;
#[test]
fn bloom_filter_metadata_weight_counts_vec_contents() {
let file_id = FileId::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
let column_id: ColumnId = 42;
let tag = Tag::Skipping;
let meta = BloomFilterMeta {
rows_per_segment: 128,
segment_count: 2,
row_count: 256,
bloom_filter_size: 1024,
segment_loc_indices: vec![0, 64, 128, 192],
bloom_filter_locs: vec![
BloomFilterLoc {
offset: 0,
size: 512,
element_count: 1000,
},
BloomFilterLoc {
offset: 512,
size: 512,
element_count: 1000,
},
],
};
let weight =
bloom_filter_index_metadata_weight(&(file_id, column_id, tag), &Arc::new(meta.clone()));
let base = file_id.as_bytes().len()
+ std::mem::size_of::<ColumnId>()
+ std::mem::size_of::<Tag>()
+ std::mem::size_of::<BloomFilterMeta>();
let expected_dynamic = meta.segment_loc_indices.len() * std::mem::size_of::<u64>()
+ meta.bloom_filter_locs.len() * std::mem::size_of::<BloomFilterLoc>();
assert_eq!(weight as usize, base + expected_dynamic);
}
#[test]
fn fuzz_index_calculation() {
let mut rng = rand::rng();

View File

@@ -169,8 +169,8 @@ impl WriteCache {
write_request: SstWriteRequest,
upload_request: SstUploadRequest,
write_opts: &WriteOptions,
write_type: WriteType,
) -> Result<(SstInfoArray, Metrics)> {
metrics: &mut Metrics,
) -> Result<SstInfoArray> {
let region_id = write_request.metadata.region_id;
let store = self.file_cache.local_store();
@@ -197,7 +197,7 @@ impl WriteCache {
write_request.index_config,
indexer,
path_provider.clone(),
Metrics::new(write_type),
metrics,
)
.await
.with_file_cleaner(cleaner);
@@ -210,11 +210,10 @@ impl WriteCache {
}
either::Right(flat_source) => writer.write_all_flat(flat_source, write_opts).await?,
};
let mut metrics = writer.into_metrics();
// Upload sst file to remote object store.
if sst_info.is_empty() {
return Ok((sst_info, metrics));
return Ok(sst_info);
}
let mut upload_tracker = UploadTracker::new(region_id);
@@ -256,7 +255,7 @@ impl WriteCache {
return Err(err);
}
Ok((sst_info, metrics))
Ok(sst_info)
}
/// Removes a file from the cache by `index_key`.
@@ -559,8 +558,9 @@ mod tests {
};
// Write to cache and upload sst to mock remote store
let (mut sst_infos, _) = write_cache
.write_and_upload_sst(write_request, upload_request, &write_opts, WriteType::Flush)
let mut metrics = Metrics::new(WriteType::Flush);
let mut sst_infos = write_cache
.write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
.await
.unwrap();
let sst_info = sst_infos.remove(0);
@@ -655,8 +655,9 @@ mod tests {
remote_store: mock_store.clone(),
};
let (mut sst_infos, _) = write_cache
.write_and_upload_sst(write_request, upload_request, &write_opts, WriteType::Flush)
let mut metrics = Metrics::new(WriteType::Flush);
let mut sst_infos = write_cache
.write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
.await
.unwrap();
let sst_info = sst_infos.remove(0);
@@ -735,8 +736,9 @@ mod tests {
remote_store: mock_store.clone(),
};
let mut metrics = Metrics::new(WriteType::Flush);
write_cache
.write_and_upload_sst(write_request, upload_request, &write_opts, WriteType::Flush)
.write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
.await
.unwrap_err();
let atomic_write_dir = write_cache_dir.path().join(ATOMIC_WRITE_DIR);

View File

@@ -30,7 +30,9 @@ use store_api::metadata::RegionMetadataRef;
use store_api::region_request::PathType;
use store_api::storage::RegionId;
use crate::access_layer::{AccessLayer, AccessLayerRef, OperationType, SstWriteRequest, WriteType};
use crate::access_layer::{
AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType,
};
use crate::cache::{CacheManager, CacheManagerRef};
use crate::compaction::picker::{PickerOutput, new_picker};
use crate::compaction::{CompactionSstReaderBuilder, find_ttl};
@@ -387,7 +389,8 @@ impl Compactor for DefaultCompactor {
let reader = builder.build_sst_reader().await?;
either::Left(Source::Reader(reader))
};
let (sst_infos, metrics) = sst_layer
let mut metrics = Metrics::new(WriteType::Compaction);
let sst_infos = sst_layer
.write_sst(
SstWriteRequest {
op_type: OperationType::Compact,
@@ -403,7 +406,7 @@ impl Compactor for DefaultCompactor {
bloom_filter_index_config,
},
&write_opts,
WriteType::Compaction,
&mut metrics,
)
.await?;
// Convert partition expression once outside the map

View File

@@ -25,6 +25,7 @@ use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use crate::error::Result;
use crate::gc::GcConfig;
use crate::memtable::MemtableConfig;
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
@@ -148,6 +149,8 @@ pub struct MitoConfig {
/// Whether to enable experimental flat format as the default format.
/// When enabled, forces using BulkMemtable and BulkMemtableBuilder.
pub default_experimental_flat_format: bool,
pub gc: GcConfig,
}
impl Default for MitoConfig {
@@ -186,6 +189,7 @@ impl Default for MitoConfig {
memtable: MemtableConfig::default(),
min_compaction_interval: Duration::from_secs(0),
default_experimental_flat_format: false,
gc: GcConfig::default(),
};
// Adjust buffer and cache size according to system memory if we can.

View File

@@ -102,7 +102,7 @@ use store_api::region_engine::{
};
use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
use store_api::storage::{FileId, RegionId, ScanRequest, SequenceNumber};
use store_api::storage::{FileId, FileRefsManifest, RegionId, ScanRequest, SequenceNumber};
use tokio::sync::{Semaphore, oneshot};
use crate::access_layer::RegionFilePathFactory;
@@ -115,6 +115,7 @@ use crate::error::{
};
#[cfg(feature = "enterprise")]
use crate::extension::BoxedExtensionRangeProviderFactory;
use crate::gc::GcLimiterRef;
use crate::manifest::action::RegionEdit;
use crate::memtable::MemtableStats;
use crate::metrics::HANDLE_REQUEST_ELAPSED;
@@ -261,6 +262,33 @@ impl MitoEngine {
self.inner.workers.file_ref_manager()
}
pub fn gc_limiter(&self) -> GcLimiterRef {
self.inner.workers.gc_limiter()
}
/// Get all tmp ref files for given region ids, excluding files that's already in manifest.
pub async fn get_snapshot_of_unmanifested_refs(
&self,
region_ids: impl IntoIterator<Item = RegionId>,
) -> Result<FileRefsManifest> {
let file_ref_mgr = self.file_ref_manager();
let region_ids = region_ids.into_iter().collect::<Vec<_>>();
// Convert region IDs to MitoRegionRef objects, error if any region doesn't exist
let regions: Vec<MitoRegionRef> = region_ids
.into_iter()
.map(|region_id| {
self.find_region(region_id)
.with_context(|| RegionNotFoundSnafu { region_id })
})
.collect::<Result<_>>()?;
file_ref_mgr
.get_snapshot_of_unmanifested_refs(regions)
.await
}
/// Returns true if the specific region exists.
pub fn is_region_exists(&self, region_id: RegionId) -> bool {
self.inner.workers.is_region_exists(region_id)
@@ -357,7 +385,7 @@ impl MitoEngine {
self.find_region(id)
}
pub(crate) fn find_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
pub fn find_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
self.inner.workers.get_region(region_id)
}

View File

@@ -1121,6 +1121,12 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("GC job permit exhausted"))]
TooManyGcJobs {
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1291,7 +1297,7 @@ impl ErrorExt for Error {
InconsistentTimestampLength { .. } => StatusCode::InvalidArguments,
TooManyFilesToRead { .. } => StatusCode::RateLimited,
TooManyFilesToRead { .. } | TooManyGcJobs { .. } => StatusCode::RateLimited,
}
}

View File

@@ -525,21 +525,19 @@ impl RegionFlushTask {
let source = Either::Left(source);
let write_request = self.new_write_request(version, max_sequence, source);
let (ssts_written, metrics) = self
let mut metrics = Metrics::new(WriteType::Flush);
let ssts_written = self
.access_layer
.write_sst(write_request, &write_opts, WriteType::Flush)
.write_sst(write_request, &write_opts, &mut metrics)
.await?;
if ssts_written.is_empty() {
// No data written.
continue;
}
common_telemetry::debug!(
debug!(
"Region {} flush one memtable, num_mem_ranges: {}, num_rows: {}, metrics: {:?}",
self.region_id,
num_mem_ranges,
num_mem_rows,
metrics
self.region_id, num_mem_ranges, num_mem_rows, metrics
);
flush_metrics = flush_metrics.merge(metrics);
@@ -591,9 +589,11 @@ impl RegionFlushTask {
let semaphore = self.flush_semaphore.clone();
let task = common_runtime::spawn_global(async move {
let _permit = semaphore.acquire().await.unwrap();
access_layer
.write_sst(write_request, &write_opts, WriteType::Flush)
.await
let mut metrics = Metrics::new(WriteType::Flush);
let ssts = access_layer
.write_sst(write_request, &write_opts, &mut metrics)
.await?;
Ok((ssts, metrics))
});
tasks.push(task);
}

View File

@@ -22,14 +22,17 @@
//!
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use common_telemetry::{error, info, warn};
use common_meta::datanode::GcStat;
use common_telemetry::{debug, error, info, warn};
use common_time::Timestamp;
use object_store::{Entry, Lister};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt as _, ensure};
use store_api::storage::{FileId, RegionId};
use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId};
use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
use tokio_stream::StreamExt;
use crate::access_layer::AccessLayerRef;
@@ -37,26 +40,64 @@ use crate::cache::CacheManagerRef;
use crate::config::MitoConfig;
use crate::error::{
DurationOutOfRangeSnafu, EmptyRegionDirSnafu, JoinSnafu, OpenDalSnafu, RegionNotFoundSnafu,
Result, UnexpectedSnafu,
Result, TooManyGcJobsSnafu, UnexpectedSnafu,
};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions};
use crate::manifest::storage::manifest_compress_type;
use crate::metrics::GC_FILE_CNT;
use crate::metrics::GC_DEL_FILE_CNT;
use crate::region::opener::new_manifest_dir;
use crate::sst::file::delete_files;
use crate::sst::file_ref::TableFileRefsManifest;
use crate::sst::location::{self, region_dir_from_table_dir};
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct GcReport {
/// deleted files per region
pub deleted_files: HashMap<RegionId, Vec<FileId>>,
/// Regions that need retry in next gc round, usually because their tmp ref files are outdated
pub need_retry_regions: HashSet<RegionId>,
/// Limit the amount of concurrent GC jobs on the datanode
pub struct GcLimiter {
pub gc_job_limit: Arc<tokio::sync::Semaphore>,
gc_concurrency: usize,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct FileGcOption {
pub type GcLimiterRef = Arc<GcLimiter>;
impl GcLimiter {
pub fn new(gc_concurrency: usize) -> Self {
Self {
gc_job_limit: Arc::new(tokio::sync::Semaphore::new(gc_concurrency)),
gc_concurrency,
}
}
pub fn running_gc_tasks(&self) -> u32 {
(self.gc_concurrency - self.gc_job_limit.available_permits()) as u32
}
pub fn gc_concurrency(&self) -> u32 {
self.gc_concurrency as u32
}
pub fn gc_stat(&self) -> GcStat {
GcStat::new(self.running_gc_tasks(), self.gc_concurrency())
}
/// Try to acquire a permit for a GC job.
///
/// If no permit is available, returns an `TooManyGcJobs` error.
pub fn permit(&self) -> Result<OwnedSemaphorePermit> {
self.gc_job_limit
.clone()
.try_acquire_owned()
.map_err(|e| match e {
TryAcquireError::Closed => UnexpectedSnafu {
reason: format!("Failed to acquire gc permit: {e}"),
}
.build(),
TryAcquireError::NoPermits => TooManyGcJobsSnafu {}.build(),
})
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct GcConfig {
/// Whether GC is enabled.
pub enable: bool,
/// Lingering time before deleting files.
/// Should be long enough to allow long running queries to finish.
///
@@ -73,16 +114,22 @@ pub struct FileGcOption {
/// Maximum concurrent list operations per GC job.
/// This is used to limit the number of concurrent listing operations and speed up listing.
pub max_concurrent_lister_per_gc_job: usize,
/// Maximum concurrent GC jobs.
/// This is used to limit the number of concurrent GC jobs running on the datanode
/// to prevent too many concurrent GC jobs from overwhelming the datanode.
pub max_concurrent_gc_job: usize,
}
impl Default for FileGcOption {
impl Default for GcConfig {
fn default() -> Self {
Self {
enable: false,
// expect long running queries to be finished within a reasonable time
lingering_time: Duration::from_secs(60 * 5),
// 6 hours, for unknown expel time, which is when this file get removed from manifest, it should rarely happen, can keep it longer
unknown_file_lingering_time: Duration::from_secs(60 * 60 * 6),
max_concurrent_lister_per_gc_job: 32,
max_concurrent_gc_job: 4,
}
}
}
@@ -92,13 +139,23 @@ pub struct LocalGcWorker {
pub(crate) cache_manager: Option<CacheManagerRef>,
pub(crate) manifest_mgrs: HashMap<RegionId, RegionManifestManager>,
/// Lingering time before deleting files.
pub(crate) opt: FileGcOption,
pub(crate) opt: GcConfig,
pub(crate) manifest_open_config: ManifestOpenConfig,
/// Tmp ref files manifest, used to determine which files are still in use by ongoing queries.
///
/// Also contains manifest versions of regions when the tmp ref files are generated.
/// Used to determine whether the tmp ref files are outdated.
pub(crate) file_ref_manifest: TableFileRefsManifest,
pub(crate) file_ref_manifest: FileRefsManifest,
_permit: OwnedSemaphorePermit,
/// Whether to perform full file listing during GC.
/// When set to false, GC will only delete files that are tracked in the manifest's removed_files,
/// which can significantly improve performance by avoiding expensive list operations.
/// When set to true, GC will perform a full listing to find and delete orphan files
/// (files not tracked in the manifest).
///
/// Set to false for regular GC operations to optimize performance.
/// Set to true periodically or when you need to clean up orphan files.
pub full_file_listing: bool,
}
pub struct ManifestOpenConfig {
@@ -125,13 +182,16 @@ impl LocalGcWorker {
/// Create a new LocalGcWorker, with `regions_to_gc` regions to GC.
/// The regions are specified by their `RegionId` and should all belong to the same table.
///
#[allow(clippy::too_many_arguments)]
pub async fn try_new(
access_layer: AccessLayerRef,
cache_manager: Option<CacheManagerRef>,
regions_to_gc: BTreeSet<RegionId>,
opt: FileGcOption,
opt: GcConfig,
manifest_open_config: ManifestOpenConfig,
file_ref_manifest: TableFileRefsManifest,
file_ref_manifest: FileRefsManifest,
limiter: &GcLimiterRef,
full_file_listing: bool,
) -> Result<Self> {
let table_id = regions_to_gc
.first()
@@ -139,6 +199,7 @@ impl LocalGcWorker {
reason: "Expect at least one region, found none",
})?
.table_id();
let permit = limiter.permit()?;
let mut zelf = Self {
access_layer,
cache_manager,
@@ -146,6 +207,8 @@ impl LocalGcWorker {
opt,
manifest_open_config,
file_ref_manifest,
_permit: permit,
full_file_listing,
};
// dedup just in case
@@ -193,15 +256,15 @@ impl LocalGcWorker {
// TODO(discord9): verify manifest version before reading tmp ref files
let mut tmp_ref_files = HashMap::new();
for file_ref in &self.file_ref_manifest.file_refs {
if outdated_regions.contains(&file_ref.region_id) {
for (region_id, file_refs) in &self.file_ref_manifest.file_refs {
if outdated_regions.contains(region_id) {
// skip outdated regions
continue;
}
tmp_ref_files
.entry(file_ref.region_id)
.entry(*region_id)
.or_insert_with(HashSet::new)
.insert(file_ref.file_id);
.extend(file_refs.clone());
}
Ok(tmp_ref_files)
@@ -220,14 +283,14 @@ impl LocalGcWorker {
let mut deleted_files = HashMap::new();
let tmp_ref_files = self.read_tmp_ref_files(&mut outdated_regions).await?;
for region_id in self.manifest_mgrs.keys() {
info!("Doing gc for region {}", region_id);
debug!("Doing gc for region {}", region_id);
let tmp_ref_files = tmp_ref_files
.get(region_id)
.cloned()
.unwrap_or_else(HashSet::new);
let files = self.do_region_gc(*region_id, &tmp_ref_files).await?;
deleted_files.insert(*region_id, files);
info!("Gc for region {} finished", region_id);
debug!("Gc for region {} finished", region_id);
}
info!(
"LocalGcWorker finished after {} secs.",
@@ -244,7 +307,7 @@ impl LocalGcWorker {
impl LocalGcWorker {
/// concurrency of listing files per region.
/// This is used to limit the number of concurrent listing operations and speed up listing
pub const CONCURRENCY_LIST_PER_FILES: usize = 512;
pub const CONCURRENCY_LIST_PER_FILES: usize = 1024;
/// Perform GC for the region.
/// 1. Get all the removed files in delta manifest files and their expel times
@@ -259,7 +322,7 @@ impl LocalGcWorker {
region_id: RegionId,
tmp_ref_files: &HashSet<FileId>,
) -> Result<Vec<FileId>> {
info!("Doing gc for region {}", region_id);
debug!("Doing gc for region {}", region_id);
let manifest = self
.manifest_mgrs
.get(&region_id)
@@ -272,10 +335,10 @@ impl LocalGcWorker {
if recently_removed_files.is_empty() {
// no files to remove, skip
info!("No recently removed files to gc for region {}", region_id);
debug!("No recently removed files to gc for region {}", region_id);
}
info!(
debug!(
"Found {} recently removed files sets for region {}",
recently_removed_files.len(),
region_id
@@ -291,27 +354,20 @@ impl LocalGcWorker {
.chain(tmp_ref_files.clone().into_iter())
.collect();
let true_tmp_ref_files = tmp_ref_files
.iter()
.filter(|f| !current_files.contains_key(f))
.collect::<HashSet<_>>();
info!("True tmp ref files: {:?}", true_tmp_ref_files);
let unused_files = self
.list_to_be_deleted_files(region_id, in_used, recently_removed_files, concurrency)
.await?;
let unused_len = unused_files.len();
info!(
debug!(
"Found {} unused files to delete for region {}",
unused_len, region_id
);
self.delete_files(region_id, &unused_files).await?;
info!(
debug!(
"Successfully deleted {} unused files for region {}",
unused_len, region_id
);
@@ -329,7 +385,8 @@ impl LocalGcWorker {
)
.await?;
GC_FILE_CNT.add(file_ids.len() as i64);
// FIXME(discord9): if files are already deleted before calling delete_files, the metric will be inaccurate, no clean way to fix it now
GC_DEL_FILE_CNT.add(file_ids.len() as i64);
Ok(())
}
@@ -491,7 +548,7 @@ impl LocalGcWorker {
entries: Vec<Entry>,
in_use_filenames: &HashSet<&FileId>,
may_linger_filenames: &HashSet<&FileId>,
all_files_appear_in_delta_manifests: &HashSet<&FileId>,
eligible_for_removal: &HashSet<&FileId>,
unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
) -> (Vec<FileId>, HashSet<FileId>) {
let mut all_unused_files_ready_for_delete = vec![];
@@ -515,7 +572,7 @@ impl LocalGcWorker {
let should_delete = !in_use_filenames.contains(&file_id)
&& !may_linger_filenames.contains(&file_id)
&& {
if !all_files_appear_in_delta_manifests.contains(&file_id) {
if !eligible_for_removal.contains(&file_id) {
// if the file's expel time is unknown(because not appear in delta manifest), we keep it for a while
// using it's last modified time
// notice unknown files use a different lingering time
@@ -541,6 +598,11 @@ impl LocalGcWorker {
/// Concurrently list unused files in the region dir
/// because there may be a lot of files in the region dir
/// and listing them may take a long time.
///
/// When `full_file_listing` is false, this method will only delete files tracked in
/// `recently_removed_files` without performing expensive list operations, which significantly
/// improves performance. When `full_file_listing` is true, it performs a full listing to
/// find and delete orphan files.
pub async fn list_to_be_deleted_files(
&self,
region_id: RegionId,
@@ -548,6 +610,7 @@ impl LocalGcWorker {
recently_removed_files: BTreeMap<Timestamp, HashSet<FileId>>,
concurrency: usize,
) -> Result<Vec<FileId>> {
let start = tokio::time::Instant::now();
let now = chrono::Utc::now();
let may_linger_until = now
- chrono::Duration::from_std(self.opt.lingering_time).with_context(|_| {
@@ -569,7 +632,7 @@ impl LocalGcWorker {
let may_linger_files = recently_removed_files.split_off(&threshold);
let may_linger_filenames = may_linger_files.values().flatten().collect::<HashSet<_>>();
let all_files_appear_in_delta_manifests = recently_removed_files
let eligible_for_removal = recently_removed_files
.values()
.flatten()
.collect::<HashSet<_>>();
@@ -577,23 +640,56 @@ impl LocalGcWorker {
// in use filenames, include sst and index files
let in_use_filenames = in_used.iter().collect::<HashSet<_>>();
// When full_file_listing is false, skip expensive list operations and only delete
// files that are tracked in recently_removed_files
if !self.full_file_listing {
// Only delete files that:
// 1. Are in recently_removed_files (tracked in manifest)
// 2. Are not in use
// 3. Have passed the lingering time
let files_to_delete: Vec<FileId> = eligible_for_removal
.iter()
.filter(|file_id| !in_use_filenames.contains(*file_id))
.map(|&f| *f)
.collect();
info!(
"gc: fast mode (no full listing) cost {} secs for region {}, found {} files to delete from manifest",
start.elapsed().as_secs_f64(),
region_id,
files_to_delete.len()
);
return Ok(files_to_delete);
}
// Full file listing mode: perform expensive list operations to find orphan files
// Step 1: Create partitioned listers for concurrent processing
let listers = self.partition_region_files(region_id, concurrency).await?;
let lister_cnt = listers.len();
// Step 2: Concurrently list all files in the region directory
let all_entries = self.list_region_files_concurrent(listers).await?;
let cnt = all_entries.len();
// Step 3: Filter files to determine which ones can be deleted
let (all_unused_files_ready_for_delete, all_in_exist_linger_files) = self
.filter_deletable_files(
all_entries,
&in_use_filenames,
&may_linger_filenames,
&all_files_appear_in_delta_manifests,
&eligible_for_removal,
unknown_file_may_linger_until,
);
info!("All in exist linger files: {:?}", all_in_exist_linger_files);
info!(
"gc: full listing mode cost {} secs using {lister_cnt} lister for {cnt} files in region {}, found {} unused files to delete",
start.elapsed().as_secs_f64(),
region_id,
all_unused_files_ready_for_delete.len()
);
debug!("All in exist linger files: {:?}", all_in_exist_linger_files);
Ok(all_unused_files_ready_for_delete)
}

View File

@@ -47,7 +47,7 @@ pub mod schedule;
pub mod sst;
mod time_provider;
pub mod wal;
mod worker;
pub mod worker;
#[cfg_attr(doc, aquamarine::aquamarine)]
/// # Mito developer document

View File

@@ -384,6 +384,7 @@ mod tests {
use api::v1::helper::{field_column_schema, row, tag_column_schema, time_index_column_schema};
use api::v1::value::ValueData;
use api::v1::{Mutation, OpType, Rows, SemanticType};
use common_query::prelude::{greptime_timestamp, greptime_value};
use common_time::Timestamp;
use datafusion_common::Column;
use datafusion_expr::{BinaryExpr, Expr, Literal, Operator};
@@ -694,7 +695,7 @@ mod tests {
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"greptime_timestamp",
greptime_timestamp(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
@@ -703,7 +704,7 @@ mod tests {
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"greptime_value",
greptime_value(),
ConcreteDataType::float64_datatype(),
true,
),

View File

@@ -922,7 +922,9 @@ impl ValueBuilder {
)
};
mutable_vector.push_nulls(num_rows - 1);
let _ = mutable_vector.push(field_value);
mutable_vector
.push(field_value)
.unwrap_or_else(|e| panic!("unexpected field value: {e:?}"));
self.fields[idx] = Some(mutable_vector);
MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.inc();
}

View File

@@ -437,7 +437,7 @@ lazy_static! {
"mito stalled write request in each worker",
&[WORKER_LABEL]
).unwrap();
/// Number of ref files per table
/// Number of ref files
pub static ref GC_REF_FILE_CNT: IntGauge = register_int_gauge!(
"greptime_gc_ref_file_count",
"gc ref file count",
@@ -458,9 +458,9 @@ lazy_static! {
.unwrap();
/// Counter for the number of files deleted by the GC worker.
pub static ref GC_FILE_CNT: IntGauge =
pub static ref GC_DEL_FILE_CNT: IntGauge =
register_int_gauge!(
"greptime_mito_gc_file_count",
"greptime_mito_gc_delete_file_count",
"mito gc deleted file count",
).unwrap();
}

View File

@@ -1106,9 +1106,8 @@ impl ScanInput {
rows
}
/// Returns table predicate of all exprs.
pub(crate) fn predicate(&self) -> Option<&Predicate> {
self.predicate.predicate()
pub(crate) fn predicate_group(&self) -> &PredicateGroup {
&self.predicate
}
/// Returns number of memtables to scan.

View File

@@ -632,8 +632,12 @@ impl RegionScanner for SeqScan {
Ok(())
}
fn has_predicate(&self) -> bool {
let predicate = self.stream_ctx.input.predicate();
fn has_predicate_without_region(&self) -> bool {
let predicate = self
.stream_ctx
.input
.predicate_group()
.predicate_without_region();
predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
}

View File

@@ -314,8 +314,12 @@ impl RegionScanner for SeriesScan {
Ok(())
}
fn has_predicate(&self) -> bool {
let predicate = self.stream_ctx.input.predicate();
fn has_predicate_without_region(&self) -> bool {
let predicate = self
.stream_ctx
.input
.predicate_group()
.predicate_without_region();
predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
}

View File

@@ -427,8 +427,14 @@ impl RegionScanner for UnorderedScan {
.map_err(BoxedError::new)
}
fn has_predicate(&self) -> bool {
let predicate = self.stream_ctx.input.predicate();
/// If this scanner have predicate other than region partition exprs
fn has_predicate_without_region(&self) -> bool {
let predicate = self
.stream_ctx
.input
.predicate_group()
.predicate_without_region();
predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
}

View File

@@ -565,6 +565,10 @@ impl MitoRegion {
Ok(())
}
pub fn access_layer(&self) -> AccessLayerRef {
self.access_layer.clone()
}
/// Returns the SST entries of the region.
pub async fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
let table_dir = self.table_dir();

View File

@@ -295,8 +295,8 @@ impl FileHandle {
}
/// Returns the complete file path of the file.
pub fn file_path(&self, file_dir: &str, path_type: PathType) -> String {
location::sst_file_path(file_dir, self.file_id(), path_type)
pub fn file_path(&self, table_dir: &str, path_type: PathType) -> String {
location::sst_file_path(table_dir, self.file_id(), path_type)
}
/// Returns the time range of the file.

View File

@@ -162,6 +162,7 @@ impl FilePurger for ObjectStoreFilePurger {
// notice that no matter whether the file is deleted or not, we need to remove the reference
// because the file is no longer in use nonetheless.
self.file_ref_manager.remove_file(&file_meta);
// TODO(discord9): consider impl a .tombstone file to reduce files needed to list
}
fn new_file(&self, file_meta: &FileMeta) {

View File

@@ -17,38 +17,23 @@ use std::sync::Arc;
use common_telemetry::debug;
use dashmap::{DashMap, Entry};
use serde::{Deserialize, Serialize};
use store_api::ManifestVersion;
use store_api::storage::{FileId, RegionId, TableId};
use store_api::storage::{FileRef, FileRefsManifest, RegionId};
use crate::error::Result;
use crate::metrics::GC_REF_FILE_CNT;
use crate::region::RegionMapRef;
use crate::region::MitoRegionRef;
use crate::sst::file::FileMeta;
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct FileRef {
pub region_id: RegionId,
pub file_id: FileId,
}
impl FileRef {
pub fn new(region_id: RegionId, file_id: FileId) -> Self {
Self { region_id, file_id }
}
}
/// File references for a table.
/// It contains all files referenced by the table.
/// File references for a region.
/// It contains all files referenced by the region.
#[derive(Debug, Clone, Default)]
pub struct TableFileRefs {
pub struct RegionFileRefs {
/// (FileRef, Ref Count) meaning how many FileHandleInner is opened for this file.
pub files: HashMap<FileRef, usize>,
}
/// Manages all file references in one datanode.
/// It keeps track of which files are referenced and group by table ids.
/// And periodically update the references to tmp file in object storage.
/// This is useful for ensuring that files are not deleted while they are still in use by any
/// query.
#[derive(Debug)]
@@ -56,33 +41,24 @@ pub struct FileReferenceManager {
/// Datanode id. used to determine tmp ref file name.
node_id: Option<u64>,
/// TODO(discord9): use no hash hasher since table id is sequential.
files_per_table: DashMap<TableId, TableFileRefs>,
files_per_region: DashMap<RegionId, RegionFileRefs>,
}
pub type FileReferenceManagerRef = Arc<FileReferenceManager>;
/// The tmp file uploaded to object storage to record one table's file references.
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct TableFileRefsManifest {
pub file_refs: HashSet<FileRef>,
/// Manifest version when this manifest is read for it's files
pub manifest_version: HashMap<RegionId, ManifestVersion>,
}
impl FileReferenceManager {
pub fn new(node_id: Option<u64>) -> Self {
Self {
node_id,
files_per_table: Default::default(),
files_per_region: Default::default(),
}
}
fn ref_file_set(&self, table_id: TableId) -> Option<HashSet<FileRef>> {
let file_refs = if let Some(file_refs) = self.files_per_table.get(&table_id) {
fn ref_file_set(&self, region_id: RegionId) -> Option<HashSet<FileRef>> {
let file_refs = if let Some(file_refs) = self.files_per_region.get(&region_id) {
file_refs.clone()
} else {
// still return an empty manifest to indicate no files are referenced.
// and differentiate from error case where table_id not found.
// region id not found.
return None;
};
@@ -95,8 +71,8 @@ impl FileReferenceManager {
let ref_file_set: HashSet<FileRef> = file_refs.files.keys().cloned().collect();
debug!(
"Get file refs for table {}, node {:?}, {} files",
table_id,
"Get file refs for region {}, node {:?}, {} files",
region_id,
self.node_id,
ref_file_set.len(),
);
@@ -120,22 +96,19 @@ impl FileReferenceManager {
#[allow(unused)]
pub(crate) async fn get_snapshot_of_unmanifested_refs(
&self,
table_id: TableId,
region_map: &RegionMapRef,
) -> Result<TableFileRefsManifest> {
let Some(ref_files) = self.ref_file_set(table_id) else {
return Ok(Default::default());
};
let region_list = region_map.list_regions();
let table_regions = region_list
.iter()
.filter(|r| r.region_id().table_id() == table_id)
.collect::<Vec<_>>();
regions: Vec<MitoRegionRef>,
) -> Result<FileRefsManifest> {
let mut ref_files = HashMap::new();
for region_id in regions.iter().map(|r| r.region_id()) {
if let Some(files) = self.ref_file_set(region_id) {
ref_files.insert(region_id, files);
}
}
let mut in_manifest_files = HashSet::new();
let mut manifest_version = HashMap::new();
for r in &table_regions {
for r in &regions {
let manifest = r.manifest_ctx.manifest().await;
let files = manifest.files.keys().cloned().collect::<Vec<_>>();
in_manifest_files.extend(files);
@@ -144,11 +117,18 @@ impl FileReferenceManager {
let ref_files_excluding_in_manifest = ref_files
.iter()
.filter(|f| !in_manifest_files.contains(&f.file_id))
.cloned()
.collect::<HashSet<_>>();
Ok(TableFileRefsManifest {
.map(|(r, f)| {
(
*r,
f.iter()
.filter_map(|f| {
(!in_manifest_files.contains(&f.file_id)).then_some(f.file_id)
})
.collect::<HashSet<_>>(),
)
})
.collect();
Ok(FileRefsManifest {
file_refs: ref_files_excluding_in_manifest,
manifest_version,
})
@@ -158,12 +138,12 @@ impl FileReferenceManager {
/// Also records the access layer for the table if not exists.
/// The access layer will be used to upload ref file to object storage.
pub fn add_file(&self, file_meta: &FileMeta) {
let table_id = file_meta.region_id.table_id();
let region_id = file_meta.region_id;
let mut is_new = false;
{
let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id);
self.files_per_table
.entry(table_id)
self.files_per_region
.entry(region_id)
.and_modify(|refs| {
refs.files
.entry(file_ref.clone())
@@ -173,7 +153,7 @@ impl FileReferenceManager {
1
});
})
.or_insert_with(|| TableFileRefs {
.or_insert_with(|| RegionFileRefs {
files: HashMap::from_iter([(file_ref, 1)]),
});
}
@@ -185,14 +165,14 @@ impl FileReferenceManager {
/// Removes a file reference.
/// If the reference count reaches zero, the file reference will be removed from the manager.
pub fn remove_file(&self, file_meta: &FileMeta) {
let table_id = file_meta.region_id.table_id();
let region_id = file_meta.region_id;
let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id);
let mut remove_table_entry = false;
let mut remove_file_ref = false;
let mut file_cnt = 0;
let table_ref = self.files_per_table.entry(table_id).and_modify(|refs| {
let region_ref = self.files_per_region.entry(region_id).and_modify(|refs| {
let entry = refs.files.entry(file_ref.clone()).and_modify(|count| {
if *count > 0 {
*count -= 1;
@@ -214,7 +194,7 @@ impl FileReferenceManager {
}
});
if let Entry::Occupied(o) = table_ref
if let Entry::Occupied(o) = region_ref
&& remove_table_entry
{
o.remove_entry();
@@ -234,7 +214,7 @@ mod tests {
use std::num::NonZeroU64;
use smallvec::SmallVec;
use store_api::storage::RegionId;
use store_api::storage::{FileId, RegionId};
use super::*;
use crate::sst::file::{FileMeta, FileTimeRange, IndexType, RegionFileId};
@@ -265,54 +245,69 @@ mod tests {
file_ref_mgr.add_file(&file_meta);
assert_eq!(
file_ref_mgr.files_per_table.get(&0).unwrap().files,
file_ref_mgr
.files_per_region
.get(&file_meta.region_id)
.unwrap()
.files,
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
);
file_ref_mgr.add_file(&file_meta);
let expected_table_ref_manifest =
let expected_region_ref_manifest =
HashSet::from_iter([FileRef::new(file_meta.region_id, file_meta.file_id)]);
assert_eq!(
file_ref_mgr.ref_file_set(0).unwrap(),
expected_table_ref_manifest
file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
expected_region_ref_manifest
);
assert_eq!(
file_ref_mgr.files_per_table.get(&0).unwrap().files,
file_ref_mgr
.files_per_region
.get(&file_meta.region_id)
.unwrap()
.files,
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 2)])
);
assert_eq!(
file_ref_mgr.ref_file_set(0).unwrap(),
expected_table_ref_manifest
file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
expected_region_ref_manifest
);
file_ref_mgr.remove_file(&file_meta);
assert_eq!(
file_ref_mgr.files_per_table.get(&0).unwrap().files,
file_ref_mgr
.files_per_region
.get(&file_meta.region_id)
.unwrap()
.files,
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
);
assert_eq!(
file_ref_mgr.ref_file_set(0).unwrap(),
expected_table_ref_manifest
file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
expected_region_ref_manifest
);
file_ref_mgr.remove_file(&file_meta);
assert!(
file_ref_mgr.files_per_table.get(&0).is_none(),
file_ref_mgr
.files_per_region
.get(&file_meta.region_id)
.is_none(),
"{:?}",
file_ref_mgr.files_per_table
file_ref_mgr.files_per_region
);
assert!(
file_ref_mgr.ref_file_set(0).is_none(),
file_ref_mgr.ref_file_set(file_meta.region_id).is_none(),
"{:?}",
file_ref_mgr.files_per_table
file_ref_mgr.files_per_region
);
}
}

View File

@@ -791,7 +791,7 @@ mod tests {
use tokio::sync::mpsc;
use super::*;
use crate::access_layer::{FilePathProvider, SstWriteRequest, WriteType};
use crate::access_layer::{FilePathProvider, Metrics, SstWriteRequest, WriteType};
use crate::cache::write_cache::WriteCache;
use crate::config::{FulltextIndexConfig, IndexBuildMode, MitoConfig, Mode};
use crate::memtable::time_partition::TimePartitions;
@@ -927,11 +927,11 @@ mod tests {
fulltext_index_config: Default::default(),
bloom_filter_index_config: Default::default(),
};
let mut metrics = Metrics::new(WriteType::Flush);
env.access_layer
.write_sst(write_request, &WriteOptions::default(), WriteType::Flush)
.write_sst(write_request, &WriteOptions::default(), &mut metrics)
.await
.unwrap()
.0
.remove(0)
}

View File

@@ -181,13 +181,14 @@ mod tests {
..Default::default()
};
let mut metrics = Metrics::new(WriteType::Flush);
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
IndexConfig::default(),
NoopIndexBuilder,
file_path,
Metrics::new(WriteType::Flush),
&mut metrics,
)
.await;
@@ -243,6 +244,7 @@ mod tests {
..Default::default()
};
// Prepare data.
let mut metrics = Metrics::new(WriteType::Flush);
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
@@ -251,7 +253,7 @@ mod tests {
FixedPathProvider {
region_file_id: handle.file_id(),
},
Metrics::new(WriteType::Flush),
&mut metrics,
)
.await;
@@ -329,6 +331,7 @@ mod tests {
// write the sst file and get sst info
// sst info contains the parquet metadata, which is converted from FileMetaData
let mut metrics = Metrics::new(WriteType::Flush);
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
@@ -337,7 +340,7 @@ mod tests {
FixedPathProvider {
region_file_id: handle.file_id(),
},
Metrics::new(WriteType::Flush),
&mut metrics,
)
.await;
@@ -378,6 +381,7 @@ mod tests {
..Default::default()
};
// Prepare data.
let mut metrics = Metrics::new(WriteType::Flush);
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
@@ -386,7 +390,7 @@ mod tests {
FixedPathProvider {
region_file_id: handle.file_id(),
},
Metrics::new(WriteType::Flush),
&mut metrics,
)
.await;
writer
@@ -437,6 +441,7 @@ mod tests {
..Default::default()
};
// Prepare data.
let mut metrics = Metrics::new(WriteType::Flush);
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
@@ -445,7 +450,7 @@ mod tests {
FixedPathProvider {
region_file_id: handle.file_id(),
},
Metrics::new(WriteType::Flush),
&mut metrics,
)
.await;
writer
@@ -481,6 +486,7 @@ mod tests {
..Default::default()
};
// Prepare data.
let mut metrics = Metrics::new(WriteType::Flush);
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
@@ -489,7 +495,7 @@ mod tests {
FixedPathProvider {
region_file_id: handle.file_id(),
},
Metrics::new(WriteType::Flush),
&mut metrics,
)
.await;
@@ -639,13 +645,14 @@ mod tests {
table_dir: "test".to_string(),
path_type: PathType::Bare,
};
let mut metrics = Metrics::new(WriteType::Flush);
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
IndexConfig::default(),
NoopIndexBuilder,
path_provider,
Metrics::new(WriteType::Flush),
&mut metrics,
)
.await;
@@ -716,13 +723,14 @@ mod tests {
bloom_filter_index_config: Default::default(),
};
let mut metrics = Metrics::new(WriteType::Flush);
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
IndexConfig::default(),
indexer_builder,
file_path.clone(),
Metrics::new(WriteType::Flush),
&mut metrics,
)
.await;
@@ -1092,13 +1100,14 @@ mod tests {
bloom_filter_index_config: Default::default(),
};
let mut metrics = Metrics::new(WriteType::Flush);
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
IndexConfig::default(),
indexer_builder,
file_path.clone(),
Metrics::new(WriteType::Flush),
&mut metrics,
)
.await;
@@ -1148,13 +1157,14 @@ mod tests {
..Default::default()
};
let mut metrics = Metrics::new(WriteType::Flush);
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
IndexConfig::default(),
NoopIndexBuilder,
file_path,
Metrics::new(WriteType::Flush),
&mut metrics,
)
.await;

View File

@@ -91,7 +91,7 @@ macro_rules! handle_index_error {
/// Parquet SST reader builder.
pub struct ParquetReaderBuilder {
/// SST directory.
file_dir: String,
table_dir: String,
/// Path type for generating file paths.
path_type: PathType,
file_handle: FileHandle,
@@ -122,13 +122,13 @@ pub struct ParquetReaderBuilder {
impl ParquetReaderBuilder {
/// Returns a new [ParquetReaderBuilder] to read specific SST.
pub fn new(
file_dir: String,
table_dir: String,
path_type: PathType,
file_handle: FileHandle,
object_store: ObjectStore,
) -> ParquetReaderBuilder {
ParquetReaderBuilder {
file_dir,
table_dir,
path_type,
file_handle,
object_store,
@@ -237,7 +237,7 @@ impl ParquetReaderBuilder {
) -> Result<(FileRangeContext, RowGroupSelection)> {
let start = Instant::now();
let file_path = self.file_handle.file_path(&self.file_dir, self.path_type);
let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
let file_size = self.file_handle.meta_ref().file_size;
// Loads parquet metadata of the file.
@@ -1227,7 +1227,6 @@ impl ParquetReader {
self.context.read_format().metadata()
}
#[cfg(test)]
pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
self.context.reader_builder().parquet_meta.clone()
}

View File

@@ -62,7 +62,7 @@ use crate::sst::{
};
/// Parquet SST writer.
pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
pub struct ParquetWriter<'a, F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
/// Path provider that creates SST and index file paths according to file id.
path_provider: P,
writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
@@ -81,7 +81,7 @@ pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvide
/// Cleaner to remove temp files on failure.
file_cleaner: Option<TempFileCleaner>,
/// Write metrics
metrics: Metrics,
metrics: &'a mut Metrics,
}
pub trait WriterFactory {
@@ -107,7 +107,7 @@ impl WriterFactory for ObjectStoreWriterFactory {
}
}
impl<I, P> ParquetWriter<ObjectStoreWriterFactory, I, P>
impl<'a, I, P> ParquetWriter<'a, ObjectStoreWriterFactory, I, P>
where
P: FilePathProvider,
I: IndexerBuilder,
@@ -118,8 +118,8 @@ where
index_config: IndexConfig,
indexer_builder: I,
path_provider: P,
metrics: Metrics,
) -> ParquetWriter<ObjectStoreWriterFactory, I, P> {
metrics: &'a mut Metrics,
) -> ParquetWriter<'a, ObjectStoreWriterFactory, I, P> {
ParquetWriter::new(
ObjectStoreWriterFactory { object_store },
metadata,
@@ -137,7 +137,7 @@ where
}
}
impl<F, I, P> ParquetWriter<F, I, P>
impl<'a, F, I, P> ParquetWriter<'a, F, I, P>
where
F: WriterFactory,
I: IndexerBuilder,
@@ -150,8 +150,8 @@ where
index_config: IndexConfig,
indexer_builder: I,
path_provider: P,
metrics: Metrics,
) -> ParquetWriter<F, I, P> {
metrics: &'a mut Metrics,
) -> ParquetWriter<'a, F, I, P> {
let init_file = FileId::random();
let indexer = indexer_builder.build(init_file).await;
@@ -487,11 +487,6 @@ where
Ok(self.writer.as_mut().unwrap())
}
}
/// Consumes write and return the collected metrics.
pub fn into_metrics(self) -> Metrics {
self.metrics
}
}
#[derive(Default)]

View File

@@ -58,6 +58,7 @@ use crate::compaction::CompactionScheduler;
use crate::config::MitoConfig;
use crate::error::{self, CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
use crate::gc::{GcLimiter, GcLimiterRef};
use crate::memtable::MemtableBuilderProvider;
use crate::metrics::{REGION_COUNT, REQUEST_WAIT_TIME, WRITE_STALLING};
use crate::region::opener::PartitionExprFetcherRef;
@@ -138,6 +139,8 @@ pub(crate) struct WorkerGroup {
cache_manager: CacheManagerRef,
/// File reference manager.
file_ref_manager: FileReferenceManagerRef,
/// Gc limiter to limit concurrent gc jobs.
gc_limiter: GcLimiterRef,
}
impl WorkerGroup {
@@ -196,6 +199,7 @@ impl WorkerGroup {
.build(),
);
let time_provider = Arc::new(StdTimeProvider);
let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
let workers = (0..config.num_workers)
.map(|id| {
@@ -234,6 +238,7 @@ impl WorkerGroup {
purge_scheduler,
cache_manager,
file_ref_manager,
gc_limiter,
})
}
@@ -291,6 +296,10 @@ impl WorkerGroup {
self.file_ref_manager.clone()
}
pub(crate) fn gc_limiter(&self) -> GcLimiterRef {
self.gc_limiter.clone()
}
/// Get worker for specific `region_id`.
pub(crate) fn worker(&self, region_id: RegionId) -> &RegionWorker {
let index = region_id_to_index(region_id, self.workers.len());
@@ -361,6 +370,7 @@ impl WorkerGroup {
.write_cache(write_cache)
.build(),
);
let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
let workers = (0..config.num_workers)
.map(|id| {
WorkerStarter {
@@ -398,6 +408,7 @@ impl WorkerGroup {
purge_scheduler,
cache_manager,
file_ref_manager,
gc_limiter,
})
}
@@ -412,7 +423,7 @@ fn region_id_to_index(id: RegionId, num_workers: usize) -> usize {
% num_workers
}
async fn write_cache_from_config(
pub async fn write_cache_from_config(
config: &MitoConfig,
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,

View File

@@ -29,14 +29,15 @@ use catalog::CatalogManagerRef;
use client::{OutputData, OutputMeta};
use common_catalog::consts::{
PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, TRACE_ID_COLUMN, TRACE_TABLE_NAME,
TRACE_TABLE_NAME_SESSION_KEY, default_engine, trace_services_table_name,
TRACE_TABLE_NAME_SESSION_KEY, default_engine, trace_operations_table_name,
trace_services_table_name,
};
use common_grpc_expr::util::ColumnExpr;
use common_meta::cache::TableFlownodeSetCacheRef;
use common_meta::node_manager::{AffectedRows, NodeManagerRef};
use common_meta::peer::Peer;
use common_query::Output;
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_query::prelude::{greptime_timestamp, greptime_value};
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{error, info, warn};
use datatypes::schema::SkippingIndexOptions;
@@ -618,8 +619,10 @@ impl Inserter {
// note that auto create table shouldn't be ttl instant table
// for it's a very unexpected behavior and should be set by user explicitly
for mut create_table in create_tables {
if create_table.table_name == trace_services_table_name(trace_table_name) {
// Disable append mode for trace services table since it requires upsert behavior.
if create_table.table_name == trace_services_table_name(trace_table_name)
|| create_table.table_name == trace_operations_table_name(trace_table_name)
{
// Disable append mode for auxiliary tables (services/operations) since they require upsert behavior.
create_table
.table_options
.insert(APPEND_MODE_KEY.to_string(), "false".to_string());
@@ -718,14 +721,14 @@ impl Inserter {
// schema with timestamp and field column
let default_schema = vec![
ColumnSchema {
column_name: GREPTIME_TIMESTAMP.to_string(),
column_name: greptime_timestamp().to_string(),
datatype: ColumnDataType::TimestampMillisecond as _,
semantic_type: SemanticType::Timestamp as _,
datatype_extension: None,
options: None,
},
ColumnSchema {
column_name: GREPTIME_VALUE.to_string(),
column_name: greptime_value().to_string(),
datatype: ColumnDataType::Float64 as _,
semantic_type: SemanticType::Field as _,
datatype_extension: None,

Some files were not shown because too many files have changed in this diff Show More