mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-10 07:12:54 +00:00
Compare commits
4 Commits
release/v0
...
fix/v0.16-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c69c40a9ee | ||
|
|
c9bc3de9aa | ||
|
|
71d54431c6 | ||
|
|
524c0ced44 |
@@ -12,7 +12,7 @@ runs:
|
||||
steps:
|
||||
- name: Install Etcd cluster
|
||||
shell: bash
|
||||
run: |
|
||||
run: |
|
||||
helm upgrade \
|
||||
--install etcd oci://registry-1.docker.io/bitnamicharts/etcd \
|
||||
--set replicaCount=${{ inputs.etcd-replicas }} \
|
||||
@@ -24,9 +24,4 @@ runs:
|
||||
--set auth.rbac.token.enabled=false \
|
||||
--set persistence.size=2Gi \
|
||||
--create-namespace \
|
||||
--set global.security.allowInsecureImages=true \
|
||||
--set image.registry=docker.io \
|
||||
--set image.repository=greptime/etcd \
|
||||
--set image.tag=3.6.1-debian-12-r3 \
|
||||
--version 12.0.8 \
|
||||
-n ${{ inputs.namespace }}
|
||||
|
||||
@@ -51,7 +51,7 @@ runs:
|
||||
run: |
|
||||
helm upgrade \
|
||||
--install my-greptimedb \
|
||||
--set 'meta.backendStorage.etcd.endpoints[0]=${{ inputs.etcd-endpoints }}' \
|
||||
--set meta.backendStorage.etcd.endpoints=${{ inputs.etcd-endpoints }} \
|
||||
--set meta.enableRegionFailover=${{ inputs.enable-region-failover }} \
|
||||
--set image.registry=${{ inputs.image-registry }} \
|
||||
--set image.repository=${{ inputs.image-repository }} \
|
||||
|
||||
@@ -12,7 +12,7 @@ runs:
|
||||
steps:
|
||||
- name: Install Kafka cluster
|
||||
shell: bash
|
||||
run: |
|
||||
run: |
|
||||
helm upgrade \
|
||||
--install kafka oci://registry-1.docker.io/bitnamicharts/kafka \
|
||||
--set controller.replicaCount=${{ inputs.controller-replicas }} \
|
||||
@@ -23,8 +23,4 @@ runs:
|
||||
--set listeners.controller.protocol=PLAINTEXT \
|
||||
--set listeners.client.protocol=PLAINTEXT \
|
||||
--create-namespace \
|
||||
--set image.registry=docker.io \
|
||||
--set image.repository=greptime/kafka \
|
||||
--set image.tag=3.9.0-debian-12-r1 \
|
||||
--version 31.0.0 \
|
||||
-n ${{ inputs.namespace }}
|
||||
|
||||
@@ -6,7 +6,9 @@ inputs:
|
||||
description: "Number of PostgreSQL replicas"
|
||||
namespace:
|
||||
default: "postgres-namespace"
|
||||
description: "The PostgreSQL namespace"
|
||||
postgres-version:
|
||||
default: "14.2"
|
||||
description: "PostgreSQL version"
|
||||
storage-size:
|
||||
default: "1Gi"
|
||||
description: "Storage size for PostgreSQL"
|
||||
@@ -20,11 +22,7 @@ runs:
|
||||
helm upgrade \
|
||||
--install postgresql oci://registry-1.docker.io/bitnamicharts/postgresql \
|
||||
--set replicaCount=${{ inputs.postgres-replicas }} \
|
||||
--set global.security.allowInsecureImages=true \
|
||||
--set image.registry=docker.io \
|
||||
--set image.repository=greptime/postgresql \
|
||||
--set image.tag=17.5.0-debian-12-r3 \
|
||||
--version 16.7.4 \
|
||||
--set image.tag=${{ inputs.postgres-version }} \
|
||||
--set persistence.size=${{ inputs.storage-size }} \
|
||||
--set postgresql.username=greptimedb \
|
||||
--set postgresql.password=admin \
|
||||
|
||||
5
.github/scripts/deploy-greptimedb.sh
vendored
5
.github/scripts/deploy-greptimedb.sh
vendored
@@ -68,7 +68,7 @@ function deploy_greptimedb_cluster() {
|
||||
|
||||
helm install "$cluster_name" greptime/greptimedb-cluster \
|
||||
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
|
||||
--set meta.backendStorage.etcd.endpoints[0]="etcd.$install_namespace:2379" \
|
||||
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
|
||||
-n "$install_namespace"
|
||||
|
||||
# Wait for greptimedb cluster to be ready.
|
||||
@@ -103,13 +103,14 @@ function deploy_greptimedb_cluster_with_s3_storage() {
|
||||
|
||||
helm install "$cluster_name" greptime/greptimedb-cluster -n "$install_namespace" \
|
||||
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
|
||||
--set meta.backendStorage.etcd.endpoints[0]="etcd.$install_namespace:2379" \
|
||||
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
|
||||
--set storage.s3.bucket="$AWS_CI_TEST_BUCKET" \
|
||||
--set storage.s3.region="$AWS_REGION" \
|
||||
--set storage.s3.root="$DATA_ROOT" \
|
||||
--set storage.credentials.secretName=s3-credentials \
|
||||
--set storage.credentials.accessKeyId="$AWS_ACCESS_KEY_ID" \
|
||||
--set storage.credentials.secretAccessKey="$AWS_SECRET_ACCESS_KEY"
|
||||
|
||||
# Wait for greptimedb cluster to be ready.
|
||||
while true; do
|
||||
PHASE=$(kubectl -n "$install_namespace" get gtc "$cluster_name" -o jsonpath='{.status.clusterPhase}')
|
||||
|
||||
34
.github/scripts/pull-test-deps-images.sh
vendored
34
.github/scripts/pull-test-deps-images.sh
vendored
@@ -1,34 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
# This script is used to pull the test dependency images that are stored in public ECR one by one to avoid rate limiting.
|
||||
|
||||
set -e
|
||||
|
||||
MAX_RETRIES=3
|
||||
|
||||
IMAGES=(
|
||||
"greptime/zookeeper:3.7"
|
||||
"greptime/kafka:3.9.0-debian-12-r1"
|
||||
"greptime/etcd:3.6.1-debian-12-r3"
|
||||
"greptime/minio:2024"
|
||||
"greptime/mysql:5.7"
|
||||
)
|
||||
|
||||
for image in "${IMAGES[@]}"; do
|
||||
for ((attempt=1; attempt<=MAX_RETRIES; attempt++)); do
|
||||
if docker pull "$image"; then
|
||||
# Successfully pulled the image.
|
||||
break
|
||||
else
|
||||
# Use some simple exponential backoff to avoid rate limiting.
|
||||
if [ $attempt -lt $MAX_RETRIES ]; then
|
||||
sleep_seconds=$((attempt * 5))
|
||||
echo "Attempt $attempt failed for $image, waiting $sleep_seconds seconds"
|
||||
sleep $sleep_seconds # 5s, 10s delays
|
||||
else
|
||||
echo "Failed to pull $image after $MAX_RETRIES attempts"
|
||||
exit 1
|
||||
fi
|
||||
fi
|
||||
done
|
||||
done
|
||||
4
.github/workflows/develop.yml
vendored
4
.github/workflows/develop.yml
vendored
@@ -719,10 +719,6 @@ jobs:
|
||||
save-if: ${{ github.ref == 'refs/heads/main' }}
|
||||
- name: Install latest nextest release
|
||||
uses: taiki-e/install-action@nextest
|
||||
|
||||
- name: Pull test dependencies images
|
||||
run: ./.github/scripts/pull-test-deps-images.sh
|
||||
|
||||
- name: Setup external services
|
||||
working-directory: tests-integration/fixtures
|
||||
run: docker compose up -d --wait
|
||||
|
||||
18
Cargo.lock
generated
18
Cargo.lock
generated
@@ -6870,9 +6870,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.178"
|
||||
version = "0.2.171"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091"
|
||||
checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6"
|
||||
|
||||
[[package]]
|
||||
name = "libflate"
|
||||
@@ -6928,7 +6928,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"windows-targets 0.48.5",
|
||||
"windows-targets 0.52.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -7005,13 +7005,13 @@ checksum = "b4ce301924b7887e9d637144fdade93f9dfff9b60981d4ac161db09720d39aa5"
|
||||
|
||||
[[package]]
|
||||
name = "local-ip-address"
|
||||
version = "0.6.5"
|
||||
version = "0.6.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "656b3b27f8893f7bbf9485148ff9a65f019e3f33bd5cdc87c83cab16b3fd9ec8"
|
||||
checksum = "3669cf5561f8d27e8fc84cc15e58350e70f557d4d65f70e3154e54cd2f8e1782"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"neli",
|
||||
"thiserror 2.0.12",
|
||||
"thiserror 1.0.64",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
@@ -10849,7 +10849,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "rskafka"
|
||||
version = "0.6.0"
|
||||
source = "git+https://github.com/WenyXu/rskafka.git?rev=9494304ae3947b07e660b5d08549ad4a39c84a26#9494304ae3947b07e660b5d08549ad4a39c84a26"
|
||||
source = "git+https://github.com/influxdata/rskafka.git?rev=8dbd01ed809f5a791833a594e85b144e36e45820#8dbd01ed809f5a791833a594e85b144e36e45820"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"chrono",
|
||||
@@ -12331,7 +12331,7 @@ dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"psm",
|
||||
"windows-sys 0.52.0",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -14598,7 +14598,7 @@ version = "0.1.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
|
||||
dependencies = [
|
||||
"windows-sys 0.48.0",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -188,8 +188,7 @@ reqwest = { version = "0.12", default-features = false, features = [
|
||||
"stream",
|
||||
"multipart",
|
||||
] }
|
||||
# Branch: feat/request-timeout-port
|
||||
rskafka = { git = "https://github.com/WenyXu/rskafka.git", rev = "9494304ae3947b07e660b5d08549ad4a39c84a26", features = [
|
||||
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev = "8dbd01ed809f5a791833a594e85b144e36e45820", features = [
|
||||
"transport-tls",
|
||||
] }
|
||||
rstest = "0.25"
|
||||
|
||||
@@ -78,8 +78,6 @@
|
||||
| `wal.sync_period` | String | `10s` | Duration for fsyncing log files.<br/>**It's only used when the provider is `raft_engine`**. |
|
||||
| `wal.recovery_parallelism` | Integer | `2` | Parallelism during WAL recovery. |
|
||||
| `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.connect_timeout` | String | `3s` | The connect timeout for kafka client.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.timeout` | String | `3s` | The timeout for kafka client.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.<br/>Set to `true` to automatically create topics for WAL.<br/>Otherwise, use topics named `topic_name_prefix_[0..num_topics)` |
|
||||
| `wal.num_topics` | Integer | `64` | Number of topics.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.selector_type` | String | `round_robin` | Topic selector type.<br/>Available selector types:<br/>- `round_robin` (default)<br/>**It's only used when the provider is `kafka`**. |
|
||||
@@ -277,6 +275,7 @@
|
||||
| `meta_client` | -- | -- | The metasrv client options. |
|
||||
| `meta_client.metasrv_addrs` | Array | -- | The addresses of the metasrv. |
|
||||
| `meta_client.timeout` | String | `3s` | Operation timeout. |
|
||||
| `meta_client.heartbeat_timeout` | String | `500ms` | Heartbeat timeout. |
|
||||
| `meta_client.ddl_timeout` | String | `10s` | DDL timeout. |
|
||||
| `meta_client.connect_timeout` | String | `1s` | Connect server timeout. |
|
||||
| `meta_client.tcp_nodelay` | Bool | `true` | `TCP_NODELAY` option for accepted connections. |
|
||||
@@ -335,7 +334,6 @@
|
||||
| `region_failure_detector_initialization_delay` | String | `10m` | The delay before starting region failure detection.<br/>This delay helps prevent Metasrv from triggering unnecessary region failovers before all Datanodes are fully started.<br/>Especially useful when the cluster is not deployed with GreptimeDB Operator and maintenance mode is not enabled. |
|
||||
| `allow_region_failover_on_local_wal` | Bool | `false` | Whether to allow region failover on local WAL.<br/>**This option is not recommended to be set to true, because it may lead to data loss during failover.** |
|
||||
| `node_max_idle_time` | String | `24hours` | Max allowed idle time before removing node info from metasrv memory. |
|
||||
| `heartbeat_interval` | String | `3s` | Base heartbeat interval for calculating distributed time constants.<br/>The frontend heartbeat interval is 6 times of the base heartbeat interval.<br/>The flownode/datanode heartbeat interval is 1 times of the base heartbeat interval.<br/>e.g., If the base heartbeat interval is 3s, the frontend heartbeat interval is 18s, the flownode/datanode heartbeat interval is 3s.<br/>If you change this value, you need to change the heartbeat interval of the flownode/frontend/datanode accordingly. |
|
||||
| `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. Enabled by default. |
|
||||
| `runtime` | -- | -- | The runtime options. |
|
||||
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
|
||||
@@ -346,18 +344,12 @@
|
||||
| `backend_tls.key_path` | String | `""` | Path to client private key file (for client authentication)<br/>Like "/path/to/client.key" |
|
||||
| `backend_tls.ca_cert_path` | String | `""` | Path to CA certificate file (for server certificate verification)<br/>Required when using custom CAs or self-signed certificates<br/>Leave empty to use system root certificates only<br/>Like "/path/to/ca.crt" |
|
||||
| `backend_tls.watch` | Bool | `false` | Watch for certificate file changes and auto reload |
|
||||
| `backend_client` | -- | -- | The backend client options.<br/>Currently, only applicable when using etcd as the metadata store. |
|
||||
| `backend_client.keep_alive_timeout` | String | `3s` | The keep alive timeout for backend client. |
|
||||
| `backend_client.keep_alive_interval` | String | `10s` | The keep alive interval for backend client. |
|
||||
| `backend_client.connect_timeout` | String | `3s` | The connect timeout for backend client. |
|
||||
| `grpc` | -- | -- | The gRPC server options. |
|
||||
| `grpc.bind_addr` | String | `127.0.0.1:3002` | The address to bind the gRPC server. |
|
||||
| `grpc.server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `bind_addr`. |
|
||||
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
|
||||
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
|
||||
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
|
||||
| `grpc.http2_keep_alive_interval` | String | `10s` | The server side HTTP/2 keep-alive interval |
|
||||
| `grpc.http2_keep_alive_timeout` | String | `3s` | The server side HTTP/2 keep-alive timeout. |
|
||||
| `http` | -- | -- | The HTTP server options. |
|
||||
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
|
||||
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
|
||||
@@ -449,6 +441,7 @@
|
||||
| `meta_client` | -- | -- | The metasrv client options. |
|
||||
| `meta_client.metasrv_addrs` | Array | -- | The addresses of the metasrv. |
|
||||
| `meta_client.timeout` | String | `3s` | Operation timeout. |
|
||||
| `meta_client.heartbeat_timeout` | String | `500ms` | Heartbeat timeout. |
|
||||
| `meta_client.ddl_timeout` | String | `10s` | DDL timeout. |
|
||||
| `meta_client.connect_timeout` | String | `1s` | Connect server timeout. |
|
||||
| `meta_client.tcp_nodelay` | Bool | `true` | `TCP_NODELAY` option for accepted connections. |
|
||||
@@ -468,8 +461,6 @@
|
||||
| `wal.sync_period` | String | `10s` | Duration for fsyncing log files.<br/>**It's only used when the provider is `raft_engine`**. |
|
||||
| `wal.recovery_parallelism` | Integer | `2` | Parallelism during WAL recovery. |
|
||||
| `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.connect_timeout` | String | `3s` | The connect timeout for kafka client.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.timeout` | String | `3s` | The timeout for kafka client.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.max_batch_bytes` | String | `1MB` | The max size of a single producer batch.<br/>Warning: Kafka has a default limit of 1MB per message in a topic.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.consumer_wait_timeout` | String | `100ms` | The consumer wait timeout.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.create_index` | Bool | `true` | Whether to enable WAL index creation.<br/>**It's only used when the provider is `kafka`**. |
|
||||
@@ -613,6 +604,7 @@
|
||||
| `meta_client` | -- | -- | The metasrv client options. |
|
||||
| `meta_client.metasrv_addrs` | Array | -- | The addresses of the metasrv. |
|
||||
| `meta_client.timeout` | String | `3s` | Operation timeout. |
|
||||
| `meta_client.heartbeat_timeout` | String | `500ms` | Heartbeat timeout. |
|
||||
| `meta_client.ddl_timeout` | String | `10s` | DDL timeout. |
|
||||
| `meta_client.connect_timeout` | String | `1s` | Connect server timeout. |
|
||||
| `meta_client.tcp_nodelay` | Bool | `true` | `TCP_NODELAY` option for accepted connections. |
|
||||
|
||||
@@ -92,6 +92,9 @@ metasrv_addrs = ["127.0.0.1:3002"]
|
||||
## Operation timeout.
|
||||
timeout = "3s"
|
||||
|
||||
## Heartbeat timeout.
|
||||
heartbeat_timeout = "500ms"
|
||||
|
||||
## DDL timeout.
|
||||
ddl_timeout = "10s"
|
||||
|
||||
@@ -161,14 +164,6 @@ recovery_parallelism = 2
|
||||
## **It's only used when the provider is `kafka`**.
|
||||
broker_endpoints = ["127.0.0.1:9092"]
|
||||
|
||||
## The connect timeout for kafka client.
|
||||
## **It's only used when the provider is `kafka`**.
|
||||
#+ connect_timeout = "3s"
|
||||
|
||||
## The timeout for kafka client.
|
||||
## **It's only used when the provider is `kafka`**.
|
||||
#+ timeout = "3s"
|
||||
|
||||
## The max size of a single producer batch.
|
||||
## Warning: Kafka has a default limit of 1MB per message in a topic.
|
||||
## **It's only used when the provider is `kafka`**.
|
||||
|
||||
@@ -64,6 +64,9 @@ metasrv_addrs = ["127.0.0.1:3002"]
|
||||
## Operation timeout.
|
||||
timeout = "3s"
|
||||
|
||||
## Heartbeat timeout.
|
||||
heartbeat_timeout = "500ms"
|
||||
|
||||
## DDL timeout.
|
||||
ddl_timeout = "10s"
|
||||
|
||||
|
||||
@@ -171,6 +171,9 @@ metasrv_addrs = ["127.0.0.1:3002"]
|
||||
## Operation timeout.
|
||||
timeout = "3s"
|
||||
|
||||
## Heartbeat timeout.
|
||||
heartbeat_timeout = "500ms"
|
||||
|
||||
## DDL timeout.
|
||||
ddl_timeout = "10s"
|
||||
|
||||
|
||||
@@ -55,13 +55,6 @@ allow_region_failover_on_local_wal = false
|
||||
## Max allowed idle time before removing node info from metasrv memory.
|
||||
node_max_idle_time = "24hours"
|
||||
|
||||
## Base heartbeat interval for calculating distributed time constants.
|
||||
## The frontend heartbeat interval is 6 times of the base heartbeat interval.
|
||||
## The flownode/datanode heartbeat interval is 1 times of the base heartbeat interval.
|
||||
## e.g., If the base heartbeat interval is 3s, the frontend heartbeat interval is 18s, the flownode/datanode heartbeat interval is 3s.
|
||||
## If you change this value, you need to change the heartbeat interval of the flownode/frontend/datanode accordingly.
|
||||
#+ heartbeat_interval = "3s"
|
||||
|
||||
## Whether to enable greptimedb telemetry. Enabled by default.
|
||||
#+ enable_telemetry = true
|
||||
|
||||
@@ -100,16 +93,6 @@ ca_cert_path = ""
|
||||
## Watch for certificate file changes and auto reload
|
||||
watch = false
|
||||
|
||||
## The backend client options.
|
||||
## Currently, only applicable when using etcd as the metadata store.
|
||||
#+ [backend_client]
|
||||
## The keep alive timeout for backend client.
|
||||
#+ keep_alive_timeout = "3s"
|
||||
## The keep alive interval for backend client.
|
||||
#+ keep_alive_interval = "10s"
|
||||
## The connect timeout for backend client.
|
||||
#+ connect_timeout = "3s"
|
||||
|
||||
## The gRPC server options.
|
||||
[grpc]
|
||||
## The address to bind the gRPC server.
|
||||
@@ -124,10 +107,6 @@ runtime_size = 8
|
||||
max_recv_message_size = "512MB"
|
||||
## The maximum send message size for gRPC server.
|
||||
max_send_message_size = "512MB"
|
||||
## The server side HTTP/2 keep-alive interval
|
||||
#+ http2_keep_alive_interval = "10s"
|
||||
## The server side HTTP/2 keep-alive timeout.
|
||||
#+ http2_keep_alive_timeout = "3s"
|
||||
|
||||
## The HTTP server options.
|
||||
[http]
|
||||
|
||||
@@ -209,14 +209,6 @@ recovery_parallelism = 2
|
||||
## **It's only used when the provider is `kafka`**.
|
||||
broker_endpoints = ["127.0.0.1:9092"]
|
||||
|
||||
## The connect timeout for kafka client.
|
||||
## **It's only used when the provider is `kafka`**.
|
||||
#+ connect_timeout = "3s"
|
||||
|
||||
## The timeout for kafka client.
|
||||
## **It's only used when the provider is `kafka`**.
|
||||
#+ timeout = "3s"
|
||||
|
||||
## Automatically create topics for WAL.
|
||||
## Set to `true` to automatically create topics for WAL.
|
||||
## Otherwise, use topics named `topic_name_prefix_[0..num_topics)`
|
||||
|
||||
@@ -29,7 +29,6 @@ use crate::information_schema::{InformationExtensionRef, InformationSchemaProvid
|
||||
use crate::kvbackend::manager::{SystemCatalog, CATALOG_CACHE_MAX_CAPACITY};
|
||||
use crate::kvbackend::KvBackendCatalogManager;
|
||||
use crate::process_manager::ProcessManagerRef;
|
||||
use crate::system_schema::numbers_table_provider::NumbersTableProvider;
|
||||
use crate::system_schema::pg_catalog::PGCatalogProvider;
|
||||
|
||||
pub struct KvBackendCatalogManagerBuilder {
|
||||
@@ -120,7 +119,6 @@ impl KvBackendCatalogManagerBuilder {
|
||||
DEFAULT_CATALOG_NAME.to_string(),
|
||||
me.clone(),
|
||||
)),
|
||||
numbers_table_provider: NumbersTableProvider,
|
||||
backend,
|
||||
process_manager,
|
||||
#[cfg(feature = "enterprise")]
|
||||
|
||||
@@ -18,7 +18,8 @@ use std::sync::{Arc, Weak};
|
||||
|
||||
use async_stream::try_stream;
|
||||
use common_catalog::consts::{
|
||||
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME,
|
||||
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID,
|
||||
PG_CATALOG_NAME,
|
||||
};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::cache::{
|
||||
@@ -42,6 +43,7 @@ use snafu::prelude::*;
|
||||
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
|
||||
use table::dist_table::DistTable;
|
||||
use table::metadata::{TableId, TableInfoRef};
|
||||
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
|
||||
use table::table_name::TableName;
|
||||
use table::TableRef;
|
||||
use tokio::sync::Semaphore;
|
||||
@@ -56,7 +58,6 @@ use crate::information_schema::InformationSchemaTableFactoryRef;
|
||||
use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
|
||||
use crate::kvbackend::TableCacheRef;
|
||||
use crate::process_manager::ProcessManagerRef;
|
||||
use crate::system_schema::numbers_table_provider::NumbersTableProvider;
|
||||
use crate::system_schema::pg_catalog::PGCatalogProvider;
|
||||
use crate::system_schema::SystemSchemaProvider;
|
||||
use crate::CatalogManager;
|
||||
@@ -536,7 +537,6 @@ pub(super) struct SystemCatalog {
|
||||
// system_schema_provider for default catalog
|
||||
pub(super) information_schema_provider: Arc<InformationSchemaProvider>,
|
||||
pub(super) pg_catalog_provider: Arc<PGCatalogProvider>,
|
||||
pub(super) numbers_table_provider: NumbersTableProvider,
|
||||
pub(super) backend: KvBackendRef,
|
||||
pub(super) process_manager: Option<ProcessManagerRef>,
|
||||
#[cfg(feature = "enterprise")]
|
||||
@@ -566,7 +566,9 @@ impl SystemCatalog {
|
||||
PG_CATALOG_NAME if channel == Channel::Postgres => {
|
||||
self.pg_catalog_provider.table_names()
|
||||
}
|
||||
DEFAULT_SCHEMA_NAME => self.numbers_table_provider.table_names(),
|
||||
DEFAULT_SCHEMA_NAME => {
|
||||
vec![NUMBERS_TABLE_NAME.to_string()]
|
||||
}
|
||||
_ => vec![],
|
||||
}
|
||||
}
|
||||
@@ -584,7 +586,7 @@ impl SystemCatalog {
|
||||
if schema == INFORMATION_SCHEMA_NAME {
|
||||
self.information_schema_provider.table(table).is_some()
|
||||
} else if schema == DEFAULT_SCHEMA_NAME {
|
||||
self.numbers_table_provider.table_exists(table)
|
||||
table == NUMBERS_TABLE_NAME
|
||||
} else if schema == PG_CATALOG_NAME && channel == Channel::Postgres {
|
||||
self.pg_catalog_provider.table(table).is_some()
|
||||
} else {
|
||||
@@ -629,8 +631,8 @@ impl SystemCatalog {
|
||||
});
|
||||
pg_catalog_provider.table(table_name)
|
||||
}
|
||||
} else if schema == DEFAULT_SCHEMA_NAME {
|
||||
self.numbers_table_provider.table(table_name)
|
||||
} else if schema == DEFAULT_SCHEMA_NAME && table_name == NUMBERS_TABLE_NAME {
|
||||
Some(NumbersTable::table(NUMBERS_TABLE_ID))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
|
||||
pub mod information_schema;
|
||||
mod memory_table;
|
||||
pub mod numbers_table_provider;
|
||||
pub mod pg_catalog;
|
||||
pub mod predicate;
|
||||
mod utils;
|
||||
|
||||
@@ -1,59 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#[cfg(any(test, feature = "testing", debug_assertions))]
|
||||
use common_catalog::consts::NUMBERS_TABLE_ID;
|
||||
#[cfg(any(test, feature = "testing", debug_assertions))]
|
||||
use table::table::numbers::NumbersTable;
|
||||
#[cfg(any(test, feature = "testing", debug_assertions))]
|
||||
use table::table::numbers::NUMBERS_TABLE_NAME;
|
||||
use table::TableRef;
|
||||
|
||||
// NumbersTableProvider is a dedicated provider for feature-gating the numbers table.
|
||||
#[derive(Clone)]
|
||||
pub struct NumbersTableProvider;
|
||||
|
||||
#[cfg(any(test, feature = "testing", debug_assertions))]
|
||||
impl NumbersTableProvider {
|
||||
pub(crate) fn table_exists(&self, name: &str) -> bool {
|
||||
name == NUMBERS_TABLE_NAME
|
||||
}
|
||||
|
||||
pub(crate) fn table_names(&self) -> Vec<String> {
|
||||
vec![NUMBERS_TABLE_NAME.to_string()]
|
||||
}
|
||||
|
||||
pub(crate) fn table(&self, name: &str) -> Option<TableRef> {
|
||||
if name == NUMBERS_TABLE_NAME {
|
||||
Some(NumbersTable::table(NUMBERS_TABLE_ID))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(any(test, feature = "testing", debug_assertions)))]
|
||||
impl NumbersTableProvider {
|
||||
pub(crate) fn table_exists(&self, _name: &str) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
pub(crate) fn table_names(&self) -> Vec<String> {
|
||||
vec![]
|
||||
}
|
||||
|
||||
pub(crate) fn table(&self, _name: &str) -> Option<TableRef> {
|
||||
None
|
||||
}
|
||||
}
|
||||
@@ -20,7 +20,7 @@ use common_meta::kv_backend::chroot::ChrootKvBackend;
|
||||
use common_meta::kv_backend::etcd::EtcdStore;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use meta_srv::bootstrap::create_etcd_client;
|
||||
use meta_srv::metasrv::{BackendClientOptions, BackendImpl};
|
||||
use meta_srv::metasrv::BackendImpl;
|
||||
|
||||
use crate::error::{EmptyStoreAddrsSnafu, UnsupportedMemoryBackendSnafu};
|
||||
|
||||
@@ -67,10 +67,9 @@ impl StoreConfig {
|
||||
} else {
|
||||
let kvbackend = match self.backend {
|
||||
BackendImpl::EtcdStore => {
|
||||
let etcd_client =
|
||||
create_etcd_client(store_addrs, &BackendClientOptions::default())
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
let etcd_client = create_etcd_client(store_addrs)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops))
|
||||
}
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
|
||||
@@ -29,7 +29,7 @@ use futures::TryStreamExt;
|
||||
|
||||
use crate::error::InvalidArgumentsSnafu;
|
||||
use crate::metadata::common::StoreConfig;
|
||||
use crate::metadata::control::utils::{decode_key_value, get_table_id_by_name, json_formatter};
|
||||
use crate::metadata::control::utils::{decode_key_value, get_table_id_by_name, json_fromatter};
|
||||
use crate::Tool;
|
||||
|
||||
/// Getting metadata from metadata store.
|
||||
@@ -206,7 +206,7 @@ impl Tool for GetTableTool {
|
||||
println!(
|
||||
"{}\n{}",
|
||||
TableInfoKey::new(table_id),
|
||||
json_formatter(self.pretty, &*table_info)
|
||||
json_fromatter(self.pretty, &*table_info)
|
||||
);
|
||||
} else {
|
||||
println!("Table info not found");
|
||||
@@ -221,7 +221,7 @@ impl Tool for GetTableTool {
|
||||
println!(
|
||||
"{}\n{}",
|
||||
TableRouteKey::new(table_id),
|
||||
json_formatter(self.pretty, &table_route)
|
||||
json_fromatter(self.pretty, &table_route)
|
||||
);
|
||||
} else {
|
||||
println!("Table route not found");
|
||||
|
||||
@@ -27,7 +27,7 @@ pub fn decode_key_value(kv: KeyValue) -> CommonMetaResult<(String, String)> {
|
||||
}
|
||||
|
||||
/// Formats a value as a JSON string.
|
||||
pub fn json_formatter<T>(pretty: bool, value: &T) -> String
|
||||
pub fn json_fromatter<T>(pretty: bool, value: &T) -> String
|
||||
where
|
||||
T: Serialize,
|
||||
{
|
||||
|
||||
@@ -41,6 +41,7 @@ use frontend::server::Services;
|
||||
use meta_client::{MetaClientOptions, MetaClientType};
|
||||
use servers::addrs;
|
||||
use servers::export_metrics::ExportMetricsTask;
|
||||
use servers::grpc::GrpcOptions;
|
||||
use servers::tls::{TlsMode, TlsOption};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
@@ -144,6 +145,14 @@ pub struct StartCommand {
|
||||
/// on the host, with the same port number as the one specified in `rpc_bind_addr`.
|
||||
#[clap(long, alias = "rpc-hostname")]
|
||||
rpc_server_addr: Option<String>,
|
||||
/// The address to bind the internal gRPC server.
|
||||
#[clap(long, alias = "internal-rpc-addr")]
|
||||
internal_rpc_bind_addr: Option<String>,
|
||||
/// The address advertised to the metasrv, and used for connections from outside the host.
|
||||
/// If left empty or unset, the server will automatically use the IP address of the first network interface
|
||||
/// on the host, with the same port number as the one specified in `internal_rpc_bind_addr`.
|
||||
#[clap(long, alias = "internal-rpc-hostname")]
|
||||
internal_rpc_server_addr: Option<String>,
|
||||
#[clap(long)]
|
||||
http_addr: Option<String>,
|
||||
#[clap(long)]
|
||||
@@ -241,6 +250,31 @@ impl StartCommand {
|
||||
opts.grpc.server_addr.clone_from(addr);
|
||||
}
|
||||
|
||||
if let Some(addr) = &self.internal_rpc_bind_addr {
|
||||
if let Some(internal_grpc) = &mut opts.internal_grpc {
|
||||
internal_grpc.bind_addr = addr.to_string();
|
||||
} else {
|
||||
let grpc_options = GrpcOptions {
|
||||
bind_addr: addr.to_string(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
opts.internal_grpc = Some(grpc_options);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(addr) = &self.internal_rpc_server_addr {
|
||||
if let Some(internal_grpc) = &mut opts.internal_grpc {
|
||||
internal_grpc.server_addr = addr.to_string();
|
||||
} else {
|
||||
let grpc_options = GrpcOptions {
|
||||
server_addr: addr.to_string(),
|
||||
..Default::default()
|
||||
};
|
||||
opts.internal_grpc = Some(grpc_options);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(addr) = &self.mysql_addr {
|
||||
opts.mysql.enable = true;
|
||||
opts.mysql.addr.clone_from(addr);
|
||||
@@ -448,6 +482,8 @@ mod tests {
|
||||
http_addr: Some("127.0.0.1:1234".to_string()),
|
||||
mysql_addr: Some("127.0.0.1:5678".to_string()),
|
||||
postgres_addr: Some("127.0.0.1:5432".to_string()),
|
||||
internal_rpc_bind_addr: Some("127.0.0.1:4010".to_string()),
|
||||
internal_rpc_server_addr: Some("10.0.0.24:4010".to_string()),
|
||||
influxdb_enable: Some(false),
|
||||
disable_dashboard: Some(false),
|
||||
..Default::default()
|
||||
@@ -460,6 +496,10 @@ mod tests {
|
||||
assert_eq!(opts.mysql.addr, "127.0.0.1:5678");
|
||||
assert_eq!(opts.postgres.addr, "127.0.0.1:5432");
|
||||
|
||||
let internal_grpc = opts.internal_grpc.as_ref().unwrap();
|
||||
assert_eq!(internal_grpc.bind_addr, "127.0.0.1:4010");
|
||||
assert_eq!(internal_grpc.server_addr, "10.0.0.24:4010");
|
||||
|
||||
let default_opts = FrontendOptions::default().component;
|
||||
|
||||
assert_eq!(opts.grpc.bind_addr, default_opts.grpc.bind_addr);
|
||||
|
||||
@@ -20,7 +20,6 @@ use async_trait::async_trait;
|
||||
use clap::Parser;
|
||||
use common_base::Plugins;
|
||||
use common_config::Configurable;
|
||||
use common_meta::distributed_time_constants::init_distributed_time_constants;
|
||||
use common_telemetry::info;
|
||||
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
|
||||
use common_version::{short_version, verbose_version};
|
||||
@@ -328,7 +327,6 @@ impl StartCommand {
|
||||
log_versions(verbose_version(), short_version(), APP_NAME);
|
||||
maybe_activate_heap_profile(&opts.component.memory);
|
||||
create_resource_limit_metrics(APP_NAME);
|
||||
init_distributed_time_constants(opts.component.heartbeat_interval);
|
||||
|
||||
info!("Metasrv start command: {:#?}", self);
|
||||
|
||||
|
||||
@@ -51,6 +51,7 @@ fn test_load_datanode_example_config() {
|
||||
meta_client: Some(MetaClientOptions {
|
||||
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
|
||||
timeout: Duration::from_secs(3),
|
||||
heartbeat_timeout: Duration::from_millis(500),
|
||||
ddl_timeout: Duration::from_secs(10),
|
||||
connect_timeout: Duration::from_secs(1),
|
||||
tcp_nodelay: true,
|
||||
@@ -115,6 +116,7 @@ fn test_load_frontend_example_config() {
|
||||
meta_client: Some(MetaClientOptions {
|
||||
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
|
||||
timeout: Duration::from_secs(3),
|
||||
heartbeat_timeout: Duration::from_millis(500),
|
||||
ddl_timeout: Duration::from_secs(10),
|
||||
connect_timeout: Duration::from_secs(1),
|
||||
tcp_nodelay: true,
|
||||
@@ -238,6 +240,7 @@ fn test_load_flownode_example_config() {
|
||||
meta_client: Some(MetaClientOptions {
|
||||
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
|
||||
timeout: Duration::from_secs(3),
|
||||
heartbeat_timeout: Duration::from_millis(500),
|
||||
ddl_timeout: Duration::from_secs(10),
|
||||
connect_timeout: Duration::from_secs(1),
|
||||
tcp_nodelay: true,
|
||||
|
||||
@@ -332,7 +332,7 @@ impl AggregateUDFImpl for StateWrapper {
|
||||
self.inner.signature()
|
||||
}
|
||||
|
||||
/// Coerce types also do nothing, as optimizer should be able to already make struct types
|
||||
/// Coerce types also do nothing, as optimzer should be able to already make struct types
|
||||
fn coerce_types(&self, arg_types: &[DataType]) -> datafusion_common::Result<Vec<DataType>> {
|
||||
self.inner.coerce_types(arg_types)
|
||||
}
|
||||
@@ -486,7 +486,7 @@ impl AggregateUDFImpl for MergeWrapper {
|
||||
&self.merge_signature
|
||||
}
|
||||
|
||||
/// Coerce types also do nothing, as optimizer should be able to already make struct types
|
||||
/// Coerce types also do nothing, as optimzer should be able to already make struct types
|
||||
fn coerce_types(&self, arg_types: &[DataType]) -> datafusion_common::Result<Vec<DataType>> {
|
||||
// just check if the arg_types are only one and is struct array
|
||||
if arg_types.len() != 1 || !matches!(arg_types.first(), Some(DataType::Struct(_))) {
|
||||
|
||||
@@ -12,10 +12,25 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::OnceLock;
|
||||
use std::time::Duration;
|
||||
|
||||
pub const BASE_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(3);
|
||||
/// Heartbeat interval time (is the basic unit of various time).
|
||||
pub const HEARTBEAT_INTERVAL_MILLIS: u64 = 3000;
|
||||
|
||||
/// The frontend will also send heartbeats to Metasrv, sending an empty
|
||||
/// heartbeat every HEARTBEAT_INTERVAL_MILLIS * 6 seconds.
|
||||
pub const FRONTEND_HEARTBEAT_INTERVAL_MILLIS: u64 = HEARTBEAT_INTERVAL_MILLIS * 6;
|
||||
|
||||
/// The lease seconds of a region. It's set by 3 heartbeat intervals
|
||||
/// (HEARTBEAT_INTERVAL_MILLIS × 3), plus some extra buffer (1 second).
|
||||
pub const REGION_LEASE_SECS: u64 =
|
||||
Duration::from_millis(HEARTBEAT_INTERVAL_MILLIS * 3).as_secs() + 1;
|
||||
|
||||
/// When creating table or region failover, a target node needs to be selected.
|
||||
/// If the node's lease has expired, the `Selector` will not select it.
|
||||
pub const DATANODE_LEASE_SECS: u64 = REGION_LEASE_SECS;
|
||||
|
||||
pub const FLOWNODE_LEASE_SECS: u64 = DATANODE_LEASE_SECS;
|
||||
|
||||
/// The lease seconds of metasrv leader.
|
||||
pub const META_LEASE_SECS: u64 = 5;
|
||||
@@ -26,73 +41,5 @@ pub const POSTGRES_KEEP_ALIVE_SECS: u64 = 30;
|
||||
/// In a lease, there are two opportunities for renewal.
|
||||
pub const META_KEEP_ALIVE_INTERVAL_SECS: u64 = META_LEASE_SECS / 2;
|
||||
|
||||
/// The timeout of the heartbeat request.
|
||||
pub const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1);
|
||||
|
||||
/// The keep-alive interval of the heartbeat channel.
|
||||
pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS: Duration =
|
||||
Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1);
|
||||
|
||||
/// The keep-alive timeout of the heartbeat channel.
|
||||
pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS: Duration =
|
||||
Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1);
|
||||
|
||||
/// The default mailbox round-trip timeout.
|
||||
pub const MAILBOX_RTT_SECS: u64 = 1;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
/// The distributed time constants.
|
||||
pub struct DistributedTimeConstants {
|
||||
pub heartbeat_interval: Duration,
|
||||
pub frontend_heartbeat_interval: Duration,
|
||||
pub region_lease: Duration,
|
||||
pub datanode_lease: Duration,
|
||||
pub flownode_lease: Duration,
|
||||
}
|
||||
|
||||
/// The frontend heartbeat interval is 6 times of the base heartbeat interval.
|
||||
pub fn frontend_heartbeat_interval(base_heartbeat_interval: Duration) -> Duration {
|
||||
base_heartbeat_interval * 6
|
||||
}
|
||||
|
||||
impl DistributedTimeConstants {
|
||||
/// Create a new DistributedTimeConstants from the heartbeat interval.
|
||||
pub fn from_heartbeat_interval(heartbeat_interval: Duration) -> Self {
|
||||
let region_lease = heartbeat_interval * 3 + Duration::from_secs(1);
|
||||
let datanode_lease = region_lease;
|
||||
let flownode_lease = datanode_lease;
|
||||
Self {
|
||||
heartbeat_interval,
|
||||
frontend_heartbeat_interval: frontend_heartbeat_interval(heartbeat_interval),
|
||||
region_lease,
|
||||
datanode_lease,
|
||||
flownode_lease,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for DistributedTimeConstants {
|
||||
fn default() -> Self {
|
||||
Self::from_heartbeat_interval(BASE_HEARTBEAT_INTERVAL)
|
||||
}
|
||||
}
|
||||
|
||||
static DEFAULT_DISTRIBUTED_TIME_CONSTANTS: OnceLock<DistributedTimeConstants> = OnceLock::new();
|
||||
|
||||
/// Get the default distributed time constants.
|
||||
pub fn default_distributed_time_constants() -> &'static DistributedTimeConstants {
|
||||
DEFAULT_DISTRIBUTED_TIME_CONSTANTS.get_or_init(Default::default)
|
||||
}
|
||||
|
||||
/// Initialize the default distributed time constants.
|
||||
pub fn init_distributed_time_constants(base_heartbeat_interval: Duration) {
|
||||
let distributed_time_constants =
|
||||
DistributedTimeConstants::from_heartbeat_interval(base_heartbeat_interval);
|
||||
DEFAULT_DISTRIBUTED_TIME_CONSTANTS
|
||||
.set(distributed_time_constants)
|
||||
.expect("Failed to set default distributed time constants");
|
||||
common_telemetry::info!(
|
||||
"Initialized default distributed time constants: {:#?}",
|
||||
distributed_time_constants
|
||||
);
|
||||
}
|
||||
|
||||
@@ -528,6 +528,9 @@ pub enum Error {
|
||||
source: common_wal::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to resolve Kafka broker endpoint."))]
|
||||
ResolveKafkaEndpoint { source: common_wal::error::Error },
|
||||
|
||||
#[snafu(display("Failed to build a Kafka controller client"))]
|
||||
BuildKafkaCtrlClient {
|
||||
#[snafu(implicit)]
|
||||
@@ -1037,6 +1040,7 @@ impl ErrorExt for Error {
|
||||
| BuildKafkaClient { .. }
|
||||
| BuildKafkaCtrlClient { .. }
|
||||
| KafkaPartitionClient { .. }
|
||||
| ResolveKafkaEndpoint { .. }
|
||||
| ProduceRecord { .. }
|
||||
| CreateKafkaWalTopic { .. }
|
||||
| EmptyTopicPool { .. }
|
||||
|
||||
@@ -25,7 +25,8 @@ use snafu::ResultExt;
|
||||
|
||||
use crate::error::{
|
||||
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu,
|
||||
KafkaGetOffsetSnafu, KafkaPartitionClientSnafu, ProduceRecordSnafu, Result, TlsConfigSnafu,
|
||||
KafkaGetOffsetSnafu, KafkaPartitionClientSnafu, ProduceRecordSnafu, ResolveKafkaEndpointSnafu,
|
||||
Result, TlsConfigSnafu,
|
||||
};
|
||||
|
||||
// Each topic only has one partition for now.
|
||||
@@ -208,10 +209,10 @@ impl KafkaTopicCreator {
|
||||
/// Builds a kafka [Client](rskafka::client::Client).
|
||||
pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result<Client> {
|
||||
// Builds an kafka controller client for creating topics.
|
||||
let mut builder = ClientBuilder::new(connection.broker_endpoints.clone())
|
||||
.backoff_config(DEFAULT_BACKOFF_CONFIG)
|
||||
.connect_timeout(Some(connection.connect_timeout))
|
||||
.timeout(Some(connection.timeout));
|
||||
let broker_endpoints = common_wal::resolve_to_ipv4(&connection.broker_endpoints)
|
||||
.await
|
||||
.context(ResolveKafkaEndpointSnafu)?;
|
||||
let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(DEFAULT_BACKOFF_CONFIG);
|
||||
if let Some(sasl) = &connection.sasl {
|
||||
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
|
||||
};
|
||||
|
||||
@@ -19,7 +19,7 @@ use opentelemetry::propagation::TextMapPropagator;
|
||||
use opentelemetry_sdk::propagation::TraceContextPropagator;
|
||||
use tracing_opentelemetry::OpenTelemetrySpanExt;
|
||||
|
||||
// An wrapper for `Futures` that provides tracing instrument adapters.
|
||||
// An wapper for `Futures` that provides tracing instrument adapters.
|
||||
pub trait FutureExt: std::future::Future + Sized {
|
||||
fn trace(self, span: tracing::span::Span) -> tracing::instrument::Instrumented<Self>;
|
||||
}
|
||||
|
||||
@@ -189,8 +189,6 @@ mod tests {
|
||||
client_cert_path: None,
|
||||
client_key_path: None,
|
||||
}),
|
||||
connect_timeout: Duration::from_secs(3),
|
||||
timeout: Duration::from_secs(3),
|
||||
},
|
||||
kafka_topic: KafkaTopicConfig {
|
||||
num_topics: 32,
|
||||
@@ -223,8 +221,6 @@ mod tests {
|
||||
client_cert_path: None,
|
||||
client_key_path: None,
|
||||
}),
|
||||
connect_timeout: Duration::from_secs(3),
|
||||
timeout: Duration::from_secs(3),
|
||||
},
|
||||
max_batch_bytes: ReadableSize::mb(1),
|
||||
consumer_wait_timeout: Duration::from_millis(100),
|
||||
|
||||
@@ -161,12 +161,6 @@ pub struct KafkaConnectionConfig {
|
||||
pub sasl: Option<KafkaClientSasl>,
|
||||
/// Client TLS config
|
||||
pub tls: Option<KafkaClientTls>,
|
||||
/// The connect timeout for kafka client.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub connect_timeout: Duration,
|
||||
/// The timeout for kafka client.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub timeout: Duration,
|
||||
}
|
||||
|
||||
impl Default for KafkaConnectionConfig {
|
||||
@@ -175,8 +169,6 @@ impl Default for KafkaConnectionConfig {
|
||||
broker_endpoints: vec![BROKER_ENDPOINT.to_string()],
|
||||
sasl: None,
|
||||
tls: None,
|
||||
connect_timeout: Duration::from_secs(3),
|
||||
timeout: Duration::from_secs(3),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -218,7 +218,6 @@ impl HeartbeatTask {
|
||||
if let Some(message) = message {
|
||||
Self::new_heartbeat_request(&heartbeat_request, Some(message), &latest_report)
|
||||
} else {
|
||||
warn!("Sender has been dropped, exiting the heartbeat loop");
|
||||
// Receives None that means Sender was dropped, we need to break the current loop
|
||||
break
|
||||
}
|
||||
@@ -260,11 +259,7 @@ impl HeartbeatTask {
|
||||
error!(e; "Error while handling heartbeat response");
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
warn!("Heartbeat response stream closed");
|
||||
capture_self.start_with_retry(retry_interval).await;
|
||||
break;
|
||||
}
|
||||
Ok(None) => break,
|
||||
Err(e) => {
|
||||
error!(e; "Occur error while reading heartbeat response");
|
||||
capture_self.start_with_retry(retry_interval).await;
|
||||
|
||||
@@ -71,6 +71,6 @@ pub struct LinearStagePlan {
|
||||
/// The key expressions to use for the lookup relation.
|
||||
pub lookup_key: Vec<ScalarExpr>,
|
||||
/// The closure to apply to the concatenation of the key columns,
|
||||
/// the stream value columns, and the lookup value columns.
|
||||
/// the stream value columns, and the lookup value colunms.
|
||||
pub closure: JoinFilter,
|
||||
}
|
||||
|
||||
@@ -47,6 +47,9 @@ pub struct FrontendOptions {
|
||||
pub heartbeat: HeartbeatOptions,
|
||||
pub http: HttpOptions,
|
||||
pub grpc: GrpcOptions,
|
||||
/// The internal gRPC options for the frontend service.
|
||||
/// it provide the same service as the public gRPC service, just only for internal use.
|
||||
pub internal_grpc: Option<GrpcOptions>,
|
||||
pub mysql: MysqlOptions,
|
||||
pub postgres: PostgresOptions,
|
||||
pub opentsdb: OpentsdbOptions,
|
||||
@@ -74,6 +77,7 @@ impl Default for FrontendOptions {
|
||||
heartbeat: HeartbeatOptions::frontend_default(),
|
||||
http: HttpOptions::default(),
|
||||
grpc: GrpcOptions::default(),
|
||||
internal_grpc: None,
|
||||
mysql: MysqlOptions::default(),
|
||||
postgres: PostgresOptions::default(),
|
||||
opentsdb: OpentsdbOptions::default(),
|
||||
|
||||
@@ -23,7 +23,7 @@ use common_meta::heartbeat::handler::{
|
||||
};
|
||||
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
|
||||
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
|
||||
use common_telemetry::{debug, error, info, warn};
|
||||
use common_telemetry::{debug, error, info};
|
||||
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
|
||||
use servers::addrs;
|
||||
use servers::heartbeat_options::HeartbeatOptions;
|
||||
@@ -42,8 +42,8 @@ use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
|
||||
pub struct HeartbeatTask {
|
||||
peer_addr: String,
|
||||
meta_client: Arc<MetaClient>,
|
||||
report_interval: Duration,
|
||||
retry_interval: Duration,
|
||||
report_interval: u64,
|
||||
retry_interval: u64,
|
||||
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
start_time_ms: u64,
|
||||
}
|
||||
@@ -56,10 +56,17 @@ impl HeartbeatTask {
|
||||
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
) -> Self {
|
||||
HeartbeatTask {
|
||||
peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
|
||||
// if internal grpc is configured, use its address as the peer address
|
||||
// otherwise use the public grpc address, because peer address only promises to be reachable
|
||||
// by other components, it doesn't matter whether it's internal or external
|
||||
peer_addr: if let Some(internal) = &opts.internal_grpc {
|
||||
addrs::resolve_addr(&internal.bind_addr, Some(&internal.server_addr))
|
||||
} else {
|
||||
addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr))
|
||||
},
|
||||
meta_client,
|
||||
report_interval: heartbeat_opts.interval,
|
||||
retry_interval: heartbeat_opts.retry_interval,
|
||||
report_interval: heartbeat_opts.interval.as_millis() as u64,
|
||||
retry_interval: heartbeat_opts.retry_interval.as_millis() as u64,
|
||||
resp_handler_executor,
|
||||
start_time_ms: common_time::util::current_time_millis() as u64,
|
||||
}
|
||||
@@ -103,15 +110,13 @@ impl HeartbeatTask {
|
||||
HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
warn!("Heartbeat response stream closed");
|
||||
capture_self.start_with_retry(retry_interval).await;
|
||||
break;
|
||||
}
|
||||
Ok(None) => break,
|
||||
Err(e) => {
|
||||
HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
|
||||
error!(e; "Occur error while reading heartbeat response");
|
||||
capture_self.start_with_retry(retry_interval).await;
|
||||
capture_self
|
||||
.start_with_retry(Duration::from_millis(retry_interval))
|
||||
.await;
|
||||
|
||||
break;
|
||||
}
|
||||
@@ -179,13 +184,12 @@ impl HeartbeatTask {
|
||||
if let Some(message) = message {
|
||||
Self::new_heartbeat_request(&heartbeat_request, Some(message))
|
||||
} else {
|
||||
warn!("Sender has been dropped, exiting the heartbeat loop");
|
||||
// Receives None that means Sender was dropped, we need to break the current loop
|
||||
break
|
||||
}
|
||||
}
|
||||
_ = &mut sleep => {
|
||||
sleep.as_mut().reset(Instant::now() + report_interval);
|
||||
sleep.as_mut().reset(Instant::now() + Duration::from_millis(report_interval));
|
||||
Self::new_heartbeat_request(&heartbeat_request, None)
|
||||
}
|
||||
};
|
||||
|
||||
@@ -18,6 +18,7 @@ use std::sync::Arc;
|
||||
use auth::UserProviderRef;
|
||||
use common_base::Plugins;
|
||||
use common_config::Configurable;
|
||||
use meta_client::MetaClientOptions;
|
||||
use servers::error::Error as ServerError;
|
||||
use servers::grpc::builder::GrpcServerBuilder;
|
||||
use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
|
||||
@@ -131,17 +132,28 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn build_grpc_server(&mut self, opts: &FrontendOptions) -> Result<GrpcServer> {
|
||||
fn build_grpc_server(
|
||||
&mut self,
|
||||
grpc: &GrpcOptions,
|
||||
meta_client: &Option<MetaClientOptions>,
|
||||
name: Option<String>,
|
||||
external: bool,
|
||||
) -> Result<GrpcServer> {
|
||||
let builder = if let Some(builder) = self.grpc_server_builder.take() {
|
||||
builder
|
||||
} else {
|
||||
self.grpc_server_builder(&opts.grpc)?
|
||||
self.grpc_server_builder(grpc)?
|
||||
};
|
||||
|
||||
let user_provider = self.plugins.get::<UserProviderRef>();
|
||||
let user_provider = if external {
|
||||
self.plugins.get::<UserProviderRef>()
|
||||
} else {
|
||||
// skip authentication for internal grpc port
|
||||
None
|
||||
};
|
||||
|
||||
// Determine whether it is Standalone or Distributed mode based on whether the meta client is configured.
|
||||
let runtime = if opts.meta_client.is_none() {
|
||||
let runtime = if meta_client.is_none() {
|
||||
Some(builder.runtime().clone())
|
||||
} else {
|
||||
None
|
||||
@@ -151,18 +163,25 @@ where
|
||||
ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
|
||||
user_provider.clone(),
|
||||
runtime,
|
||||
opts.grpc.flight_compression,
|
||||
grpc.flight_compression,
|
||||
);
|
||||
|
||||
let frontend_grpc_handler =
|
||||
FrontendGrpcHandler::new(self.instance.process_manager().clone());
|
||||
let grpc_server = builder
|
||||
.name(name)
|
||||
.database_handler(greptime_request_handler.clone())
|
||||
.prometheus_handler(self.instance.clone(), user_provider.clone())
|
||||
.otel_arrow_handler(OtelArrowServiceHandler::new(self.instance.clone()))
|
||||
.flight_handler(Arc::new(greptime_request_handler))
|
||||
.frontend_grpc_handler(frontend_grpc_handler)
|
||||
.build();
|
||||
.flight_handler(Arc::new(greptime_request_handler));
|
||||
|
||||
let grpc_server = if external {
|
||||
let frontend_grpc_handler =
|
||||
FrontendGrpcHandler::new(self.instance.process_manager().clone());
|
||||
grpc_server.frontend_grpc_handler(frontend_grpc_handler)
|
||||
} else {
|
||||
grpc_server
|
||||
}
|
||||
.build();
|
||||
|
||||
Ok(grpc_server)
|
||||
}
|
||||
|
||||
@@ -195,7 +214,19 @@ where
|
||||
{
|
||||
// Always init GRPC server
|
||||
let grpc_addr = parse_addr(&opts.grpc.bind_addr)?;
|
||||
let grpc_server = self.build_grpc_server(&opts)?;
|
||||
let grpc_server = self.build_grpc_server(&opts.grpc, &opts.meta_client, None, true)?;
|
||||
handlers.insert((Box::new(grpc_server), grpc_addr));
|
||||
}
|
||||
|
||||
if let Some(internal_grpc) = &opts.internal_grpc {
|
||||
// Always init Internal GRPC server
|
||||
let grpc_addr = parse_addr(&internal_grpc.bind_addr)?;
|
||||
let grpc_server = self.build_grpc_server(
|
||||
internal_grpc,
|
||||
&opts.meta_client,
|
||||
Some("INTERNAL_GRPC_SERVER".to_string()),
|
||||
false,
|
||||
)?;
|
||||
handlers.insert((Box::new(grpc_server), grpc_addr));
|
||||
}
|
||||
|
||||
|
||||
@@ -139,6 +139,9 @@ pub enum Error {
|
||||
error: rskafka::client::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to resolve Kafka broker endpoint."))]
|
||||
ResolveKafkaEndpoint { source: common_wal::error::Error },
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to build a Kafka partition client, topic: {}, partition: {}",
|
||||
topic,
|
||||
@@ -340,6 +343,7 @@ impl ErrorExt for Error {
|
||||
StartWalTask { .. }
|
||||
| StopWalTask { .. }
|
||||
| IllegalState { .. }
|
||||
| ResolveKafkaEndpoint { .. }
|
||||
| NoMaxValue { .. }
|
||||
| Cast { .. }
|
||||
| EncodeJson { .. }
|
||||
|
||||
@@ -24,7 +24,9 @@ use snafu::ResultExt;
|
||||
use store_api::logstore::provider::KafkaProvider;
|
||||
use tokio::sync::{Mutex, RwLock};
|
||||
|
||||
use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result, TlsConfigSnafu};
|
||||
use crate::error::{
|
||||
BuildClientSnafu, BuildPartitionClientSnafu, ResolveKafkaEndpointSnafu, Result, TlsConfigSnafu,
|
||||
};
|
||||
use crate::kafka::index::{GlobalIndexCollector, NoopCollector};
|
||||
use crate::kafka::producer::{OrderedBatchProducer, OrderedBatchProducerRef};
|
||||
|
||||
@@ -76,10 +78,11 @@ impl ClientManager {
|
||||
high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
|
||||
) -> Result<Self> {
|
||||
// Sets backoff config for the top-level kafka client and all clients constructed by it.
|
||||
let mut builder = ClientBuilder::new(config.connection.broker_endpoints.clone())
|
||||
.backoff_config(DEFAULT_BACKOFF_CONFIG)
|
||||
.connect_timeout(Some(config.connection.connect_timeout))
|
||||
.timeout(Some(config.connection.timeout));
|
||||
let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints)
|
||||
.await
|
||||
.context(ResolveKafkaEndpointSnafu)?;
|
||||
let mut builder =
|
||||
ClientBuilder::new(broker_endpoints).backoff_config(DEFAULT_BACKOFF_CONFIG);
|
||||
if let Some(sasl) = &config.connection.sasl {
|
||||
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
|
||||
};
|
||||
|
||||
@@ -186,9 +186,6 @@ impl MetaClientBuilder {
|
||||
let mgr = client.channel_manager.clone();
|
||||
|
||||
if self.enable_heartbeat {
|
||||
if self.heartbeat_channel_manager.is_some() {
|
||||
info!("Enable heartbeat channel using the heartbeat channel manager.");
|
||||
}
|
||||
let mgr = self.heartbeat_channel_manager.unwrap_or(mgr.clone());
|
||||
client.heartbeat = Some(HeartbeatClient::new(
|
||||
self.id,
|
||||
@@ -528,7 +525,7 @@ impl MetaClient {
|
||||
self.heartbeat_client()?.ask_leader().await
|
||||
}
|
||||
|
||||
/// Returns a heartbeat bidirectional streaming: (sender, receiver), the
|
||||
/// Returns a heartbeat bidirectional streaming: (sender, recever), the
|
||||
/// other end is the leader of `metasrv`.
|
||||
///
|
||||
/// The `datanode` needs to use the sender to continuously send heartbeat
|
||||
|
||||
@@ -24,7 +24,7 @@ use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
|
||||
use common_telemetry::tracing_context::TracingContext;
|
||||
use common_telemetry::warn;
|
||||
use rand::seq::SliceRandom;
|
||||
use snafu::ResultExt;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use tokio::time::timeout;
|
||||
use tonic::transport::Channel;
|
||||
|
||||
@@ -101,14 +101,12 @@ impl AskLeader {
|
||||
};
|
||||
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel(peers.len());
|
||||
let channel_manager = self.channel_manager.clone();
|
||||
|
||||
for addr in &peers {
|
||||
let mut client = self.create_asker(addr)?;
|
||||
let tx_clone = tx.clone();
|
||||
let req = req.clone();
|
||||
let addr = addr.clone();
|
||||
let channel_manager = channel_manager.clone();
|
||||
let addr = addr.to_string();
|
||||
tokio::spawn(async move {
|
||||
match client.ask_leader(req).await {
|
||||
Ok(res) => {
|
||||
@@ -119,19 +117,13 @@ impl AskLeader {
|
||||
};
|
||||
}
|
||||
Err(status) => {
|
||||
// Reset cached channel even on generic errors: the VIP may keep us on a dead
|
||||
// backend, so forcing a reconnect gives us a chance to hit a healthy peer.
|
||||
Self::reset_channels_with_manager(
|
||||
&channel_manager,
|
||||
std::slice::from_ref(&addr),
|
||||
);
|
||||
warn!("Failed to ask leader from: {addr}, {status}");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let leader = match timeout(
|
||||
let leader = timeout(
|
||||
self.channel_manager
|
||||
.config()
|
||||
.timeout
|
||||
@@ -139,16 +131,8 @@ impl AskLeader {
|
||||
rx.recv(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Some(leader)) => leader,
|
||||
Ok(None) => return error::NoLeaderSnafu.fail(),
|
||||
Err(e) => {
|
||||
// All peers timed out. Reset channels to force reconnection,
|
||||
// which may help escape dead backends in VIP/LB scenarios.
|
||||
Self::reset_channels_with_manager(&self.channel_manager, &peers);
|
||||
return Err(e).context(error::AskLeaderTimeoutSnafu);
|
||||
}
|
||||
};
|
||||
.context(error::AskLeaderTimeoutSnafu)?
|
||||
.context(error::NoLeaderSnafu)?;
|
||||
|
||||
let mut leadership_group = self.leadership_group.write().unwrap();
|
||||
leadership_group.leader = Some(leader.clone());
|
||||
@@ -185,15 +169,6 @@ impl AskLeader {
|
||||
.context(error::CreateChannelSnafu)?,
|
||||
))
|
||||
}
|
||||
|
||||
/// Drop cached channels for the given peers so a fresh connection is used next time.
|
||||
fn reset_channels_with_manager(channel_manager: &ChannelManager, peers: &[String]) {
|
||||
if peers.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
channel_manager.retain_channel(|addr, _| !peers.iter().any(|peer| peer == addr));
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
||||
@@ -18,10 +18,6 @@ use std::time::Duration;
|
||||
use client::RegionFollowerClientRef;
|
||||
use common_base::Plugins;
|
||||
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
|
||||
use common_meta::distributed_time_constants::{
|
||||
HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS, HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS,
|
||||
HEARTBEAT_TIMEOUT,
|
||||
};
|
||||
use common_telemetry::{debug, info};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@@ -38,6 +34,8 @@ pub struct MetaClientOptions {
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub timeout: Duration,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub heartbeat_timeout: Duration,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub ddl_timeout: Duration,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub connect_timeout: Duration,
|
||||
@@ -54,6 +52,7 @@ impl Default for MetaClientOptions {
|
||||
Self {
|
||||
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
|
||||
timeout: Duration::from_millis(3_000u64),
|
||||
heartbeat_timeout: Duration::from_millis(500u64),
|
||||
ddl_timeout: Duration::from_millis(10_000u64),
|
||||
connect_timeout: Duration::from_millis(1_000u64),
|
||||
tcp_nodelay: true,
|
||||
@@ -98,11 +97,7 @@ pub async fn create_meta_client(
|
||||
.timeout(meta_client_options.timeout)
|
||||
.connect_timeout(meta_client_options.connect_timeout)
|
||||
.tcp_nodelay(meta_client_options.tcp_nodelay);
|
||||
let heartbeat_config = base_config
|
||||
.clone()
|
||||
.timeout(HEARTBEAT_TIMEOUT)
|
||||
.http2_keep_alive_interval(HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS)
|
||||
.http2_keep_alive_timeout(HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS);
|
||||
let heartbeat_config = base_config.clone();
|
||||
|
||||
if let MetaClientType::Frontend = client_type {
|
||||
let ddl_config = base_config.clone().timeout(meta_client_options.ddl_timeout);
|
||||
|
||||
@@ -40,7 +40,7 @@ use common_telemetry::info;
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
use deadpool_postgres::{Config, Runtime};
|
||||
use either::Either;
|
||||
use etcd_client::{Client, ConnectOptions};
|
||||
use etcd_client::Client;
|
||||
use servers::configurator::ConfiguratorRef;
|
||||
use servers::export_metrics::ExportMetricsTask;
|
||||
use servers::http::{HttpServer, HttpServerBuilder};
|
||||
@@ -70,9 +70,7 @@ use crate::election::rds::postgres::PgElection;
|
||||
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
|
||||
use crate::election::CANDIDATE_LEASE_SECS;
|
||||
use crate::metasrv::builder::MetasrvBuilder;
|
||||
use crate::metasrv::{
|
||||
BackendClientOptions, BackendImpl, Metasrv, MetasrvOptions, SelectTarget, SelectorRef,
|
||||
};
|
||||
use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectTarget, SelectorRef};
|
||||
use crate::node_excluder::NodeExcluderRef;
|
||||
use crate::selector::lease_based::LeaseBasedSelector;
|
||||
use crate::selector::load_based::LoadBasedSelector;
|
||||
@@ -272,12 +270,7 @@ macro_rules! add_compressed_service {
|
||||
}
|
||||
|
||||
pub fn router(metasrv: Arc<Metasrv>) -> Router {
|
||||
let mut router = tonic::transport::Server::builder()
|
||||
// for admin services
|
||||
.accept_http1(true)
|
||||
// For quick network failures detection.
|
||||
.http2_keepalive_interval(Some(metasrv.options().grpc.http2_keep_alive_interval))
|
||||
.http2_keepalive_timeout(Some(metasrv.options().grpc.http2_keep_alive_timeout));
|
||||
let mut router = tonic::transport::Server::builder().accept_http1(true); // for admin services
|
||||
let router = add_compressed_service!(router, HeartbeatServer::from_arc(metasrv.clone()));
|
||||
let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone()));
|
||||
let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone()));
|
||||
@@ -294,7 +287,7 @@ pub async fn metasrv_builder(
|
||||
(Some(kv_backend), _) => (kv_backend, None),
|
||||
(None, BackendImpl::MemoryStore) => (Arc::new(MemoryKvBackend::new()) as _, None),
|
||||
(None, BackendImpl::EtcdStore) => {
|
||||
let etcd_client = create_etcd_client(&opts.store_addrs, &opts.backend_client).await?;
|
||||
let etcd_client = create_etcd_client(&opts.store_addrs).await?;
|
||||
let kv_backend = EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops);
|
||||
let election = EtcdElection::with_etcd_client(
|
||||
&opts.grpc.server_addr,
|
||||
@@ -442,20 +435,13 @@ pub async fn metasrv_builder(
|
||||
.plugins(plugins))
|
||||
}
|
||||
|
||||
pub async fn create_etcd_client(
|
||||
store_addrs: &[String],
|
||||
options: &BackendClientOptions,
|
||||
) -> Result<Client> {
|
||||
pub async fn create_etcd_client(store_addrs: &[String]) -> Result<Client> {
|
||||
let etcd_endpoints = store_addrs
|
||||
.iter()
|
||||
.map(|x| x.trim())
|
||||
.filter(|x| !x.is_empty())
|
||||
.collect::<Vec<_>>();
|
||||
let options = ConnectOptions::new()
|
||||
.with_keep_alive_while_idle(true)
|
||||
.with_keep_alive(options.keep_alive_interval, options.keep_alive_timeout)
|
||||
.with_connect_timeout(options.connect_timeout);
|
||||
Client::connect(&etcd_endpoints, Some(options))
|
||||
Client::connect(&etcd_endpoints, None)
|
||||
.await
|
||||
.context(error::ConnectEtcdSnafu)
|
||||
}
|
||||
|
||||
@@ -63,6 +63,22 @@ pub struct EtcdElection {
|
||||
}
|
||||
|
||||
impl EtcdElection {
|
||||
pub async fn with_endpoints<E, S>(
|
||||
leader_value: E,
|
||||
endpoints: S,
|
||||
store_key_prefix: String,
|
||||
) -> Result<ElectionRef>
|
||||
where
|
||||
E: AsRef<str>,
|
||||
S: AsRef<[E]>,
|
||||
{
|
||||
let client = Client::connect(endpoints, None)
|
||||
.await
|
||||
.context(error::ConnectEtcdSnafu)?;
|
||||
|
||||
Self::with_etcd_client(leader_value, client, store_key_prefix).await
|
||||
}
|
||||
|
||||
pub async fn with_etcd_client<E>(
|
||||
leader_value: E,
|
||||
client: Client,
|
||||
|
||||
@@ -1190,7 +1190,7 @@ mod tests {
|
||||
));
|
||||
handles.push(handle);
|
||||
}
|
||||
// Wait for candidates to register themselves and renew their leases at least once.
|
||||
// Wait for candidates to registrate themselves and renew their leases at least once.
|
||||
tokio::time::sleep(candidate_lease_ttl / 2 + Duration::from_secs(1)).await;
|
||||
|
||||
let (tx, _) = broadcast::channel(100);
|
||||
|
||||
@@ -1012,7 +1012,7 @@ mod tests {
|
||||
));
|
||||
handles.push(handle);
|
||||
}
|
||||
// Wait for candidates to register themselves and renew their leases at least once.
|
||||
// Wait for candidates to registrate themselves and renew their leases at least once.
|
||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||
|
||||
let (tx, _) = broadcast::channel(100);
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
use std::collections::VecDeque;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_meta::distributed_time_constants;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// This is our port of Akka's "[PhiAccrualFailureDetector](https://github.com/akka/akka/blob/v2.6.21/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala)"
|
||||
@@ -82,7 +83,9 @@ impl Default for PhiAccrualFailureDetectorOptions {
|
||||
Self {
|
||||
threshold: 8_f32,
|
||||
min_std_deviation: Duration::from_millis(100),
|
||||
acceptable_heartbeat_pause: Duration::from_secs(10),
|
||||
acceptable_heartbeat_pause: Duration::from_secs(
|
||||
distributed_time_constants::DATANODE_LEASE_SECS,
|
||||
),
|
||||
first_heartbeat_estimate: Duration::from_millis(1000),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -268,15 +268,6 @@ impl Pushers {
|
||||
async fn remove(&self, pusher_id: &str) -> Option<Pusher> {
|
||||
self.0.write().await.remove(pusher_id)
|
||||
}
|
||||
|
||||
pub(crate) async fn clear(&self) -> Vec<String> {
|
||||
let mut pushers = self.0.write().await;
|
||||
let keys = pushers.keys().cloned().collect::<Vec<_>>();
|
||||
if !keys.is_empty() {
|
||||
pushers.clear();
|
||||
}
|
||||
keys
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -313,11 +304,10 @@ impl HeartbeatHandlerGroup {
|
||||
/// Deregisters the heartbeat response [`Pusher`] with the given key from the group.
|
||||
///
|
||||
/// Returns the [`Pusher`] if it exists.
|
||||
pub async fn deregister_push(&self, pusher_id: PusherId) {
|
||||
pub async fn deregister_push(&self, pusher_id: PusherId) -> Option<Pusher> {
|
||||
METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
|
||||
info!("Pusher unregister: {}", pusher_id);
|
||||
if self.pushers.remove(&pusher_id.string_key()).await.is_some() {
|
||||
METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
|
||||
}
|
||||
self.pushers.remove(&pusher_id.string_key()).await
|
||||
}
|
||||
|
||||
/// Returns the [`Pushers`] of the group.
|
||||
@@ -526,14 +516,6 @@ impl Mailbox for HeartbeatMailbox {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn reset(&self) {
|
||||
let keys = self.pushers.clear().await;
|
||||
if !keys.is_empty() {
|
||||
info!("Reset mailbox, deregister pushers: {:?}", keys);
|
||||
METRIC_META_HEARTBEAT_CONNECTION_NUM.sub(keys.len() as i64);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The builder to build the group of heartbeat handlers.
|
||||
|
||||
@@ -124,7 +124,7 @@ mod test {
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
|
||||
use common_meta::distributed_time_constants::default_distributed_time_constants;
|
||||
use common_meta::distributed_time_constants;
|
||||
use common_meta::key::table_route::TableRouteValue;
|
||||
use common_meta::key::test_utils::new_test_table_info;
|
||||
use common_meta::key::TableMetadataManager;
|
||||
@@ -223,7 +223,7 @@ mod test {
|
||||
let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());
|
||||
|
||||
let handler = RegionLeaseHandler::new(
|
||||
default_distributed_time_constants().region_lease.as_secs() as u64,
|
||||
distributed_time_constants::REGION_LEASE_SECS,
|
||||
table_metadata_manager.clone(),
|
||||
opening_region_keeper.clone(),
|
||||
None,
|
||||
@@ -253,7 +253,7 @@ mod test {
|
||||
|
||||
assert_eq!(
|
||||
acc.region_lease.as_ref().unwrap().lease_seconds,
|
||||
default_distributed_time_constants().region_lease.as_secs()
|
||||
distributed_time_constants::REGION_LEASE_SECS
|
||||
);
|
||||
|
||||
assert_region_lease(
|
||||
@@ -287,7 +287,7 @@ mod test {
|
||||
|
||||
assert_eq!(
|
||||
acc.region_lease.as_ref().unwrap().lease_seconds,
|
||||
default_distributed_time_constants().region_lease.as_secs()
|
||||
distributed_time_constants::REGION_LEASE_SECS
|
||||
);
|
||||
|
||||
assert_region_lease(
|
||||
@@ -366,7 +366,7 @@ mod test {
|
||||
});
|
||||
|
||||
let handler = RegionLeaseHandler::new(
|
||||
default_distributed_time_constants().region_lease.as_secs(),
|
||||
distributed_time_constants::REGION_LEASE_SECS,
|
||||
table_metadata_manager.clone(),
|
||||
Default::default(),
|
||||
None,
|
||||
|
||||
@@ -21,7 +21,7 @@ use std::task::{Context, Poll};
|
||||
use api::v1::meta::heartbeat_request::NodeWorkloads;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::cluster::{NodeInfo, NodeInfoKey, Role as ClusterRole};
|
||||
use common_meta::distributed_time_constants::default_distributed_time_constants;
|
||||
use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS;
|
||||
use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef};
|
||||
use common_meta::peer::{Peer, PeerLookupService};
|
||||
use common_meta::rpc::store::RangeRequest;
|
||||
@@ -312,9 +312,7 @@ impl PeerLookupService for MetaPeerLookupService {
|
||||
lookup_frontends(
|
||||
&self.meta_peer_client,
|
||||
// TODO(zyy17): How to get the heartbeat interval of the frontend if it uses a custom heartbeat interval?
|
||||
default_distributed_time_constants()
|
||||
.frontend_heartbeat_interval
|
||||
.as_secs(),
|
||||
FRONTEND_HEARTBEAT_INTERVAL_MILLIS,
|
||||
)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
|
||||
@@ -27,7 +27,7 @@ use common_event_recorder::EventRecorderOptions;
|
||||
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
|
||||
use common_meta::cache_invalidator::CacheInvalidatorRef;
|
||||
use common_meta::ddl_manager::DdlManagerRef;
|
||||
use common_meta::distributed_time_constants::{self, default_distributed_time_constants};
|
||||
use common_meta::distributed_time_constants;
|
||||
use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
|
||||
use common_meta::key::TableMetadataManagerRef;
|
||||
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
|
||||
@@ -98,27 +98,6 @@ pub enum BackendImpl {
|
||||
MysqlStore,
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
|
||||
#[serde(default)]
|
||||
pub struct BackendClientOptions {
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub keep_alive_timeout: Duration,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub keep_alive_interval: Duration,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub connect_timeout: Duration,
|
||||
}
|
||||
|
||||
impl Default for BackendClientOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
keep_alive_interval: Duration::from_secs(10),
|
||||
keep_alive_timeout: Duration::from_secs(3),
|
||||
connect_timeout: Duration::from_secs(3),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct MetasrvOptions {
|
||||
@@ -134,22 +113,12 @@ pub struct MetasrvOptions {
|
||||
/// Only applicable when using PostgreSQL or MySQL as the metadata store
|
||||
#[serde(default)]
|
||||
pub backend_tls: Option<TlsOption>,
|
||||
/// The backend client options.
|
||||
/// Currently, only applicable when using etcd as the metadata store.
|
||||
#[serde(default)]
|
||||
pub backend_client: BackendClientOptions,
|
||||
/// The type of selector.
|
||||
pub selector: SelectorType,
|
||||
/// Whether to use the memory store.
|
||||
pub use_memory_store: bool,
|
||||
/// Whether to enable region failover.
|
||||
pub enable_region_failover: bool,
|
||||
/// The base heartbeat interval.
|
||||
///
|
||||
/// This value is used to calculate the distributed time constants for components.
|
||||
/// e.g., the region lease time is `heartbeat_interval * 3 + Duration::from_secs(1)`.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub heartbeat_interval: Duration,
|
||||
/// The delay before starting region failure detection.
|
||||
/// This delay helps prevent Metasrv from triggering unnecessary region failovers before all Datanodes are fully started.
|
||||
/// Especially useful when the cluster is not deployed with GreptimeDB Operator and maintenance mode is not enabled.
|
||||
@@ -242,9 +211,7 @@ impl fmt::Debug for MetasrvOptions {
|
||||
.field("max_txn_ops", &self.max_txn_ops)
|
||||
.field("flush_stats_factor", &self.flush_stats_factor)
|
||||
.field("tracing", &self.tracing)
|
||||
.field("backend", &self.backend)
|
||||
.field("heartbeat_interval", &self.heartbeat_interval)
|
||||
.field("backend_client", &self.backend_client);
|
||||
.field("backend", &self.backend);
|
||||
|
||||
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
|
||||
debug_struct.field("meta_table_name", &self.meta_table_name);
|
||||
@@ -272,7 +239,6 @@ impl Default for MetasrvOptions {
|
||||
selector: SelectorType::default(),
|
||||
use_memory_store: false,
|
||||
enable_region_failover: false,
|
||||
heartbeat_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
|
||||
region_failure_detector_initialization_delay: Duration::from_secs(10 * 60),
|
||||
allow_region_failover_on_local_wal: false,
|
||||
grpc: GrpcOptions {
|
||||
@@ -307,7 +273,6 @@ impl Default for MetasrvOptions {
|
||||
meta_election_lock_id: common_meta::kv_backend::DEFAULT_META_ELECTION_LOCK_ID,
|
||||
node_max_idle_time: Duration::from_secs(24 * 60 * 60),
|
||||
event_recorder: EventRecorderOptions::default(),
|
||||
backend_client: BackendClientOptions::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -423,7 +388,6 @@ pub struct MetaStateHandler {
|
||||
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
|
||||
leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
|
||||
leadership_change_notifier: LeadershipChangeNotifier,
|
||||
mailbox: MailboxRef,
|
||||
state: StateRef,
|
||||
}
|
||||
|
||||
@@ -447,9 +411,6 @@ impl MetaStateHandler {
|
||||
pub async fn on_leader_stop(&self) {
|
||||
self.state.write().unwrap().next_state(become_follower());
|
||||
|
||||
// Enforces the mailbox to clear all pushers.
|
||||
// The remaining heartbeat connections will be closed by the remote peer or keep-alive detection.
|
||||
self.mailbox.reset().await;
|
||||
self.leadership_change_notifier
|
||||
.notify_on_leader_stop()
|
||||
.await;
|
||||
@@ -567,7 +528,6 @@ impl Metasrv {
|
||||
state: self.state.clone(),
|
||||
leader_cached_kv_backend: leader_cached_kv_backend.clone(),
|
||||
leadership_change_notifier,
|
||||
mailbox: self.mailbox.clone(),
|
||||
};
|
||||
let _handle = common_runtime::spawn_global(async move {
|
||||
loop {
|
||||
@@ -695,9 +655,7 @@ impl Metasrv {
|
||||
lookup_datanode_peer(
|
||||
peer_id,
|
||||
&self.meta_peer_client,
|
||||
default_distributed_time_constants()
|
||||
.datanode_lease
|
||||
.as_secs(),
|
||||
distributed_time_constants::DATANODE_LEASE_SECS,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ use common_meta::ddl::{
|
||||
DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef,
|
||||
};
|
||||
use common_meta::ddl_manager::DdlManager;
|
||||
use common_meta::distributed_time_constants::default_distributed_time_constants;
|
||||
use common_meta::distributed_time_constants;
|
||||
use common_meta::key::flow::flow_state::FlowStateManager;
|
||||
use common_meta::key::flow::FlowMetadataManager;
|
||||
use common_meta::key::runtime_switch::{RuntimeSwitchManager, RuntimeSwitchManagerRef};
|
||||
@@ -220,12 +220,8 @@ impl MetasrvBuilder {
|
||||
|
||||
let selector_ctx = SelectorContext {
|
||||
server_addr: options.grpc.server_addr.clone(),
|
||||
datanode_lease_secs: default_distributed_time_constants()
|
||||
.datanode_lease
|
||||
.as_secs(),
|
||||
flownode_lease_secs: default_distributed_time_constants()
|
||||
.flownode_lease
|
||||
.as_secs(),
|
||||
datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS,
|
||||
flownode_lease_secs: distributed_time_constants::FLOWNODE_LEASE_SECS,
|
||||
kv_backend: kv_backend.clone(),
|
||||
meta_peer_client: meta_peer_client.clone(),
|
||||
table_id: None,
|
||||
@@ -442,7 +438,7 @@ impl MetasrvBuilder {
|
||||
Some(handler_group_builder) => handler_group_builder,
|
||||
None => {
|
||||
let region_lease_handler = RegionLeaseHandler::new(
|
||||
default_distributed_time_constants().region_lease.as_secs(),
|
||||
distributed_time_constants::REGION_LEASE_SECS,
|
||||
table_metadata_manager.clone(),
|
||||
memory_region_keeper.clone(),
|
||||
customized_region_lease_renewer,
|
||||
|
||||
@@ -770,7 +770,7 @@ mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_meta::distributed_time_constants::default_distributed_time_constants;
|
||||
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
|
||||
use common_meta::instruction::Instruction;
|
||||
use common_meta::key::test_utils::new_test_table_info;
|
||||
use common_meta::rpc::router::{Region, RegionRoute};
|
||||
@@ -1004,10 +1004,8 @@ mod tests {
|
||||
.run_once()
|
||||
.await;
|
||||
|
||||
let region_lease = default_distributed_time_constants().region_lease.as_secs();
|
||||
|
||||
// Ensure it didn't run into the slow path.
|
||||
assert!(timer.elapsed().as_secs() < region_lease / 2);
|
||||
assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2);
|
||||
|
||||
runner.suite.verify_table_metadata().await;
|
||||
}
|
||||
@@ -1061,9 +1059,8 @@ mod tests {
|
||||
.run_once()
|
||||
.await;
|
||||
|
||||
let region_lease = default_distributed_time_constants().region_lease.as_secs();
|
||||
// Ensure it didn't run into the slow path.
|
||||
assert!(timer.elapsed().as_secs() < region_lease / 2);
|
||||
assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2);
|
||||
|
||||
runner.suite.verify_table_metadata().await;
|
||||
}
|
||||
@@ -1383,9 +1380,8 @@ mod tests {
|
||||
.run_once()
|
||||
.await;
|
||||
|
||||
let region_lease = default_distributed_time_constants().region_lease.as_secs();
|
||||
// Ensure it didn't run into the slow path.
|
||||
assert!(timer.elapsed().as_secs() < region_lease);
|
||||
assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS);
|
||||
runner.suite.verify_table_metadata().await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,9 +13,10 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::MailboxMessage;
|
||||
use common_meta::distributed_time_constants::default_distributed_time_constants;
|
||||
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
|
||||
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
|
||||
use common_meta::key::datanode_table::RegionInfo;
|
||||
use common_meta::RegionIdent;
|
||||
@@ -30,6 +31,9 @@ use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
|
||||
use crate::procedure::region_migration::{Context, State};
|
||||
use crate::service::mailbox::Channel;
|
||||
|
||||
/// Uses lease time of a region as the timeout of closing a downgraded region.
|
||||
const CLOSE_DOWNGRADED_REGION_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS);
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct CloseDowngradedRegion;
|
||||
|
||||
@@ -107,7 +111,7 @@ impl CloseDowngradedRegion {
|
||||
let ch = Channel::Datanode(downgrade_leader_datanode.id);
|
||||
let receiver = ctx
|
||||
.mailbox
|
||||
.send(&ch, msg, default_distributed_time_constants().region_lease)
|
||||
.send(&ch, msg, CLOSE_DOWNGRADED_REGION_TIMEOUT)
|
||||
.await?;
|
||||
|
||||
match receiver.await {
|
||||
|
||||
@@ -17,7 +17,7 @@ use std::time::Duration;
|
||||
|
||||
use api::v1::meta::MailboxMessage;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::distributed_time_constants::default_distributed_time_constants;
|
||||
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
|
||||
use common_meta::instruction::{
|
||||
DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply,
|
||||
};
|
||||
@@ -64,7 +64,7 @@ impl State for DowngradeLeaderRegion {
|
||||
let now = Instant::now();
|
||||
// Ensures the `leader_region_lease_deadline` must exist after recovering.
|
||||
ctx.volatile_ctx
|
||||
.set_leader_region_lease_deadline(default_distributed_time_constants().region_lease);
|
||||
.set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS));
|
||||
|
||||
match self.downgrade_region_with_retry(ctx).await {
|
||||
Ok(_) => {
|
||||
@@ -250,14 +250,14 @@ impl DowngradeLeaderRegion {
|
||||
if let Some(last_connection_at) = last_connection_at {
|
||||
let now = current_time_millis();
|
||||
let elapsed = now - last_connection_at;
|
||||
let region_lease = default_distributed_time_constants().region_lease;
|
||||
let region_lease = Duration::from_secs(REGION_LEASE_SECS);
|
||||
|
||||
// It's safe to update the region leader lease deadline here because:
|
||||
// 1. The old region leader has already been marked as downgraded in metadata,
|
||||
// which means any attempts to renew its lease will be rejected.
|
||||
// 2. The pusher disconnect time record only gets removed when the datanode (from_peer)
|
||||
// establishes a new heartbeat connection stream.
|
||||
if elapsed >= (region_lease.as_secs() * 1000) as i64 {
|
||||
if elapsed >= (REGION_LEASE_SECS * 1000) as i64 {
|
||||
ctx.volatile_ctx.reset_leader_region_lease_deadline();
|
||||
info!(
|
||||
"Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {}",
|
||||
@@ -663,8 +663,7 @@ mod tests {
|
||||
let procedure_ctx = new_procedure_context();
|
||||
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
|
||||
let elapsed = timer.elapsed().as_secs();
|
||||
let region_lease = default_distributed_time_constants().region_lease.as_secs();
|
||||
assert!(elapsed < region_lease / 2);
|
||||
assert!(elapsed < REGION_LEASE_SECS / 2);
|
||||
assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1));
|
||||
assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
|
||||
|
||||
|
||||
@@ -13,9 +13,10 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::MailboxMessage;
|
||||
use common_meta::distributed_time_constants::default_distributed_time_constants;
|
||||
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
|
||||
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
|
||||
use common_meta::key::datanode_table::RegionInfo;
|
||||
use common_meta::RegionIdent;
|
||||
@@ -31,6 +32,9 @@ use crate::procedure::region_migration::flush_leader_region::PreFlushRegion;
|
||||
use crate::procedure::region_migration::{Context, State};
|
||||
use crate::service::mailbox::Channel;
|
||||
|
||||
/// Uses lease time of a region as the timeout of opening a candidate region.
|
||||
const OPEN_CANDIDATE_REGION_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS);
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct OpenCandidateRegion;
|
||||
|
||||
@@ -139,7 +143,7 @@ impl OpenCandidateRegion {
|
||||
let now = Instant::now();
|
||||
let receiver = ctx
|
||||
.mailbox
|
||||
.send(&ch, msg, default_distributed_time_constants().region_lease)
|
||||
.send(&ch, msg, OPEN_CANDIDATE_REGION_TIMEOUT)
|
||||
.await?;
|
||||
|
||||
match receiver.await {
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_meta::distributed_time_constants::default_distributed_time_constants;
|
||||
use common_meta::distributed_time_constants::{FLOWNODE_LEASE_SECS, REGION_LEASE_SECS};
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use common_meta::peer::Peer;
|
||||
use rand::prelude::SliceRandom;
|
||||
@@ -36,10 +36,8 @@ pub fn new_test_selector_context() -> SelectorContext {
|
||||
|
||||
SelectorContext {
|
||||
server_addr: "127.0.0.1:3002".to_string(),
|
||||
datanode_lease_secs: default_distributed_time_constants().region_lease.as_secs(),
|
||||
flownode_lease_secs: default_distributed_time_constants()
|
||||
.flownode_lease
|
||||
.as_secs(),
|
||||
datanode_lease_secs: REGION_LEASE_SECS,
|
||||
flownode_lease_secs: FLOWNODE_LEASE_SECS,
|
||||
kv_backend,
|
||||
meta_peer_client,
|
||||
table_id: None,
|
||||
|
||||
@@ -27,9 +27,10 @@ use snafu::OptionExt;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tonic::{Request, Response, Status, Streaming};
|
||||
use tonic::{Request, Response, Streaming};
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::error;
|
||||
use crate::error::Result;
|
||||
use crate::handler::{HeartbeatHandlerGroup, Pusher, PusherId};
|
||||
use crate::metasrv::{Context, Metasrv};
|
||||
use crate::metrics::METRIC_META_HEARTBEAT_RECV;
|
||||
@@ -98,7 +99,6 @@ impl heartbeat_server::Heartbeat for Metasrv {
|
||||
break;
|
||||
}
|
||||
}
|
||||
error!(err; "Sending heartbeat response error");
|
||||
|
||||
if tx.send(Err(err)).await.is_err() {
|
||||
info!("ReceiverStream was dropped; shutting down");
|
||||
@@ -109,12 +109,6 @@ impl heartbeat_server::Heartbeat for Metasrv {
|
||||
|
||||
if is_not_leader {
|
||||
warn!("Quit because it is no longer the leader");
|
||||
let _ = tx
|
||||
.send(Err(Status::aborted(format!(
|
||||
"The requested metasrv node is not leader, node addr: {}",
|
||||
ctx.server_addr
|
||||
))))
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -207,9 +207,6 @@ pub trait Mailbox: Send + Sync {
|
||||
async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()>;
|
||||
|
||||
async fn on_recv(&self, id: MessageId, maybe_msg: Result<MailboxMessage>) -> Result<()>;
|
||||
|
||||
/// Reset all pushers of the mailbox.
|
||||
async fn reset(&self);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -90,8 +90,7 @@ pub struct CompactionRegion {
|
||||
pub(crate) engine_config: Arc<MitoConfig>,
|
||||
pub(crate) region_metadata: RegionMetadataRef,
|
||||
pub(crate) cache_manager: CacheManagerRef,
|
||||
/// Access layer to get the table path and path type.
|
||||
pub access_layer: AccessLayerRef,
|
||||
pub(crate) access_layer: AccessLayerRef,
|
||||
pub(crate) manifest_ctx: Arc<ManifestContext>,
|
||||
pub(crate) current_version: CompactionVersion,
|
||||
pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
|
||||
|
||||
@@ -62,15 +62,16 @@ pub struct RegionRemove {
|
||||
pub region_id: RegionId,
|
||||
}
|
||||
|
||||
/// Last data truncated in the region.
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
|
||||
pub struct RegionTruncate {
|
||||
pub region_id: RegionId,
|
||||
#[serde(flatten)]
|
||||
pub kind: TruncateKind,
|
||||
}
|
||||
|
||||
/// The kind of truncate operation.
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
|
||||
#[serde(untagged)]
|
||||
pub enum TruncateKind {
|
||||
/// Truncate all data in the region, marked by all data before the given entry id&sequence.
|
||||
All {
|
||||
@@ -259,6 +260,8 @@ impl RegionMetaActionList {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use common_time::Timestamp;
|
||||
|
||||
use super::*;
|
||||
|
||||
// These tests are used to ensure backward compatibility of manifest files.
|
||||
@@ -419,4 +422,66 @@ mod tests {
|
||||
assert_eq!(manifest, deserialized_manifest);
|
||||
assert_ne!(serialized_manifest, region_manifest_json);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_region_truncate_compat() {
|
||||
// Test deserializing RegionTruncate from old schema
|
||||
let region_truncate_json = r#"{
|
||||
"region_id": 4402341478400,
|
||||
"truncated_entry_id": 10,
|
||||
"truncated_sequence": 20
|
||||
}"#;
|
||||
|
||||
let truncate_v1: RegionTruncate = serde_json::from_str(region_truncate_json).unwrap();
|
||||
assert_eq!(truncate_v1.region_id, 4402341478400);
|
||||
assert_eq!(
|
||||
truncate_v1.kind,
|
||||
TruncateKind::All {
|
||||
truncated_entry_id: 10,
|
||||
truncated_sequence: 20,
|
||||
}
|
||||
);
|
||||
|
||||
// Test deserializing RegionTruncate from new schema
|
||||
let region_truncate_v2_json = r#"{
|
||||
"region_id": 4402341478400,
|
||||
"files_to_remove": [
|
||||
{
|
||||
"region_id": 4402341478400,
|
||||
"file_id": "4b220a70-2b03-4641-9687-b65d94641208",
|
||||
"time_range": [
|
||||
{
|
||||
"value": 1451609210000,
|
||||
"unit": "Millisecond"
|
||||
},
|
||||
{
|
||||
"value": 1451609520000,
|
||||
"unit": "Millisecond"
|
||||
}
|
||||
],
|
||||
"level": 1,
|
||||
"file_size": 100
|
||||
}
|
||||
]
|
||||
}"#;
|
||||
|
||||
let truncate_v2: RegionTruncate = serde_json::from_str(region_truncate_v2_json).unwrap();
|
||||
assert_eq!(truncate_v2.region_id, 4402341478400);
|
||||
assert_eq!(
|
||||
truncate_v2.kind,
|
||||
TruncateKind::Partial {
|
||||
files_to_remove: vec![FileMeta {
|
||||
region_id: RegionId::from_u64(4402341478400),
|
||||
file_id: FileId::parse_str("4b220a70-2b03-4641-9687-b65d94641208").unwrap(),
|
||||
time_range: (
|
||||
Timestamp::new_millisecond(1451609210000),
|
||||
Timestamp::new_millisecond(1451609520000)
|
||||
),
|
||||
level: 1,
|
||||
file_size: 100,
|
||||
..Default::default()
|
||||
}]
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -76,7 +76,7 @@ pub struct RegionManifestOptions {
|
||||
/// -RegionMetadataRef metadata
|
||||
/// }
|
||||
/// class RegionEdit {
|
||||
/// -VersionNumber region_version
|
||||
/// -VersionNumber regoin_version
|
||||
/// -Vec~FileMeta~ files_to_add
|
||||
/// -Vec~FileMeta~ files_to_remove
|
||||
/// -SequenceNumber flushed_sequence
|
||||
|
||||
@@ -371,7 +371,7 @@ impl VersionBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets truncated entry id.
|
||||
/// Sets truncated entty id.
|
||||
pub(crate) fn truncated_entry_id(mut self, entry_id: Option<EntryId>) -> Self {
|
||||
self.truncated_entry_id = entry_id;
|
||||
self
|
||||
|
||||
@@ -344,7 +344,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_and_basic() {
|
||||
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_collect_and_basic_");
|
||||
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_and_basic_");
|
||||
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -353,7 +353,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
factory,
|
||||
facotry,
|
||||
);
|
||||
|
||||
let expr = Expr::BinaryExpr(BinaryExpr {
|
||||
|
||||
@@ -72,7 +72,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_between_basic() {
|
||||
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_collect_between_basic_");
|
||||
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_between_basic_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
"test".to_string(),
|
||||
@@ -80,7 +80,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
factory,
|
||||
facotry,
|
||||
);
|
||||
|
||||
let between = Between {
|
||||
@@ -113,7 +113,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_between_negated() {
|
||||
let (_d, factory) =
|
||||
let (_d, facotry) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_between_negated_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -122,7 +122,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
factory,
|
||||
facotry,
|
||||
);
|
||||
|
||||
let between = Between {
|
||||
@@ -138,7 +138,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_between_field_column() {
|
||||
let (_d, factory) =
|
||||
let (_d, facotry) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_between_field_column_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -147,7 +147,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
factory,
|
||||
facotry,
|
||||
);
|
||||
|
||||
let between = Between {
|
||||
@@ -180,7 +180,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_between_type_mismatch() {
|
||||
let (_d, factory) =
|
||||
let (_d, facotry) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_between_type_mismatch_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -189,7 +189,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
factory,
|
||||
facotry,
|
||||
);
|
||||
|
||||
let between = Between {
|
||||
@@ -206,7 +206,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_between_nonexistent_column() {
|
||||
let (_d, factory) =
|
||||
let (_d, facotry) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_between_nonexistent_column_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -215,7 +215,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
factory,
|
||||
facotry,
|
||||
);
|
||||
|
||||
let between = Between {
|
||||
|
||||
@@ -227,7 +227,7 @@ mod tests {
|
||||
),
|
||||
];
|
||||
|
||||
let (_d, factory) =
|
||||
let (_d, facotry) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_comparison_basic_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -236,7 +236,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
factory,
|
||||
facotry,
|
||||
);
|
||||
|
||||
for ((left, op, right), _) in &cases {
|
||||
@@ -255,7 +255,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_comparison_type_mismatch() {
|
||||
let (_d, factory) =
|
||||
let (_d, facotry) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_comparison_type_mismatch_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -264,7 +264,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
factory,
|
||||
facotry,
|
||||
);
|
||||
|
||||
let res = builder.collect_comparison_expr(&tag_column(), &Operator::Lt, &int64_lit(10));
|
||||
@@ -274,7 +274,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_comparison_field_column() {
|
||||
let (_d, factory) =
|
||||
let (_d, facotry) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_comparison_field_column_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -283,7 +283,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
factory,
|
||||
facotry,
|
||||
);
|
||||
|
||||
builder
|
||||
@@ -308,7 +308,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_comparison_nonexistent_column() {
|
||||
let (_d, factory) =
|
||||
let (_d, facotry) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_comparison_nonexistent_column_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -317,7 +317,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
factory,
|
||||
facotry,
|
||||
);
|
||||
|
||||
let res = builder.collect_comparison_expr(
|
||||
|
||||
@@ -136,7 +136,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_eq_basic() {
|
||||
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_collect_eq_basic_");
|
||||
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_eq_basic_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
"test".to_string(),
|
||||
@@ -144,7 +144,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
factory,
|
||||
facotry,
|
||||
);
|
||||
|
||||
builder
|
||||
@@ -172,7 +172,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_eq_field_column() {
|
||||
let (_d, factory) =
|
||||
let (_d, facotry) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_eq_field_column_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -181,7 +181,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
factory,
|
||||
facotry,
|
||||
);
|
||||
|
||||
builder
|
||||
@@ -200,7 +200,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_eq_nonexistent_column() {
|
||||
let (_d, factory) =
|
||||
let (_d, facotry) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_eq_nonexistent_column_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -209,7 +209,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
factory,
|
||||
facotry,
|
||||
);
|
||||
|
||||
let res = builder.collect_eq(&nonexistent_column(), &string_lit("abc"));
|
||||
@@ -219,7 +219,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_eq_type_mismatch() {
|
||||
let (_d, factory) =
|
||||
let (_d, facotry) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_eq_type_mismatch_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -228,7 +228,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
factory,
|
||||
facotry,
|
||||
);
|
||||
|
||||
let res = builder.collect_eq(&tag_column(), &int64_lit(1));
|
||||
@@ -238,7 +238,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_or_eq_list_basic() {
|
||||
let (_d, factory) =
|
||||
let (_d, facotry) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_basic_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -247,7 +247,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
factory,
|
||||
facotry,
|
||||
);
|
||||
|
||||
let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
|
||||
@@ -296,7 +296,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_or_eq_list_invalid_op() {
|
||||
let (_d, factory) =
|
||||
let (_d, facotry) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_invalid_op_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -305,7 +305,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
factory,
|
||||
facotry,
|
||||
);
|
||||
|
||||
let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
|
||||
@@ -333,7 +333,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_or_eq_list_multiple_columns() {
|
||||
let (_d, factory) =
|
||||
let (_d, facotry) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_multiple_columns_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -342,7 +342,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
factory,
|
||||
facotry,
|
||||
);
|
||||
|
||||
let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
|
||||
|
||||
@@ -67,7 +67,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_in_list_basic() {
|
||||
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_collect_in_list_basic_");
|
||||
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_in_list_basic_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
"test".to_string(),
|
||||
@@ -75,7 +75,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
factory,
|
||||
facotry,
|
||||
);
|
||||
|
||||
let in_list = InList {
|
||||
@@ -98,7 +98,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_in_list_negated() {
|
||||
let (_d, factory) =
|
||||
let (_d, facotry) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_in_list_negated_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -107,7 +107,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
factory,
|
||||
facotry,
|
||||
);
|
||||
|
||||
let in_list = InList {
|
||||
@@ -122,7 +122,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_in_list_field_column() {
|
||||
let (_d, factory) =
|
||||
let (_d, facotry) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_in_list_field_column_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -131,7 +131,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
factory,
|
||||
facotry,
|
||||
);
|
||||
|
||||
let in_list = InList {
|
||||
@@ -154,7 +154,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_in_list_type_mismatch() {
|
||||
let (_d, factory) =
|
||||
let (_d, facotry) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_in_list_type_mismatch_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -163,7 +163,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
factory,
|
||||
facotry,
|
||||
);
|
||||
|
||||
let in_list = InList {
|
||||
@@ -179,7 +179,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_collect_in_list_nonexistent_column() {
|
||||
let (_d, factory) =
|
||||
let (_d, facotry) =
|
||||
PuffinManagerFactory::new_for_test_block("test_collect_in_list_nonexistent_column_");
|
||||
|
||||
let metadata = test_region_metadata();
|
||||
@@ -189,7 +189,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
factory,
|
||||
facotry,
|
||||
);
|
||||
|
||||
let in_list = InList {
|
||||
|
||||
@@ -59,7 +59,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_regex_match_basic() {
|
||||
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_regex_match_basic_");
|
||||
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_regex_match_basic_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
"test".to_string(),
|
||||
@@ -67,7 +67,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
factory,
|
||||
facotry,
|
||||
);
|
||||
|
||||
builder
|
||||
@@ -86,7 +86,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_regex_match_field_column() {
|
||||
let (_d, factory) =
|
||||
let (_d, facotry) =
|
||||
PuffinManagerFactory::new_for_test_block("test_regex_match_field_column_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -95,7 +95,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
factory,
|
||||
facotry,
|
||||
);
|
||||
|
||||
builder
|
||||
@@ -114,7 +114,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_regex_match_type_mismatch() {
|
||||
let (_d, factory) =
|
||||
let (_d, facotry) =
|
||||
PuffinManagerFactory::new_for_test_block("test_regex_match_type_mismatch_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -123,7 +123,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
factory,
|
||||
facotry,
|
||||
);
|
||||
|
||||
builder
|
||||
@@ -135,7 +135,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_regex_match_type_nonexist_column() {
|
||||
let (_d, factory) =
|
||||
let (_d, facotry) =
|
||||
PuffinManagerFactory::new_for_test_block("test_regex_match_type_nonexist_column_");
|
||||
let metadata = test_region_metadata();
|
||||
let mut builder = InvertedIndexApplierBuilder::new(
|
||||
@@ -144,7 +144,7 @@ mod tests {
|
||||
test_object_store(),
|
||||
&metadata,
|
||||
HashSet::from_iter([1, 2, 3]),
|
||||
factory,
|
||||
facotry,
|
||||
);
|
||||
|
||||
let res = builder.collect_regex_match(&nonexistent_column(), &string_lit("abc"));
|
||||
|
||||
@@ -70,7 +70,7 @@ impl ScalarCalculate {
|
||||
interval: Millisecond,
|
||||
input: LogicalPlan,
|
||||
time_index: &str,
|
||||
tag_columns: &[String],
|
||||
tag_colunms: &[String],
|
||||
field_column: &str,
|
||||
table_name: Option<&str>,
|
||||
) -> Result<Self> {
|
||||
@@ -97,7 +97,7 @@ impl ScalarCalculate {
|
||||
end,
|
||||
interval,
|
||||
time_index: time_index.to_string(),
|
||||
tag_columns: tag_columns.to_vec(),
|
||||
tag_columns: tag_colunms.to_vec(),
|
||||
field_column: field_column.to_string(),
|
||||
input,
|
||||
output_schema: Arc::new(schema),
|
||||
|
||||
@@ -82,7 +82,7 @@ impl ExtensionPlanner for MergeSortExtensionPlanner {
|
||||
// and we only need to do a merge sort, otherwise fallback to quick sort
|
||||
let can_merge_sort = partition_cnt >= region_cnt;
|
||||
if can_merge_sort {
|
||||
// TODO(discord9): use `SortPreservingMergeExec here`
|
||||
// TODO(discord9): use `SortPreversingMergeExec here`
|
||||
}
|
||||
// for now merge sort only exist in logical plan, and have the same effect as `Sort`
|
||||
// doesn't change the execution plan, this will change in the future
|
||||
|
||||
@@ -23,7 +23,6 @@ pub mod prom_query_gateway;
|
||||
pub mod region_server;
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::health_check_server::{HealthCheck, HealthCheckServer};
|
||||
use api::v1::{HealthCheckRequest, HealthCheckResponse};
|
||||
@@ -73,12 +72,6 @@ pub struct GrpcOptions {
|
||||
pub runtime_size: usize,
|
||||
#[serde(default = "Default::default")]
|
||||
pub tls: TlsOption,
|
||||
/// The HTTP/2 keep-alive interval.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub http2_keep_alive_interval: Duration,
|
||||
/// The HTTP/2 keep-alive timeout.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub http2_keep_alive_timeout: Duration,
|
||||
}
|
||||
|
||||
impl GrpcOptions {
|
||||
@@ -125,6 +118,8 @@ impl GrpcOptions {
|
||||
|
||||
const DEFAULT_GRPC_ADDR_PORT: &str = "4001";
|
||||
|
||||
const DEFAULT_INTERNAL_GRPC_ADDR_PORT: &str = "4010";
|
||||
|
||||
impl Default for GrpcOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
@@ -136,13 +131,27 @@ impl Default for GrpcOptions {
|
||||
flight_compression: FlightCompression::ArrowIpc,
|
||||
runtime_size: 8,
|
||||
tls: TlsOption::default(),
|
||||
http2_keep_alive_interval: Duration::from_secs(10),
|
||||
http2_keep_alive_timeout: Duration::from_secs(3),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl GrpcOptions {
|
||||
/// Default options for internal gRPC server.
|
||||
/// The internal gRPC server is used for communication between different nodes in cluster.
|
||||
/// It is not exposed to the outside world.
|
||||
pub fn internal_default() -> Self {
|
||||
Self {
|
||||
bind_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT),
|
||||
// If hostname is not set, the server will use the local ip address as the hostname.
|
||||
server_addr: String::new(),
|
||||
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
|
||||
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
|
||||
flight_compression: FlightCompression::ArrowIpc,
|
||||
runtime_size: 8,
|
||||
tls: TlsOption::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_bind_addr(mut self, bind_addr: &str) -> Self {
|
||||
self.bind_addr = bind_addr.to_string();
|
||||
self
|
||||
@@ -198,6 +207,7 @@ pub struct GrpcServer {
|
||||
>,
|
||||
>,
|
||||
bind_addr: Option<SocketAddr>,
|
||||
name: Option<String>,
|
||||
}
|
||||
|
||||
/// Grpc Server configuration
|
||||
@@ -309,7 +319,7 @@ impl Server for GrpcServer {
|
||||
let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
|
||||
let incoming =
|
||||
TcpIncoming::from_listener(listener, true, None).context(TcpIncomingSnafu)?;
|
||||
info!("gRPC server is bound to {}", addr);
|
||||
info!("gRPC server(name={}) is bound to {}", self.name(), addr);
|
||||
|
||||
*shutdown_tx = Some(tx);
|
||||
|
||||
@@ -351,7 +361,11 @@ impl Server for GrpcServer {
|
||||
}
|
||||
|
||||
fn name(&self) -> &str {
|
||||
GRPC_SERVER
|
||||
if let Some(name) = &self.name {
|
||||
name
|
||||
} else {
|
||||
GRPC_SERVER
|
||||
}
|
||||
}
|
||||
|
||||
fn bind_addr(&self) -> Option<SocketAddr> {
|
||||
|
||||
@@ -62,6 +62,7 @@ macro_rules! add_service {
|
||||
}
|
||||
|
||||
pub struct GrpcServerBuilder {
|
||||
name: Option<String>,
|
||||
config: GrpcServerConfig,
|
||||
runtime: Runtime,
|
||||
routes_builder: RoutesBuilder,
|
||||
@@ -77,6 +78,7 @@ pub struct GrpcServerBuilder {
|
||||
impl GrpcServerBuilder {
|
||||
pub fn new(config: GrpcServerConfig, runtime: Runtime) -> Self {
|
||||
Self {
|
||||
name: None,
|
||||
config,
|
||||
runtime,
|
||||
routes_builder: RoutesBuilder::default(),
|
||||
@@ -93,6 +95,10 @@ impl GrpcServerBuilder {
|
||||
&self.runtime
|
||||
}
|
||||
|
||||
pub fn name(self, name: Option<String>) -> Self {
|
||||
Self { name, ..self }
|
||||
}
|
||||
|
||||
/// Add handler for [DatabaseService] service.
|
||||
pub fn database_handler(mut self, database_handler: GreptimeRequestHandler) -> Self {
|
||||
add_service!(
|
||||
@@ -190,6 +196,7 @@ impl GrpcServerBuilder {
|
||||
tls_config: self.tls_config,
|
||||
otel_arrow_service: Mutex::new(self.otel_arrow_service),
|
||||
bind_addr: None,
|
||||
name: self.name,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,10 +34,12 @@ impl HeartbeatOptions {
|
||||
pub fn frontend_default() -> Self {
|
||||
Self {
|
||||
// Frontend can send heartbeat with a longer interval.
|
||||
interval: distributed_time_constants::frontend_heartbeat_interval(
|
||||
distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
|
||||
interval: Duration::from_millis(
|
||||
distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS,
|
||||
),
|
||||
retry_interval: Duration::from_millis(
|
||||
distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS,
|
||||
),
|
||||
retry_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -45,8 +47,10 @@ impl HeartbeatOptions {
|
||||
impl Default for HeartbeatOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
|
||||
retry_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
|
||||
interval: Duration::from_millis(distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS),
|
||||
retry_interval: Duration::from_millis(
|
||||
distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS,
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1066,7 +1066,7 @@ impl HttpServer {
|
||||
/// Route Prometheus [HTTP API].
|
||||
///
|
||||
/// [HTTP API]: https://prometheus.io/docs/prometheus/latest/querying/api/
|
||||
pub fn route_prometheus<S>(prometheus_handler: PrometheusHandlerRef) -> Router<S> {
|
||||
fn route_prometheus<S>(prometheus_handler: PrometheusHandlerRef) -> Router<S> {
|
||||
Router::new()
|
||||
.route(
|
||||
"/format_query",
|
||||
|
||||
@@ -352,7 +352,7 @@ async fn dryrun_pipeline_inner(
|
||||
)
|
||||
.await?;
|
||||
|
||||
let column_type_key = "column_type";
|
||||
let colume_type_key = "colume_type";
|
||||
let data_type_key = "data_type";
|
||||
let name_key = "name";
|
||||
|
||||
@@ -376,7 +376,7 @@ async fn dryrun_pipeline_inner(
|
||||
JsonValue::String(cs.datatype().as_str_name().to_string()),
|
||||
);
|
||||
map.insert(
|
||||
column_type_key.to_string(),
|
||||
colume_type_key.to_string(),
|
||||
JsonValue::String(cs.semantic_type().as_str_name().to_string()),
|
||||
);
|
||||
map.insert(
|
||||
@@ -409,7 +409,7 @@ async fn dryrun_pipeline_inner(
|
||||
);
|
||||
map.insert(
|
||||
"semantic_type".to_string(),
|
||||
schema[idx][column_type_key].clone(),
|
||||
schema[idx][colume_type_key].clone(),
|
||||
);
|
||||
map.insert(
|
||||
"data_type".to_string(),
|
||||
|
||||
@@ -105,7 +105,7 @@ mod tests {
|
||||
use crate::statements::statement::Statement;
|
||||
|
||||
#[test]
|
||||
fn test_display_for_truncate_table() {
|
||||
fn test_display_for_tuncate_table() {
|
||||
let sql = r"truncate table t1;";
|
||||
let stmts: Vec<Statement> =
|
||||
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
|
||||
|
||||
@@ -19,7 +19,7 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use arbitrary::{Arbitrary, Unstructured};
|
||||
use common_meta::distributed_time_constants::default_distributed_time_constants;
|
||||
use common_meta::distributed_time_constants;
|
||||
use common_telemetry::info;
|
||||
use libfuzzer_sys::fuzz_target;
|
||||
use rand::{Rng, SeedableRng};
|
||||
@@ -252,7 +252,10 @@ async fn execute_failover(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
|
||||
recover_pod_failure(ctx.kube.clone(), &ctx.namespace, &chaos_name).await?;
|
||||
wait_for_all_datanode_online(ctx.greptime.clone(), Duration::from_secs(60)).await;
|
||||
|
||||
tokio::time::sleep(default_distributed_time_constants().region_lease).await;
|
||||
tokio::time::sleep(Duration::from_secs(
|
||||
distributed_time_constants::REGION_LEASE_SECS,
|
||||
))
|
||||
.await;
|
||||
// Validates value rows
|
||||
info!("Validates num of rows");
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use arbitrary::{Arbitrary, Unstructured};
|
||||
use common_meta::distributed_time_constants::default_distributed_time_constants;
|
||||
use common_meta::distributed_time_constants;
|
||||
use common_telemetry::info;
|
||||
use common_time::util::current_time_millis;
|
||||
use futures::future::try_join_all;
|
||||
@@ -319,7 +319,10 @@ async fn execute_failover(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
|
||||
recover_pod_failure(ctx.kube.clone(), &ctx.namespace, &chaos_name).await?;
|
||||
wait_for_all_datanode_online(ctx.greptime.clone(), Duration::from_secs(60)).await;
|
||||
|
||||
tokio::time::sleep(default_distributed_time_constants().region_lease).await;
|
||||
tokio::time::sleep(Duration::from_secs(
|
||||
distributed_time_constants::REGION_LEASE_SECS,
|
||||
))
|
||||
.await;
|
||||
// Validates value rows
|
||||
info!("Validates num of rows");
|
||||
for (table_ctx, expected_rows) in table_ctxs.iter().zip(affected_rows) {
|
||||
|
||||
@@ -19,7 +19,7 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use arbitrary::{Arbitrary, Unstructured};
|
||||
use common_meta::distributed_time_constants::default_distributed_time_constants;
|
||||
use common_meta::distributed_time_constants;
|
||||
use common_telemetry::info;
|
||||
use libfuzzer_sys::fuzz_target;
|
||||
use rand::{Rng, SeedableRng};
|
||||
@@ -271,7 +271,10 @@ async fn migrate_regions(ctx: &FuzzContext, migrations: &[Migration]) -> Result<
|
||||
wait_for_migration(ctx, migration, &procedure_id).await;
|
||||
}
|
||||
|
||||
tokio::time::sleep(default_distributed_time_constants().region_lease).await;
|
||||
tokio::time::sleep(Duration::from_secs(
|
||||
distributed_time_constants::REGION_LEASE_SECS,
|
||||
))
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use arbitrary::{Arbitrary, Unstructured};
|
||||
use common_meta::distributed_time_constants::default_distributed_time_constants;
|
||||
use common_meta::distributed_time_constants;
|
||||
use common_telemetry::info;
|
||||
use libfuzzer_sys::fuzz_target;
|
||||
use rand::{Rng, SeedableRng};
|
||||
@@ -262,7 +262,10 @@ async fn migrate_regions(ctx: &FuzzContext, migrations: &[Migration]) -> Result<
|
||||
.await;
|
||||
}
|
||||
|
||||
tokio::time::sleep(default_distributed_time_constants().region_lease).await;
|
||||
tokio::time::sleep(Duration::from_secs(
|
||||
distributed_time_constants::REGION_LEASE_SECS,
|
||||
))
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
services:
|
||||
|
||||
zookeeper:
|
||||
image: greptime/zookeeper:3.7
|
||||
image: docker.io/bitnami/zookeeper:3.7
|
||||
ports:
|
||||
- '2181:2181'
|
||||
environment:
|
||||
- ALLOW_ANONYMOUS_LOGIN=yes
|
||||
|
||||
kafka:
|
||||
image: greptime/kafka:3.9.0-debian-12-r1
|
||||
image: docker.io/bitnami/kafka:3.9.0
|
||||
container_name: kafka
|
||||
ports:
|
||||
- 9092:9092
|
||||
@@ -32,7 +32,7 @@ services:
|
||||
condition: service_started
|
||||
|
||||
etcd:
|
||||
image: greptime/etcd:3.6.1-debian-12-r3
|
||||
image: docker.io/bitnami/etcd:3.5
|
||||
ports:
|
||||
- "2379:2379"
|
||||
- "2380:2380"
|
||||
@@ -44,7 +44,7 @@ services:
|
||||
ETCD_MAX_REQUEST_BYTES: 10485760
|
||||
|
||||
minio:
|
||||
image: greptime/minio:2024
|
||||
image: docker.io/bitnami/minio:2024
|
||||
ports:
|
||||
- '9000:9000'
|
||||
- '9001:9001'
|
||||
@@ -68,7 +68,7 @@ services:
|
||||
- POSTGRES_PASSWORD=admin
|
||||
|
||||
mysql:
|
||||
image: greptime/mysql:5.7
|
||||
image: bitnami/mysql:5.7
|
||||
ports:
|
||||
- 3306:3306
|
||||
volumes:
|
||||
|
||||
@@ -462,6 +462,7 @@ impl GreptimeDbClusterBuilder {
|
||||
let grpc_port = self.choose_random_unused_port(port_range.clone(), max_attempts, localhost);
|
||||
fe_opts.grpc.bind_addr = construct_addr(grpc_port);
|
||||
fe_opts.grpc.server_addr = construct_addr(grpc_port);
|
||||
|
||||
fe_opts.mysql.addr = construct_addr(self.choose_random_unused_port(
|
||||
port_range.clone(),
|
||||
max_attempts,
|
||||
|
||||
@@ -1188,8 +1188,6 @@ max_recv_message_size = "512MiB"
|
||||
max_send_message_size = "512MiB"
|
||||
flight_compression = "arrow_ipc"
|
||||
runtime_size = 8
|
||||
http2_keep_alive_interval = "10s"
|
||||
http2_keep_alive_timeout = "3s"
|
||||
|
||||
[grpc.tls]
|
||||
mode = "disable"
|
||||
@@ -3214,37 +3212,37 @@ transform:
|
||||
|
||||
let dryrun_schema = json!([
|
||||
{
|
||||
"column_type": "FIELD",
|
||||
"colume_type": "FIELD",
|
||||
"data_type": "INT32",
|
||||
"fulltext": false,
|
||||
"name": "id1"
|
||||
},
|
||||
{
|
||||
"column_type": "FIELD",
|
||||
"colume_type": "FIELD",
|
||||
"data_type": "INT32",
|
||||
"fulltext": false,
|
||||
"name": "id2"
|
||||
},
|
||||
{
|
||||
"column_type": "FIELD",
|
||||
"colume_type": "FIELD",
|
||||
"data_type": "STRING",
|
||||
"fulltext": false,
|
||||
"name": "type"
|
||||
},
|
||||
{
|
||||
"column_type": "FIELD",
|
||||
"colume_type": "FIELD",
|
||||
"data_type": "STRING",
|
||||
"fulltext": false,
|
||||
"name": "log"
|
||||
},
|
||||
{
|
||||
"column_type": "FIELD",
|
||||
"colume_type": "FIELD",
|
||||
"data_type": "STRING",
|
||||
"fulltext": false,
|
||||
"name": "logger"
|
||||
},
|
||||
{
|
||||
"column_type": "TIMESTAMP",
|
||||
"colume_type": "TIMESTAMP",
|
||||
"data_type": "TIMESTAMP_NANOSECOND",
|
||||
"fulltext": false,
|
||||
"name": "time"
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
version: '3.8'
|
||||
services:
|
||||
kafka:
|
||||
image: greptime/kafka:3.9.0-debian-12-r1
|
||||
image: bitnami/kafka:3.6.0
|
||||
container_name: kafka
|
||||
ports:
|
||||
- 9092:9092
|
||||
|
||||
@@ -363,9 +363,9 @@ pub fn setup_mysql(mysql_port: u16, mysql_version: Option<&str>) {
|
||||
}
|
||||
|
||||
let mysql_image = if let Some(mysql_version) = mysql_version {
|
||||
format!("greptime/mysql:{mysql_version}")
|
||||
format!("bitnami/mysql:{mysql_version}")
|
||||
} else {
|
||||
"greptime/mysql:5.7".to_string()
|
||||
"bitnami/mysql:5.7".to_string()
|
||||
};
|
||||
let mysql_password = "admin";
|
||||
let mysql_user = "greptimedb";
|
||||
|
||||
Reference in New Issue
Block a user