Compare commits

...

14 Commits

Author SHA1 Message Date
Weny Xu
8f9674440f chore: pick #7148 into release/v0.16 (#7455)
refactor: add test feature gate to numbers table (#7148)

* refactor: add test feature gate to numbers table



* chore: add debug_assertions



* refactor: extract numbers table provider



* chore: address CR issues



---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>
Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>
2025-12-22 19:34:19 +08:00
Weny Xu
a29681eb12 chore: pick #7101 into release/v0.16 (#7453) 2025-12-22 07:14:48 +00:00
Weny Xu
c112cdf241 feat: make distributed time constants and client timeouts configurable (#7429)
* chore: update rskafka

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: make distributed time constants and client timeouts configurable

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: change etcd endpoints to array in the test scripts (#7419)

chore: change etcd endpoint

Signed-off-by: liyang <daviderli614@gmail.com>

* fix: fix tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
Signed-off-by: liyang <daviderli614@gmail.com>
Co-authored-by: liyang <daviderli614@gmail.com>
2025-12-17 11:40:16 +00:00
Weny Xu
4d33b9687a fix: improve network failure detection (#7367) 2025-12-09 11:47:45 +00:00
Weny Xu
b0d2d26ad8 chore: pick #6845 #6821 into release/v0.16 (#7368) 2025-12-09 12:45:47 +08:00
Weny Xu
4ed048c735 fix(meta): add default etcd client options with keep-alive settings (#7363) 2025-12-08 12:39:18 +00:00
Weny Xu
af4465e543 chore: pick #7344 into release v0.16 (#7345) 2025-12-04 08:34:23 +00:00
liyang
7c2cdccb22 chore: use greptime dockerhub image (#6865)
Signed-off-by: liyang <daviderli614@gmail.com>
2025-12-03 18:33:56 +08:00
liyang
3655bbe032 chore: update bitnami config (#6847)
* chore: update bitnami config

Signed-off-by: liyang <daviderli614@gmail.com>

* update postgresql chart version

Signed-off-by: liyang <daviderli614@gmail.com>

* fix ci

Signed-off-by: liyang <daviderli614@gmail.com>

* refactor: add pull-test-deps-images.sh to pull images one by one to avoid rate limit

Signed-off-by: zyy17 <zyylsxm@gmail.com>

---------

Signed-off-by: liyang <daviderli614@gmail.com>
Signed-off-by: zyy17 <zyylsxm@gmail.com>
Co-authored-by: zyy17 <zyylsxm@gmail.com>
2025-12-03 18:33:56 +08:00
jeremyhi
5501f63a71 fix: reset cached channel on errors with VIP (#7335)
Signed-off-by: jeremyhi <fengjiachun@gmail.com>
2025-12-03 18:33:56 +08:00
Ruihang Xia
c5c9b263e1 chore: fix typo (#6885)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-12-03 18:33:56 +08:00
Ning Sun
9e6b19d301 chore: fix typo (#7169)
Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-12-03 18:33:56 +08:00
Ning Sun
f7f52592b4 fix: various typos reported by CI (#7047)
* fix: various typos reported by CI

* fix: additional typo

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-12-03 18:33:56 +08:00
LFC
4a936d7320 chore: pub access layer (#6670) (#6842)
(cherry picked from commit 7bb765af1d)

Signed-off-by: evenyag <realevenyag@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
2025-08-28 09:42:16 +00:00
80 changed files with 598 additions and 294 deletions

View File

@@ -12,7 +12,7 @@ runs:
steps:
- name: Install Etcd cluster
shell: bash
run: |
run: |
helm upgrade \
--install etcd oci://registry-1.docker.io/bitnamicharts/etcd \
--set replicaCount=${{ inputs.etcd-replicas }} \
@@ -24,4 +24,9 @@ runs:
--set auth.rbac.token.enabled=false \
--set persistence.size=2Gi \
--create-namespace \
--set global.security.allowInsecureImages=true \
--set image.registry=docker.io \
--set image.repository=greptime/etcd \
--set image.tag=3.6.1-debian-12-r3 \
--version 12.0.8 \
-n ${{ inputs.namespace }}

View File

@@ -51,7 +51,7 @@ runs:
run: |
helm upgrade \
--install my-greptimedb \
--set meta.backendStorage.etcd.endpoints=${{ inputs.etcd-endpoints }} \
--set 'meta.backendStorage.etcd.endpoints[0]=${{ inputs.etcd-endpoints }}' \
--set meta.enableRegionFailover=${{ inputs.enable-region-failover }} \
--set image.registry=${{ inputs.image-registry }} \
--set image.repository=${{ inputs.image-repository }} \

View File

@@ -12,7 +12,7 @@ runs:
steps:
- name: Install Kafka cluster
shell: bash
run: |
run: |
helm upgrade \
--install kafka oci://registry-1.docker.io/bitnamicharts/kafka \
--set controller.replicaCount=${{ inputs.controller-replicas }} \
@@ -23,4 +23,8 @@ runs:
--set listeners.controller.protocol=PLAINTEXT \
--set listeners.client.protocol=PLAINTEXT \
--create-namespace \
--set image.registry=docker.io \
--set image.repository=greptime/kafka \
--set image.tag=3.9.0-debian-12-r1 \
--version 31.0.0 \
-n ${{ inputs.namespace }}

View File

@@ -6,9 +6,7 @@ inputs:
description: "Number of PostgreSQL replicas"
namespace:
default: "postgres-namespace"
postgres-version:
default: "14.2"
description: "PostgreSQL version"
description: "The PostgreSQL namespace"
storage-size:
default: "1Gi"
description: "Storage size for PostgreSQL"
@@ -22,7 +20,11 @@ runs:
helm upgrade \
--install postgresql oci://registry-1.docker.io/bitnamicharts/postgresql \
--set replicaCount=${{ inputs.postgres-replicas }} \
--set image.tag=${{ inputs.postgres-version }} \
--set global.security.allowInsecureImages=true \
--set image.registry=docker.io \
--set image.repository=greptime/postgresql \
--set image.tag=17.5.0-debian-12-r3 \
--version 16.7.4 \
--set persistence.size=${{ inputs.storage-size }} \
--set postgresql.username=greptimedb \
--set postgresql.password=admin \

View File

@@ -68,7 +68,7 @@ function deploy_greptimedb_cluster() {
helm install "$cluster_name" greptime/greptimedb-cluster \
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
--set meta.backendStorage.etcd.endpoints[0]="etcd.$install_namespace:2379" \
-n "$install_namespace"
# Wait for greptimedb cluster to be ready.
@@ -103,14 +103,13 @@ function deploy_greptimedb_cluster_with_s3_storage() {
helm install "$cluster_name" greptime/greptimedb-cluster -n "$install_namespace" \
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
--set meta.backendStorage.etcd.endpoints[0]="etcd.$install_namespace:2379" \
--set storage.s3.bucket="$AWS_CI_TEST_BUCKET" \
--set storage.s3.region="$AWS_REGION" \
--set storage.s3.root="$DATA_ROOT" \
--set storage.credentials.secretName=s3-credentials \
--set storage.credentials.accessKeyId="$AWS_ACCESS_KEY_ID" \
--set storage.credentials.secretAccessKey="$AWS_SECRET_ACCESS_KEY"
# Wait for greptimedb cluster to be ready.
while true; do
PHASE=$(kubectl -n "$install_namespace" get gtc "$cluster_name" -o jsonpath='{.status.clusterPhase}')

34
.github/scripts/pull-test-deps-images.sh vendored Executable file
View File

@@ -0,0 +1,34 @@
#!/bin/bash
# This script is used to pull the test dependency images that are stored in public ECR one by one to avoid rate limiting.
set -e
MAX_RETRIES=3
IMAGES=(
"greptime/zookeeper:3.7"
"greptime/kafka:3.9.0-debian-12-r1"
"greptime/etcd:3.6.1-debian-12-r3"
"greptime/minio:2024"
"greptime/mysql:5.7"
)
for image in "${IMAGES[@]}"; do
for ((attempt=1; attempt<=MAX_RETRIES; attempt++)); do
if docker pull "$image"; then
# Successfully pulled the image.
break
else
# Use some simple exponential backoff to avoid rate limiting.
if [ $attempt -lt $MAX_RETRIES ]; then
sleep_seconds=$((attempt * 5))
echo "Attempt $attempt failed for $image, waiting $sleep_seconds seconds"
sleep $sleep_seconds # 5s, 10s delays
else
echo "Failed to pull $image after $MAX_RETRIES attempts"
exit 1
fi
fi
done
done

View File

@@ -719,6 +719,10 @@ jobs:
save-if: ${{ github.ref == 'refs/heads/main' }}
- name: Install latest nextest release
uses: taiki-e/install-action@nextest
- name: Pull test dependencies images
run: ./.github/scripts/pull-test-deps-images.sh
- name: Setup external services
working-directory: tests-integration/fixtures
run: docker compose up -d --wait

18
Cargo.lock generated
View File

@@ -6870,9 +6870,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.171"
version = "0.2.178"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6"
checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091"
[[package]]
name = "libflate"
@@ -6928,7 +6928,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4"
dependencies = [
"cfg-if",
"windows-targets 0.52.6",
"windows-targets 0.48.5",
]
[[package]]
@@ -7005,13 +7005,13 @@ checksum = "b4ce301924b7887e9d637144fdade93f9dfff9b60981d4ac161db09720d39aa5"
[[package]]
name = "local-ip-address"
version = "0.6.3"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3669cf5561f8d27e8fc84cc15e58350e70f557d4d65f70e3154e54cd2f8e1782"
checksum = "656b3b27f8893f7bbf9485148ff9a65f019e3f33bd5cdc87c83cab16b3fd9ec8"
dependencies = [
"libc",
"neli",
"thiserror 1.0.64",
"thiserror 2.0.12",
"windows-sys 0.59.0",
]
@@ -10849,7 +10849,7 @@ dependencies = [
[[package]]
name = "rskafka"
version = "0.6.0"
source = "git+https://github.com/influxdata/rskafka.git?rev=8dbd01ed809f5a791833a594e85b144e36e45820#8dbd01ed809f5a791833a594e85b144e36e45820"
source = "git+https://github.com/WenyXu/rskafka.git?rev=9494304ae3947b07e660b5d08549ad4a39c84a26#9494304ae3947b07e660b5d08549ad4a39c84a26"
dependencies = [
"bytes",
"chrono",
@@ -12331,7 +12331,7 @@ dependencies = [
"cfg-if",
"libc",
"psm",
"windows-sys 0.59.0",
"windows-sys 0.52.0",
]
[[package]]
@@ -14598,7 +14598,7 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
dependencies = [
"windows-sys 0.59.0",
"windows-sys 0.48.0",
]
[[package]]

View File

@@ -188,7 +188,8 @@ reqwest = { version = "0.12", default-features = false, features = [
"stream",
"multipart",
] }
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev = "8dbd01ed809f5a791833a594e85b144e36e45820", features = [
# Branch: feat/request-timeout-port
rskafka = { git = "https://github.com/WenyXu/rskafka.git", rev = "9494304ae3947b07e660b5d08549ad4a39c84a26", features = [
"transport-tls",
] }
rstest = "0.25"

View File

@@ -78,6 +78,8 @@
| `wal.sync_period` | String | `10s` | Duration for fsyncing log files.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.recovery_parallelism` | Integer | `2` | Parallelism during WAL recovery. |
| `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.<br/>**It's only used when the provider is `kafka`**. |
| `wal.connect_timeout` | String | `3s` | The connect timeout for kafka client.<br/>**It's only used when the provider is `kafka`**. |
| `wal.timeout` | String | `3s` | The timeout for kafka client.<br/>**It's only used when the provider is `kafka`**. |
| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.<br/>Set to `true` to automatically create topics for WAL.<br/>Otherwise, use topics named `topic_name_prefix_[0..num_topics)` |
| `wal.num_topics` | Integer | `64` | Number of topics.<br/>**It's only used when the provider is `kafka`**. |
| `wal.selector_type` | String | `round_robin` | Topic selector type.<br/>Available selector types:<br/>- `round_robin` (default)<br/>**It's only used when the provider is `kafka`**. |
@@ -275,7 +277,6 @@
| `meta_client` | -- | -- | The metasrv client options. |
| `meta_client.metasrv_addrs` | Array | -- | The addresses of the metasrv. |
| `meta_client.timeout` | String | `3s` | Operation timeout. |
| `meta_client.heartbeat_timeout` | String | `500ms` | Heartbeat timeout. |
| `meta_client.ddl_timeout` | String | `10s` | DDL timeout. |
| `meta_client.connect_timeout` | String | `1s` | Connect server timeout. |
| `meta_client.tcp_nodelay` | Bool | `true` | `TCP_NODELAY` option for accepted connections. |
@@ -334,6 +335,7 @@
| `region_failure_detector_initialization_delay` | String | `10m` | The delay before starting region failure detection.<br/>This delay helps prevent Metasrv from triggering unnecessary region failovers before all Datanodes are fully started.<br/>Especially useful when the cluster is not deployed with GreptimeDB Operator and maintenance mode is not enabled. |
| `allow_region_failover_on_local_wal` | Bool | `false` | Whether to allow region failover on local WAL.<br/>**This option is not recommended to be set to true, because it may lead to data loss during failover.** |
| `node_max_idle_time` | String | `24hours` | Max allowed idle time before removing node info from metasrv memory. |
| `heartbeat_interval` | String | `3s` | Base heartbeat interval for calculating distributed time constants.<br/>The frontend heartbeat interval is 6 times of the base heartbeat interval.<br/>The flownode/datanode heartbeat interval is 1 times of the base heartbeat interval.<br/>e.g., If the base heartbeat interval is 3s, the frontend heartbeat interval is 18s, the flownode/datanode heartbeat interval is 3s.<br/>If you change this value, you need to change the heartbeat interval of the flownode/frontend/datanode accordingly. |
| `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. Enabled by default. |
| `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
@@ -344,12 +346,18 @@
| `backend_tls.key_path` | String | `""` | Path to client private key file (for client authentication)<br/>Like "/path/to/client.key" |
| `backend_tls.ca_cert_path` | String | `""` | Path to CA certificate file (for server certificate verification)<br/>Required when using custom CAs or self-signed certificates<br/>Leave empty to use system root certificates only<br/>Like "/path/to/ca.crt" |
| `backend_tls.watch` | Bool | `false` | Watch for certificate file changes and auto reload |
| `backend_client` | -- | -- | The backend client options.<br/>Currently, only applicable when using etcd as the metadata store. |
| `backend_client.keep_alive_timeout` | String | `3s` | The keep alive timeout for backend client. |
| `backend_client.keep_alive_interval` | String | `10s` | The keep alive interval for backend client. |
| `backend_client.connect_timeout` | String | `3s` | The connect timeout for backend client. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:3002` | The address to bind the gRPC server. |
| `grpc.server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `bind_addr`. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
| `grpc.http2_keep_alive_interval` | String | `10s` | The server side HTTP/2 keep-alive interval |
| `grpc.http2_keep_alive_timeout` | String | `3s` | The server side HTTP/2 keep-alive timeout. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
@@ -441,7 +449,6 @@
| `meta_client` | -- | -- | The metasrv client options. |
| `meta_client.metasrv_addrs` | Array | -- | The addresses of the metasrv. |
| `meta_client.timeout` | String | `3s` | Operation timeout. |
| `meta_client.heartbeat_timeout` | String | `500ms` | Heartbeat timeout. |
| `meta_client.ddl_timeout` | String | `10s` | DDL timeout. |
| `meta_client.connect_timeout` | String | `1s` | Connect server timeout. |
| `meta_client.tcp_nodelay` | Bool | `true` | `TCP_NODELAY` option for accepted connections. |
@@ -461,6 +468,8 @@
| `wal.sync_period` | String | `10s` | Duration for fsyncing log files.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.recovery_parallelism` | Integer | `2` | Parallelism during WAL recovery. |
| `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.<br/>**It's only used when the provider is `kafka`**. |
| `wal.connect_timeout` | String | `3s` | The connect timeout for kafka client.<br/>**It's only used when the provider is `kafka`**. |
| `wal.timeout` | String | `3s` | The timeout for kafka client.<br/>**It's only used when the provider is `kafka`**. |
| `wal.max_batch_bytes` | String | `1MB` | The max size of a single producer batch.<br/>Warning: Kafka has a default limit of 1MB per message in a topic.<br/>**It's only used when the provider is `kafka`**. |
| `wal.consumer_wait_timeout` | String | `100ms` | The consumer wait timeout.<br/>**It's only used when the provider is `kafka`**. |
| `wal.create_index` | Bool | `true` | Whether to enable WAL index creation.<br/>**It's only used when the provider is `kafka`**. |
@@ -604,7 +613,6 @@
| `meta_client` | -- | -- | The metasrv client options. |
| `meta_client.metasrv_addrs` | Array | -- | The addresses of the metasrv. |
| `meta_client.timeout` | String | `3s` | Operation timeout. |
| `meta_client.heartbeat_timeout` | String | `500ms` | Heartbeat timeout. |
| `meta_client.ddl_timeout` | String | `10s` | DDL timeout. |
| `meta_client.connect_timeout` | String | `1s` | Connect server timeout. |
| `meta_client.tcp_nodelay` | Bool | `true` | `TCP_NODELAY` option for accepted connections. |

View File

@@ -92,9 +92,6 @@ metasrv_addrs = ["127.0.0.1:3002"]
## Operation timeout.
timeout = "3s"
## Heartbeat timeout.
heartbeat_timeout = "500ms"
## DDL timeout.
ddl_timeout = "10s"
@@ -164,6 +161,14 @@ recovery_parallelism = 2
## **It's only used when the provider is `kafka`**.
broker_endpoints = ["127.0.0.1:9092"]
## The connect timeout for kafka client.
## **It's only used when the provider is `kafka`**.
#+ connect_timeout = "3s"
## The timeout for kafka client.
## **It's only used when the provider is `kafka`**.
#+ timeout = "3s"
## The max size of a single producer batch.
## Warning: Kafka has a default limit of 1MB per message in a topic.
## **It's only used when the provider is `kafka`**.

View File

@@ -64,9 +64,6 @@ metasrv_addrs = ["127.0.0.1:3002"]
## Operation timeout.
timeout = "3s"
## Heartbeat timeout.
heartbeat_timeout = "500ms"
## DDL timeout.
ddl_timeout = "10s"

View File

@@ -171,9 +171,6 @@ metasrv_addrs = ["127.0.0.1:3002"]
## Operation timeout.
timeout = "3s"
## Heartbeat timeout.
heartbeat_timeout = "500ms"
## DDL timeout.
ddl_timeout = "10s"

View File

@@ -55,6 +55,13 @@ allow_region_failover_on_local_wal = false
## Max allowed idle time before removing node info from metasrv memory.
node_max_idle_time = "24hours"
## Base heartbeat interval for calculating distributed time constants.
## The frontend heartbeat interval is 6 times of the base heartbeat interval.
## The flownode/datanode heartbeat interval is 1 times of the base heartbeat interval.
## e.g., If the base heartbeat interval is 3s, the frontend heartbeat interval is 18s, the flownode/datanode heartbeat interval is 3s.
## If you change this value, you need to change the heartbeat interval of the flownode/frontend/datanode accordingly.
#+ heartbeat_interval = "3s"
## Whether to enable greptimedb telemetry. Enabled by default.
#+ enable_telemetry = true
@@ -93,6 +100,16 @@ ca_cert_path = ""
## Watch for certificate file changes and auto reload
watch = false
## The backend client options.
## Currently, only applicable when using etcd as the metadata store.
#+ [backend_client]
## The keep alive timeout for backend client.
#+ keep_alive_timeout = "3s"
## The keep alive interval for backend client.
#+ keep_alive_interval = "10s"
## The connect timeout for backend client.
#+ connect_timeout = "3s"
## The gRPC server options.
[grpc]
## The address to bind the gRPC server.
@@ -107,6 +124,10 @@ runtime_size = 8
max_recv_message_size = "512MB"
## The maximum send message size for gRPC server.
max_send_message_size = "512MB"
## The server side HTTP/2 keep-alive interval
#+ http2_keep_alive_interval = "10s"
## The server side HTTP/2 keep-alive timeout.
#+ http2_keep_alive_timeout = "3s"
## The HTTP server options.
[http]

View File

@@ -209,6 +209,14 @@ recovery_parallelism = 2
## **It's only used when the provider is `kafka`**.
broker_endpoints = ["127.0.0.1:9092"]
## The connect timeout for kafka client.
## **It's only used when the provider is `kafka`**.
#+ connect_timeout = "3s"
## The timeout for kafka client.
## **It's only used when the provider is `kafka`**.
#+ timeout = "3s"
## Automatically create topics for WAL.
## Set to `true` to automatically create topics for WAL.
## Otherwise, use topics named `topic_name_prefix_[0..num_topics)`

View File

@@ -29,6 +29,7 @@ use crate::information_schema::{InformationExtensionRef, InformationSchemaProvid
use crate::kvbackend::manager::{SystemCatalog, CATALOG_CACHE_MAX_CAPACITY};
use crate::kvbackend::KvBackendCatalogManager;
use crate::process_manager::ProcessManagerRef;
use crate::system_schema::numbers_table_provider::NumbersTableProvider;
use crate::system_schema::pg_catalog::PGCatalogProvider;
pub struct KvBackendCatalogManagerBuilder {
@@ -119,6 +120,7 @@ impl KvBackendCatalogManagerBuilder {
DEFAULT_CATALOG_NAME.to_string(),
me.clone(),
)),
numbers_table_provider: NumbersTableProvider,
backend,
process_manager,
#[cfg(feature = "enterprise")]

View File

@@ -18,8 +18,7 @@ use std::sync::{Arc, Weak};
use async_stream::try_stream;
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID,
PG_CATALOG_NAME,
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME,
};
use common_error::ext::BoxedError;
use common_meta::cache::{
@@ -43,7 +42,6 @@ use snafu::prelude::*;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use table::dist_table::DistTable;
use table::metadata::{TableId, TableInfoRef};
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use table::table_name::TableName;
use table::TableRef;
use tokio::sync::Semaphore;
@@ -58,6 +56,7 @@ use crate::information_schema::InformationSchemaTableFactoryRef;
use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
use crate::kvbackend::TableCacheRef;
use crate::process_manager::ProcessManagerRef;
use crate::system_schema::numbers_table_provider::NumbersTableProvider;
use crate::system_schema::pg_catalog::PGCatalogProvider;
use crate::system_schema::SystemSchemaProvider;
use crate::CatalogManager;
@@ -537,6 +536,7 @@ pub(super) struct SystemCatalog {
// system_schema_provider for default catalog
pub(super) information_schema_provider: Arc<InformationSchemaProvider>,
pub(super) pg_catalog_provider: Arc<PGCatalogProvider>,
pub(super) numbers_table_provider: NumbersTableProvider,
pub(super) backend: KvBackendRef,
pub(super) process_manager: Option<ProcessManagerRef>,
#[cfg(feature = "enterprise")]
@@ -566,9 +566,7 @@ impl SystemCatalog {
PG_CATALOG_NAME if channel == Channel::Postgres => {
self.pg_catalog_provider.table_names()
}
DEFAULT_SCHEMA_NAME => {
vec![NUMBERS_TABLE_NAME.to_string()]
}
DEFAULT_SCHEMA_NAME => self.numbers_table_provider.table_names(),
_ => vec![],
}
}
@@ -586,7 +584,7 @@ impl SystemCatalog {
if schema == INFORMATION_SCHEMA_NAME {
self.information_schema_provider.table(table).is_some()
} else if schema == DEFAULT_SCHEMA_NAME {
table == NUMBERS_TABLE_NAME
self.numbers_table_provider.table_exists(table)
} else if schema == PG_CATALOG_NAME && channel == Channel::Postgres {
self.pg_catalog_provider.table(table).is_some()
} else {
@@ -631,8 +629,8 @@ impl SystemCatalog {
});
pg_catalog_provider.table(table_name)
}
} else if schema == DEFAULT_SCHEMA_NAME && table_name == NUMBERS_TABLE_NAME {
Some(NumbersTable::table(NUMBERS_TABLE_ID))
} else if schema == DEFAULT_SCHEMA_NAME {
self.numbers_table_provider.table(table_name)
} else {
None
}

View File

@@ -14,6 +14,7 @@
pub mod information_schema;
mod memory_table;
pub mod numbers_table_provider;
pub mod pg_catalog;
pub mod predicate;
mod utils;

View File

@@ -0,0 +1,59 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(any(test, feature = "testing", debug_assertions))]
use common_catalog::consts::NUMBERS_TABLE_ID;
#[cfg(any(test, feature = "testing", debug_assertions))]
use table::table::numbers::NumbersTable;
#[cfg(any(test, feature = "testing", debug_assertions))]
use table::table::numbers::NUMBERS_TABLE_NAME;
use table::TableRef;
// NumbersTableProvider is a dedicated provider for feature-gating the numbers table.
#[derive(Clone)]
pub struct NumbersTableProvider;
#[cfg(any(test, feature = "testing", debug_assertions))]
impl NumbersTableProvider {
pub(crate) fn table_exists(&self, name: &str) -> bool {
name == NUMBERS_TABLE_NAME
}
pub(crate) fn table_names(&self) -> Vec<String> {
vec![NUMBERS_TABLE_NAME.to_string()]
}
pub(crate) fn table(&self, name: &str) -> Option<TableRef> {
if name == NUMBERS_TABLE_NAME {
Some(NumbersTable::table(NUMBERS_TABLE_ID))
} else {
None
}
}
}
#[cfg(not(any(test, feature = "testing", debug_assertions)))]
impl NumbersTableProvider {
pub(crate) fn table_exists(&self, _name: &str) -> bool {
false
}
pub(crate) fn table_names(&self) -> Vec<String> {
vec![]
}
pub(crate) fn table(&self, _name: &str) -> Option<TableRef> {
None
}
}

View File

@@ -20,7 +20,7 @@ use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::KvBackendRef;
use meta_srv::bootstrap::create_etcd_client;
use meta_srv::metasrv::BackendImpl;
use meta_srv::metasrv::{BackendClientOptions, BackendImpl};
use crate::error::{EmptyStoreAddrsSnafu, UnsupportedMemoryBackendSnafu};
@@ -67,9 +67,10 @@ impl StoreConfig {
} else {
let kvbackend = match self.backend {
BackendImpl::EtcdStore => {
let etcd_client = create_etcd_client(store_addrs)
.await
.map_err(BoxedError::new)?;
let etcd_client =
create_etcd_client(store_addrs, &BackendClientOptions::default())
.await
.map_err(BoxedError::new)?;
Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops))
}
#[cfg(feature = "pg_kvbackend")]

View File

@@ -29,7 +29,7 @@ use futures::TryStreamExt;
use crate::error::InvalidArgumentsSnafu;
use crate::metadata::common::StoreConfig;
use crate::metadata::control::utils::{decode_key_value, get_table_id_by_name, json_fromatter};
use crate::metadata::control::utils::{decode_key_value, get_table_id_by_name, json_formatter};
use crate::Tool;
/// Getting metadata from metadata store.
@@ -206,7 +206,7 @@ impl Tool for GetTableTool {
println!(
"{}\n{}",
TableInfoKey::new(table_id),
json_fromatter(self.pretty, &*table_info)
json_formatter(self.pretty, &*table_info)
);
} else {
println!("Table info not found");
@@ -221,7 +221,7 @@ impl Tool for GetTableTool {
println!(
"{}\n{}",
TableRouteKey::new(table_id),
json_fromatter(self.pretty, &table_route)
json_formatter(self.pretty, &table_route)
);
} else {
println!("Table route not found");

View File

@@ -27,7 +27,7 @@ pub fn decode_key_value(kv: KeyValue) -> CommonMetaResult<(String, String)> {
}
/// Formats a value as a JSON string.
pub fn json_fromatter<T>(pretty: bool, value: &T) -> String
pub fn json_formatter<T>(pretty: bool, value: &T) -> String
where
T: Serialize,
{

View File

@@ -20,6 +20,7 @@ use async_trait::async_trait;
use clap::Parser;
use common_base::Plugins;
use common_config::Configurable;
use common_meta::distributed_time_constants::init_distributed_time_constants;
use common_telemetry::info;
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
use common_version::{short_version, verbose_version};
@@ -327,6 +328,7 @@ impl StartCommand {
log_versions(verbose_version(), short_version(), APP_NAME);
maybe_activate_heap_profile(&opts.component.memory);
create_resource_limit_metrics(APP_NAME);
init_distributed_time_constants(opts.component.heartbeat_interval);
info!("Metasrv start command: {:#?}", self);

View File

@@ -51,7 +51,6 @@ fn test_load_datanode_example_config() {
meta_client: Some(MetaClientOptions {
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
timeout: Duration::from_secs(3),
heartbeat_timeout: Duration::from_millis(500),
ddl_timeout: Duration::from_secs(10),
connect_timeout: Duration::from_secs(1),
tcp_nodelay: true,
@@ -116,7 +115,6 @@ fn test_load_frontend_example_config() {
meta_client: Some(MetaClientOptions {
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
timeout: Duration::from_secs(3),
heartbeat_timeout: Duration::from_millis(500),
ddl_timeout: Duration::from_secs(10),
connect_timeout: Duration::from_secs(1),
tcp_nodelay: true,
@@ -240,7 +238,6 @@ fn test_load_flownode_example_config() {
meta_client: Some(MetaClientOptions {
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
timeout: Duration::from_secs(3),
heartbeat_timeout: Duration::from_millis(500),
ddl_timeout: Duration::from_secs(10),
connect_timeout: Duration::from_secs(1),
tcp_nodelay: true,

View File

@@ -332,7 +332,7 @@ impl AggregateUDFImpl for StateWrapper {
self.inner.signature()
}
/// Coerce types also do nothing, as optimzer should be able to already make struct types
/// Coerce types also do nothing, as optimizer should be able to already make struct types
fn coerce_types(&self, arg_types: &[DataType]) -> datafusion_common::Result<Vec<DataType>> {
self.inner.coerce_types(arg_types)
}
@@ -486,7 +486,7 @@ impl AggregateUDFImpl for MergeWrapper {
&self.merge_signature
}
/// Coerce types also do nothing, as optimzer should be able to already make struct types
/// Coerce types also do nothing, as optimizer should be able to already make struct types
fn coerce_types(&self, arg_types: &[DataType]) -> datafusion_common::Result<Vec<DataType>> {
// just check if the arg_types are only one and is struct array
if arg_types.len() != 1 || !matches!(arg_types.first(), Some(DataType::Struct(_))) {

View File

@@ -12,25 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::OnceLock;
use std::time::Duration;
/// Heartbeat interval time (is the basic unit of various time).
pub const HEARTBEAT_INTERVAL_MILLIS: u64 = 3000;
/// The frontend will also send heartbeats to Metasrv, sending an empty
/// heartbeat every HEARTBEAT_INTERVAL_MILLIS * 6 seconds.
pub const FRONTEND_HEARTBEAT_INTERVAL_MILLIS: u64 = HEARTBEAT_INTERVAL_MILLIS * 6;
/// The lease seconds of a region. It's set by 3 heartbeat intervals
/// (HEARTBEAT_INTERVAL_MILLIS × 3), plus some extra buffer (1 second).
pub const REGION_LEASE_SECS: u64 =
Duration::from_millis(HEARTBEAT_INTERVAL_MILLIS * 3).as_secs() + 1;
/// When creating table or region failover, a target node needs to be selected.
/// If the node's lease has expired, the `Selector` will not select it.
pub const DATANODE_LEASE_SECS: u64 = REGION_LEASE_SECS;
pub const FLOWNODE_LEASE_SECS: u64 = DATANODE_LEASE_SECS;
pub const BASE_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(3);
/// The lease seconds of metasrv leader.
pub const META_LEASE_SECS: u64 = 5;
@@ -41,5 +26,73 @@ pub const POSTGRES_KEEP_ALIVE_SECS: u64 = 30;
/// In a lease, there are two opportunities for renewal.
pub const META_KEEP_ALIVE_INTERVAL_SECS: u64 = META_LEASE_SECS / 2;
/// The timeout of the heartbeat request.
pub const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1);
/// The keep-alive interval of the heartbeat channel.
pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS: Duration =
Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1);
/// The keep-alive timeout of the heartbeat channel.
pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS: Duration =
Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1);
/// The default mailbox round-trip timeout.
pub const MAILBOX_RTT_SECS: u64 = 1;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
/// The distributed time constants.
pub struct DistributedTimeConstants {
pub heartbeat_interval: Duration,
pub frontend_heartbeat_interval: Duration,
pub region_lease: Duration,
pub datanode_lease: Duration,
pub flownode_lease: Duration,
}
/// The frontend heartbeat interval is 6 times of the base heartbeat interval.
pub fn frontend_heartbeat_interval(base_heartbeat_interval: Duration) -> Duration {
base_heartbeat_interval * 6
}
impl DistributedTimeConstants {
/// Create a new DistributedTimeConstants from the heartbeat interval.
pub fn from_heartbeat_interval(heartbeat_interval: Duration) -> Self {
let region_lease = heartbeat_interval * 3 + Duration::from_secs(1);
let datanode_lease = region_lease;
let flownode_lease = datanode_lease;
Self {
heartbeat_interval,
frontend_heartbeat_interval: frontend_heartbeat_interval(heartbeat_interval),
region_lease,
datanode_lease,
flownode_lease,
}
}
}
impl Default for DistributedTimeConstants {
fn default() -> Self {
Self::from_heartbeat_interval(BASE_HEARTBEAT_INTERVAL)
}
}
static DEFAULT_DISTRIBUTED_TIME_CONSTANTS: OnceLock<DistributedTimeConstants> = OnceLock::new();
/// Get the default distributed time constants.
pub fn default_distributed_time_constants() -> &'static DistributedTimeConstants {
DEFAULT_DISTRIBUTED_TIME_CONSTANTS.get_or_init(Default::default)
}
/// Initialize the default distributed time constants.
pub fn init_distributed_time_constants(base_heartbeat_interval: Duration) {
let distributed_time_constants =
DistributedTimeConstants::from_heartbeat_interval(base_heartbeat_interval);
DEFAULT_DISTRIBUTED_TIME_CONSTANTS
.set(distributed_time_constants)
.expect("Failed to set default distributed time constants");
common_telemetry::info!(
"Initialized default distributed time constants: {:#?}",
distributed_time_constants
);
}

View File

@@ -528,9 +528,6 @@ pub enum Error {
source: common_wal::error::Error,
},
#[snafu(display("Failed to resolve Kafka broker endpoint."))]
ResolveKafkaEndpoint { source: common_wal::error::Error },
#[snafu(display("Failed to build a Kafka controller client"))]
BuildKafkaCtrlClient {
#[snafu(implicit)]
@@ -1040,7 +1037,6 @@ impl ErrorExt for Error {
| BuildKafkaClient { .. }
| BuildKafkaCtrlClient { .. }
| KafkaPartitionClient { .. }
| ResolveKafkaEndpoint { .. }
| ProduceRecord { .. }
| CreateKafkaWalTopic { .. }
| EmptyTopicPool { .. }

View File

@@ -25,8 +25,7 @@ use snafu::ResultExt;
use crate::error::{
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu,
KafkaGetOffsetSnafu, KafkaPartitionClientSnafu, ProduceRecordSnafu, ResolveKafkaEndpointSnafu,
Result, TlsConfigSnafu,
KafkaGetOffsetSnafu, KafkaPartitionClientSnafu, ProduceRecordSnafu, Result, TlsConfigSnafu,
};
// Each topic only has one partition for now.
@@ -209,10 +208,10 @@ impl KafkaTopicCreator {
/// Builds a kafka [Client](rskafka::client::Client).
pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result<Client> {
// Builds an kafka controller client for creating topics.
let broker_endpoints = common_wal::resolve_to_ipv4(&connection.broker_endpoints)
.await
.context(ResolveKafkaEndpointSnafu)?;
let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(DEFAULT_BACKOFF_CONFIG);
let mut builder = ClientBuilder::new(connection.broker_endpoints.clone())
.backoff_config(DEFAULT_BACKOFF_CONFIG)
.connect_timeout(Some(connection.connect_timeout))
.timeout(Some(connection.timeout));
if let Some(sasl) = &connection.sasl {
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
};

View File

@@ -19,7 +19,7 @@ use opentelemetry::propagation::TextMapPropagator;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use tracing_opentelemetry::OpenTelemetrySpanExt;
// An wapper for `Futures` that provides tracing instrument adapters.
// An wrapper for `Futures` that provides tracing instrument adapters.
pub trait FutureExt: std::future::Future + Sized {
fn trace(self, span: tracing::span::Span) -> tracing::instrument::Instrumented<Self>;
}

View File

@@ -189,6 +189,8 @@ mod tests {
client_cert_path: None,
client_key_path: None,
}),
connect_timeout: Duration::from_secs(3),
timeout: Duration::from_secs(3),
},
kafka_topic: KafkaTopicConfig {
num_topics: 32,
@@ -221,6 +223,8 @@ mod tests {
client_cert_path: None,
client_key_path: None,
}),
connect_timeout: Duration::from_secs(3),
timeout: Duration::from_secs(3),
},
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),

View File

@@ -161,6 +161,12 @@ pub struct KafkaConnectionConfig {
pub sasl: Option<KafkaClientSasl>,
/// Client TLS config
pub tls: Option<KafkaClientTls>,
/// The connect timeout for kafka client.
#[serde(with = "humantime_serde")]
pub connect_timeout: Duration,
/// The timeout for kafka client.
#[serde(with = "humantime_serde")]
pub timeout: Duration,
}
impl Default for KafkaConnectionConfig {
@@ -169,6 +175,8 @@ impl Default for KafkaConnectionConfig {
broker_endpoints: vec![BROKER_ENDPOINT.to_string()],
sasl: None,
tls: None,
connect_timeout: Duration::from_secs(3),
timeout: Duration::from_secs(3),
}
}
}

View File

@@ -218,6 +218,7 @@ impl HeartbeatTask {
if let Some(message) = message {
Self::new_heartbeat_request(&heartbeat_request, Some(message), &latest_report)
} else {
warn!("Sender has been dropped, exiting the heartbeat loop");
// Receives None that means Sender was dropped, we need to break the current loop
break
}
@@ -259,7 +260,11 @@ impl HeartbeatTask {
error!(e; "Error while handling heartbeat response");
}
}
Ok(None) => break,
Ok(None) => {
warn!("Heartbeat response stream closed");
capture_self.start_with_retry(retry_interval).await;
break;
}
Err(e) => {
error!(e; "Occur error while reading heartbeat response");
capture_self.start_with_retry(retry_interval).await;

View File

@@ -71,6 +71,6 @@ pub struct LinearStagePlan {
/// The key expressions to use for the lookup relation.
pub lookup_key: Vec<ScalarExpr>,
/// The closure to apply to the concatenation of the key columns,
/// the stream value columns, and the lookup value colunms.
/// the stream value columns, and the lookup value columns.
pub closure: JoinFilter,
}

View File

@@ -23,7 +23,7 @@ use common_meta::heartbeat::handler::{
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_telemetry::{debug, error, info};
use common_telemetry::{debug, error, info, warn};
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
use servers::addrs;
use servers::heartbeat_options::HeartbeatOptions;
@@ -42,8 +42,8 @@ use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
pub struct HeartbeatTask {
peer_addr: String,
meta_client: Arc<MetaClient>,
report_interval: u64,
retry_interval: u64,
report_interval: Duration,
retry_interval: Duration,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
start_time_ms: u64,
}
@@ -58,8 +58,8 @@ impl HeartbeatTask {
HeartbeatTask {
peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
meta_client,
report_interval: heartbeat_opts.interval.as_millis() as u64,
retry_interval: heartbeat_opts.retry_interval.as_millis() as u64,
report_interval: heartbeat_opts.interval,
retry_interval: heartbeat_opts.retry_interval,
resp_handler_executor,
start_time_ms: common_time::util::current_time_millis() as u64,
}
@@ -103,13 +103,15 @@ impl HeartbeatTask {
HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
}
}
Ok(None) => break,
Ok(None) => {
warn!("Heartbeat response stream closed");
capture_self.start_with_retry(retry_interval).await;
break;
}
Err(e) => {
HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
error!(e; "Occur error while reading heartbeat response");
capture_self
.start_with_retry(Duration::from_millis(retry_interval))
.await;
capture_self.start_with_retry(retry_interval).await;
break;
}
@@ -177,12 +179,13 @@ impl HeartbeatTask {
if let Some(message) = message {
Self::new_heartbeat_request(&heartbeat_request, Some(message))
} else {
warn!("Sender has been dropped, exiting the heartbeat loop");
// Receives None that means Sender was dropped, we need to break the current loop
break
}
}
_ = &mut sleep => {
sleep.as_mut().reset(Instant::now() + Duration::from_millis(report_interval));
sleep.as_mut().reset(Instant::now() + report_interval);
Self::new_heartbeat_request(&heartbeat_request, None)
}
};

View File

@@ -139,9 +139,6 @@ pub enum Error {
error: rskafka::client::error::Error,
},
#[snafu(display("Failed to resolve Kafka broker endpoint."))]
ResolveKafkaEndpoint { source: common_wal::error::Error },
#[snafu(display(
"Failed to build a Kafka partition client, topic: {}, partition: {}",
topic,
@@ -343,7 +340,6 @@ impl ErrorExt for Error {
StartWalTask { .. }
| StopWalTask { .. }
| IllegalState { .. }
| ResolveKafkaEndpoint { .. }
| NoMaxValue { .. }
| Cast { .. }
| EncodeJson { .. }

View File

@@ -24,9 +24,7 @@ use snafu::ResultExt;
use store_api::logstore::provider::KafkaProvider;
use tokio::sync::{Mutex, RwLock};
use crate::error::{
BuildClientSnafu, BuildPartitionClientSnafu, ResolveKafkaEndpointSnafu, Result, TlsConfigSnafu,
};
use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result, TlsConfigSnafu};
use crate::kafka::index::{GlobalIndexCollector, NoopCollector};
use crate::kafka::producer::{OrderedBatchProducer, OrderedBatchProducerRef};
@@ -78,11 +76,10 @@ impl ClientManager {
high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
) -> Result<Self> {
// Sets backoff config for the top-level kafka client and all clients constructed by it.
let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints)
.await
.context(ResolveKafkaEndpointSnafu)?;
let mut builder =
ClientBuilder::new(broker_endpoints).backoff_config(DEFAULT_BACKOFF_CONFIG);
let mut builder = ClientBuilder::new(config.connection.broker_endpoints.clone())
.backoff_config(DEFAULT_BACKOFF_CONFIG)
.connect_timeout(Some(config.connection.connect_timeout))
.timeout(Some(config.connection.timeout));
if let Some(sasl) = &config.connection.sasl {
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
};

View File

@@ -186,6 +186,9 @@ impl MetaClientBuilder {
let mgr = client.channel_manager.clone();
if self.enable_heartbeat {
if self.heartbeat_channel_manager.is_some() {
info!("Enable heartbeat channel using the heartbeat channel manager.");
}
let mgr = self.heartbeat_channel_manager.unwrap_or(mgr.clone());
client.heartbeat = Some(HeartbeatClient::new(
self.id,
@@ -525,7 +528,7 @@ impl MetaClient {
self.heartbeat_client()?.ask_leader().await
}
/// Returns a heartbeat bidirectional streaming: (sender, recever), the
/// Returns a heartbeat bidirectional streaming: (sender, receiver), the
/// other end is the leader of `metasrv`.
///
/// The `datanode` needs to use the sender to continuously send heartbeat

View File

@@ -24,7 +24,7 @@ use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::warn;
use rand::seq::SliceRandom;
use snafu::{OptionExt, ResultExt};
use snafu::ResultExt;
use tokio::time::timeout;
use tonic::transport::Channel;
@@ -101,12 +101,14 @@ impl AskLeader {
};
let (tx, mut rx) = tokio::sync::mpsc::channel(peers.len());
let channel_manager = self.channel_manager.clone();
for addr in &peers {
let mut client = self.create_asker(addr)?;
let tx_clone = tx.clone();
let req = req.clone();
let addr = addr.to_string();
let addr = addr.clone();
let channel_manager = channel_manager.clone();
tokio::spawn(async move {
match client.ask_leader(req).await {
Ok(res) => {
@@ -117,13 +119,19 @@ impl AskLeader {
};
}
Err(status) => {
// Reset cached channel even on generic errors: the VIP may keep us on a dead
// backend, so forcing a reconnect gives us a chance to hit a healthy peer.
Self::reset_channels_with_manager(
&channel_manager,
std::slice::from_ref(&addr),
);
warn!("Failed to ask leader from: {addr}, {status}");
}
}
});
}
let leader = timeout(
let leader = match timeout(
self.channel_manager
.config()
.timeout
@@ -131,8 +139,16 @@ impl AskLeader {
rx.recv(),
)
.await
.context(error::AskLeaderTimeoutSnafu)?
.context(error::NoLeaderSnafu)?;
{
Ok(Some(leader)) => leader,
Ok(None) => return error::NoLeaderSnafu.fail(),
Err(e) => {
// All peers timed out. Reset channels to force reconnection,
// which may help escape dead backends in VIP/LB scenarios.
Self::reset_channels_with_manager(&self.channel_manager, &peers);
return Err(e).context(error::AskLeaderTimeoutSnafu);
}
};
let mut leadership_group = self.leadership_group.write().unwrap();
leadership_group.leader = Some(leader.clone());
@@ -169,6 +185,15 @@ impl AskLeader {
.context(error::CreateChannelSnafu)?,
))
}
/// Drop cached channels for the given peers so a fresh connection is used next time.
fn reset_channels_with_manager(channel_manager: &ChannelManager, peers: &[String]) {
if peers.is_empty() {
return;
}
channel_manager.retain_channel(|addr, _| !peers.iter().any(|peer| peer == addr));
}
}
#[async_trait]

View File

@@ -18,6 +18,10 @@ use std::time::Duration;
use client::RegionFollowerClientRef;
use common_base::Plugins;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::distributed_time_constants::{
HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS, HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS,
HEARTBEAT_TIMEOUT,
};
use common_telemetry::{debug, info};
use serde::{Deserialize, Serialize};
@@ -34,8 +38,6 @@ pub struct MetaClientOptions {
#[serde(with = "humantime_serde")]
pub timeout: Duration,
#[serde(with = "humantime_serde")]
pub heartbeat_timeout: Duration,
#[serde(with = "humantime_serde")]
pub ddl_timeout: Duration,
#[serde(with = "humantime_serde")]
pub connect_timeout: Duration,
@@ -52,7 +54,6 @@ impl Default for MetaClientOptions {
Self {
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
timeout: Duration::from_millis(3_000u64),
heartbeat_timeout: Duration::from_millis(500u64),
ddl_timeout: Duration::from_millis(10_000u64),
connect_timeout: Duration::from_millis(1_000u64),
tcp_nodelay: true,
@@ -97,7 +98,11 @@ pub async fn create_meta_client(
.timeout(meta_client_options.timeout)
.connect_timeout(meta_client_options.connect_timeout)
.tcp_nodelay(meta_client_options.tcp_nodelay);
let heartbeat_config = base_config.clone();
let heartbeat_config = base_config
.clone()
.timeout(HEARTBEAT_TIMEOUT)
.http2_keep_alive_interval(HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS)
.http2_keep_alive_timeout(HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS);
if let MetaClientType::Frontend = client_type {
let ddl_config = base_config.clone().timeout(meta_client_options.ddl_timeout);

View File

@@ -40,7 +40,7 @@ use common_telemetry::info;
#[cfg(feature = "pg_kvbackend")]
use deadpool_postgres::{Config, Runtime};
use either::Either;
use etcd_client::Client;
use etcd_client::{Client, ConnectOptions};
use servers::configurator::ConfiguratorRef;
use servers::export_metrics::ExportMetricsTask;
use servers::http::{HttpServer, HttpServerBuilder};
@@ -70,7 +70,9 @@ use crate::election::rds::postgres::PgElection;
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
use crate::election::CANDIDATE_LEASE_SECS;
use crate::metasrv::builder::MetasrvBuilder;
use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectTarget, SelectorRef};
use crate::metasrv::{
BackendClientOptions, BackendImpl, Metasrv, MetasrvOptions, SelectTarget, SelectorRef,
};
use crate::node_excluder::NodeExcluderRef;
use crate::selector::lease_based::LeaseBasedSelector;
use crate::selector::load_based::LoadBasedSelector;
@@ -270,7 +272,12 @@ macro_rules! add_compressed_service {
}
pub fn router(metasrv: Arc<Metasrv>) -> Router {
let mut router = tonic::transport::Server::builder().accept_http1(true); // for admin services
let mut router = tonic::transport::Server::builder()
// for admin services
.accept_http1(true)
// For quick network failures detection.
.http2_keepalive_interval(Some(metasrv.options().grpc.http2_keep_alive_interval))
.http2_keepalive_timeout(Some(metasrv.options().grpc.http2_keep_alive_timeout));
let router = add_compressed_service!(router, HeartbeatServer::from_arc(metasrv.clone()));
let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone()));
let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone()));
@@ -287,7 +294,7 @@ pub async fn metasrv_builder(
(Some(kv_backend), _) => (kv_backend, None),
(None, BackendImpl::MemoryStore) => (Arc::new(MemoryKvBackend::new()) as _, None),
(None, BackendImpl::EtcdStore) => {
let etcd_client = create_etcd_client(&opts.store_addrs).await?;
let etcd_client = create_etcd_client(&opts.store_addrs, &opts.backend_client).await?;
let kv_backend = EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops);
let election = EtcdElection::with_etcd_client(
&opts.grpc.server_addr,
@@ -435,13 +442,20 @@ pub async fn metasrv_builder(
.plugins(plugins))
}
pub async fn create_etcd_client(store_addrs: &[String]) -> Result<Client> {
pub async fn create_etcd_client(
store_addrs: &[String],
options: &BackendClientOptions,
) -> Result<Client> {
let etcd_endpoints = store_addrs
.iter()
.map(|x| x.trim())
.filter(|x| !x.is_empty())
.collect::<Vec<_>>();
Client::connect(&etcd_endpoints, None)
let options = ConnectOptions::new()
.with_keep_alive_while_idle(true)
.with_keep_alive(options.keep_alive_interval, options.keep_alive_timeout)
.with_connect_timeout(options.connect_timeout);
Client::connect(&etcd_endpoints, Some(options))
.await
.context(error::ConnectEtcdSnafu)
}

View File

@@ -63,22 +63,6 @@ pub struct EtcdElection {
}
impl EtcdElection {
pub async fn with_endpoints<E, S>(
leader_value: E,
endpoints: S,
store_key_prefix: String,
) -> Result<ElectionRef>
where
E: AsRef<str>,
S: AsRef<[E]>,
{
let client = Client::connect(endpoints, None)
.await
.context(error::ConnectEtcdSnafu)?;
Self::with_etcd_client(leader_value, client, store_key_prefix).await
}
pub async fn with_etcd_client<E>(
leader_value: E,
client: Client,

View File

@@ -1190,7 +1190,7 @@ mod tests {
));
handles.push(handle);
}
// Wait for candidates to registrate themselves and renew their leases at least once.
// Wait for candidates to register themselves and renew their leases at least once.
tokio::time::sleep(candidate_lease_ttl / 2 + Duration::from_secs(1)).await;
let (tx, _) = broadcast::channel(100);

View File

@@ -1012,7 +1012,7 @@ mod tests {
));
handles.push(handle);
}
// Wait for candidates to registrate themselves and renew their leases at least once.
// Wait for candidates to register themselves and renew their leases at least once.
tokio::time::sleep(Duration::from_secs(3)).await;
let (tx, _) = broadcast::channel(100);

View File

@@ -15,7 +15,6 @@
use std::collections::VecDeque;
use std::time::Duration;
use common_meta::distributed_time_constants;
use serde::{Deserialize, Serialize};
/// This is our port of Akka's "[PhiAccrualFailureDetector](https://github.com/akka/akka/blob/v2.6.21/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala)"
@@ -83,9 +82,7 @@ impl Default for PhiAccrualFailureDetectorOptions {
Self {
threshold: 8_f32,
min_std_deviation: Duration::from_millis(100),
acceptable_heartbeat_pause: Duration::from_secs(
distributed_time_constants::DATANODE_LEASE_SECS,
),
acceptable_heartbeat_pause: Duration::from_secs(10),
first_heartbeat_estimate: Duration::from_millis(1000),
}
}

View File

@@ -268,6 +268,15 @@ impl Pushers {
async fn remove(&self, pusher_id: &str) -> Option<Pusher> {
self.0.write().await.remove(pusher_id)
}
pub(crate) async fn clear(&self) -> Vec<String> {
let mut pushers = self.0.write().await;
let keys = pushers.keys().cloned().collect::<Vec<_>>();
if !keys.is_empty() {
pushers.clear();
}
keys
}
}
#[derive(Clone)]
@@ -304,10 +313,11 @@ impl HeartbeatHandlerGroup {
/// Deregisters the heartbeat response [`Pusher`] with the given key from the group.
///
/// Returns the [`Pusher`] if it exists.
pub async fn deregister_push(&self, pusher_id: PusherId) -> Option<Pusher> {
METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
pub async fn deregister_push(&self, pusher_id: PusherId) {
info!("Pusher unregister: {}", pusher_id);
self.pushers.remove(&pusher_id.string_key()).await
if self.pushers.remove(&pusher_id.string_key()).await.is_some() {
METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
}
}
/// Returns the [`Pushers`] of the group.
@@ -516,6 +526,14 @@ impl Mailbox for HeartbeatMailbox {
Ok(())
}
async fn reset(&self) {
let keys = self.pushers.clear().await;
if !keys.is_empty() {
info!("Reset mailbox, deregister pushers: {:?}", keys);
METRIC_META_HEARTBEAT_CONNECTION_NUM.sub(keys.len() as i64);
}
}
}
/// The builder to build the group of heartbeat handlers.

View File

@@ -124,7 +124,7 @@ mod test {
use std::sync::Arc;
use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
use common_meta::distributed_time_constants;
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::key::TableMetadataManager;
@@ -223,7 +223,7 @@ mod test {
let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());
let handler = RegionLeaseHandler::new(
distributed_time_constants::REGION_LEASE_SECS,
default_distributed_time_constants().region_lease.as_secs() as u64,
table_metadata_manager.clone(),
opening_region_keeper.clone(),
None,
@@ -253,7 +253,7 @@ mod test {
assert_eq!(
acc.region_lease.as_ref().unwrap().lease_seconds,
distributed_time_constants::REGION_LEASE_SECS
default_distributed_time_constants().region_lease.as_secs()
);
assert_region_lease(
@@ -287,7 +287,7 @@ mod test {
assert_eq!(
acc.region_lease.as_ref().unwrap().lease_seconds,
distributed_time_constants::REGION_LEASE_SECS
default_distributed_time_constants().region_lease.as_secs()
);
assert_region_lease(
@@ -366,7 +366,7 @@ mod test {
});
let handler = RegionLeaseHandler::new(
distributed_time_constants::REGION_LEASE_SECS,
default_distributed_time_constants().region_lease.as_secs(),
table_metadata_manager.clone(),
Default::default(),
None,

View File

@@ -21,7 +21,7 @@ use std::task::{Context, Poll};
use api::v1::meta::heartbeat_request::NodeWorkloads;
use common_error::ext::BoxedError;
use common_meta::cluster::{NodeInfo, NodeInfoKey, Role as ClusterRole};
use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS;
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef};
use common_meta::peer::{Peer, PeerLookupService};
use common_meta::rpc::store::RangeRequest;
@@ -312,7 +312,9 @@ impl PeerLookupService for MetaPeerLookupService {
lookup_frontends(
&self.meta_peer_client,
// TODO(zyy17): How to get the heartbeat interval of the frontend if it uses a custom heartbeat interval?
FRONTEND_HEARTBEAT_INTERVAL_MILLIS,
default_distributed_time_constants()
.frontend_heartbeat_interval
.as_secs(),
)
.await
.map_err(BoxedError::new)

View File

@@ -27,7 +27,7 @@ use common_event_recorder::EventRecorderOptions;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl_manager::DdlManagerRef;
use common_meta::distributed_time_constants;
use common_meta::distributed_time_constants::{self, default_distributed_time_constants};
use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
@@ -98,6 +98,27 @@ pub enum BackendImpl {
MysqlStore,
}
#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
#[serde(default)]
pub struct BackendClientOptions {
#[serde(with = "humantime_serde")]
pub keep_alive_timeout: Duration,
#[serde(with = "humantime_serde")]
pub keep_alive_interval: Duration,
#[serde(with = "humantime_serde")]
pub connect_timeout: Duration,
}
impl Default for BackendClientOptions {
fn default() -> Self {
Self {
keep_alive_interval: Duration::from_secs(10),
keep_alive_timeout: Duration::from_secs(3),
connect_timeout: Duration::from_secs(3),
}
}
}
#[derive(Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct MetasrvOptions {
@@ -113,12 +134,22 @@ pub struct MetasrvOptions {
/// Only applicable when using PostgreSQL or MySQL as the metadata store
#[serde(default)]
pub backend_tls: Option<TlsOption>,
/// The backend client options.
/// Currently, only applicable when using etcd as the metadata store.
#[serde(default)]
pub backend_client: BackendClientOptions,
/// The type of selector.
pub selector: SelectorType,
/// Whether to use the memory store.
pub use_memory_store: bool,
/// Whether to enable region failover.
pub enable_region_failover: bool,
/// The base heartbeat interval.
///
/// This value is used to calculate the distributed time constants for components.
/// e.g., the region lease time is `heartbeat_interval * 3 + Duration::from_secs(1)`.
#[serde(with = "humantime_serde")]
pub heartbeat_interval: Duration,
/// The delay before starting region failure detection.
/// This delay helps prevent Metasrv from triggering unnecessary region failovers before all Datanodes are fully started.
/// Especially useful when the cluster is not deployed with GreptimeDB Operator and maintenance mode is not enabled.
@@ -211,7 +242,9 @@ impl fmt::Debug for MetasrvOptions {
.field("max_txn_ops", &self.max_txn_ops)
.field("flush_stats_factor", &self.flush_stats_factor)
.field("tracing", &self.tracing)
.field("backend", &self.backend);
.field("backend", &self.backend)
.field("heartbeat_interval", &self.heartbeat_interval)
.field("backend_client", &self.backend_client);
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
debug_struct.field("meta_table_name", &self.meta_table_name);
@@ -239,6 +272,7 @@ impl Default for MetasrvOptions {
selector: SelectorType::default(),
use_memory_store: false,
enable_region_failover: false,
heartbeat_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
region_failure_detector_initialization_delay: Duration::from_secs(10 * 60),
allow_region_failover_on_local_wal: false,
grpc: GrpcOptions {
@@ -273,6 +307,7 @@ impl Default for MetasrvOptions {
meta_election_lock_id: common_meta::kv_backend::DEFAULT_META_ELECTION_LOCK_ID,
node_max_idle_time: Duration::from_secs(24 * 60 * 60),
event_recorder: EventRecorderOptions::default(),
backend_client: BackendClientOptions::default(),
}
}
}
@@ -388,6 +423,7 @@ pub struct MetaStateHandler {
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
leadership_change_notifier: LeadershipChangeNotifier,
mailbox: MailboxRef,
state: StateRef,
}
@@ -411,6 +447,9 @@ impl MetaStateHandler {
pub async fn on_leader_stop(&self) {
self.state.write().unwrap().next_state(become_follower());
// Enforces the mailbox to clear all pushers.
// The remaining heartbeat connections will be closed by the remote peer or keep-alive detection.
self.mailbox.reset().await;
self.leadership_change_notifier
.notify_on_leader_stop()
.await;
@@ -528,6 +567,7 @@ impl Metasrv {
state: self.state.clone(),
leader_cached_kv_backend: leader_cached_kv_backend.clone(),
leadership_change_notifier,
mailbox: self.mailbox.clone(),
};
let _handle = common_runtime::spawn_global(async move {
loop {
@@ -655,7 +695,9 @@ impl Metasrv {
lookup_datanode_peer(
peer_id,
&self.meta_peer_client,
distributed_time_constants::DATANODE_LEASE_SECS,
default_distributed_time_constants()
.datanode_lease
.as_secs(),
)
.await
}

View File

@@ -27,7 +27,7 @@ use common_meta::ddl::{
DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef,
};
use common_meta::ddl_manager::DdlManager;
use common_meta::distributed_time_constants;
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::key::flow::flow_state::FlowStateManager;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::runtime_switch::{RuntimeSwitchManager, RuntimeSwitchManagerRef};
@@ -220,8 +220,12 @@ impl MetasrvBuilder {
let selector_ctx = SelectorContext {
server_addr: options.grpc.server_addr.clone(),
datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS,
flownode_lease_secs: distributed_time_constants::FLOWNODE_LEASE_SECS,
datanode_lease_secs: default_distributed_time_constants()
.datanode_lease
.as_secs(),
flownode_lease_secs: default_distributed_time_constants()
.flownode_lease
.as_secs(),
kv_backend: kv_backend.clone(),
meta_peer_client: meta_peer_client.clone(),
table_id: None,
@@ -438,7 +442,7 @@ impl MetasrvBuilder {
Some(handler_group_builder) => handler_group_builder,
None => {
let region_lease_handler = RegionLeaseHandler::new(
distributed_time_constants::REGION_LEASE_SECS,
default_distributed_time_constants().region_lease.as_secs(),
table_metadata_manager.clone(),
memory_region_keeper.clone(),
customized_region_lease_renewer,

View File

@@ -770,7 +770,7 @@ mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::instruction::Instruction;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::rpc::router::{Region, RegionRoute};
@@ -1004,8 +1004,10 @@ mod tests {
.run_once()
.await;
let region_lease = default_distributed_time_constants().region_lease.as_secs();
// Ensure it didn't run into the slow path.
assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2);
assert!(timer.elapsed().as_secs() < region_lease / 2);
runner.suite.verify_table_metadata().await;
}
@@ -1059,8 +1061,9 @@ mod tests {
.run_once()
.await;
let region_lease = default_distributed_time_constants().region_lease.as_secs();
// Ensure it didn't run into the slow path.
assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2);
assert!(timer.elapsed().as_secs() < region_lease / 2);
runner.suite.verify_table_metadata().await;
}
@@ -1380,8 +1383,9 @@ mod tests {
.run_once()
.await;
let region_lease = default_distributed_time_constants().region_lease.as_secs();
// Ensure it didn't run into the slow path.
assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS);
assert!(timer.elapsed().as_secs() < region_lease);
runner.suite.verify_table_metadata().await;
}
}

View File

@@ -13,10 +13,9 @@
// limitations under the License.
use std::any::Any;
use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::key::datanode_table::RegionInfo;
use common_meta::RegionIdent;
@@ -31,9 +30,6 @@ use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;
/// Uses lease time of a region as the timeout of closing a downgraded region.
const CLOSE_DOWNGRADED_REGION_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS);
#[derive(Debug, Serialize, Deserialize)]
pub struct CloseDowngradedRegion;
@@ -111,7 +107,7 @@ impl CloseDowngradedRegion {
let ch = Channel::Datanode(downgrade_leader_datanode.id);
let receiver = ctx
.mailbox
.send(&ch, msg, CLOSE_DOWNGRADED_REGION_TIMEOUT)
.send(&ch, msg, default_distributed_time_constants().region_lease)
.await?;
match receiver.await {

View File

@@ -17,7 +17,7 @@ use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_error::ext::BoxedError;
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::instruction::{
DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply,
};
@@ -64,7 +64,7 @@ impl State for DowngradeLeaderRegion {
let now = Instant::now();
// Ensures the `leader_region_lease_deadline` must exist after recovering.
ctx.volatile_ctx
.set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS));
.set_leader_region_lease_deadline(default_distributed_time_constants().region_lease);
match self.downgrade_region_with_retry(ctx).await {
Ok(_) => {
@@ -250,14 +250,14 @@ impl DowngradeLeaderRegion {
if let Some(last_connection_at) = last_connection_at {
let now = current_time_millis();
let elapsed = now - last_connection_at;
let region_lease = Duration::from_secs(REGION_LEASE_SECS);
let region_lease = default_distributed_time_constants().region_lease;
// It's safe to update the region leader lease deadline here because:
// 1. The old region leader has already been marked as downgraded in metadata,
// which means any attempts to renew its lease will be rejected.
// 2. The pusher disconnect time record only gets removed when the datanode (from_peer)
// establishes a new heartbeat connection stream.
if elapsed >= (REGION_LEASE_SECS * 1000) as i64 {
if elapsed >= (region_lease.as_secs() * 1000) as i64 {
ctx.volatile_ctx.reset_leader_region_lease_deadline();
info!(
"Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {}",
@@ -663,7 +663,8 @@ mod tests {
let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let elapsed = timer.elapsed().as_secs();
assert!(elapsed < REGION_LEASE_SECS / 2);
let region_lease = default_distributed_time_constants().region_lease.as_secs();
assert!(elapsed < region_lease / 2);
assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1));
assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());

View File

@@ -13,10 +13,9 @@
// limitations under the License.
use std::any::Any;
use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
use common_meta::key::datanode_table::RegionInfo;
use common_meta::RegionIdent;
@@ -32,9 +31,6 @@ use crate::procedure::region_migration::flush_leader_region::PreFlushRegion;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;
/// Uses lease time of a region as the timeout of opening a candidate region.
const OPEN_CANDIDATE_REGION_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS);
#[derive(Debug, Serialize, Deserialize)]
pub struct OpenCandidateRegion;
@@ -143,7 +139,7 @@ impl OpenCandidateRegion {
let now = Instant::now();
let receiver = ctx
.mailbox
.send(&ch, msg, OPEN_CANDIDATE_REGION_TIMEOUT)
.send(&ch, msg, default_distributed_time_constants().region_lease)
.await?;
match receiver.await {

View File

@@ -14,7 +14,7 @@
use std::sync::Arc;
use common_meta::distributed_time_constants::{FLOWNODE_LEASE_SECS, REGION_LEASE_SECS};
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
use rand::prelude::SliceRandom;
@@ -36,8 +36,10 @@ pub fn new_test_selector_context() -> SelectorContext {
SelectorContext {
server_addr: "127.0.0.1:3002".to_string(),
datanode_lease_secs: REGION_LEASE_SECS,
flownode_lease_secs: FLOWNODE_LEASE_SECS,
datanode_lease_secs: default_distributed_time_constants().region_lease.as_secs(),
flownode_lease_secs: default_distributed_time_constants()
.flownode_lease
.as_secs(),
kv_backend,
meta_peer_client,
table_id: None,

View File

@@ -27,10 +27,9 @@ use snafu::OptionExt;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Streaming};
use tonic::{Request, Response, Status, Streaming};
use crate::error;
use crate::error::Result;
use crate::error::{self, Result};
use crate::handler::{HeartbeatHandlerGroup, Pusher, PusherId};
use crate::metasrv::{Context, Metasrv};
use crate::metrics::METRIC_META_HEARTBEAT_RECV;
@@ -99,6 +98,7 @@ impl heartbeat_server::Heartbeat for Metasrv {
break;
}
}
error!(err; "Sending heartbeat response error");
if tx.send(Err(err)).await.is_err() {
info!("ReceiverStream was dropped; shutting down");
@@ -109,6 +109,12 @@ impl heartbeat_server::Heartbeat for Metasrv {
if is_not_leader {
warn!("Quit because it is no longer the leader");
let _ = tx
.send(Err(Status::aborted(format!(
"The requested metasrv node is not leader, node addr: {}",
ctx.server_addr
))))
.await;
break;
}
}

View File

@@ -207,6 +207,9 @@ pub trait Mailbox: Send + Sync {
async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()>;
async fn on_recv(&self, id: MessageId, maybe_msg: Result<MailboxMessage>) -> Result<()>;
/// Reset all pushers of the mailbox.
async fn reset(&self);
}
#[cfg(test)]

View File

@@ -90,7 +90,8 @@ pub struct CompactionRegion {
pub(crate) engine_config: Arc<MitoConfig>,
pub(crate) region_metadata: RegionMetadataRef,
pub(crate) cache_manager: CacheManagerRef,
pub(crate) access_layer: AccessLayerRef,
/// Access layer to get the table path and path type.
pub access_layer: AccessLayerRef,
pub(crate) manifest_ctx: Arc<ManifestContext>,
pub(crate) current_version: CompactionVersion,
pub(crate) file_purger: Option<Arc<LocalFilePurger>>,

View File

@@ -76,7 +76,7 @@ pub struct RegionManifestOptions {
/// -RegionMetadataRef metadata
/// }
/// class RegionEdit {
/// -VersionNumber regoin_version
/// -VersionNumber region_version
/// -Vec~FileMeta~ files_to_add
/// -Vec~FileMeta~ files_to_remove
/// -SequenceNumber flushed_sequence

View File

@@ -371,7 +371,7 @@ impl VersionBuilder {
self
}
/// Sets truncated entty id.
/// Sets truncated entry id.
pub(crate) fn truncated_entry_id(mut self, entry_id: Option<EntryId>) -> Self {
self.truncated_entry_id = entry_id;
self

View File

@@ -344,7 +344,7 @@ mod tests {
#[test]
fn test_collect_and_basic() {
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_and_basic_");
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_collect_and_basic_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -353,7 +353,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let expr = Expr::BinaryExpr(BinaryExpr {

View File

@@ -72,7 +72,7 @@ mod tests {
#[test]
fn test_collect_between_basic() {
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_between_basic_");
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_collect_between_basic_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
@@ -80,7 +80,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let between = Between {
@@ -113,7 +113,7 @@ mod tests {
#[test]
fn test_collect_between_negated() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_between_negated_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -122,7 +122,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let between = Between {
@@ -138,7 +138,7 @@ mod tests {
#[test]
fn test_collect_between_field_column() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_between_field_column_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -147,7 +147,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let between = Between {
@@ -180,7 +180,7 @@ mod tests {
#[test]
fn test_collect_between_type_mismatch() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_between_type_mismatch_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -189,7 +189,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let between = Between {
@@ -206,7 +206,7 @@ mod tests {
#[test]
fn test_collect_between_nonexistent_column() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_between_nonexistent_column_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -215,7 +215,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let between = Between {

View File

@@ -227,7 +227,7 @@ mod tests {
),
];
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_comparison_basic_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -236,7 +236,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
for ((left, op, right), _) in &cases {
@@ -255,7 +255,7 @@ mod tests {
#[test]
fn test_collect_comparison_type_mismatch() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_comparison_type_mismatch_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -264,7 +264,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let res = builder.collect_comparison_expr(&tag_column(), &Operator::Lt, &int64_lit(10));
@@ -274,7 +274,7 @@ mod tests {
#[test]
fn test_collect_comparison_field_column() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_comparison_field_column_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -283,7 +283,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
builder
@@ -308,7 +308,7 @@ mod tests {
#[test]
fn test_collect_comparison_nonexistent_column() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_comparison_nonexistent_column_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -317,7 +317,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let res = builder.collect_comparison_expr(

View File

@@ -136,7 +136,7 @@ mod tests {
#[test]
fn test_collect_eq_basic() {
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_eq_basic_");
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_collect_eq_basic_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
@@ -144,7 +144,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
builder
@@ -172,7 +172,7 @@ mod tests {
#[test]
fn test_collect_eq_field_column() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_eq_field_column_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -181,7 +181,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
builder
@@ -200,7 +200,7 @@ mod tests {
#[test]
fn test_collect_eq_nonexistent_column() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_eq_nonexistent_column_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -209,7 +209,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let res = builder.collect_eq(&nonexistent_column(), &string_lit("abc"));
@@ -219,7 +219,7 @@ mod tests {
#[test]
fn test_collect_eq_type_mismatch() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_eq_type_mismatch_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -228,7 +228,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let res = builder.collect_eq(&tag_column(), &int64_lit(1));
@@ -238,7 +238,7 @@ mod tests {
#[test]
fn test_collect_or_eq_list_basic() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_basic_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -247,7 +247,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
@@ -296,7 +296,7 @@ mod tests {
#[test]
fn test_collect_or_eq_list_invalid_op() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_invalid_op_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -305,7 +305,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
@@ -333,7 +333,7 @@ mod tests {
#[test]
fn test_collect_or_eq_list_multiple_columns() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_multiple_columns_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -342,7 +342,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let eq_expr = DfExpr::BinaryExpr(BinaryExpr {

View File

@@ -67,7 +67,7 @@ mod tests {
#[test]
fn test_collect_in_list_basic() {
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_in_list_basic_");
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_collect_in_list_basic_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
@@ -75,7 +75,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let in_list = InList {
@@ -98,7 +98,7 @@ mod tests {
#[test]
fn test_collect_in_list_negated() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_in_list_negated_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -107,7 +107,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let in_list = InList {
@@ -122,7 +122,7 @@ mod tests {
#[test]
fn test_collect_in_list_field_column() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_in_list_field_column_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -131,7 +131,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let in_list = InList {
@@ -154,7 +154,7 @@ mod tests {
#[test]
fn test_collect_in_list_type_mismatch() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_in_list_type_mismatch_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -163,7 +163,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let in_list = InList {
@@ -179,7 +179,7 @@ mod tests {
#[test]
fn test_collect_in_list_nonexistent_column() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_in_list_nonexistent_column_");
let metadata = test_region_metadata();
@@ -189,7 +189,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let in_list = InList {

View File

@@ -59,7 +59,7 @@ mod tests {
#[test]
fn test_regex_match_basic() {
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_regex_match_basic_");
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_regex_match_basic_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
@@ -67,7 +67,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
builder
@@ -86,7 +86,7 @@ mod tests {
#[test]
fn test_regex_match_field_column() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_regex_match_field_column_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -95,7 +95,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
builder
@@ -114,7 +114,7 @@ mod tests {
#[test]
fn test_regex_match_type_mismatch() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_regex_match_type_mismatch_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -123,7 +123,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
builder
@@ -135,7 +135,7 @@ mod tests {
#[test]
fn test_regex_match_type_nonexist_column() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_regex_match_type_nonexist_column_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -144,7 +144,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let res = builder.collect_regex_match(&nonexistent_column(), &string_lit("abc"));

View File

@@ -70,7 +70,7 @@ impl ScalarCalculate {
interval: Millisecond,
input: LogicalPlan,
time_index: &str,
tag_colunms: &[String],
tag_columns: &[String],
field_column: &str,
table_name: Option<&str>,
) -> Result<Self> {
@@ -97,7 +97,7 @@ impl ScalarCalculate {
end,
interval,
time_index: time_index.to_string(),
tag_columns: tag_colunms.to_vec(),
tag_columns: tag_columns.to_vec(),
field_column: field_column.to_string(),
input,
output_schema: Arc::new(schema),

View File

@@ -82,7 +82,7 @@ impl ExtensionPlanner for MergeSortExtensionPlanner {
// and we only need to do a merge sort, otherwise fallback to quick sort
let can_merge_sort = partition_cnt >= region_cnt;
if can_merge_sort {
// TODO(discord9): use `SortPreversingMergeExec here`
// TODO(discord9): use `SortPreservingMergeExec here`
}
// for now merge sort only exist in logical plan, and have the same effect as `Sort`
// doesn't change the execution plan, this will change in the future

View File

@@ -23,6 +23,7 @@ pub mod prom_query_gateway;
pub mod region_server;
use std::net::SocketAddr;
use std::time::Duration;
use api::v1::health_check_server::{HealthCheck, HealthCheckServer};
use api::v1::{HealthCheckRequest, HealthCheckResponse};
@@ -72,6 +73,12 @@ pub struct GrpcOptions {
pub runtime_size: usize,
#[serde(default = "Default::default")]
pub tls: TlsOption,
/// The HTTP/2 keep-alive interval.
#[serde(with = "humantime_serde")]
pub http2_keep_alive_interval: Duration,
/// The HTTP/2 keep-alive timeout.
#[serde(with = "humantime_serde")]
pub http2_keep_alive_timeout: Duration,
}
impl GrpcOptions {
@@ -129,6 +136,8 @@ impl Default for GrpcOptions {
flight_compression: FlightCompression::ArrowIpc,
runtime_size: 8,
tls: TlsOption::default(),
http2_keep_alive_interval: Duration::from_secs(10),
http2_keep_alive_timeout: Duration::from_secs(3),
}
}
}

View File

@@ -34,12 +34,10 @@ impl HeartbeatOptions {
pub fn frontend_default() -> Self {
Self {
// Frontend can send heartbeat with a longer interval.
interval: Duration::from_millis(
distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS,
),
retry_interval: Duration::from_millis(
distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS,
interval: distributed_time_constants::frontend_heartbeat_interval(
distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
),
retry_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
}
}
}
@@ -47,10 +45,8 @@ impl HeartbeatOptions {
impl Default for HeartbeatOptions {
fn default() -> Self {
Self {
interval: Duration::from_millis(distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS),
retry_interval: Duration::from_millis(
distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS,
),
interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
retry_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
}
}
}

View File

@@ -1066,7 +1066,7 @@ impl HttpServer {
/// Route Prometheus [HTTP API].
///
/// [HTTP API]: https://prometheus.io/docs/prometheus/latest/querying/api/
fn route_prometheus<S>(prometheus_handler: PrometheusHandlerRef) -> Router<S> {
pub fn route_prometheus<S>(prometheus_handler: PrometheusHandlerRef) -> Router<S> {
Router::new()
.route(
"/format_query",

View File

@@ -352,7 +352,7 @@ async fn dryrun_pipeline_inner(
)
.await?;
let colume_type_key = "colume_type";
let column_type_key = "column_type";
let data_type_key = "data_type";
let name_key = "name";
@@ -376,7 +376,7 @@ async fn dryrun_pipeline_inner(
JsonValue::String(cs.datatype().as_str_name().to_string()),
);
map.insert(
colume_type_key.to_string(),
column_type_key.to_string(),
JsonValue::String(cs.semantic_type().as_str_name().to_string()),
);
map.insert(
@@ -409,7 +409,7 @@ async fn dryrun_pipeline_inner(
);
map.insert(
"semantic_type".to_string(),
schema[idx][colume_type_key].clone(),
schema[idx][column_type_key].clone(),
);
map.insert(
"data_type".to_string(),

View File

@@ -105,7 +105,7 @@ mod tests {
use crate::statements::statement::Statement;
#[test]
fn test_display_for_tuncate_table() {
fn test_display_for_truncate_table() {
let sql = r"truncate table t1;";
let stmts: Vec<Statement> =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())

View File

@@ -19,7 +19,7 @@ use std::sync::Arc;
use std::time::Duration;
use arbitrary::{Arbitrary, Unstructured};
use common_meta::distributed_time_constants;
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_telemetry::info;
use libfuzzer_sys::fuzz_target;
use rand::{Rng, SeedableRng};
@@ -252,10 +252,7 @@ async fn execute_failover(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
recover_pod_failure(ctx.kube.clone(), &ctx.namespace, &chaos_name).await?;
wait_for_all_datanode_online(ctx.greptime.clone(), Duration::from_secs(60)).await;
tokio::time::sleep(Duration::from_secs(
distributed_time_constants::REGION_LEASE_SECS,
))
.await;
tokio::time::sleep(default_distributed_time_constants().region_lease).await;
// Validates value rows
info!("Validates num of rows");

View File

@@ -19,7 +19,7 @@ use std::sync::Arc;
use std::time::Duration;
use arbitrary::{Arbitrary, Unstructured};
use common_meta::distributed_time_constants;
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_telemetry::info;
use common_time::util::current_time_millis;
use futures::future::try_join_all;
@@ -319,10 +319,7 @@ async fn execute_failover(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
recover_pod_failure(ctx.kube.clone(), &ctx.namespace, &chaos_name).await?;
wait_for_all_datanode_online(ctx.greptime.clone(), Duration::from_secs(60)).await;
tokio::time::sleep(Duration::from_secs(
distributed_time_constants::REGION_LEASE_SECS,
))
.await;
tokio::time::sleep(default_distributed_time_constants().region_lease).await;
// Validates value rows
info!("Validates num of rows");
for (table_ctx, expected_rows) in table_ctxs.iter().zip(affected_rows) {

View File

@@ -19,7 +19,7 @@ use std::sync::Arc;
use std::time::Duration;
use arbitrary::{Arbitrary, Unstructured};
use common_meta::distributed_time_constants;
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_telemetry::info;
use libfuzzer_sys::fuzz_target;
use rand::{Rng, SeedableRng};
@@ -271,10 +271,7 @@ async fn migrate_regions(ctx: &FuzzContext, migrations: &[Migration]) -> Result<
wait_for_migration(ctx, migration, &procedure_id).await;
}
tokio::time::sleep(Duration::from_secs(
distributed_time_constants::REGION_LEASE_SECS,
))
.await;
tokio::time::sleep(default_distributed_time_constants().region_lease).await;
Ok(())
}

View File

@@ -19,7 +19,7 @@ use std::sync::Arc;
use std::time::Duration;
use arbitrary::{Arbitrary, Unstructured};
use common_meta::distributed_time_constants;
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_telemetry::info;
use libfuzzer_sys::fuzz_target;
use rand::{Rng, SeedableRng};
@@ -262,10 +262,7 @@ async fn migrate_regions(ctx: &FuzzContext, migrations: &[Migration]) -> Result<
.await;
}
tokio::time::sleep(Duration::from_secs(
distributed_time_constants::REGION_LEASE_SECS,
))
.await;
tokio::time::sleep(default_distributed_time_constants().region_lease).await;
Ok(())
}

View File

@@ -1,14 +1,14 @@
services:
zookeeper:
image: docker.io/bitnami/zookeeper:3.7
image: greptime/zookeeper:3.7
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: docker.io/bitnami/kafka:3.9.0
image: greptime/kafka:3.9.0-debian-12-r1
container_name: kafka
ports:
- 9092:9092
@@ -32,7 +32,7 @@ services:
condition: service_started
etcd:
image: docker.io/bitnami/etcd:3.5
image: greptime/etcd:3.6.1-debian-12-r3
ports:
- "2379:2379"
- "2380:2380"
@@ -44,7 +44,7 @@ services:
ETCD_MAX_REQUEST_BYTES: 10485760
minio:
image: docker.io/bitnami/minio:2024
image: greptime/minio:2024
ports:
- '9000:9000'
- '9001:9001'
@@ -68,7 +68,7 @@ services:
- POSTGRES_PASSWORD=admin
mysql:
image: bitnami/mysql:5.7
image: greptime/mysql:5.7
ports:
- 3306:3306
volumes:

View File

@@ -1188,6 +1188,8 @@ max_recv_message_size = "512MiB"
max_send_message_size = "512MiB"
flight_compression = "arrow_ipc"
runtime_size = 8
http2_keep_alive_interval = "10s"
http2_keep_alive_timeout = "3s"
[grpc.tls]
mode = "disable"
@@ -3212,37 +3214,37 @@ transform:
let dryrun_schema = json!([
{
"colume_type": "FIELD",
"column_type": "FIELD",
"data_type": "INT32",
"fulltext": false,
"name": "id1"
},
{
"colume_type": "FIELD",
"column_type": "FIELD",
"data_type": "INT32",
"fulltext": false,
"name": "id2"
},
{
"colume_type": "FIELD",
"column_type": "FIELD",
"data_type": "STRING",
"fulltext": false,
"name": "type"
},
{
"colume_type": "FIELD",
"column_type": "FIELD",
"data_type": "STRING",
"fulltext": false,
"name": "log"
},
{
"colume_type": "FIELD",
"column_type": "FIELD",
"data_type": "STRING",
"fulltext": false,
"name": "logger"
},
{
"colume_type": "TIMESTAMP",
"column_type": "TIMESTAMP",
"data_type": "TIMESTAMP_NANOSECOND",
"fulltext": false,
"name": "time"

View File

@@ -2,7 +2,7 @@
version: '3.8'
services:
kafka:
image: bitnami/kafka:3.6.0
image: greptime/kafka:3.9.0-debian-12-r1
container_name: kafka
ports:
- 9092:9092

View File

@@ -363,9 +363,9 @@ pub fn setup_mysql(mysql_port: u16, mysql_version: Option<&str>) {
}
let mysql_image = if let Some(mysql_version) = mysql_version {
format!("bitnami/mysql:{mysql_version}")
format!("greptime/mysql:{mysql_version}")
} else {
"bitnami/mysql:5.7".to_string()
"greptime/mysql:5.7".to_string()
};
let mysql_password = "admin";
let mysql_user = "greptimedb";