mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-27 00:19:58 +00:00
Compare commits
14 Commits
v0.16.0
...
release/v0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8f9674440f | ||
|
|
a29681eb12 | ||
|
|
c112cdf241 | ||
|
|
4d33b9687a | ||
|
|
b0d2d26ad8 | ||
|
|
4ed048c735 | ||
|
|
af4465e543 | ||
|
|
7c2cdccb22 | ||
|
|
3655bbe032 | ||
|
|
5501f63a71 | ||
|
|
c5c9b263e1 | ||
|
|
9e6b19d301 | ||
|
|
f7f52592b4 | ||
|
|
4a936d7320 |
@@ -12,7 +12,7 @@ runs:
|
||||
steps:
|
||||
- name: Install Etcd cluster
|
||||
shell: bash
|
||||
run: |
|
||||
run: |
|
||||
helm upgrade \
|
||||
--install etcd oci://registry-1.docker.io/bitnamicharts/etcd \
|
||||
--set replicaCount=${{ inputs.etcd-replicas }} \
|
||||
@@ -24,4 +24,9 @@ runs:
|
||||
--set auth.rbac.token.enabled=false \
|
||||
--set persistence.size=2Gi \
|
||||
--create-namespace \
|
||||
--set global.security.allowInsecureImages=true \
|
||||
--set image.registry=docker.io \
|
||||
--set image.repository=greptime/etcd \
|
||||
--set image.tag=3.6.1-debian-12-r3 \
|
||||
--version 12.0.8 \
|
||||
-n ${{ inputs.namespace }}
|
||||
|
||||
@@ -51,7 +51,7 @@ runs:
|
||||
run: |
|
||||
helm upgrade \
|
||||
--install my-greptimedb \
|
||||
--set meta.backendStorage.etcd.endpoints=${{ inputs.etcd-endpoints }} \
|
||||
--set 'meta.backendStorage.etcd.endpoints[0]=${{ inputs.etcd-endpoints }}' \
|
||||
--set meta.enableRegionFailover=${{ inputs.enable-region-failover }} \
|
||||
--set image.registry=${{ inputs.image-registry }} \
|
||||
--set image.repository=${{ inputs.image-repository }} \
|
||||
|
||||
@@ -12,7 +12,7 @@ runs:
|
||||
steps:
|
||||
- name: Install Kafka cluster
|
||||
shell: bash
|
||||
run: |
|
||||
run: |
|
||||
helm upgrade \
|
||||
--install kafka oci://registry-1.docker.io/bitnamicharts/kafka \
|
||||
--set controller.replicaCount=${{ inputs.controller-replicas }} \
|
||||
@@ -23,4 +23,8 @@ runs:
|
||||
--set listeners.controller.protocol=PLAINTEXT \
|
||||
--set listeners.client.protocol=PLAINTEXT \
|
||||
--create-namespace \
|
||||
--set image.registry=docker.io \
|
||||
--set image.repository=greptime/kafka \
|
||||
--set image.tag=3.9.0-debian-12-r1 \
|
||||
--version 31.0.0 \
|
||||
-n ${{ inputs.namespace }}
|
||||
|
||||
@@ -6,9 +6,7 @@ inputs:
|
||||
description: "Number of PostgreSQL replicas"
|
||||
namespace:
|
||||
default: "postgres-namespace"
|
||||
postgres-version:
|
||||
default: "14.2"
|
||||
description: "PostgreSQL version"
|
||||
description: "The PostgreSQL namespace"
|
||||
storage-size:
|
||||
default: "1Gi"
|
||||
description: "Storage size for PostgreSQL"
|
||||
@@ -22,7 +20,11 @@ runs:
|
||||
helm upgrade \
|
||||
--install postgresql oci://registry-1.docker.io/bitnamicharts/postgresql \
|
||||
--set replicaCount=${{ inputs.postgres-replicas }} \
|
||||
--set image.tag=${{ inputs.postgres-version }} \
|
||||
--set global.security.allowInsecureImages=true \
|
||||
--set image.registry=docker.io \
|
||||
--set image.repository=greptime/postgresql \
|
||||
--set image.tag=17.5.0-debian-12-r3 \
|
||||
--version 16.7.4 \
|
||||
--set persistence.size=${{ inputs.storage-size }} \
|
||||
--set postgresql.username=greptimedb \
|
||||
--set postgresql.password=admin \
|
||||
|
||||
5
.github/scripts/deploy-greptimedb.sh
vendored
5
.github/scripts/deploy-greptimedb.sh
vendored
@@ -68,7 +68,7 @@ function deploy_greptimedb_cluster() {
|
||||
|
||||
helm install "$cluster_name" greptime/greptimedb-cluster \
|
||||
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
|
||||
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
|
||||
--set meta.backendStorage.etcd.endpoints[0]="etcd.$install_namespace:2379" \
|
||||
-n "$install_namespace"
|
||||
|
||||
# Wait for greptimedb cluster to be ready.
|
||||
@@ -103,14 +103,13 @@ function deploy_greptimedb_cluster_with_s3_storage() {
|
||||
|
||||
helm install "$cluster_name" greptime/greptimedb-cluster -n "$install_namespace" \
|
||||
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
|
||||
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
|
||||
--set meta.backendStorage.etcd.endpoints[0]="etcd.$install_namespace:2379" \
|
||||
--set storage.s3.bucket="$AWS_CI_TEST_BUCKET" \
|
||||
--set storage.s3.region="$AWS_REGION" \
|
||||
--set storage.s3.root="$DATA_ROOT" \
|
||||
--set storage.credentials.secretName=s3-credentials \
|
||||
--set storage.credentials.accessKeyId="$AWS_ACCESS_KEY_ID" \
|
||||
--set storage.credentials.secretAccessKey="$AWS_SECRET_ACCESS_KEY"
|
||||
|
||||
# Wait for greptimedb cluster to be ready.
|
||||
while true; do
|
||||
PHASE=$(kubectl -n "$install_namespace" get gtc "$cluster_name" -o jsonpath='{.status.clusterPhase}')
|
||||
|
||||
34
.github/scripts/pull-test-deps-images.sh
vendored
Executable file
34
.github/scripts/pull-test-deps-images.sh
vendored
Executable file
@@ -0,0 +1,34 @@
|
||||
#!/bin/bash
|
||||
|
||||
# This script is used to pull the test dependency images that are stored in public ECR one by one to avoid rate limiting.
|
||||
|
||||
set -e
|
||||
|
||||
MAX_RETRIES=3
|
||||
|
||||
IMAGES=(
|
||||
"greptime/zookeeper:3.7"
|
||||
"greptime/kafka:3.9.0-debian-12-r1"
|
||||
"greptime/etcd:3.6.1-debian-12-r3"
|
||||
"greptime/minio:2024"
|
||||
"greptime/mysql:5.7"
|
||||
)
|
||||
|
||||
for image in "${IMAGES[@]}"; do
|
||||
for ((attempt=1; attempt<=MAX_RETRIES; attempt++)); do
|
||||
if docker pull "$image"; then
|
||||
# Successfully pulled the image.
|
||||
break
|
||||
else
|
||||
# Use some simple exponential backoff to avoid rate limiting.
|
||||
if [ $attempt -lt $MAX_RETRIES ]; then
|
||||
sleep_seconds=$((attempt * 5))
|
||||
echo "Attempt $attempt failed for $image, waiting $sleep_seconds seconds"
|
||||
sleep $sleep_seconds # 5s, 10s delays
|
||||
else
|
||||
echo "Failed to pull $image after $MAX_RETRIES attempts"
|
||||
exit 1
|
||||
fi
|
||||
fi
|
||||
done
|
||||
done
|
||||
4
.github/workflows/develop.yml
vendored
4
.github/workflows/develop.yml
vendored
@@ -719,6 +719,10 @@ jobs:
|
||||
save-if: ${{ github.ref == 'refs/heads/main' }}
|
||||
- name: Install latest nextest release
|
||||
uses: taiki-e/install-action@nextest
|
||||
|
||||
- name: Pull test dependencies images
|
||||
run: ./.github/scripts/pull-test-deps-images.sh
|
||||
|
||||
- name: Setup external services
|
||||
working-directory: tests-integration/fixtures
|
||||
run: docker compose up -d --wait
|
||||
|
||||
18
Cargo.lock
generated
18
Cargo.lock
generated
@@ -6870,9 +6870,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.171"
|
||||
version = "0.2.178"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6"
|
||||
checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091"
|
||||
|
||||
[[package]]
|
||||
name = "libflate"
|
||||
@@ -6928,7 +6928,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"windows-targets 0.52.6",
|
||||
"windows-targets 0.48.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -7005,13 +7005,13 @@ checksum = "b4ce301924b7887e9d637144fdade93f9dfff9b60981d4ac161db09720d39aa5"
|
||||
|
||||
[[package]]
|
||||
name = "local-ip-address"
|
||||
version = "0.6.3"
|
||||
version = "0.6.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3669cf5561f8d27e8fc84cc15e58350e70f557d4d65f70e3154e54cd2f8e1782"
|
||||
checksum = "656b3b27f8893f7bbf9485148ff9a65f019e3f33bd5cdc87c83cab16b3fd9ec8"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"neli",
|
||||
"thiserror 1.0.64",
|
||||
"thiserror 2.0.12",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
@@ -10849,7 +10849,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "rskafka"
|
||||
version = "0.6.0"
|
||||
source = "git+https://github.com/influxdata/rskafka.git?rev=8dbd01ed809f5a791833a594e85b144e36e45820#8dbd01ed809f5a791833a594e85b144e36e45820"
|
||||
source = "git+https://github.com/WenyXu/rskafka.git?rev=9494304ae3947b07e660b5d08549ad4a39c84a26#9494304ae3947b07e660b5d08549ad4a39c84a26"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"chrono",
|
||||
@@ -12331,7 +12331,7 @@ dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"psm",
|
||||
"windows-sys 0.59.0",
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -14598,7 +14598,7 @@ version = "0.1.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
|
||||
dependencies = [
|
||||
"windows-sys 0.59.0",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -188,7 +188,8 @@ reqwest = { version = "0.12", default-features = false, features = [
|
||||
"stream",
|
||||
"multipart",
|
||||
] }
|
||||
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev = "8dbd01ed809f5a791833a594e85b144e36e45820", features = [
|
||||
# Branch: feat/request-timeout-port
|
||||
rskafka = { git = "https://github.com/WenyXu/rskafka.git", rev = "9494304ae3947b07e660b5d08549ad4a39c84a26", features = [
|
||||
"transport-tls",
|
||||
] }
|
||||
rstest = "0.25"
|
||||
|
||||
@@ -78,6 +78,8 @@
|
||||
| `wal.sync_period` | String | `10s` | Duration for fsyncing log files.<br/>**It's only used when the provider is `raft_engine`**. |
|
||||
| `wal.recovery_parallelism` | Integer | `2` | Parallelism during WAL recovery. |
|
||||
| `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.connect_timeout` | String | `3s` | The connect timeout for kafka client.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.timeout` | String | `3s` | The timeout for kafka client.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.<br/>Set to `true` to automatically create topics for WAL.<br/>Otherwise, use topics named `topic_name_prefix_[0..num_topics)` |
|
||||
| `wal.num_topics` | Integer | `64` | Number of topics.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.selector_type` | String | `round_robin` | Topic selector type.<br/>Available selector types:<br/>- `round_robin` (default)<br/>**It's only used when the provider is `kafka`**. |
|
||||
@@ -275,7 +277,6 @@
|
||||
| `meta_client` | -- | -- | The metasrv client options. |
|
||||
| `meta_client.metasrv_addrs` | Array | -- | The addresses of the metasrv. |
|
||||
| `meta_client.timeout` | String | `3s` | Operation timeout. |
|
||||
| `meta_client.heartbeat_timeout` | String | `500ms` | Heartbeat timeout. |
|
||||
| `meta_client.ddl_timeout` | String | `10s` | DDL timeout. |
|
||||
| `meta_client.connect_timeout` | String | `1s` | Connect server timeout. |
|
||||
| `meta_client.tcp_nodelay` | Bool | `true` | `TCP_NODELAY` option for accepted connections. |
|
||||
@@ -334,6 +335,7 @@
|
||||
| `region_failure_detector_initialization_delay` | String | `10m` | The delay before starting region failure detection.<br/>This delay helps prevent Metasrv from triggering unnecessary region failovers before all Datanodes are fully started.<br/>Especially useful when the cluster is not deployed with GreptimeDB Operator and maintenance mode is not enabled. |
|
||||
| `allow_region_failover_on_local_wal` | Bool | `false` | Whether to allow region failover on local WAL.<br/>**This option is not recommended to be set to true, because it may lead to data loss during failover.** |
|
||||
| `node_max_idle_time` | String | `24hours` | Max allowed idle time before removing node info from metasrv memory. |
|
||||
| `heartbeat_interval` | String | `3s` | Base heartbeat interval for calculating distributed time constants.<br/>The frontend heartbeat interval is 6 times of the base heartbeat interval.<br/>The flownode/datanode heartbeat interval is 1 times of the base heartbeat interval.<br/>e.g., If the base heartbeat interval is 3s, the frontend heartbeat interval is 18s, the flownode/datanode heartbeat interval is 3s.<br/>If you change this value, you need to change the heartbeat interval of the flownode/frontend/datanode accordingly. |
|
||||
| `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. Enabled by default. |
|
||||
| `runtime` | -- | -- | The runtime options. |
|
||||
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
|
||||
@@ -344,12 +346,18 @@
|
||||
| `backend_tls.key_path` | String | `""` | Path to client private key file (for client authentication)<br/>Like "/path/to/client.key" |
|
||||
| `backend_tls.ca_cert_path` | String | `""` | Path to CA certificate file (for server certificate verification)<br/>Required when using custom CAs or self-signed certificates<br/>Leave empty to use system root certificates only<br/>Like "/path/to/ca.crt" |
|
||||
| `backend_tls.watch` | Bool | `false` | Watch for certificate file changes and auto reload |
|
||||
| `backend_client` | -- | -- | The backend client options.<br/>Currently, only applicable when using etcd as the metadata store. |
|
||||
| `backend_client.keep_alive_timeout` | String | `3s` | The keep alive timeout for backend client. |
|
||||
| `backend_client.keep_alive_interval` | String | `10s` | The keep alive interval for backend client. |
|
||||
| `backend_client.connect_timeout` | String | `3s` | The connect timeout for backend client. |
|
||||
| `grpc` | -- | -- | The gRPC server options. |
|
||||
| `grpc.bind_addr` | String | `127.0.0.1:3002` | The address to bind the gRPC server. |
|
||||
| `grpc.server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `bind_addr`. |
|
||||
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
|
||||
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
|
||||
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
|
||||
| `grpc.http2_keep_alive_interval` | String | `10s` | The server side HTTP/2 keep-alive interval |
|
||||
| `grpc.http2_keep_alive_timeout` | String | `3s` | The server side HTTP/2 keep-alive timeout. |
|
||||
| `http` | -- | -- | The HTTP server options. |
|
||||
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
|
||||
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
|
||||
@@ -441,7 +449,6 @@
|
||||
| `meta_client` | -- | -- | The metasrv client options. |
|
||||
| `meta_client.metasrv_addrs` | Array | -- | The addresses of the metasrv. |
|
||||
| `meta_client.timeout` | String | `3s` | Operation timeout. |
|
||||
| `meta_client.heartbeat_timeout` | String | `500ms` | Heartbeat timeout. |
|
||||
| `meta_client.ddl_timeout` | String | `10s` | DDL timeout. |
|
||||
| `meta_client.connect_timeout` | String | `1s` | Connect server timeout. |
|
||||
| `meta_client.tcp_nodelay` | Bool | `true` | `TCP_NODELAY` option for accepted connections. |
|
||||
@@ -461,6 +468,8 @@
|
||||
| `wal.sync_period` | String | `10s` | Duration for fsyncing log files.<br/>**It's only used when the provider is `raft_engine`**. |
|
||||
| `wal.recovery_parallelism` | Integer | `2` | Parallelism during WAL recovery. |
|
||||
| `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.connect_timeout` | String | `3s` | The connect timeout for kafka client.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.timeout` | String | `3s` | The timeout for kafka client.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.max_batch_bytes` | String | `1MB` | The max size of a single producer batch.<br/>Warning: Kafka has a default limit of 1MB per message in a topic.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.consumer_wait_timeout` | String | `100ms` | The consumer wait timeout.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.create_index` | Bool | `true` | Whether to enable WAL index creation.<br/>**It's only used when the provider is `kafka`**. |
|
||||
@@ -604,7 +613,6 @@
|
||||
| `meta_client` | -- | -- | The metasrv client options. |
|
||||
| `meta_client.metasrv_addrs` | Array | -- | The addresses of the metasrv. |
|
||||
| `meta_client.timeout` | String | `3s` | Operation timeout. |
|
||||
| `meta_client.heartbeat_timeout` | String | `500ms` | Heartbeat timeout. |
|
||||
| `meta_client.ddl_timeout` | String | `10s` | DDL timeout. |
|
||||
| `meta_client.connect_timeout` | String | `1s` | Connect server timeout. |
|
||||
| `meta_client.tcp_nodelay` | Bool | `true` | `TCP_NODELAY` option for accepted connections. |
|
||||
|
||||
@@ -92,9 +92,6 @@ metasrv_addrs = ["127.0.0.1:3002"]
|
||||
## Operation timeout.
|
||||
timeout = "3s"
|
||||
|
||||
## Heartbeat timeout.
|
||||
heartbeat_timeout = "500ms"
|
||||
|
||||
## DDL timeout.
|
||||
ddl_timeout = "10s"
|
||||
|
||||
@@ -164,6 +161,14 @@ recovery_parallelism = 2
|
||||
## **It's only used when the provider is `kafka`**.
|
||||
broker_endpoints = ["127.0.0.1:9092"]
|
||||
|
||||
## The connect timeout for kafka client.
|
||||
## **It's only used when the provider is `kafka`**.
|
||||
#+ connect_timeout = "3s"
|
||||
|
||||
## The timeout for kafka client.
|
||||
## **It's only used when the provider is `kafka`**.
|
||||
#+ timeout = "3s"
|
||||
|
||||
## The max size of a single producer batch.
|
||||
## Warning: Kafka has a default limit of 1MB per message in a topic.
|
||||
## **It's only used when the provider is `kafka`**.
|
||||
|
||||
@@ -64,9 +64,6 @@ metasrv_addrs = ["127.0.0.1:3002"]
|
||||
## Operation timeout.
|
||||
timeout = "3s"
|
||||
|
||||
## Heartbeat timeout.
|
||||
heartbeat_timeout = "500ms"
|
||||
|
||||
## DDL timeout.
|
||||
ddl_timeout = "10s"
|
||||
|
||||
|
||||
@@ -171,9 +171,6 @@ metasrv_addrs = ["127.0.0.1:3002"]
|
||||
## Operation timeout.
|
||||
timeout = "3s"
|
||||
|
||||
## Heartbeat timeout.
|
||||
heartbeat_timeout = "500ms"
|
||||
|
||||
## DDL timeout.
|
||||
ddl_timeout = "10s"
|
||||
|
||||
|
||||
@@ -55,6 +55,13 @@ allow_region_failover_on_local_wal = false
|
||||
## Max allowed idle time before removing node info from metasrv memory.
|
||||
node_max_idle_time = "24hours"
|
||||
|
||||
## Base heartbeat interval for calculating distributed time constants.
|
||||
## The frontend heartbeat interval is 6 times of the base heartbeat interval.
|
||||
## The flownode/datanode heartbeat interval is 1 times of the base heartbeat interval.
|
||||
## e.g., If the base heartbeat interval is 3s, the frontend heartbeat interval is 18s, the flownode/datanode heartbeat interval is 3s.
|
||||
## If you change this value, you need to change the heartbeat interval of the flownode/frontend/datanode accordingly.
|
||||
#+ heartbeat_interval = "3s"
|
||||
|
||||
## Whether to enable greptimedb telemetry. Enabled by default.
|
||||
#+ enable_telemetry = true
|
||||
|
||||
@@ -93,6 +100,16 @@ ca_cert_path = ""
|
||||
## Watch for certificate file changes and auto reload
|
||||
watch = false
|
||||
|
||||
## The backend client options.
|
||||
## Currently, only applicable when using etcd as the metadata store.
|
||||
#+ [backend_client]
|
||||
## The keep alive timeout for backend client.
|
||||
#+ keep_alive_timeout = "3s"
|
||||
## The keep alive interval for backend client.
|
||||
#+ keep_alive_interval = "10s"
|
||||
## The connect timeout for backend client.
|
||||
#+ connect_timeout = "3s"
|
||||
|
||||
## The gRPC server options.
|
||||
[grpc]
|
||||
## The address to bind the gRPC server.
|
||||
@@ -107,6 +124,10 @@ runtime_size = 8
|
||||
max_recv_message_size = "512MB"
|
||||
## The maximum send message size for gRPC server.
|
||||
max_send_message_size = "512MB"
|
||||
## The server side HTTP/2 keep-alive interval
|
||||
#+ http2_keep_alive_interval = "10s"
|
||||
## The server side HTTP/2 keep-alive timeout.
|
||||
#+ http2_keep_alive_timeout = "3s"
|
||||
|
||||
## The HTTP server options.
|
||||
[http]
|
||||
|
||||
@@ -209,6 +209,14 @@ recovery_parallelism = 2
|
||||
## **It's only used when the provider is `kafka`**.
|
||||
broker_endpoints = ["127.0.0.1:9092"]
|
||||
|
||||
## The connect timeout for kafka client.
|
||||
## **It's only used when the provider is `kafka`**.
|
||||
#+ connect_timeout = "3s"
|
||||
|
||||
## The timeout for kafka client.
|
||||
## **It's only used when the provider is `kafka`**.
|
||||
#+ timeout = "3s"
|
||||
|
||||
## Automatically create topics for WAL.
|
||||
## Set to `true` to automatically create topics for WAL.
|
||||
## Otherwise, use topics named `topic_name_prefix_[0..num_topics)`
|
||||
|
||||
@@ -29,6 +29,7 @@ use crate::information_schema::{InformationExtensionRef, InformationSchemaProvid
|
||||
use crate::kvbackend::manager::{SystemCatalog, CATALOG_CACHE_MAX_CAPACITY};
|
||||
use crate::kvbackend::KvBackendCatalogManager;
|
||||
use crate::process_manager::ProcessManagerRef;
|
||||
use crate::system_schema::numbers_table_provider::NumbersTableProvider;
|
||||
use crate::system_schema::pg_catalog::PGCatalogProvider;
|
||||
|
||||
pub struct KvBackendCatalogManagerBuilder {
|
||||
@@ -119,6 +120,7 @@ impl KvBackendCatalogManagerBuilder {
|
||||
DEFAULT_CATALOG_NAME.to_string(),
|
||||
me.clone(),
|
||||
)),
|
||||
numbers_table_provider: NumbersTableProvider,
|
||||
backend,
|
||||
process_manager,
|
||||
#[cfg(feature = "enterprise")]
|
||||
|
||||
@@ -18,8 +18,7 @@ use std::sync::{Arc, Weak};
|
||||
|
||||
use async_stream::try_stream;
|
||||
use common_catalog::consts::{
|
||||
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID,
|
||||
PG_CATALOG_NAME,
|
||||
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME,
|
||||
};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::cache::{
|
||||
@@ -43,7 +42,6 @@ use snafu::prelude::*;
|
||||
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
|
||||
use table::dist_table::DistTable;
|
||||
use table::metadata::{TableId, TableInfoRef};
|
||||
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
|
||||
use table::table_name::TableName;
|
||||
use table::TableRef;
|
||||
use tokio::sync::Semaphore;
|
||||
@@ -58,6 +56,7 @@ use crate::information_schema::InformationSchemaTableFactoryRef;
|
||||
use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
|
||||
use crate::kvbackend::TableCacheRef;
|
||||
use crate::process_manager::ProcessManagerRef;
|
||||
use crate::system_schema::numbers_table_provider::NumbersTableProvider;
|
||||
use crate::system_schema::pg_catalog::PGCatalogProvider;
|
||||
use crate::system_schema::SystemSchemaProvider;
|
||||
use crate::CatalogManager;
|
||||
@@ -537,6 +536,7 @@ pub(super) struct SystemCatalog {
|
||||
// system_schema_provider for default catalog
|
||||
pub(super) information_schema_provider: Arc<InformationSchemaProvider>,
|
||||
pub(super) pg_catalog_provider: Arc<PGCatalogProvider>,
|
||||
pub(super) numbers_table_provider: NumbersTableProvider,
|
||||
pub(super) backend: KvBackendRef,
|
||||
pub(super) process_manager: Option<ProcessManagerRef>,
|
||||
#[cfg(feature = "enterprise")]
|
||||
@@ -566,9 +566,7 @@ impl SystemCatalog {
|
||||
PG_CATALOG_NAME if channel == Channel::Postgres => {
|
||||
self.pg_catalog_provider.table_names()
|
||||
}
|
||||
DEFAULT_SCHEMA_NAME => {
|
||||
vec![NUMBERS_TABLE_NAME.to_string()]
|
||||
}
|
||||
DEFAULT_SCHEMA_NAME => self.numbers_table_provider.table_names(),
|
||||
_ => vec![],
|
||||
}
|
||||
}
|
||||
@@ -586,7 +584,7 @@ impl SystemCatalog {
|
||||
if schema == INFORMATION_SCHEMA_NAME {
|
||||
self.information_schema_provider.table(table).is_some()
|
||||
} else if schema == DEFAULT_SCHEMA_NAME {
|
||||
table == NUMBERS_TABLE_NAME
|
||||
self.numbers_table_provider.table_exists(table)
|
||||
} else if schema == PG_CATALOG_NAME && channel == Channel::Postgres {
|
||||
self.pg_catalog_provider.table(table).is_some()
|
||||
} else {
|
||||
@@ -631,8 +629,8 @@ impl SystemCatalog {
|
||||
});
|
||||
pg_catalog_provider.table(table_name)
|
||||
}
|
||||
} else if schema == DEFAULT_SCHEMA_NAME && table_name == NUMBERS_TABLE_NAME {
|
||||
Some(NumbersTable::table(NUMBERS_TABLE_ID))
|
||||
} else if schema == DEFAULT_SCHEMA_NAME {
|
||||
self.numbers_table_provider.table(table_name)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
pub mod information_schema;
|
||||
mod memory_table;
|
||||
pub mod numbers_table_provider;
|
||||
pub mod pg_catalog;
|
||||
pub mod predicate;
|
||||
mod utils;
|
||||
|
||||
59
src/catalog/src/system_schema/numbers_table_provider.rs
Normal file
59
src/catalog/src/system_schema/numbers_table_provider.rs
Normal file
@@ -0,0 +1,59 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#[cfg(any(test, feature = "testing", debug_assertions))]
|
||||
use common_catalog::consts::NUMBERS_TABLE_ID;
|
||||
#[cfg(any(test, feature = "testing", debug_assertions))]
|
||||
use table::table::numbers::NumbersTable;
|
||||
#[cfg(any(test, feature = "testing", debug_assertions))]
|
||||
use table::table::numbers::NUMBERS_TABLE_NAME;
|
||||
use table::TableRef;
|
||||
|
||||
// NumbersTableProvider is a dedicated provider for feature-gating the numbers table.
|
||||
#[derive(Clone)]
|
||||
pub struct NumbersTableProvider;
|
||||
|
||||
#[cfg(any(test, feature = "testing", debug_assertions))]
|
||||
impl NumbersTableProvider {
|
||||
pub(crate) fn table_exists(&self, name: &str) -> bool {
|
||||
name == NUMBERS_TABLE_NAME
|
||||
}
|
||||
|
||||
pub(crate) fn table_names(&self) -> Vec<String> {
|
||||
vec![NUMBERS_TABLE_NAME.to_string()]
|
||||
}
|
||||
|
||||
pub(crate) fn table(&self, name: &str) -> Option<TableRef> {
|
||||
if name == NUMBERS_TABLE_NAME {
|
||||
Some(NumbersTable::table(NUMBERS_TABLE_ID))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(any(test, feature = "testing", debug_assertions)))]
|
||||
impl NumbersTableProvider {
|
||||
pub(crate) fn table_exists(&self, _name: &str) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
pub(crate) fn table_names(&self) -> Vec<String> {
|
||||
vec![]
|
||||
}
|
||||
|
||||
pub(crate) fn table(&self, _name: &str) -> Option<TableRef> {
|
||||
None
|
||||
}
|
||||
}
|
||||
@@ -20,7 +20,7 @@ use common_meta::kv_backend::chroot::ChrootKvBackend;
|
||||
use common_meta::kv_backend::etcd::EtcdStore;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use meta_srv::bootstrap::create_etcd_client;
|
||||
use meta_srv::metasrv::BackendImpl;
|
||||
use meta_srv::metasrv::{BackendClientOptions, BackendImpl};
|
||||
|
||||
use crate::error::{EmptyStoreAddrsSnafu, UnsupportedMemoryBackendSnafu};
|
||||
|
||||
@@ -67,9 +67,10 @@ impl StoreConfig {
|
||||
} else {
|
||||
let kvbackend = match self.backend {
|
||||
BackendImpl::EtcdStore => {
|
||||
let etcd_client = create_etcd_client(store_addrs)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
let etcd_client =
|
||||
create_etcd_client(store_addrs, &BackendClientOptions::default())
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops))
|
||||
}
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
|
||||
@@ -29,7 +29,7 @@ use futures::TryStreamExt;
|
||||
|
||||
use crate::error::InvalidArgumentsSnafu;
|
||||
use crate::metadata::common::StoreConfig;
|
||||
use crate::metadata::control::utils::{decode_key_value, get_table_id_by_name, json_fromatter};
|
||||
use crate::metadata::control::utils::{decode_key_value, get_table_id_by_name, json_formatter};
|
||||
use crate::Tool;
|
||||
|
||||
/// Getting metadata from metadata store.
|
||||
@@ -206,7 +206,7 @@ impl Tool for GetTableTool {
|
||||
println!(
|
||||
"{}\n{}",
|
||||
TableInfoKey::new(table_id),
|
||||
json_fromatter(self.pretty, &*table_info)
|
||||
json_formatter(self.pretty, &*table_info)
|
||||
);
|
||||
} else {
|
||||
println!("Table info not found");
|
||||
@@ -221,7 +221,7 @@ impl Tool for GetTableTool {
|
||||
println!(
|
||||
"{}\n{}",
|
||||
TableRouteKey::new(table_id),
|
||||
json_fromatter(self.pretty, &table_route)
|
||||
json_formatter(self.pretty, &table_route)
|
||||
);
|
||||
} else {
|
||||
println!("Table route not found");
|
||||
|
||||
@@ -27,7 +27,7 @@ pub fn decode_key_value(kv: KeyValue) -> CommonMetaResult<(String, String)> {
|
||||
}
|
||||
|
||||
/// Formats a value as a JSON string.
|
||||
pub fn json_fromatter<T>(pretty: bool, value: &T) -> String
|
||||
pub fn json_formatter<T>(pretty: bool, value: &T) -> String
|
||||
where
|
||||
T: Serialize,
|
||||
{
|
||||
|
||||
@@ -20,6 +20,7 @@ use async_trait::async_trait;
|
||||
use clap::Parser;
|
||||
use common_base::Plugins;
|
||||
use common_config::Configurable;
|
||||
use common_meta::distributed_time_constants::init_distributed_time_constants;
|
||||
use common_telemetry::info;
|
||||
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
|
||||
use common_version::{short_version, verbose_version};
|
||||
@@ -327,6 +328,7 @@ impl StartCommand {
|
||||
log_versions(verbose_version(), short_version(), APP_NAME);
|
||||
maybe_activate_heap_profile(&opts.component.memory);
|
||||
create_resource_limit_metrics(APP_NAME);
|
||||
init_distributed_time_constants(opts.component.heartbeat_interval);
|
||||
|
||||
info!("Metasrv start command: {:#?}", self);
|
||||
|
||||
|
||||
@@ -51,7 +51,6 @@ fn test_load_datanode_example_config() {
|
||||
meta_client: Some(MetaClientOptions {
|
||||
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
|
||||
timeout: Duration::from_secs(3),
|
||||
heartbeat_timeout: Duration::from_millis(500),
|
||||
ddl_timeout: Duration::from_secs(10),
|
||||
connect_timeout: Duration::from_secs(1),
|
||||
tcp_nodelay: true,
|
||||
@@ -116,7 +115,6 @@ fn test_load_frontend_example_config() {
|
||||
meta_client: Some(MetaClientOptions {
|
||||
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
|
||||
timeout: Duration::from_secs(3),
|
||||
heartbeat_timeout: Duration::from_millis(500),
|
||||
ddl_timeout: Duration::from_secs(10),
|
||||
connect_timeout: Duration::from_secs(1),
|
||||
tcp_nodelay: true,
|
||||
@@ -240,7 +238,6 @@ fn test_load_flownode_example_config() {
|
||||
meta_client: Some(MetaClientOptions {
|
||||
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
|
||||
timeout: Duration::from_secs(3),
|
||||
heartbeat_timeout: Duration::from_millis(500),
|
||||
ddl_timeout: Duration::from_secs(10),
|
||||
connect_timeout: Duration::from_secs(1),
|
||||
tcp_nodelay: true,
|
||||
|
||||
@@ -332,7 +332,7 @@ impl AggregateUDFImpl for StateWrapper {
|
||||
self.inner.signature()
|
||||
}
|
||||
|
||||
/// Coerce types also do nothing, as optimzer should be able to already make struct types
|
||||
/// Coerce types also do nothing, as optimizer should be able to already make struct types
|
||||
fn coerce_types(&self, arg_types: &[DataType]) -> datafusion_common::Result<Vec<DataType>> {
|
||||
self.inner.coerce_types(arg_types)
|
||||
}
|
||||
@@ -486,7 +486,7 @@ impl AggregateUDFImpl for MergeWrapper {
|
||||
&self.merge_signature
|
||||
}
|
||||
|
||||
/// Coerce types also do nothing, as optimzer should be able to already make struct types
|
||||
/// Coerce types also do nothing, as optimizer should be able to already make struct types
|
||||
fn coerce_types(&self, arg_types: &[DataType]) -> datafusion_common::Result<Vec<DataType>> {
|
||||
// just check if the arg_types are only one and is struct array
|
||||
if arg_types.len() != 1 || !matches!(arg_types.first(), Some(DataType::Struct(_))) {
|
||||
|
||||
@@ -12,25 +12,10 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::OnceLock;
|
||||
use std::time::Duration;
|
||||
|
||||
/// Heartbeat interval time (is the basic unit of various time).
|
||||
pub const HEARTBEAT_INTERVAL_MILLIS: u64 = 3000;
|
||||
|
||||
/// The frontend will also send heartbeats to Metasrv, sending an empty
|
||||
/// heartbeat every HEARTBEAT_INTERVAL_MILLIS * 6 seconds.
|
||||
pub const FRONTEND_HEARTBEAT_INTERVAL_MILLIS: u64 = HEARTBEAT_INTERVAL_MILLIS * 6;
|
||||
|
||||
/// The lease seconds of a region. It's set by 3 heartbeat intervals
|
||||
/// (HEARTBEAT_INTERVAL_MILLIS × 3), plus some extra buffer (1 second).
|
||||
pub const REGION_LEASE_SECS: u64 =
|
||||
Duration::from_millis(HEARTBEAT_INTERVAL_MILLIS * 3).as_secs() + 1;
|
||||
|
||||
/// When creating table or region failover, a target node needs to be selected.
|
||||
/// If the node's lease has expired, the `Selector` will not select it.
|
||||
pub const DATANODE_LEASE_SECS: u64 = REGION_LEASE_SECS;
|
||||
|
||||
pub const FLOWNODE_LEASE_SECS: u64 = DATANODE_LEASE_SECS;
|
||||
pub const BASE_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(3);
|
||||
|
||||
/// The lease seconds of metasrv leader.
|
||||
pub const META_LEASE_SECS: u64 = 5;
|
||||
@@ -41,5 +26,73 @@ pub const POSTGRES_KEEP_ALIVE_SECS: u64 = 30;
|
||||
/// In a lease, there are two opportunities for renewal.
|
||||
pub const META_KEEP_ALIVE_INTERVAL_SECS: u64 = META_LEASE_SECS / 2;
|
||||
|
||||
/// The timeout of the heartbeat request.
|
||||
pub const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1);
|
||||
|
||||
/// The keep-alive interval of the heartbeat channel.
|
||||
pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS: Duration =
|
||||
Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1);
|
||||
|
||||
/// The keep-alive timeout of the heartbeat channel.
|
||||
pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS: Duration =
|
||||
Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1);
|
||||
|
||||
/// The default mailbox round-trip timeout.
|
||||
pub const MAILBOX_RTT_SECS: u64 = 1;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
/// The distributed time constants.
|
||||
pub struct DistributedTimeConstants {
|
||||
pub heartbeat_interval: Duration,
|
||||
pub frontend_heartbeat_interval: Duration,
|
||||
pub region_lease: Duration,
|
||||
pub datanode_lease: Duration,
|
||||
pub flownode_lease: Duration,
|
||||
}
|
||||
|
||||
/// The frontend heartbeat interval is 6 times of the base heartbeat interval.
|
||||
pub fn frontend_heartbeat_interval(base_heartbeat_interval: Duration) -> Duration {
|
||||
base_heartbeat_interval * 6
|
||||
}
|
||||
|
||||
impl DistributedTimeConstants {
|
||||
/// Create a new DistributedTimeConstants from the heartbeat interval.
|
||||
pub fn from_heartbeat_interval(heartbeat_interval: Duration) -> Self {
|
||||
let region_lease = heartbeat_interval * 3 + Duration::from_secs(1);
|
||||
let datanode_lease = region_lease;
|
||||
let flownode_lease = datanode_lease;
|
||||
Self {
|
||||
heartbeat_interval,
|
||||
frontend_heartbeat_interval: frontend_heartbeat_interval(heartbeat_interval),
|
||||
region_lease,
|
||||
datanode_lease,
|
||||
flownode_lease,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for DistributedTimeConstants {
|
||||
fn default() -> Self {
|
||||
Self::from_heartbeat_interval(BASE_HEARTBEAT_INTERVAL)
|
||||
}
|
||||
}
|
||||
|
||||
static DEFAULT_DISTRIBUTED_TIME_CONSTANTS: OnceLock<DistributedTimeConstants> = OnceLock::new();
|
||||
|
||||
/// Get the default distributed time constants.
|
||||
pub fn default_distributed_time_constants() -> &'static DistributedTimeConstants {
|
||||
DEFAULT_DISTRIBUTED_TIME_CONSTANTS.get_or_init(Default::default)
|
||||
}
|
||||
|
||||
/// Initialize the default distributed time constants.
|
||||
pub fn init_distributed_time_constants(base_heartbeat_interval: Duration) {
|
||||
let distributed_time_constants =
|
||||
DistributedTimeConstants::from_heartbeat_interval(base_heartbeat_interval);
|
||||
DEFAULT_DISTRIBUTED_TIME_CONSTANTS
|
||||
.set(distributed_time_constants)
|
||||
.expect("Failed to set default distributed time constants");
|
||||
common_telemetry::info!(
|
||||
"Initialized default distributed time constants: {:#?}",
|
||||
distributed_time_constants
|
||||
);
|
||||
}
|
||||
|
||||
@@ -528,9 +528,6 @@ pub enum Error {
|
||||
source: common_wal::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to resolve Kafka broker endpoint."))]
|
||||
ResolveKafkaEndpoint { source: common_wal::error::Error },
|
||||
|
||||
#[snafu(display("Failed to build a Kafka controller client"))]
|
||||
BuildKafkaCtrlClient {
|
||||
#[snafu(implicit)]
|
||||
@@ -1040,7 +1037,6 @@ impl ErrorExt for Error {
|
||||
| BuildKafkaClient { .. }
|
||||
| BuildKafkaCtrlClient { .. }
|
||||
| KafkaPartitionClient { .. }
|
||||
| ResolveKafkaEndpoint { .. }
|
||||
| ProduceRecord { .. }
|
||||
| CreateKafkaWalTopic { .. }
|
||||
| EmptyTopicPool { .. }
|
||||
|
||||
@@ -25,8 +25,7 @@ use snafu::ResultExt;
|
||||
|
||||
use crate::error::{
|
||||
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu,
|
||||
KafkaGetOffsetSnafu, KafkaPartitionClientSnafu, ProduceRecordSnafu, ResolveKafkaEndpointSnafu,
|
||||
Result, TlsConfigSnafu,
|
||||
KafkaGetOffsetSnafu, KafkaPartitionClientSnafu, ProduceRecordSnafu, Result, TlsConfigSnafu,
|
||||
};
|
||||
|
||||
// Each topic only has one partition for now.
|
||||
@@ -209,10 +208,10 @@ impl KafkaTopicCreator {
|
||||
/// Builds a kafka [Client](rskafka::client::Client).
|
||||
pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result<Client> {
|
||||
// Builds an kafka controller client for creating topics.
|
||||
let broker_endpoints = common_wal::resolve_to_ipv4(&connection.broker_endpoints)
|
||||
.await
|
||||
.context(ResolveKafkaEndpointSnafu)?;
|
||||
let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(DEFAULT_BACKOFF_CONFIG);
|
||||
let mut builder = ClientBuilder::new(connection.broker_endpoints.clone())
|
||||
.backoff_config(DEFAULT_BACKOFF_CONFIG)
|
||||
.connect_timeout(Some(connection.connect_timeout))
|
||||
.timeout(Some(connection.timeout));
|
||||
if let Some(sasl) = &connection.sasl {
|
||||
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
|
||||
};
|
||||
|
||||
@@ -19,7 +19,7 @@ use opentelemetry::propagation::TextMapPropagator;
|
||||
use opentelemetry_sdk::propagation::TraceContextPropagator;
|
||||
use tracing_opentelemetry::OpenTelemetrySpanExt;
|
||||
|
||||
// An wapper for `Futures` that provides tracing instrument adapters.
|
||||
// An wrapper for `Futures` that provides tracing instrument adapters.
|
||||
pub trait FutureExt: std::future::Future + Sized {
|
||||
fn trace(self, span: tracing::span::Span) -> tracing::instrument::Instrumented<Self>;
|
||||
}
|
||||
|
||||
@@ -189,6 +189,8 @@ mod tests {
|
||||
client_cert_path: None,
|
||||
client_key_path: None,
|
||||
}),
|
||||
connect_timeout: Duration::from_secs(3),
|
||||
timeout: Duration::from_secs(3),
|
||||
},
|
||||
kafka_topic: KafkaTopicConfig {
|
||||
num_topics: 32,
|
||||
@@ -221,6 +223,8 @@ mod tests {
|
||||
client_cert_path: None,
|
||||
client_key_path: None,
|
||||
}),
|
||||
connect_timeout: Duration::from_secs(3),
|
||||
timeout: Duration::from_secs(3),
|
||||
},
|
||||
max_batch_bytes: ReadableSize::mb(1),
|
||||
consumer_wait_timeout: Duration::from_millis(100),
|
||||
|
||||
@@ -161,6 +161,12 @@ pub struct KafkaConnectionConfig {
|
||||
pub sasl: Option<KafkaClientSasl>,
|
||||
/// Client TLS config
|
||||
pub tls: Option<KafkaClientTls>,
|
||||
/// The connect timeout for kafka client.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub connect_timeout: Duration,
|
||||
/// The timeout for kafka client.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub timeout: Duration,
|
||||
}
|
||||
|
||||
impl Default for KafkaConnectionConfig {
|
||||
@@ -169,6 +175,8 @@ impl Default for KafkaConnectionConfig {
|
||||
broker_endpoints: vec![BROKER_ENDPOINT.to_string()],
|
||||
sasl: None,
|
||||
tls: None,
|
||||
connect_timeout: Duration::from_secs(3),
|
||||
timeout: Duration::from_secs(3),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -218,6 +218,7 @@ impl HeartbeatTask {
|
||||
if let Some(message) = message {
|
||||
Self::new_heartbeat_request(&heartbeat_request, Some(message), &latest_report)
|
||||
} else {
|
||||
warn!("Sender has been dropped, exiting the heartbeat loop");
|
||||
// Receives None that means Sender was dropped, we need to break the current loop
|
||||
break
|
||||
}
|
||||
@@ -259,7 +260,11 @@ impl HeartbeatTask {
|
||||
error!(e; "Error while handling heartbeat response");
|
||||
}
|
||||
}
|
||||
Ok(None) => break,
|
||||
Ok(None) => {
|
||||
warn!("Heartbeat response stream closed");
|
||||
capture_self.start_with_retry(retry_interval).await;
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
error!(e; "Occur error while reading heartbeat response");
|
||||
capture_self.start_with_retry(retry_interval).await;
|
||||
|
||||
@@ -71,6 +71,6 @@ pub struct LinearStagePlan {
|
||||
/// The key expressions to use for the lookup relation.
|
||||
pub lookup_key: Vec<ScalarExpr>,
|
||||
/// The closure to apply to the concatenation of the key columns,
|
||||
/// the stream value columns, and the lookup value colunms.
|
||||
/// the stream value columns, and the lookup value columns.
|
||||
pub closure: JoinFilter,
|
||||
}
|
||||
|
||||
@@ -23,7 +23,7 @@ use common_meta::heartbeat::handler::{
|
||||
};
|
||||
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
|
||||
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
|
||||
use common_telemetry::{debug, error, info};
|
||||
use common_telemetry::{debug, error, info, warn};
|
||||
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
|
||||
use servers::addrs;
|
||||
use servers::heartbeat_options::HeartbeatOptions;
|
||||
@@ -42,8 +42,8 @@ use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
|
||||
pub struct HeartbeatTask {
|
||||
peer_addr: String,
|
||||
meta_client: Arc<MetaClient>,
|
||||
report_interval: u64,
|
||||
retry_interval: u64,
|
||||
report_interval: Duration,
|
||||
retry_interval: Duration,
|
||||
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
start_time_ms: u64,
|
||||
}
|
||||
@@ -58,8 +58,8 @@ impl HeartbeatTask {
|
||||
HeartbeatTask {
|
||||
peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
|
||||
meta_client,
|
||||
report_interval: heartbeat_opts.interval.as_millis() as u64,
|
||||
retry_interval: heartbeat_opts.retry_interval.as_millis() as u64,
|
||||
report_interval: heartbeat_opts.interval,
|
||||
retry_interval: heartbeat_opts.retry_interval,
|
||||
resp_handler_executor,
|
||||
start_time_ms: common_time::util::current_time_millis() as u64,
|
||||
}
|
||||
@@ -103,13 +103,15 @@ impl HeartbeatTask {
|
||||
HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
|
||||
}
|
||||
}
|
||||
Ok(None) => break,
|
||||
Ok(None) => {
|
||||
warn!("Heartbeat response stream closed");
|
||||
capture_self.start_with_retry(retry_interval).await;
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
|
||||
error!(e; "Occur error while reading heartbeat response");
|
||||
capture_self
|
||||
.start_with_retry(Duration::from_millis(retry_interval))
|
||||
.await;
|
||||
capture_self.start_with_retry(retry_interval).await;
|
||||
|
||||
break;
|
||||
}
|
||||
@@ -177,12 +179,13 @@ impl HeartbeatTask {
|
||||
if let Some(message) = message {
|
||||
Self::new_heartbeat_request(&heartbeat_request, Some(message))
|
||||
} else {
|
||||
warn!("Sender has been dropped, exiting the heartbeat loop");
|
||||
// Receives None that means Sender was dropped, we need to break the current loop
|
||||
break
|
||||
}
|
||||
}
|
||||
_ = &mut sleep => {
|
||||
sleep.as_mut().reset(Instant::now() + Duration::from_millis(report_interval));
|
||||
sleep.as_mut().reset(Instant::now() + report_interval);
|
||||
Self::new_heartbeat_request(&heartbeat_request, None)
|
||||
}
|
||||
};
|
||||
|
||||
@@ -139,9 +139,6 @@ pub enum Error {
|
||||
error: rskafka::client::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to resolve Kafka broker endpoint."))]
|
||||
ResolveKafkaEndpoint { source: common_wal::error::Error },
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to build a Kafka partition client, topic: {}, partition: {}",
|
||||
topic,
|
||||
@@ -343,7 +340,6 @@ impl ErrorExt for Error {
|
||||
StartWalTask { .. }
|
||||
| StopWalTask { .. }
|
||||
| IllegalState { .. }
|
||||
| ResolveKafkaEndpoint { .. }
|
||||
| NoMaxValue { .. }
|
||||
| Cast { .. }
|
||||
| EncodeJson { .. }
|
||||
|
||||
@@ -24,9 +24,7 @@ use snafu::ResultExt;
|
||||
use store_api::logstore::provider::KafkaProvider;
|
||||
use tokio::sync::{Mutex, RwLock};
|
||||
|
||||
use crate::error::{
|
||||
BuildClientSnafu, BuildPartitionClientSnafu, ResolveKafkaEndpointSnafu, Result, TlsConfigSnafu,
|
||||
};
|
||||
use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result, TlsConfigSnafu};
|
||||
use crate::kafka::index::{GlobalIndexCollector, NoopCollector};
|
||||
use crate::kafka::producer::{OrderedBatchProducer, OrderedBatchProducerRef};
|
||||
|
||||
@@ -78,11 +76,10 @@ impl ClientManager {
|
||||
high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
|
||||
) -> Result<Self> {
|
||||
// Sets backoff config for the top-level kafka client and all clients constructed by it.
|
||||
let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints)
|
||||
.await
|
||||
.context(ResolveKafkaEndpointSnafu)?;
|
||||
let mut builder =
|
||||
ClientBuilder::new(broker_endpoints).backoff_config(DEFAULT_BACKOFF_CONFIG);
|
||||
let mut builder = ClientBuilder::new(config.connection.broker_endpoints.clone())
|
||||
.backoff_config(DEFAULT_BACKOFF_CONFIG)
|
||||
.connect_timeout(Some(config.connection.connect_timeout))
|
||||
.timeout(Some(config.connection.timeout));
|
||||
if let Some(sasl) = &config.connection.sasl {
|
||||
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
|
||||
};
|
||||
|
||||
@@ -186,6 +186,9 @@ impl MetaClientBuilder {
|
||||
let mgr = client.channel_manager.clone();
|
||||
|
||||
if self.enable_heartbeat {
|
||||
if self.heartbeat_channel_manager.is_some() {
|
||||
info!("Enable heartbeat channel using the heartbeat channel manager.");
|
||||
}
|
||||
let mgr = self.heartbeat_channel_manager.unwrap_or(mgr.clone());
|
||||
client.heartbeat = Some(HeartbeatClient::new(
|
||||
self.id,
|
||||
@@ -525,7 +528,7 @@ impl MetaClient {
|
||||
self.heartbeat_client()?.ask_leader().await
|
||||
}
|
||||
|
||||
/// Returns a heartbeat bidirectional streaming: (sender, recever), the
|
||||
/// Returns a heartbeat bidirectional streaming: (sender, receiver), the
|
||||
/// other end is the leader of `metasrv`.
|
||||
///
|
||||
/// The `datanode` needs to use the sender to continuously send heartbeat
|
||||
|
||||
@@ -24,7 +24,7 @@ use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
|
||||
use common_telemetry::tracing_context::TracingContext;
|
||||
use common_telemetry::warn;
|
||||
use rand::seq::SliceRandom;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use snafu::ResultExt;
|
||||
use tokio::time::timeout;
|
||||
use tonic::transport::Channel;
|
||||
|
||||
@@ -101,12 +101,14 @@ impl AskLeader {
|
||||
};
|
||||
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel(peers.len());
|
||||
let channel_manager = self.channel_manager.clone();
|
||||
|
||||
for addr in &peers {
|
||||
let mut client = self.create_asker(addr)?;
|
||||
let tx_clone = tx.clone();
|
||||
let req = req.clone();
|
||||
let addr = addr.to_string();
|
||||
let addr = addr.clone();
|
||||
let channel_manager = channel_manager.clone();
|
||||
tokio::spawn(async move {
|
||||
match client.ask_leader(req).await {
|
||||
Ok(res) => {
|
||||
@@ -117,13 +119,19 @@ impl AskLeader {
|
||||
};
|
||||
}
|
||||
Err(status) => {
|
||||
// Reset cached channel even on generic errors: the VIP may keep us on a dead
|
||||
// backend, so forcing a reconnect gives us a chance to hit a healthy peer.
|
||||
Self::reset_channels_with_manager(
|
||||
&channel_manager,
|
||||
std::slice::from_ref(&addr),
|
||||
);
|
||||
warn!("Failed to ask leader from: {addr}, {status}");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let leader = timeout(
|
||||
let leader = match timeout(
|
||||
self.channel_manager
|
||||
.config()
|
||||
.timeout
|
||||
@@ -131,8 +139,16 @@ impl AskLeader {
|
||||
rx.recv(),
|
||||
)
|
||||
.await
|
||||
.context(error::AskLeaderTimeoutSnafu)?
|
||||
.context(error::NoLeaderSnafu)?;
|
||||
{
|
||||
Ok(Some(leader)) => leader,
|
||||
Ok(None) => return error::NoLeaderSnafu.fail(),
|
||||
Err(e) => {
|
||||
// All peers timed out. Reset channels to force reconnection,
|
||||
// which may help escape dead backends in VIP/LB scenarios.
|
||||
Self::reset_channels_with_manager(&self.channel_manager, &peers);
|
||||
return Err(e).context(error::AskLeaderTimeoutSnafu);
|
||||
}
|
||||
};
|
||||
|
||||
let mut leadership_group = self.leadership_group.write().unwrap();
|
||||
leadership_group.leader = Some(leader.clone());
|
||||
@@ -169,6 +185,15 @@ impl AskLeader {
|
||||
.context(error::CreateChannelSnafu)?,
|
||||
))
|
||||
}
|
||||
|
||||
/// Drop cached channels for the given peers so a fresh connection is used next time.
|
||||
fn reset_channels_with_manager(channel_manager: &ChannelManager, peers: &[String]) {
|
||||
if peers.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
channel_manager.retain_channel(|addr, _| !peers.iter().any(|peer| peer == addr));
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
||||
@@ -18,6 +18,10 @@ use std::time::Duration;
|
||||
use client::RegionFollowerClientRef;
|
||||
use common_base::Plugins;
|
||||
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
|
||||
use common_meta::distributed_time_constants::{
|
||||
HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS, HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS,
|
||||
HEARTBEAT_TIMEOUT,
|
||||
};
|
||||
use common_telemetry::{debug, info};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@@ -34,8 +38,6 @@ pub struct MetaClientOptions {
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub timeout: Duration,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub heartbeat_timeout: Duration,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub ddl_timeout: Duration,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub connect_timeout: Duration,
|
||||
@@ -52,7 +54,6 @@ impl Default for MetaClientOptions {
|
||||
Self {
|
||||
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
|
||||
timeout: Duration::from_millis(3_000u64),
|
||||
heartbeat_timeout: Duration::from_millis(500u64),
|
||||
ddl_timeout: Duration::from_millis(10_000u64),
|
||||
connect_timeout: Duration::from_millis(1_000u64),
|
||||
tcp_nodelay: true,
|
||||
@@ -97,7 +98,11 @@ pub async fn create_meta_client(
|
||||
.timeout(meta_client_options.timeout)
|
||||
.connect_timeout(meta_client_options.connect_timeout)
|
||||
.tcp_nodelay(meta_client_options.tcp_nodelay);
|
||||
let heartbeat_config = base_config.clone();
|
||||
let heartbeat_config = base_config
|
||||
.clone()
|
||||
.timeout(HEARTBEAT_TIMEOUT)
|
||||
.http2_keep_alive_interval(HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS)
|
||||
.http2_keep_alive_timeout(HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS);
|
||||
|
||||
if let MetaClientType::Frontend = client_type {
|
||||
let ddl_config = base_config.clone().timeout(meta_client_options.ddl_timeout);
|
||||
|
||||
@@ -40,7 +40,7 @@ use common_telemetry::info;
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
use deadpool_postgres::{Config, Runtime};
|
||||
use either::Either;
|
||||
use etcd_client::Client;
|
||||
use etcd_client::{Client, ConnectOptions};
|
||||
use servers::configurator::ConfiguratorRef;
|
||||
use servers::export_metrics::ExportMetricsTask;
|
||||
use servers::http::{HttpServer, HttpServerBuilder};
|
||||
@@ -70,7 +70,9 @@ use crate::election::rds::postgres::PgElection;
|
||||
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
|
||||
use crate::election::CANDIDATE_LEASE_SECS;
|
||||
use crate::metasrv::builder::MetasrvBuilder;
|
||||
use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectTarget, SelectorRef};
|
||||
use crate::metasrv::{
|
||||
BackendClientOptions, BackendImpl, Metasrv, MetasrvOptions, SelectTarget, SelectorRef,
|
||||
};
|
||||
use crate::node_excluder::NodeExcluderRef;
|
||||
use crate::selector::lease_based::LeaseBasedSelector;
|
||||
use crate::selector::load_based::LoadBasedSelector;
|
||||
@@ -270,7 +272,12 @@ macro_rules! add_compressed_service {
|
||||
}
|
||||
|
||||
pub fn router(metasrv: Arc<Metasrv>) -> Router {
|
||||
let mut router = tonic::transport::Server::builder().accept_http1(true); // for admin services
|
||||
let mut router = tonic::transport::Server::builder()
|
||||
// for admin services
|
||||
.accept_http1(true)
|
||||
// For quick network failures detection.
|
||||
.http2_keepalive_interval(Some(metasrv.options().grpc.http2_keep_alive_interval))
|
||||
.http2_keepalive_timeout(Some(metasrv.options().grpc.http2_keep_alive_timeout));
|
||||
let router = add_compressed_service!(router, HeartbeatServer::from_arc(metasrv.clone()));
|
||||
let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone()));
|
||||
let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone()));
|
||||
@@ -287,7 +294,7 @@ pub async fn metasrv_builder(
|
||||
(Some(kv_backend), _) => (kv_backend, None),
|
||||
(None, BackendImpl::MemoryStore) => (Arc::new(MemoryKvBackend::new()) as _, None),
|
||||
(None, BackendImpl::EtcdStore) => {
|
||||
let etcd_client = create_etcd_client(&opts.store_addrs).await?;
|
||||
let etcd_client = create_etcd_client(&opts.store_addrs, &opts.backend_client).await?;
|
||||
let kv_backend = EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops);
|
||||
let election = EtcdElection::with_etcd_client(
|
||||
&opts.grpc.server_addr,
|
||||
@@ -435,13 +442,20 @@ pub async fn metasrv_builder(
|
||||
.plugins(plugins))
|
||||
}
|
||||
|
||||
pub async fn create_etcd_client(store_addrs: &[String]) -> Result<Client> {
|
||||
pub async fn create_etcd_client(
|
||||
store_addrs: &[String],
|
||||
options: &BackendClientOptions,
|
||||
) -> Result<Client> {
|
||||
let etcd_endpoints = store_addrs
|
||||
.iter()
|
||||
.map(|x| x.trim())
|
||||
.filter(|x| !x.is_empty())
|
||||
.collect::<Vec<_>>();
|
||||
Client::connect(&etcd_endpoints, None)
|
||||
let options = ConnectOptions::new()
|
||||
.with_keep_alive_while_idle(true)
|
||||
.with_keep_alive(options.keep_alive_interval, options.keep_alive_timeout)
|
||||
.with_connect_timeout(options.connect_timeout);
|
||||
Client::connect(&etcd_endpoints, Some(options))
|
||||
.await
|
||||
.context(error::ConnectEtcdSnafu)
|
||||
}
|
||||
|
||||
@@ -63,22 +63,6 @@ pub struct EtcdElection {
|
||||
}
|
||||
|
||||
impl EtcdElection {
|
||||
pub async fn with_endpoints<E, S>(
|
||||
leader_value: E,
|
||||
endpoints: S,
|
||||
store_key_prefix: String,
|
||||
) -> Result<ElectionRef>
|
||||
where
|
||||
E: AsRef<str>,
|
||||
S: AsRef<[E]>,
|
||||
{
|
||||
let client = Client::connect(endpoints, None)
|
||||
.await
|
||||
.context(error::ConnectEtcdSnafu)?;
|
||||
|
||||
Self::with_etcd_client(leader_value, client, store_key_prefix).await
|
||||
}
|
||||
|
||||
pub async fn with_etcd_client<E>(
|
||||
leader_value: E,
|
||||
client: Client,
|
||||
|
||||
@@ -1190,7 +1190,7 @@ mod tests {
|
||||
));
|
||||
handles.push(handle);
|
||||
}
|
||||
// Wait for candidates to registrate themselves and renew their leases at least once.
|
||||
// Wait for candidates to register themselves and renew their leases at least once.
|
||||
tokio::time::sleep(candidate_lease_ttl / 2 + Duration::from_secs(1)).await;
|
||||
|
||||
let (tx, _) = broadcast::channel(100);
|
||||
|
||||
@@ -1012,7 +1012,7 @@ mod tests {
|
||||
));
|
||||
handles.push(handle);
|
||||
}
|
||||
// Wait for candidates to registrate themselves and renew their leases at least once.
|
||||
// Wait for candidates to register themselves and renew their leases at least once.
|
||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||
|
||||
let (tx, _) = broadcast::channel(100);
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
use std::collections::VecDeque;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_meta::distributed_time_constants;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// This is our port of Akka's "[PhiAccrualFailureDetector](https://github.com/akka/akka/blob/v2.6.21/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala)"
|
||||
@@ -83,9 +82,7 @@ impl Default for PhiAccrualFailureDetectorOptions {
|
||||
Self {
|
||||
threshold: 8_f32,
|
||||
min_std_deviation: Duration::from_millis(100),
|
||||
acceptable_heartbeat_pause: Duration::from_secs(
|
||||
distributed_time_constants::DATANODE_LEASE_SECS,
|
||||
),
|
||||
acceptable_heartbeat_pause: Duration::from_secs(10),
|
||||
first_heartbeat_estimate: Duration::from_millis(1000),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -268,6 +268,15 @@ impl Pushers {
|
||||
async fn remove(&self, pusher_id: &str) -> Option<Pusher> {
|
||||
self.0.write().await.remove(pusher_id)
|
||||
}
|
||||
|
||||
pub(crate) async fn clear(&self) -> Vec<String> {
|
||||
let mut pushers = self.0.write().await;
|
||||
let keys = pushers.keys().cloned().collect::<Vec<_>>();
|
||||
if !keys.is_empty() {
|
||||
pushers.clear();
|
||||
}
|
||||
keys
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -304,10 +313,11 @@ impl HeartbeatHandlerGroup {
|
||||
/// Deregisters the heartbeat response [`Pusher`] with the given key from the group.
|
||||
///
|
||||
/// Returns the [`Pusher`] if it exists.
|
||||
pub async fn deregister_push(&self, pusher_id: PusherId) -> Option<Pusher> {
|
||||
METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
|
||||
pub async fn deregister_push(&self, pusher_id: PusherId) {
|
||||
info!("Pusher unregister: {}", pusher_id);
|
||||
self.pushers.remove(&pusher_id.string_key()).await
|
||||
if self.pushers.remove(&pusher_id.string_key()).await.is_some() {
|
||||
METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the [`Pushers`] of the group.
|
||||
@@ -516,6 +526,14 @@ impl Mailbox for HeartbeatMailbox {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn reset(&self) {
|
||||
let keys = self.pushers.clear().await;
|
||||
if !keys.is_empty() {
|
||||
info!("Reset mailbox, deregister pushers: {:?}", keys);
|
||||
METRIC_META_HEARTBEAT_CONNECTION_NUM.sub(keys.len() as i64);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The builder to build the group of heartbeat handlers.
|
||||
|
||||
@@ -124,7 +124,7 @@ mod test {
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
|
||||
use common_meta::distributed_time_constants;
|
||||
use common_meta::distributed_time_constants::default_distributed_time_constants;
|
||||
use common_meta::key::table_route::TableRouteValue;
|
||||
use common_meta::key::test_utils::new_test_table_info;
|
||||
use common_meta::key::TableMetadataManager;
|
||||
@@ -223,7 +223,7 @@ mod test {
|
||||
let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());
|
||||
|
||||
let handler = RegionLeaseHandler::new(
|
||||
distributed_time_constants::REGION_LEASE_SECS,
|
||||
default_distributed_time_constants().region_lease.as_secs() as u64,
|
||||
table_metadata_manager.clone(),
|
||||
opening_region_keeper.clone(),
|
||||
None,
|
||||
@@ -253,7 +253,7 @@ mod test {
|
||||
|
||||
assert_eq!(
|
||||
acc.region_lease.as_ref().unwrap().lease_seconds,
|
||||
distributed_time_constants::REGION_LEASE_SECS
|
||||
default_distributed_time_constants().region_lease.as_secs()
|
||||
);
|
||||
|
||||
assert_region_lease(
|
||||
@@ -287,7 +287,7 @@ mod test {
|
||||
|
||||
assert_eq!(
|
||||
acc.region_lease.as_ref().unwrap().lease_seconds,
|
||||
distributed_time_constants::REGION_LEASE_SECS
|
||||
default_distributed_time_constants().region_lease.as_secs()
|
||||
);
|
||||
|
||||
assert_region_lease(
|
||||
@@ -366,7 +366,7 @@ mod test {
|
||||
});
|
||||
|
||||
let handler = RegionLeaseHandler::new(
|
||||
distributed_time_constants::REGION_LEASE_SECS,
|
||||
default_distributed_time_constants().region_lease.as_secs(),
|
||||
table_metadata_manager.clone(),
|
||||
Default::default(),
|
||||
None,
|
||||
|
||||
@@ -21,7 +21,7 @@ use std::task::{Context, Poll};
|
||||
use api::v1::meta::heartbeat_request::NodeWorkloads;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::cluster::{NodeInfo, NodeInfoKey, Role as ClusterRole};
|
||||
use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS;
|
||||
use common_meta::distributed_time_constants::default_distributed_time_constants;
|
||||
use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef};
|
||||
use common_meta::peer::{Peer, PeerLookupService};
|
||||
use common_meta::rpc::store::RangeRequest;
|
||||
@@ -312,7 +312,9 @@ impl PeerLookupService for MetaPeerLookupService {
|
||||
lookup_frontends(
|
||||
&self.meta_peer_client,
|
||||
// TODO(zyy17): How to get the heartbeat interval of the frontend if it uses a custom heartbeat interval?
|
||||
FRONTEND_HEARTBEAT_INTERVAL_MILLIS,
|
||||
default_distributed_time_constants()
|
||||
.frontend_heartbeat_interval
|
||||
.as_secs(),
|
||||
)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
|
||||
@@ -27,7 +27,7 @@ use common_event_recorder::EventRecorderOptions;
|
||||
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
|
||||
use common_meta::cache_invalidator::CacheInvalidatorRef;
|
||||
use common_meta::ddl_manager::DdlManagerRef;
|
||||
use common_meta::distributed_time_constants;
|
||||
use common_meta::distributed_time_constants::{self, default_distributed_time_constants};
|
||||
use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
|
||||
use common_meta::key::TableMetadataManagerRef;
|
||||
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
|
||||
@@ -98,6 +98,27 @@ pub enum BackendImpl {
|
||||
MysqlStore,
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
|
||||
#[serde(default)]
|
||||
pub struct BackendClientOptions {
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub keep_alive_timeout: Duration,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub keep_alive_interval: Duration,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub connect_timeout: Duration,
|
||||
}
|
||||
|
||||
impl Default for BackendClientOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
keep_alive_interval: Duration::from_secs(10),
|
||||
keep_alive_timeout: Duration::from_secs(3),
|
||||
connect_timeout: Duration::from_secs(3),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct MetasrvOptions {
|
||||
@@ -113,12 +134,22 @@ pub struct MetasrvOptions {
|
||||
/// Only applicable when using PostgreSQL or MySQL as the metadata store
|
||||
#[serde(default)]
|
||||
pub backend_tls: Option<TlsOption>,
|
||||
/// The backend client options.
|
||||
/// Currently, only applicable when using etcd as the metadata store.
|
||||
#[serde(default)]
|
||||
pub backend_client: BackendClientOptions,
|
||||
/// The type of selector.
|
||||
pub selector: SelectorType,
|
||||
/// Whether to use the memory store.
|
||||
pub use_memory_store: bool,
|
||||
/// Whether to enable region failover.
|
||||
pub enable_region_failover: bool,
|
||||
/// The base heartbeat interval.
|
||||
///
|
||||
/// This value is used to calculate the distributed time constants for components.
|
||||
/// e.g., the region lease time is `heartbeat_interval * 3 + Duration::from_secs(1)`.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub heartbeat_interval: Duration,
|
||||
/// The delay before starting region failure detection.
|
||||
/// This delay helps prevent Metasrv from triggering unnecessary region failovers before all Datanodes are fully started.
|
||||
/// Especially useful when the cluster is not deployed with GreptimeDB Operator and maintenance mode is not enabled.
|
||||
@@ -211,7 +242,9 @@ impl fmt::Debug for MetasrvOptions {
|
||||
.field("max_txn_ops", &self.max_txn_ops)
|
||||
.field("flush_stats_factor", &self.flush_stats_factor)
|
||||
.field("tracing", &self.tracing)
|
||||
.field("backend", &self.backend);
|
||||
.field("backend", &self.backend)
|
||||
.field("heartbeat_interval", &self.heartbeat_interval)
|
||||
.field("backend_client", &self.backend_client);
|
||||
|
||||
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
|
||||
debug_struct.field("meta_table_name", &self.meta_table_name);
|
||||
@@ -239,6 +272,7 @@ impl Default for MetasrvOptions {
|
||||
selector: SelectorType::default(),
|
||||
use_memory_store: false,
|
||||
enable_region_failover: false,
|
||||
heartbeat_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
|
||||
region_failure_detector_initialization_delay: Duration::from_secs(10 * 60),
|
||||
allow_region_failover_on_local_wal: false,
|
||||
grpc: GrpcOptions {
|
||||
@@ -273,6 +307,7 @@ impl Default for MetasrvOptions {
|
||||
meta_election_lock_id: common_meta::kv_backend::DEFAULT_META_ELECTION_LOCK_ID,
|
||||
node_max_idle_time: Duration::from_secs(24 * 60 * 60),
|
||||
event_recorder: EventRecorderOptions::default(),
|
||||
backend_client: BackendClientOptions::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -388,6 +423,7 @@ pub struct MetaStateHandler {
|
||||
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
|
||||
leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
|
||||
leadership_change_notifier: LeadershipChangeNotifier,
|
||||
mailbox: MailboxRef,
|
||||
state: StateRef,
|
||||
}
|
||||
|
||||
@@ -411,6 +447,9 @@ impl MetaStateHandler {
|
||||
pub async fn on_leader_stop(&self) {
|
||||
self.state.write().unwrap().next_state(become_follower());
|
||||
|
||||
// Enforces the mailbox to clear all pushers.
|
||||
// The remaining heartbeat connections will be closed by the remote peer or keep-alive detection.
|
||||
self.mailbox.reset().await;
|
||||
self.leadership_change_notifier
|
||||
.notify_on_leader_stop()
|
||||
.await;
|
||||
@@ -528,6 +567,7 @@ impl Metasrv {
|
||||
state: self.state.clone(),
|
||||
leader_cached_kv_backend: leader_cached_kv_backend.clone(),
|
||||
leadership_change_notifier,
|
||||
mailbox: self.mailbox.clone(),
|
||||
};
|
||||
let _handle = common_runtime::spawn_global(async move {
|
||||
loop {
|
||||
@@ -655,7 +695,9 @@ impl Metasrv {
|
||||
lookup_datanode_peer(
|
||||
peer_id,
|
||||
&self.meta_peer_client,
|
||||
distributed_time_constants::DATANODE_LEASE_SECS,
|
||||
default_distributed_time_constants()
|
||||
.datanode_lease
|
||||
.as_secs(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ use common_meta::ddl::{
|
||||
DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef,
|
||||
};
|
||||
use common_meta::ddl_manager::DdlManager;
|
||||
use common_meta::distributed_time_constants;
|
||||
use common_meta::distributed_time_constants::default_distributed_time_constants;
|
||||
use common_meta::key::flow::flow_state::FlowStateManager;
|
||||
use common_meta::key::flow::FlowMetadataManager;
|
||||
use common_meta::key::runtime_switch::{RuntimeSwitchManager, RuntimeSwitchManagerRef};
|
||||
@@ -220,8 +220,12 @@ impl MetasrvBuilder {
|
||||
|
||||
let selector_ctx = SelectorContext {
|
||||
server_addr: options.grpc.server_addr.clone(),
|
||||
datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS,
|
||||
flownode_lease_secs: distributed_time_constants::FLOWNODE_LEASE_SECS,
|
||||
datanode_lease_secs: default_distributed_time_constants()
|
||||
.datanode_lease
|
||||
.as_secs(),
|
||||
flownode_lease_secs: default_distributed_time_constants()
|
||||
.flownode_lease
|
||||
.as_secs(),
|
||||
kv_backend: kv_backend.clone(),
|
||||
meta_peer_client: meta_peer_client.clone(),
|
||||
table_id: None,
|
||||
@@ -438,7 +442,7 @@ impl MetasrvBuilder {
|
||||
Some(handler_group_builder) => handler_group_builder,
|
||||
None => {
|
||||
let region_lease_handler = RegionLeaseHandler::new(
|
||||
distributed_time_constants::REGION_LEASE_SECS,
|
||||
default_distributed_time_constants().region_lease.as_secs(),
|
||||
table_metadata_manager.clone(),
|
||||
memory_region_keeper.clone(),
|
||||
customized_region_lease_renewer,
|
||||
|
||||
@@ -770,7 +770,7 @@ mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
|
||||
use common_meta::distributed_time_constants::default_distributed_time_constants;
|
||||
use common_meta::instruction::Instruction;
|
||||
use common_meta::key::test_utils::new_test_table_info;
|
||||
use common_meta::rpc::router::{Region, RegionRoute};
|
||||
@@ -1004,8 +1004,10 @@ mod tests {
|
||||
.run_once()
|
||||
.await;
|
||||
|
||||
let region_lease = default_distributed_time_constants().region_lease.as_secs();
|
||||
|
||||
// Ensure it didn't run into the slow path.
|
||||
assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2);
|
||||
assert!(timer.elapsed().as_secs() < region_lease / 2);
|
||||
|
||||
runner.suite.verify_table_metadata().await;
|
||||
}
|
||||
@@ -1059,8 +1061,9 @@ mod tests {
|
||||
.run_once()
|
||||
.await;
|
||||
|
||||
let region_lease = default_distributed_time_constants().region_lease.as_secs();
|
||||
// Ensure it didn't run into the slow path.
|
||||
assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2);
|
||||
assert!(timer.elapsed().as_secs() < region_lease / 2);
|
||||
|
||||
runner.suite.verify_table_metadata().await;
|
||||
}
|
||||
@@ -1380,8 +1383,9 @@ mod tests {
|
||||
.run_once()
|
||||
.await;
|
||||
|
||||
let region_lease = default_distributed_time_constants().region_lease.as_secs();
|
||||
// Ensure it didn't run into the slow path.
|
||||
assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS);
|
||||
assert!(timer.elapsed().as_secs() < region_lease);
|
||||
runner.suite.verify_table_metadata().await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,10 +13,9 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::MailboxMessage;
|
||||
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
|
||||
use common_meta::distributed_time_constants::default_distributed_time_constants;
|
||||
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
|
||||
use common_meta::key::datanode_table::RegionInfo;
|
||||
use common_meta::RegionIdent;
|
||||
@@ -31,9 +30,6 @@ use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
|
||||
use crate::procedure::region_migration::{Context, State};
|
||||
use crate::service::mailbox::Channel;
|
||||
|
||||
/// Uses lease time of a region as the timeout of closing a downgraded region.
|
||||
const CLOSE_DOWNGRADED_REGION_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS);
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct CloseDowngradedRegion;
|
||||
|
||||
@@ -111,7 +107,7 @@ impl CloseDowngradedRegion {
|
||||
let ch = Channel::Datanode(downgrade_leader_datanode.id);
|
||||
let receiver = ctx
|
||||
.mailbox
|
||||
.send(&ch, msg, CLOSE_DOWNGRADED_REGION_TIMEOUT)
|
||||
.send(&ch, msg, default_distributed_time_constants().region_lease)
|
||||
.await?;
|
||||
|
||||
match receiver.await {
|
||||
|
||||
@@ -17,7 +17,7 @@ use std::time::Duration;
|
||||
|
||||
use api::v1::meta::MailboxMessage;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
|
||||
use common_meta::distributed_time_constants::default_distributed_time_constants;
|
||||
use common_meta::instruction::{
|
||||
DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply,
|
||||
};
|
||||
@@ -64,7 +64,7 @@ impl State for DowngradeLeaderRegion {
|
||||
let now = Instant::now();
|
||||
// Ensures the `leader_region_lease_deadline` must exist after recovering.
|
||||
ctx.volatile_ctx
|
||||
.set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS));
|
||||
.set_leader_region_lease_deadline(default_distributed_time_constants().region_lease);
|
||||
|
||||
match self.downgrade_region_with_retry(ctx).await {
|
||||
Ok(_) => {
|
||||
@@ -250,14 +250,14 @@ impl DowngradeLeaderRegion {
|
||||
if let Some(last_connection_at) = last_connection_at {
|
||||
let now = current_time_millis();
|
||||
let elapsed = now - last_connection_at;
|
||||
let region_lease = Duration::from_secs(REGION_LEASE_SECS);
|
||||
let region_lease = default_distributed_time_constants().region_lease;
|
||||
|
||||
// It's safe to update the region leader lease deadline here because:
|
||||
// 1. The old region leader has already been marked as downgraded in metadata,
|
||||
// which means any attempts to renew its lease will be rejected.
|
||||
// 2. The pusher disconnect time record only gets removed when the datanode (from_peer)
|
||||
// establishes a new heartbeat connection stream.
|
||||
if elapsed >= (REGION_LEASE_SECS * 1000) as i64 {
|
||||
if elapsed >= (region_lease.as_secs() * 1000) as i64 {
|
||||
ctx.volatile_ctx.reset_leader_region_lease_deadline();
|
||||
info!(
|
||||
"Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {}",
|
||||
@@ -663,7 +663,8 @@ mod tests {
|
||||
let procedure_ctx = new_procedure_context();
|
||||
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
|
||||
let elapsed = timer.elapsed().as_secs();
|
||||
assert!(elapsed < REGION_LEASE_SECS / 2);
|
||||
let region_lease = default_distributed_time_constants().region_lease.as_secs();
|
||||
assert!(elapsed < region_lease / 2);
|
||||
assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1));
|
||||
assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
|
||||
|
||||
|
||||
@@ -13,10 +13,9 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::MailboxMessage;
|
||||
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
|
||||
use common_meta::distributed_time_constants::default_distributed_time_constants;
|
||||
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
|
||||
use common_meta::key::datanode_table::RegionInfo;
|
||||
use common_meta::RegionIdent;
|
||||
@@ -32,9 +31,6 @@ use crate::procedure::region_migration::flush_leader_region::PreFlushRegion;
|
||||
use crate::procedure::region_migration::{Context, State};
|
||||
use crate::service::mailbox::Channel;
|
||||
|
||||
/// Uses lease time of a region as the timeout of opening a candidate region.
|
||||
const OPEN_CANDIDATE_REGION_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS);
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct OpenCandidateRegion;
|
||||
|
||||
@@ -143,7 +139,7 @@ impl OpenCandidateRegion {
|
||||
let now = Instant::now();
|
||||
let receiver = ctx
|
||||
.mailbox
|
||||
.send(&ch, msg, OPEN_CANDIDATE_REGION_TIMEOUT)
|
||||
.send(&ch, msg, default_distributed_time_constants().region_lease)
|
||||
.await?;
|
||||
|
||||
match receiver.await {
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_meta::distributed_time_constants::{FLOWNODE_LEASE_SECS, REGION_LEASE_SECS};
|
||||
use common_meta::distributed_time_constants::default_distributed_time_constants;
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use common_meta::peer::Peer;
|
||||
use rand::prelude::SliceRandom;
|
||||
@@ -36,8 +36,10 @@ pub fn new_test_selector_context() -> SelectorContext {
|
||||
|
||||
SelectorContext {
|
||||
server_addr: "127.0.0.1:3002".to_string(),
|
||||
datanode_lease_secs: REGION_LEASE_SECS,
|
||||
flownode_lease_secs: FLOWNODE_LEASE_SECS,
|
||||
datanode_lease_secs: default_distributed_time_constants().region_lease.as_secs(),
|
||||
flownode_lease_secs: default_distributed_time_constants()
|
||||
.flownode_lease
|
||||
.as_secs(),
|
||||
kv_backend,
|
||||
meta_peer_client,
|
||||
table_id: None,
|
||||
|
||||
@@ -27,10 +27,9 @@ use snafu::OptionExt;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tonic::{Request, Response, Streaming};
|
||||
use tonic::{Request, Response, Status, Streaming};
|
||||
|
||||
use crate::error;
|
||||
use crate::error::Result;
|
||||
use crate::error::{self, Result};
|
||||
use crate::handler::{HeartbeatHandlerGroup, Pusher, PusherId};
|
||||
use crate::metasrv::{Context, Metasrv};
|
||||
use crate::metrics::METRIC_META_HEARTBEAT_RECV;
|
||||
@@ -99,6 +98,7 @@ impl heartbeat_server::Heartbeat for Metasrv {
|
||||
break;
|
||||
}
|
||||
}
|
||||
error!(err; "Sending heartbeat response error");
|
||||
|
||||
if tx.send(Err(err)).await.is_err() {
|
||||
info!("ReceiverStream was dropped; shutting down");
|
||||
@@ -109,6 +109,12 @@ impl heartbeat_server::Heartbeat for Metasrv {
|
||||
|
||||
if is_not_leader {
|
||||
warn!("Quit because it is no longer the leader");
|
||||
let _ = tx
|
||||
.send(Err(Status::aborted(format!(
|
||||
"The requested metasrv node is not leader, node addr: {}",
|
||||
ctx.server_addr
|
||||
))))
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -207,6 +207,9 @@ pub trait Mailbox: Send + Sync {
|
||||
async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()>;
|
||||
|
||||
async fn on_recv(&self, id: MessageId, maybe_msg: Result<MailboxMessage>) -> Result<()>;
|
||||
|
||||
/// Reset all pushers of the mailbox.
|
||||
async fn reset(&self);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -90,7 +90,8 @@ pub struct CompactionRegion {
|
||||
pub(crate) engine_config: Arc<MitoConfig>,
|
||||
pub(crate) region_metadata: RegionMetadataRef,
|
||||
pub(crate) cache_manager: CacheManagerRef,
|
||||
pub(crate) access_layer: AccessLayerRef,
|
||||
/// Access layer to get the table path and path type.
|
||||
pub access_layer: AccessLayerRef,
|
||||
pub(crate) manifest_ctx: Arc<ManifestContext>,
|
||||
pub(crate) current_version: CompactionVersion,
|
||||
pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
|
||||
|
||||
@@ -76,7 +76,7 @@ pub struct RegionManifestOptions {
|
||||
/// -RegionMetadataRef metadata
|
||||
/// }
|
||||
/// class RegionEdit {
|
||||
/// -VersionNumber regoin_version
|
||||
/// -VersionNumber region_version
|
||||
/// -Vec~FileMeta~ files_to_add
|
||||
/// -Vec~FileMeta~ files_to_remove
|
||||
/// -SequenceNumber flushed_sequence
|
||||
|
||||
@@ -371,7 +371,7 @@ impl VersionBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets truncated entty id.
|
||||
/// Sets truncated entry id.
|
||||
pub(crate) fn truncated_entry_id(mut self, entry_id: Option<EntryId>) -> Self {
|
||||
self.truncated_entry_id = entry_id;
|
||||
self
|
||||
|
||||
@@ -344,7 +344,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_and_basic() {
|
||||
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_and_basic_");
|
||||
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_collect_and_basic_");
|
||||
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -353,7 +353,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
factory,
|
||||
);
|
||||
|
||||
let expr = Expr::BinaryExpr(BinaryExpr {
|
||||
|
||||
@@ -72,7 +72,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_between_basic() {
|
||||
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_between_basic_");
|
||||
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_collect_between_basic_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
"test".to_string(),
|
||||
@@ -80,7 +80,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
factory,
|
||||
);
|
||||
|
||||
let between = Between {
|
||||
@@ -113,7 +113,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_between_negated() {
|
||||
let (_d, facotry) =
|
||||
let (_d, factory) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_between_negated_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -122,7 +122,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
factory,
|
||||
);
|
||||
|
||||
let between = Between {
|
||||
@@ -138,7 +138,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_between_field_column() {
|
||||
let (_d, facotry) =
|
||||
let (_d, factory) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_between_field_column_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -147,7 +147,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
factory,
|
||||
);
|
||||
|
||||
let between = Between {
|
||||
@@ -180,7 +180,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_between_type_mismatch() {
|
||||
let (_d, facotry) =
|
||||
let (_d, factory) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_between_type_mismatch_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -189,7 +189,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
factory,
|
||||
);
|
||||
|
||||
let between = Between {
|
||||
@@ -206,7 +206,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_between_nonexistent_column() {
|
||||
let (_d, facotry) =
|
||||
let (_d, factory) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_between_nonexistent_column_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -215,7 +215,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
factory,
|
||||
);
|
||||
|
||||
let between = Between {
|
||||
|
||||
@@ -227,7 +227,7 @@ mod tests {
|
||||
),
|
||||
];
|
||||
|
||||
let (_d, facotry) =
|
||||
let (_d, factory) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_comparison_basic_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -236,7 +236,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
factory,
|
||||
);
|
||||
|
||||
for ((left, op, right), _) in &cases {
|
||||
@@ -255,7 +255,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_comparison_type_mismatch() {
|
||||
let (_d, facotry) =
|
||||
let (_d, factory) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_comparison_type_mismatch_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -264,7 +264,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
factory,
|
||||
);
|
||||
|
||||
let res = builder.collect_comparison_expr(&tag_column(), &Operator::Lt, &int64_lit(10));
|
||||
@@ -274,7 +274,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_comparison_field_column() {
|
||||
let (_d, facotry) =
|
||||
let (_d, factory) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_comparison_field_column_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -283,7 +283,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
factory,
|
||||
);
|
||||
|
||||
builder
|
||||
@@ -308,7 +308,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_comparison_nonexistent_column() {
|
||||
let (_d, facotry) =
|
||||
let (_d, factory) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_comparison_nonexistent_column_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -317,7 +317,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
factory,
|
||||
);
|
||||
|
||||
let res = builder.collect_comparison_expr(
|
||||
|
||||
@@ -136,7 +136,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_eq_basic() {
|
||||
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_eq_basic_");
|
||||
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_collect_eq_basic_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
"test".to_string(),
|
||||
@@ -144,7 +144,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
factory,
|
||||
);
|
||||
|
||||
builder
|
||||
@@ -172,7 +172,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_eq_field_column() {
|
||||
let (_d, facotry) =
|
||||
let (_d, factory) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_eq_field_column_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -181,7 +181,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
factory,
|
||||
);
|
||||
|
||||
builder
|
||||
@@ -200,7 +200,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_eq_nonexistent_column() {
|
||||
let (_d, facotry) =
|
||||
let (_d, factory) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_eq_nonexistent_column_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -209,7 +209,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
factory,
|
||||
);
|
||||
|
||||
let res = builder.collect_eq(&nonexistent_column(), &string_lit("abc"));
|
||||
@@ -219,7 +219,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_eq_type_mismatch() {
|
||||
let (_d, facotry) =
|
||||
let (_d, factory) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_eq_type_mismatch_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -228,7 +228,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
factory,
|
||||
);
|
||||
|
||||
let res = builder.collect_eq(&tag_column(), &int64_lit(1));
|
||||
@@ -238,7 +238,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_or_eq_list_basic() {
|
||||
let (_d, facotry) =
|
||||
let (_d, factory) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_basic_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -247,7 +247,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
factory,
|
||||
);
|
||||
|
||||
let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
|
||||
@@ -296,7 +296,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_or_eq_list_invalid_op() {
|
||||
let (_d, facotry) =
|
||||
let (_d, factory) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_invalid_op_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -305,7 +305,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
factory,
|
||||
);
|
||||
|
||||
let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
|
||||
@@ -333,7 +333,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_or_eq_list_multiple_columns() {
|
||||
let (_d, facotry) =
|
||||
let (_d, factory) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_multiple_columns_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -342,7 +342,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
factory,
|
||||
);
|
||||
|
||||
let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
|
||||
|
||||
@@ -67,7 +67,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_in_list_basic() {
|
||||
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_in_list_basic_");
|
||||
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_collect_in_list_basic_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
"test".to_string(),
|
||||
@@ -75,7 +75,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
factory,
|
||||
);
|
||||
|
||||
let in_list = InList {
|
||||
@@ -98,7 +98,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_in_list_negated() {
|
||||
let (_d, facotry) =
|
||||
let (_d, factory) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_in_list_negated_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -107,7 +107,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
factory,
|
||||
);
|
||||
|
||||
let in_list = InList {
|
||||
@@ -122,7 +122,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_in_list_field_column() {
|
||||
let (_d, facotry) =
|
||||
let (_d, factory) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_in_list_field_column_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -131,7 +131,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
factory,
|
||||
);
|
||||
|
||||
let in_list = InList {
|
||||
@@ -154,7 +154,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_in_list_type_mismatch() {
|
||||
let (_d, facotry) =
|
||||
let (_d, factory) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_in_list_type_mismatch_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -163,7 +163,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
factory,
|
||||
);
|
||||
|
||||
let in_list = InList {
|
||||
@@ -179,7 +179,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_in_list_nonexistent_column() {
|
||||
let (_d, facotry) =
|
||||
let (_d, factory) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_in_list_nonexistent_column_");
|
||||
|
||||
let metadata = test_region_metadata();
|
||||
@@ -189,7 +189,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
factory,
|
||||
);
|
||||
|
||||
let in_list = InList {
|
||||
|
||||
@@ -59,7 +59,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_regex_match_basic() {
|
||||
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_regex_match_basic_");
|
||||
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_regex_match_basic_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
"test".to_string(),
|
||||
@@ -67,7 +67,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
factory,
|
||||
);
|
||||
|
||||
builder
|
||||
@@ -86,7 +86,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_regex_match_field_column() {
|
||||
let (_d, facotry) =
|
||||
let (_d, factory) =
|
||||
PuffinManagerFactory::new_for_test_block("test_regex_match_field_column_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -95,7 +95,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
factory,
|
||||
);
|
||||
|
||||
builder
|
||||
@@ -114,7 +114,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_regex_match_type_mismatch() {
|
||||
let (_d, facotry) =
|
||||
let (_d, factory) =
|
||||
PuffinManagerFactory::new_for_test_block("test_regex_match_type_mismatch_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -123,7 +123,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
factory,
|
||||
);
|
||||
|
||||
builder
|
||||
@@ -135,7 +135,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_regex_match_type_nonexist_column() {
|
||||
let (_d, facotry) =
|
||||
let (_d, factory) =
|
||||
PuffinManagerFactory::new_for_test_block("test_regex_match_type_nonexist_column_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -144,7 +144,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
facotry,
|
||||
factory,
|
||||
);
|
||||
|
||||
let res = builder.collect_regex_match(&nonexistent_column(), &string_lit("abc"));
|
||||
|
||||
@@ -70,7 +70,7 @@ impl ScalarCalculate {
|
||||
interval: Millisecond,
|
||||
input: LogicalPlan,
|
||||
time_index: &str,
|
||||
tag_colunms: &[String],
|
||||
tag_columns: &[String],
|
||||
field_column: &str,
|
||||
table_name: Option<&str>,
|
||||
) -> Result<Self> {
|
||||
@@ -97,7 +97,7 @@ impl ScalarCalculate {
|
||||
end,
|
||||
interval,
|
||||
time_index: time_index.to_string(),
|
||||
tag_columns: tag_colunms.to_vec(),
|
||||
tag_columns: tag_columns.to_vec(),
|
||||
field_column: field_column.to_string(),
|
||||
input,
|
||||
output_schema: Arc::new(schema),
|
||||
|
||||
@@ -82,7 +82,7 @@ impl ExtensionPlanner for MergeSortExtensionPlanner {
|
||||
// and we only need to do a merge sort, otherwise fallback to quick sort
|
||||
let can_merge_sort = partition_cnt >= region_cnt;
|
||||
if can_merge_sort {
|
||||
// TODO(discord9): use `SortPreversingMergeExec here`
|
||||
// TODO(discord9): use `SortPreservingMergeExec here`
|
||||
}
|
||||
// for now merge sort only exist in logical plan, and have the same effect as `Sort`
|
||||
// doesn't change the execution plan, this will change in the future
|
||||
|
||||
@@ -23,6 +23,7 @@ pub mod prom_query_gateway;
|
||||
pub mod region_server;
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::health_check_server::{HealthCheck, HealthCheckServer};
|
||||
use api::v1::{HealthCheckRequest, HealthCheckResponse};
|
||||
@@ -72,6 +73,12 @@ pub struct GrpcOptions {
|
||||
pub runtime_size: usize,
|
||||
#[serde(default = "Default::default")]
|
||||
pub tls: TlsOption,
|
||||
/// The HTTP/2 keep-alive interval.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub http2_keep_alive_interval: Duration,
|
||||
/// The HTTP/2 keep-alive timeout.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub http2_keep_alive_timeout: Duration,
|
||||
}
|
||||
|
||||
impl GrpcOptions {
|
||||
@@ -129,6 +136,8 @@ impl Default for GrpcOptions {
|
||||
flight_compression: FlightCompression::ArrowIpc,
|
||||
runtime_size: 8,
|
||||
tls: TlsOption::default(),
|
||||
http2_keep_alive_interval: Duration::from_secs(10),
|
||||
http2_keep_alive_timeout: Duration::from_secs(3),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,12 +34,10 @@ impl HeartbeatOptions {
|
||||
pub fn frontend_default() -> Self {
|
||||
Self {
|
||||
// Frontend can send heartbeat with a longer interval.
|
||||
interval: Duration::from_millis(
|
||||
distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS,
|
||||
),
|
||||
retry_interval: Duration::from_millis(
|
||||
distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS,
|
||||
interval: distributed_time_constants::frontend_heartbeat_interval(
|
||||
distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
|
||||
),
|
||||
retry_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -47,10 +45,8 @@ impl HeartbeatOptions {
|
||||
impl Default for HeartbeatOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
interval: Duration::from_millis(distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS),
|
||||
retry_interval: Duration::from_millis(
|
||||
distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS,
|
||||
),
|
||||
interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
|
||||
retry_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1066,7 +1066,7 @@ impl HttpServer {
|
||||
/// Route Prometheus [HTTP API].
|
||||
///
|
||||
/// [HTTP API]: https://prometheus.io/docs/prometheus/latest/querying/api/
|
||||
fn route_prometheus<S>(prometheus_handler: PrometheusHandlerRef) -> Router<S> {
|
||||
pub fn route_prometheus<S>(prometheus_handler: PrometheusHandlerRef) -> Router<S> {
|
||||
Router::new()
|
||||
.route(
|
||||
"/format_query",
|
||||
|
||||
@@ -352,7 +352,7 @@ async fn dryrun_pipeline_inner(
|
||||
)
|
||||
.await?;
|
||||
|
||||
let colume_type_key = "colume_type";
|
||||
let column_type_key = "column_type";
|
||||
let data_type_key = "data_type";
|
||||
let name_key = "name";
|
||||
|
||||
@@ -376,7 +376,7 @@ async fn dryrun_pipeline_inner(
|
||||
JsonValue::String(cs.datatype().as_str_name().to_string()),
|
||||
);
|
||||
map.insert(
|
||||
colume_type_key.to_string(),
|
||||
column_type_key.to_string(),
|
||||
JsonValue::String(cs.semantic_type().as_str_name().to_string()),
|
||||
);
|
||||
map.insert(
|
||||
@@ -409,7 +409,7 @@ async fn dryrun_pipeline_inner(
|
||||
);
|
||||
map.insert(
|
||||
"semantic_type".to_string(),
|
||||
schema[idx][colume_type_key].clone(),
|
||||
schema[idx][column_type_key].clone(),
|
||||
);
|
||||
map.insert(
|
||||
"data_type".to_string(),
|
||||
|
||||
@@ -105,7 +105,7 @@ mod tests {
|
||||
use crate::statements::statement::Statement;
|
||||
|
||||
#[test]
|
||||
fn test_display_for_tuncate_table() {
|
||||
fn test_display_for_truncate_table() {
|
||||
let sql = r"truncate table t1;";
|
||||
let stmts: Vec<Statement> =
|
||||
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
|
||||
|
||||
@@ -19,7 +19,7 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use arbitrary::{Arbitrary, Unstructured};
|
||||
use common_meta::distributed_time_constants;
|
||||
use common_meta::distributed_time_constants::default_distributed_time_constants;
|
||||
use common_telemetry::info;
|
||||
use libfuzzer_sys::fuzz_target;
|
||||
use rand::{Rng, SeedableRng};
|
||||
@@ -252,10 +252,7 @@ async fn execute_failover(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
|
||||
recover_pod_failure(ctx.kube.clone(), &ctx.namespace, &chaos_name).await?;
|
||||
wait_for_all_datanode_online(ctx.greptime.clone(), Duration::from_secs(60)).await;
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(
|
||||
distributed_time_constants::REGION_LEASE_SECS,
|
||||
))
|
||||
.await;
|
||||
tokio::time::sleep(default_distributed_time_constants().region_lease).await;
|
||||
// Validates value rows
|
||||
info!("Validates num of rows");
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use arbitrary::{Arbitrary, Unstructured};
|
||||
use common_meta::distributed_time_constants;
|
||||
use common_meta::distributed_time_constants::default_distributed_time_constants;
|
||||
use common_telemetry::info;
|
||||
use common_time::util::current_time_millis;
|
||||
use futures::future::try_join_all;
|
||||
@@ -319,10 +319,7 @@ async fn execute_failover(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
|
||||
recover_pod_failure(ctx.kube.clone(), &ctx.namespace, &chaos_name).await?;
|
||||
wait_for_all_datanode_online(ctx.greptime.clone(), Duration::from_secs(60)).await;
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(
|
||||
distributed_time_constants::REGION_LEASE_SECS,
|
||||
))
|
||||
.await;
|
||||
tokio::time::sleep(default_distributed_time_constants().region_lease).await;
|
||||
// Validates value rows
|
||||
info!("Validates num of rows");
|
||||
for (table_ctx, expected_rows) in table_ctxs.iter().zip(affected_rows) {
|
||||
|
||||
@@ -19,7 +19,7 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use arbitrary::{Arbitrary, Unstructured};
|
||||
use common_meta::distributed_time_constants;
|
||||
use common_meta::distributed_time_constants::default_distributed_time_constants;
|
||||
use common_telemetry::info;
|
||||
use libfuzzer_sys::fuzz_target;
|
||||
use rand::{Rng, SeedableRng};
|
||||
@@ -271,10 +271,7 @@ async fn migrate_regions(ctx: &FuzzContext, migrations: &[Migration]) -> Result<
|
||||
wait_for_migration(ctx, migration, &procedure_id).await;
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(
|
||||
distributed_time_constants::REGION_LEASE_SECS,
|
||||
))
|
||||
.await;
|
||||
tokio::time::sleep(default_distributed_time_constants().region_lease).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use arbitrary::{Arbitrary, Unstructured};
|
||||
use common_meta::distributed_time_constants;
|
||||
use common_meta::distributed_time_constants::default_distributed_time_constants;
|
||||
use common_telemetry::info;
|
||||
use libfuzzer_sys::fuzz_target;
|
||||
use rand::{Rng, SeedableRng};
|
||||
@@ -262,10 +262,7 @@ async fn migrate_regions(ctx: &FuzzContext, migrations: &[Migration]) -> Result<
|
||||
.await;
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(
|
||||
distributed_time_constants::REGION_LEASE_SECS,
|
||||
))
|
||||
.await;
|
||||
tokio::time::sleep(default_distributed_time_constants().region_lease).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
services:
|
||||
|
||||
zookeeper:
|
||||
image: docker.io/bitnami/zookeeper:3.7
|
||||
image: greptime/zookeeper:3.7
|
||||
ports:
|
||||
- '2181:2181'
|
||||
environment:
|
||||
- ALLOW_ANONYMOUS_LOGIN=yes
|
||||
|
||||
kafka:
|
||||
image: docker.io/bitnami/kafka:3.9.0
|
||||
image: greptime/kafka:3.9.0-debian-12-r1
|
||||
container_name: kafka
|
||||
ports:
|
||||
- 9092:9092
|
||||
@@ -32,7 +32,7 @@ services:
|
||||
condition: service_started
|
||||
|
||||
etcd:
|
||||
image: docker.io/bitnami/etcd:3.5
|
||||
image: greptime/etcd:3.6.1-debian-12-r3
|
||||
ports:
|
||||
- "2379:2379"
|
||||
- "2380:2380"
|
||||
@@ -44,7 +44,7 @@ services:
|
||||
ETCD_MAX_REQUEST_BYTES: 10485760
|
||||
|
||||
minio:
|
||||
image: docker.io/bitnami/minio:2024
|
||||
image: greptime/minio:2024
|
||||
ports:
|
||||
- '9000:9000'
|
||||
- '9001:9001'
|
||||
@@ -68,7 +68,7 @@ services:
|
||||
- POSTGRES_PASSWORD=admin
|
||||
|
||||
mysql:
|
||||
image: bitnami/mysql:5.7
|
||||
image: greptime/mysql:5.7
|
||||
ports:
|
||||
- 3306:3306
|
||||
volumes:
|
||||
|
||||
@@ -1188,6 +1188,8 @@ max_recv_message_size = "512MiB"
|
||||
max_send_message_size = "512MiB"
|
||||
flight_compression = "arrow_ipc"
|
||||
runtime_size = 8
|
||||
http2_keep_alive_interval = "10s"
|
||||
http2_keep_alive_timeout = "3s"
|
||||
|
||||
[grpc.tls]
|
||||
mode = "disable"
|
||||
@@ -3212,37 +3214,37 @@ transform:
|
||||
|
||||
let dryrun_schema = json!([
|
||||
{
|
||||
"colume_type": "FIELD",
|
||||
"column_type": "FIELD",
|
||||
"data_type": "INT32",
|
||||
"fulltext": false,
|
||||
"name": "id1"
|
||||
},
|
||||
{
|
||||
"colume_type": "FIELD",
|
||||
"column_type": "FIELD",
|
||||
"data_type": "INT32",
|
||||
"fulltext": false,
|
||||
"name": "id2"
|
||||
},
|
||||
{
|
||||
"colume_type": "FIELD",
|
||||
"column_type": "FIELD",
|
||||
"data_type": "STRING",
|
||||
"fulltext": false,
|
||||
"name": "type"
|
||||
},
|
||||
{
|
||||
"colume_type": "FIELD",
|
||||
"column_type": "FIELD",
|
||||
"data_type": "STRING",
|
||||
"fulltext": false,
|
||||
"name": "log"
|
||||
},
|
||||
{
|
||||
"colume_type": "FIELD",
|
||||
"column_type": "FIELD",
|
||||
"data_type": "STRING",
|
||||
"fulltext": false,
|
||||
"name": "logger"
|
||||
},
|
||||
{
|
||||
"colume_type": "TIMESTAMP",
|
||||
"column_type": "TIMESTAMP",
|
||||
"data_type": "TIMESTAMP_NANOSECOND",
|
||||
"fulltext": false,
|
||||
"name": "time"
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
version: '3.8'
|
||||
services:
|
||||
kafka:
|
||||
image: bitnami/kafka:3.6.0
|
||||
image: greptime/kafka:3.9.0-debian-12-r1
|
||||
container_name: kafka
|
||||
ports:
|
||||
- 9092:9092
|
||||
|
||||
@@ -363,9 +363,9 @@ pub fn setup_mysql(mysql_port: u16, mysql_version: Option<&str>) {
|
||||
}
|
||||
|
||||
let mysql_image = if let Some(mysql_version) = mysql_version {
|
||||
format!("bitnami/mysql:{mysql_version}")
|
||||
format!("greptime/mysql:{mysql_version}")
|
||||
} else {
|
||||
"bitnami/mysql:5.7".to_string()
|
||||
"greptime/mysql:5.7".to_string()
|
||||
};
|
||||
let mysql_password = "admin";
|
||||
let mysql_user = "greptimedb";
|
||||
|
||||
Reference in New Issue
Block a user