ci: add remote WAL logical pruning fuzz (#8307)

* feat: configure remote wal checkpoint intervals

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: add logical pruning fuzz target

Signed-off-by: WenyXu <wenymedia@gmail.com>

* ci: add remote WAL logical pruning fuzz

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2026-06-18 16:24:47 +08:00
committed by GitHub
parent 3e36778b0e
commit fbdc35a109
22 changed files with 1404 additions and 19 deletions

View File

@@ -0,0 +1,53 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-wal-helper
labels:
app: kafka-wal-helper
spec:
replicas: 1
selector:
matchLabels:
app: kafka-wal-helper
template:
metadata:
labels:
app: kafka-wal-helper
spec:
containers:
- name: kafka-wal-helper
image: localhost:5001/greptime/kafka-wal-helper:latest
imagePullPolicy: IfNotPresent
args:
- "--addr"
- "0.0.0.0:8080"
ports:
- name: http
containerPort: 8080
readinessProbe:
httpGet:
path: /health
port: http
initialDelaySeconds: 2
periodSeconds: 5
livenessProbe:
httpGet:
path: /health
port: http
initialDelaySeconds: 5
periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
name: kafka-wal-helper
labels:
app: kafka-wal-helper
spec:
type: ClusterIP
selector:
app: kafka-wal-helper
ports:
- name: http
port: 8080
targetPort: http

View File

@@ -0,0 +1,63 @@
logging:
level: "info"
format: "json"
filters:
- log_store=debug
meta:
configData: |-
[runtime]
global_rt_size = 4
[wal]
provider = "kafka"
broker_endpoints = ["kafka.kafka-cluster.svc.cluster.local:9092"]
num_topics = 3
auto_prune_interval = "30s"
auto_prune_parallelism = 3
auto_prune_logical_delete = true
trigger_flush_threshold = 100
region_flush_trigger_interval = "30s"
periodic_checkpoint_persist_interval = "60s"
flush_trigger_size = "1MB"
checkpoint_trigger_size = "1MB"
[datanode]
[datanode.client]
timeout = "120s"
datanode:
configData: |-
[runtime]
global_rt_size = 4
compact_rt_size = 2
[wal]
provider = "kafka"
broker_endpoints = ["kafka.kafka-cluster.svc.cluster.local:9092"]
num_topics = 3
topic_latest_offset_fetch_interval = "30s"
overwrite_entry_start_id = false
[[region_engine]]
[region_engine.mito]
auto_flush_interval = "30s"
frontend:
configData: |-
[runtime]
global_rt_size = 4
[meta_client]
ddl_timeout = "120s"
objectStorage:
s3:
bucket: default
region: us-west-2
root: test-root
endpoint: http://minio.minio.svc.cluster.local
credentials:
accessKeyId: rootuser
secretAccessKey: rootpass123
remoteWal:
enabled: true
kafka:
brokerEndpoints:
- "kafka.kafka-cluster.svc.cluster.local:9092"

View File

@@ -329,6 +329,13 @@ jobs:
minio: true
kafka: false
values: "with-minio-repartition-gc.yaml"
- target: "fuzz_remote_wal_logical_prune"
mode:
name: "Remote WAL logical pruning"
minio: true
kafka: true
values: "with-remote-wal-logical-prune.yaml"
kafkaWalHelper: true
steps:
- name: Remove unused software
run: |
@@ -378,6 +385,24 @@ jobs:
rm ./bin.tar.gz
- name: Build and push GreptimeDB image
uses: ./.github/actions/build-and-push-ci-image
- if: matrix.mode.kafkaWalHelper
name: Build Kafka WAL helper binary
shell: bash
run: |
cargo build --bin kafka_wal_helper
mkdir -p amd64
cp target/debug/kafka_wal_helper amd64/kafka_wal_helper
- if: matrix.mode.kafkaWalHelper
name: Build and push Kafka WAL helper image
uses: docker/build-push-action@v5
with:
context: .
file: ./docker/ci/ubuntu/Dockerfile.kafka-wal-helper
push: true
tags: localhost:5001/greptime/kafka-wal-helper:latest
build-args: |
TARGETARCH=amd64
TARGET_BIN=kafka_wal_helper
- name: Wait for etcd
run: |
kubectl wait \
@@ -401,6 +426,17 @@ jobs:
pod -l app.kubernetes.io/instance=kafka \
--timeout=120s \
-n kafka-cluster
- if: matrix.mode.kafkaWalHelper
name: Setup Kafka WAL helper
shell: bash
run: |
kubectl -n kafka-cluster apply \
-f .github/actions/setup-greptimedb-cluster/kafka-wal-helper.yaml
kubectl -n kafka-cluster wait \
--for=condition=Ready \
pod -l app=kafka-wal-helper \
--timeout=120s
- name: Print etcd info
shell: bash
run: kubectl get all --show-labels -n etcd-cluster
@@ -413,14 +449,23 @@ jobs:
- name: Port forward (mysql)
run: |
kubectl port-forward service/my-greptimedb-frontend 4002:4002 -n my-greptimedb&
- if: matrix.mode.kafkaWalHelper
name: Port forward Kafka WAL helper
run: |
kubectl port-forward service/kafka-wal-helper 8080:8080 -n kafka-cluster&
- name: Fuzz Test
uses: ./.github/actions/fuzz-test
env:
CUSTOM_LIBFUZZER_PATH: /usr/lib/llvm-14/lib/libFuzzer.a
GT_MYSQL_ADDR: 127.0.0.1:4002
GT_KAFKA_WAL_HELPER_URL: http://127.0.0.1:8080
GT_KAFKA_ADDR: kafka.kafka-cluster.svc.cluster.local:9092
GT_WAL_TOPIC_PREFIX: greptimedb_wal_topic
GT_WAL_NUM_TOPICS: 3
GT_FUZZ_OVERRIDE_DELETE_AFTER_SECS: 300
with:
target: ${{ matrix.target }}
max-total-time: 120
max-total-time: ${{ matrix.mode.kafkaWalHelper && 600 || 120 }}
- name: Describe Nodes
if: failure()
shell: bash

3
Cargo.lock generated
View File

@@ -14546,7 +14546,9 @@ version = "1.1.0"
dependencies = [
"arbitrary",
"async-trait",
"axum 0.8.4",
"chrono",
"clap",
"common-base",
"common-error",
"common-macro",
@@ -14570,6 +14572,7 @@ dependencies = [
"rand 0.9.4",
"rand_chacha 0.9.0",
"reqwest 0.13.4",
"rskafka",
"rustls",
"schemars",
"serde",

View File

@@ -0,0 +1,18 @@
FROM ubuntu:24.04
ARG TARGETARCH
ARG TARGET_BIN=kafka_wal_helper
RUN apt-get update && \
apt-get install -y --no-install-recommends ca-certificates && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
ADD ${TARGETARCH}/${TARGET_BIN} /greptime/bin/${TARGET_BIN}
ENV PATH=/greptime/bin/:$PATH
EXPOSE 8080
ENTRYPOINT ["/greptime/bin/kafka_wal_helper"]
CMD ["--addr", "0.0.0.0:8080"]

View File

@@ -16,7 +16,7 @@ use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::sync::{Arc, RwLock};
use common_telemetry::warn;
use common_telemetry::{debug, warn};
use store_api::storage::RegionId;
use crate::datanode::{RegionManifestInfo, RegionStat};
@@ -241,6 +241,11 @@ impl LeaderRegionRegistry {
leader_region.manifest.manifest_version()
);
} else {
debug!(
"Updating leader region for region {}, pruned entry id: {}",
region_id,
leader_region.manifest.prunable_entry_id(),
);
entry.insert(leader_region);
}
}

View File

@@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize};
use crate::config::kafka::common::{
DEFAULT_AUTO_PRUNE_INTERVAL, DEFAULT_AUTO_PRUNE_LOGICAL_DELETE, DEFAULT_AUTO_PRUNE_PARALLELISM,
DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE,
DEFAULT_PERIODIC_CHECKPOINT_PERSIST_INTERVAL, DEFAULT_REGION_FLUSH_TRIGGER_INTERVAL,
};
use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
use crate::config::raft_engine::RaftEngineConfig;
@@ -73,6 +74,10 @@ impl TryFrom<DatanodeWalConfig> for MetasrvWalConfig {
flush_trigger_size: DEFAULT_FLUSH_TRIGGER_SIZE,
// This field won't be used in standalone mode
checkpoint_trigger_size: DEFAULT_CHECKPOINT_TRIGGER_SIZE,
// This field won't be used in standalone mode
region_flush_trigger_interval: DEFAULT_REGION_FLUSH_TRIGGER_INTERVAL,
// This field won't be used in standalone mode
periodic_checkpoint_persist_interval: DEFAULT_PERIODIC_CHECKPOINT_PERSIST_INTERVAL,
})),
DatanodeWalConfig::Noop => UnsupportedWalProviderSnafu {
provider: "noop".to_string(),
@@ -225,6 +230,8 @@ mod tests {
auto_prune_parallelism: 10,
flush_trigger_size: ReadableSize::mb(512),
checkpoint_trigger_size: ReadableSize::mb(128),
region_flush_trigger_interval: Duration::from_secs(60),
periodic_checkpoint_persist_interval: Duration::from_secs(60 * 60),
};
assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected));

View File

@@ -46,6 +46,12 @@ pub const DEFAULT_AUTO_PRUNE_PARALLELISM: usize = 10;
pub const DEFAULT_FLUSH_TRIGGER_SIZE: ReadableSize = ReadableSize::mb(512);
/// Default checkpoint trigger size.
pub const DEFAULT_CHECKPOINT_TRIGGER_SIZE: ReadableSize = ReadableSize::mb(128);
/// Default interval for remote WAL region flush trigger.
pub const DEFAULT_REGION_FLUSH_TRIGGER_INTERVAL: Duration = Duration::from_secs(60);
/// Default interval to periodically persist remote WAL checkpoints.
pub const DEFAULT_PERIODIC_CHECKPOINT_PERSIST_INTERVAL: Duration = Duration::from_secs(60 * 60);
/// Default interval for fetching latest Kafka topic offsets.
pub const DEFAULT_TOPIC_LATEST_OFFSET_FETCH_INTERVAL: Duration = Duration::from_secs(60);
use crate::error::{self, Result};
use crate::{BROKER_ENDPOINT, TOPIC_NAME_PREFIX, TopicSelectorType};

View File

@@ -17,7 +17,9 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize;
use serde::{Deserialize, Serialize};
use crate::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use crate::config::kafka::common::{
DEFAULT_TOPIC_LATEST_OFFSET_FETCH_INTERVAL, KafkaConnectionConfig, KafkaTopicConfig,
};
/// Kafka wal configurations for datanode.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
@@ -42,6 +44,9 @@ pub struct DatanodeKafkaConfig {
pub create_index: bool,
#[serde(with = "humantime_serde")]
pub dump_index_interval: Duration,
/// Internal interval for fetching latest Kafka topic offsets.
#[serde(with = "humantime_serde")]
pub topic_latest_offset_fetch_interval: Duration,
/// Ignore missing entries during read WAL.
pub overwrite_entry_start_id: bool,
}
@@ -57,6 +62,7 @@ impl Default for DatanodeKafkaConfig {
auto_create_topics: true,
create_index: true,
dump_index_interval: Duration::from_secs(60),
topic_latest_offset_fetch_interval: DEFAULT_TOPIC_LATEST_OFFSET_FETCH_INTERVAL,
overwrite_entry_start_id: false,
}
}

View File

@@ -19,8 +19,9 @@ use serde::{Deserialize, Serialize};
use crate::config::kafka::common::{
DEFAULT_AUTO_PRUNE_INTERVAL, DEFAULT_AUTO_PRUNE_LOGICAL_DELETE, DEFAULT_AUTO_PRUNE_PARALLELISM,
DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE, KafkaConnectionConfig,
KafkaTopicConfig,
DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE,
DEFAULT_PERIODIC_CHECKPOINT_PERSIST_INTERVAL, DEFAULT_REGION_FLUSH_TRIGGER_INTERVAL,
KafkaConnectionConfig, KafkaTopicConfig,
};
/// Kafka wal configurations for metasrv.
@@ -46,6 +47,12 @@ pub struct MetasrvKafkaConfig {
pub flush_trigger_size: ReadableSize,
// The checkpoint trigger size.
pub checkpoint_trigger_size: ReadableSize,
/// Internal interval of remote WAL region flush trigger.
#[serde(with = "humantime_serde")]
pub region_flush_trigger_interval: Duration,
/// Internal interval to periodically persist remote WAL checkpoints.
#[serde(with = "humantime_serde")]
pub periodic_checkpoint_persist_interval: Duration,
}
impl Default for MetasrvKafkaConfig {
@@ -59,6 +66,8 @@ impl Default for MetasrvKafkaConfig {
auto_prune_parallelism: DEFAULT_AUTO_PRUNE_PARALLELISM,
flush_trigger_size: DEFAULT_FLUSH_TRIGGER_SIZE,
checkpoint_trigger_size: DEFAULT_CHECKPOINT_TRIGGER_SIZE,
region_flush_trigger_interval: DEFAULT_REGION_FLUSH_TRIGGER_INTERVAL,
periodic_checkpoint_persist_interval: DEFAULT_PERIODIC_CHECKPOINT_PERSIST_INTERVAL,
}
}
}

View File

@@ -47,8 +47,6 @@ use crate::kafka::util::record::{
};
use crate::metrics;
const DEFAULT_OFFSET_FETCH_INTERVAL: Duration = Duration::from_secs(60);
/// Statistics for a topic.
#[derive(Debug, Clone, Copy, Default)]
pub struct TopicStat {
@@ -152,8 +150,10 @@ impl KafkaLogStore {
let client_manager = Arc::new(
ClientManager::try_new(config, global_index_collector, topic_stats.clone()).await?,
);
let fetcher =
PeriodicOffsetFetcher::new(DEFAULT_OFFSET_FETCH_INTERVAL, client_manager.clone());
let fetcher = PeriodicOffsetFetcher::new(
config.topic_latest_offset_fetch_interval,
client_manager.clone(),
);
fetcher.run().await;
Ok(Self {

View File

@@ -467,6 +467,8 @@ impl MetasrvBuilder {
options.grpc.server_addr.clone(),
remote_wal_options.flush_trigger_size,
remote_wal_options.checkpoint_trigger_size,
remote_wal_options.region_flush_trigger_interval,
remote_wal_options.periodic_checkpoint_persist_interval,
);
region_flush_trigger.try_start()?;

View File

@@ -52,7 +52,7 @@ use common_procedure::error::{
use common_procedure::{
Context as ProcedureContext, LockKey, Procedure, Status, StringKey, UserMetadata,
};
use common_telemetry::{error, info};
use common_telemetry::{debug, error, info};
use manager::RegionMigrationProcedureGuard;
pub use manager::{
RegionMigrationManagerRef, RegionMigrationProcedureTask, RegionMigrationProcedureTracker,
@@ -708,6 +708,10 @@ impl Context {
.batch_get(topic_name_keys)
.await
.context(error::TableMetadataManagerSnafu)?;
debug!(
"Fetched topic region values: {:?}, topic name values: {:?}",
topic_region_values, topic_name_values
);
let replay_checkpoints = region_topics
.iter()

View File

@@ -242,6 +242,10 @@ impl WalPruneManager {
.await
.context(error::TableMetadataManagerSnafu)?
.map(|v| v.into_inner().pruned_entry_id);
debug!(
"Found prunable entry id {} for topic {}, current pruned entry id: {:?}",
prunable_entry_id, topic_name, current
);
if !should_trigger_prune(current, prunable_entry_id) {
debug!(
"No need to prune topic {}, current pruned entry id: {:?}, prunable entry id: {}",

View File

@@ -29,6 +29,7 @@ use common_telemetry::{debug, error, info, warn};
use common_time::util::current_time_millis;
use common_wal::config::kafka::common::{
DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE,
DEFAULT_PERIODIC_CHECKPOINT_PERSIST_INTERVAL, DEFAULT_REGION_FLUSH_TRIGGER_INTERVAL,
};
use snafu::{OptionExt, ResultExt};
use store_api::region_request::RegionFlushReason;
@@ -39,15 +40,9 @@ use crate::error::{self, Result};
use crate::service::mailbox::{Channel, MailboxRef};
use crate::{define_ticker, metrics};
/// The interval of the region flush ticker.
const TICKER_INTERVAL: Duration = Duration::from_secs(60);
/// The duration of the recent period.
const RECENT_DURATION: Duration = Duration::from_secs(300);
/// The interval to periodically persist region checkpoints regardless of replay size.
const PERIODIC_CHECKPOINT_PERSIST_INTERVAL: Duration = Duration::from_mins(60);
/// [`Event`] represents various types of events that can be processed by the region flush ticker.
///
/// Variants:
@@ -87,6 +82,10 @@ pub struct RegionFlushTrigger {
flush_trigger_size: ReadableSize,
/// The checkpoint trigger size.
checkpoint_trigger_size: ReadableSize,
/// The interval of the region flush trigger.
region_flush_trigger_interval: Duration,
/// The interval to periodically persist region checkpoints regardless of replay size.
periodic_checkpoint_persist_interval: Duration,
/// The last timestamp in milliseconds when a region checkpoint was persisted.
last_checkpoint_persist_millis_by_region: HashMap<RegionId, i64>,
/// The receiver of events.
@@ -95,6 +94,7 @@ pub struct RegionFlushTrigger {
impl RegionFlushTrigger {
/// Creates a new [`RegionFlushTrigger`].
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
table_metadata_manager: TableMetadataManagerRef,
leader_region_registry: LeaderRegionRegistryRef,
@@ -103,6 +103,8 @@ impl RegionFlushTrigger {
server_addr: String,
mut flush_trigger_size: ReadableSize,
mut checkpoint_trigger_size: ReadableSize,
mut region_flush_trigger_interval: Duration,
mut periodic_checkpoint_persist_interval: Duration,
) -> (Self, RegionFlushTicker) {
if flush_trigger_size.as_bytes() == 0 {
flush_trigger_size = DEFAULT_FLUSH_TRIGGER_SIZE;
@@ -118,8 +120,22 @@ impl RegionFlushTrigger {
checkpoint_trigger_size
);
}
if region_flush_trigger_interval.is_zero() {
region_flush_trigger_interval = DEFAULT_REGION_FLUSH_TRIGGER_INTERVAL;
warn!(
"region_flush_trigger_interval is not set, using default value: {:?}",
region_flush_trigger_interval
);
}
if periodic_checkpoint_persist_interval.is_zero() {
periodic_checkpoint_persist_interval = DEFAULT_PERIODIC_CHECKPOINT_PERSIST_INTERVAL;
warn!(
"periodic_checkpoint_persist_interval is not set, using default value: {:?}",
periodic_checkpoint_persist_interval
);
}
let (tx, rx) = Self::channel();
let region_flush_ticker = RegionFlushTicker::new(TICKER_INTERVAL, tx);
let region_flush_ticker = RegionFlushTicker::new(region_flush_trigger_interval, tx);
let region_flush_trigger = Self {
table_metadata_manager,
leader_region_registry,
@@ -128,6 +144,8 @@ impl RegionFlushTrigger {
server_addr,
flush_trigger_size,
checkpoint_trigger_size,
region_flush_trigger_interval,
periodic_checkpoint_persist_interval,
last_checkpoint_persist_millis_by_region: HashMap::new(),
receiver: rx,
};
@@ -230,7 +248,7 @@ impl RegionFlushTrigger {
topic_regions.keys().copied(),
&self.last_checkpoint_persist_millis_by_region,
now_millis,
PERIODIC_CHECKPOINT_PERSIST_INTERVAL,
self.periodic_checkpoint_persist_interval,
);
let regions_to_persist = merge_region_ids(size_based_regions, periodic_regions);
let region_manifests = self
@@ -284,7 +302,7 @@ impl RegionFlushTrigger {
let Some(stat) = self
.topic_stats_registry
.get_calculated_topic_stat(topic, TICKER_INTERVAL)
.get_calculated_topic_stat(topic, self.region_flush_trigger_interval)
else {
debug!("No topic stat found for topic: {}", topic);
return None;
@@ -992,6 +1010,8 @@ mod tests {
"127.0.0.1:3002".to_string(),
ReadableSize(1),
ReadableSize(1),
DEFAULT_REGION_FLUSH_TRIGGER_INTERVAL,
DEFAULT_PERIODIC_CHECKPOINT_PERSIST_INTERVAL,
);
let topic = "test_topic";
@@ -1064,6 +1084,8 @@ mod tests {
"127.0.0.1:3002".to_string(),
ReadableSize(1),
ReadableSize(1),
DEFAULT_REGION_FLUSH_TRIGGER_INTERVAL,
DEFAULT_PERIODIC_CHECKPOINT_PERSIST_INTERVAL,
);
let topic = "test_topic".to_string();

View File

@@ -17,7 +17,9 @@ unstable = ["nix"]
[dependencies]
arbitrary = { version = "1.3.0", features = ["derive"] }
async-trait = { workspace = true }
axum.workspace = true
chrono = { workspace = true }
clap.workspace = true
common-base = { workspace = true }
common-error = { workspace = true }
common-macro = { workspace = true }
@@ -46,6 +48,7 @@ paste.workspace = true
rand = { workspace = true }
rand_chacha = "0.9"
reqwest = { workspace = true }
rskafka.workspace = true
rustls = { workspace = true, default-features = false, features = ["aws_lc_rs", "std", "tls12"] }
schemars = "0.8"
serde = { workspace = true }
@@ -171,3 +174,17 @@ path = "targets/migration/fuzz_migrate_metric_regions.rs"
test = false
bench = false
doc = false
[[bin]]
name = "fuzz_remote_wal_logical_prune"
path = "targets/wal/fuzz_remote_wal_logical_prune.rs"
test = false
bench = false
doc = false
[[bin]]
name = "kafka_wal_helper"
path = "utils/kafka_wal_helper.rs"
test = false
bench = false
doc = false

View File

@@ -81,6 +81,29 @@ pub enum Error {
location: Location,
},
#[snafu(display("Kafka WAL helper request failed, operation: {}", operation))]
KafkaWalHelperRequest {
operation: String,
#[snafu(source)]
error: reqwest::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Kafka WAL helper returned non-success status, operation: {}, status: {}, body: {}",
operation,
status,
body
))]
KafkaWalHelperStatus {
operation: String,
status: reqwest::StatusCode,
body: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to assert: {}", reason))]
Assert {
reason: String,

View File

@@ -18,6 +18,7 @@ pub mod crd;
/// CSV dump writer utilities for fuzz tests.
pub mod csv_dump_writer;
pub mod health;
pub mod kafka_wal_http;
pub mod migration;
pub mod network_chaos;
pub mod partition;

View File

@@ -0,0 +1,165 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use crate::error::{self, Result};
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TopicOffset {
pub topic: String,
pub partition: i32,
pub offset: i64,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RecordOffsetsRequest {
pub broker_endpoints: Vec<String>,
pub topic_prefix: String,
pub num_topics: usize,
#[serde(default)]
pub partition: i32,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct DeleteRecordsRequest {
pub broker_endpoints: Vec<String>,
pub offsets: Vec<TopicOffset>,
pub timeout_ms: Option<i32>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RecordOffsetsResponse {
pub offsets: Vec<TopicOffset>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct DeleteRecordsResponse {
pub deleted: Vec<TopicOffset>,
}
#[async_trait]
pub trait WalAdmin: Send + Sync {
async fn record_topic_latest_offsets(&self) -> Result<Vec<TopicOffset>>;
async fn delete_records_to_offsets(&self, offsets: &[TopicOffset]) -> Result<()>;
}
pub struct HttpWalAdmin {
client: reqwest::Client,
base_url: String,
broker_endpoints: Vec<String>,
topic_prefix: String,
num_topics: usize,
partition: i32,
delete_records_timeout_ms: i32,
}
pub struct HttpWalAdminConfig {
pub base_url: String,
pub broker_endpoints: Vec<String>,
pub topic_prefix: String,
pub num_topics: usize,
pub partition: i32,
pub delete_records_timeout_ms: i32,
}
impl HttpWalAdmin {
pub fn new(config: HttpWalAdminConfig) -> Self {
Self {
client: reqwest::Client::new(),
base_url: config.base_url,
broker_endpoints: config.broker_endpoints,
topic_prefix: config.topic_prefix,
num_topics: config.num_topics,
partition: config.partition,
delete_records_timeout_ms: config.delete_records_timeout_ms,
}
}
fn endpoint(&self, path: &str) -> String {
format!("{}{}", self.base_url.trim_end_matches('/'), path)
}
}
#[async_trait]
impl WalAdmin for HttpWalAdmin {
async fn record_topic_latest_offsets(&self) -> Result<Vec<TopicOffset>> {
let request = RecordOffsetsRequest {
broker_endpoints: self.broker_endpoints.clone(),
topic_prefix: self.topic_prefix.clone(),
num_topics: self.num_topics,
partition: self.partition,
};
let response = self
.client
.post(self.endpoint("/record-offsets"))
.json(&request)
.send()
.await
.context(error::KafkaWalHelperRequestSnafu {
operation: "record-offsets".to_string(),
})?;
ensure_success(response, "record-offsets")
.await?
.json::<RecordOffsetsResponse>()
.await
.context(error::KafkaWalHelperRequestSnafu {
operation: "record-offsets".to_string(),
})
.map(|response| response.offsets)
}
async fn delete_records_to_offsets(&self, offsets: &[TopicOffset]) -> Result<()> {
let request = DeleteRecordsRequest {
broker_endpoints: self.broker_endpoints.clone(),
offsets: offsets.to_vec(),
timeout_ms: Some(self.delete_records_timeout_ms),
};
let response = self
.client
.post(self.endpoint("/delete-records"))
.json(&request)
.send()
.await
.context(error::KafkaWalHelperRequestSnafu {
operation: "delete-records".to_string(),
})?;
ensure_success(response, "delete-records")
.await?
.json::<DeleteRecordsResponse>()
.await
.context(error::KafkaWalHelperRequestSnafu {
operation: "delete-records".to_string(),
})
.map(|_| ())
}
}
async fn ensure_success(response: reqwest::Response, operation: &str) -> Result<reqwest::Response> {
let status = response.status();
if status.is_success() {
return Ok(response);
}
let body = response.text().await.unwrap_or_default();
error::KafkaWalHelperStatusSnafu {
operation: operation.to_string(),
status,
body,
}
.fail()
}

View File

@@ -0,0 +1,647 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![no_main]
use std::env;
use std::sync::Arc;
use std::time::Duration;
use arbitrary::{Arbitrary, Unstructured};
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_telemetry::info;
use libfuzzer_sys::fuzz_target;
use rand::seq::IndexedRandom;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaChaRng;
use snafu::{ResultExt, ensure};
use sqlx::{Executor, MySql, Pool};
use store_api::storage::RegionId;
use tests_fuzz::context::{TableContext, TableContextRef};
use tests_fuzz::error::{self, Result};
use tests_fuzz::fake::{
MappedGenerator, WordGenerator, merge_two_word_map_fn, random_capitalize_map,
uppercase_and_keyword_backtick_map,
};
use tests_fuzz::generator::Generator;
use tests_fuzz::generator::create_expr::CreateTableExprGeneratorBuilder;
use tests_fuzz::generator::insert_expr::InsertExprGeneratorBuilder;
use tests_fuzz::ir::{
CreateTableExpr, Ident, InsertIntoExpr, MySQLTsColumnTypeGenerator, format_columns,
generate_random_value, generate_unique_timestamp_for_mysql, replace_default,
sort_by_primary_keys,
};
use tests_fuzz::translator::DslTranslator;
use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator;
use tests_fuzz::translator::mysql::insert_expr::InsertIntoExprTranslator;
use tests_fuzz::utils::cluster_info::{PEER_TYPE_DATANODE, fetch_nodes};
use tests_fuzz::utils::kafka_wal_http::{HttpWalAdmin, HttpWalAdminConfig, WalAdmin};
use tests_fuzz::utils::migration::migrate_region;
use tests_fuzz::utils::partition::{fetch_partition, fetch_partitions};
use tests_fuzz::utils::procedure::procedure_state;
use tests_fuzz::utils::wait::wait_condition_fn;
use tests_fuzz::utils::{
Connections, flush_memtable, get_fuzz_override, get_gt_fuzz_input_max_rows,
get_gt_fuzz_input_max_tables, init_greptime_connections_via_env,
};
use tests_fuzz::validator;
const MIN_TABLES: usize = 4;
const MAX_PARTITIONS: usize = 8;
const MIN_DELETE_AFTER_SECS: u64 = 180;
const DEFAULT_DELETE_AFTER_SECS: u64 = 360;
const MIGRATION_TIMEOUT_SECS: u64 = 240;
const ACTIVITY_TICK_SECS: u64 = 5;
const DEFAULT_KAFKA_ADDR: &str = "127.0.0.1:9092";
const DEFAULT_KAFKA_WAL_HELPER_URL: &str = "http://127.0.0.1:8080";
const DEFAULT_WAL_TOPIC_PREFIX: &str = "greptimedb_wal_topic";
const DEFAULT_WAL_NUM_TOPICS: usize = 3;
const DEFAULT_WAL_PARTITION: i32 = 0;
const DEFAULT_DELETE_RECORDS_TIMEOUT_MS: i32 = 5000;
struct FuzzContext {
greptime: Pool<MySql>,
}
impl FuzzContext {
async fn close(self) {
self.greptime.close().await;
}
}
#[derive(Copy, Clone, Debug)]
struct FuzzInput {
seed: u64,
tables: usize,
partitions: usize,
rows_per_insert: usize,
rounds: usize,
delete_after_secs: u64,
}
impl Arbitrary<'_> for FuzzInput {
fn arbitrary(u: &mut Unstructured<'_>) -> arbitrary::Result<Self> {
let seed = get_fuzz_override::<u64>("SEED").unwrap_or(u.int_in_range(u64::MIN..=u64::MAX)?);
let mut rng = ChaChaRng::seed_from_u64(seed);
let max_tables = get_gt_fuzz_input_max_tables().max(MIN_TABLES);
let tables = get_fuzz_override::<usize>("TABLES")
.unwrap_or_else(|| rng.random_range(MIN_TABLES..=max_tables));
let partitions = get_fuzz_override::<usize>("PARTITIONS")
.unwrap_or_else(|| rng.random_range(1..=MAX_PARTITIONS));
let max_rows = get_gt_fuzz_input_max_rows().max(32);
let rows_per_insert =
get_fuzz_override::<usize>("ROWS").unwrap_or_else(|| rng.random_range(32..=max_rows));
let rounds =
get_fuzz_override::<usize>("ROUNDS").unwrap_or_else(|| rng.random_range(2..=4));
let delete_after_secs = get_fuzz_override::<u64>("DELETE_AFTER_SECS")
.or_else(|| get_fuzz_override::<u64>("RETENTION_WAIT_SECS"))
.unwrap_or(DEFAULT_DELETE_AFTER_SECS)
.max(MIN_DELETE_AFTER_SECS);
Ok(FuzzInput {
seed,
tables,
partitions,
rows_per_insert,
rounds,
delete_after_secs,
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ActivityProfile {
Hot,
Warm,
Cold,
Dormant,
}
impl ActivityProfile {
fn for_table(idx: usize) -> Self {
match idx % 4 {
0 => Self::Hot,
1 => Self::Warm,
2 => Self::Cold,
_ => Self::Dormant,
}
}
fn should_insert<R: Rng>(&self, rng: &mut R) -> bool {
match self {
Self::Hot => true,
Self::Warm => rng.random_bool(0.7),
Self::Cold => rng.random_bool(0.2),
Self::Dormant => false,
}
}
fn should_flush<R: Rng>(&self, rng: &mut R) -> bool {
match self {
Self::Hot => rng.random_bool(0.05),
Self::Warm => rng.random_bool(0.05),
Self::Cold | Self::Dormant => false,
}
}
}
struct TableState {
ctx: TableContextRef,
profile: ActivityProfile,
insert_exprs: Vec<InsertIntoExpr>,
}
#[derive(Debug)]
struct Migration {
from_peer: u64,
to_peer: u64,
region_id: RegionId,
}
fn wal_admin_from_env() -> HttpWalAdmin {
let _ = dotenv::dotenv();
HttpWalAdmin::new(HttpWalAdminConfig {
base_url: env::var("GT_KAFKA_WAL_HELPER_URL")
.unwrap_or_else(|_| DEFAULT_KAFKA_WAL_HELPER_URL.to_string()),
broker_endpoints: kafka_broker_endpoints(),
topic_prefix: env::var("GT_WAL_TOPIC_PREFIX")
.unwrap_or_else(|_| DEFAULT_WAL_TOPIC_PREFIX.to_string()),
num_topics: env::var("GT_WAL_NUM_TOPICS")
.ok()
.and_then(|value| value.parse().ok())
.unwrap_or(DEFAULT_WAL_NUM_TOPICS),
partition: env::var("GT_WAL_PARTITION")
.ok()
.and_then(|value| value.parse().ok())
.unwrap_or(DEFAULT_WAL_PARTITION),
delete_records_timeout_ms: env::var("GT_WAL_DELETE_RECORDS_TIMEOUT_MS")
.ok()
.and_then(|value| value.parse().ok())
.unwrap_or(DEFAULT_DELETE_RECORDS_TIMEOUT_MS),
})
}
fn kafka_broker_endpoints() -> Vec<String> {
env::var("GT_KAFKA_ADDR")
.or_else(|_| env::var("KAFKA_ADDR"))
.unwrap_or_else(|_| DEFAULT_KAFKA_ADDR.to_string())
.split(',')
.map(str::trim)
.filter(|endpoint| !endpoint.is_empty())
.map(ToString::to_string)
.collect()
}
fn generate_create_expr<R: Rng + 'static>(
input: FuzzInput,
idx: usize,
rng: &mut R,
) -> Result<CreateTableExpr> {
let table_name = Ident::new(format!("wal_logical_prune_{}_{}", input.seed, idx));
let create_table_generator = CreateTableExprGeneratorBuilder::default()
.name_generator(Box::new(MappedGenerator::new(
WordGenerator,
merge_two_word_map_fn(random_capitalize_map, uppercase_and_keyword_backtick_map),
)))
.name(table_name)
.columns(4)
.partition(input.partitions)
.engine("mito")
.ts_column_type_generator(Box::new(MySQLTsColumnTypeGenerator))
.build()
.unwrap();
create_table_generator.generate(rng)
}
fn generate_insert_expr<R: Rng + 'static>(
rows: usize,
ts_base: i64,
rng: &mut R,
table_ctx: TableContextRef,
) -> Result<InsertIntoExpr> {
let insert_generator = InsertExprGeneratorBuilder::default()
.table_ctx(table_ctx)
.omit_column_list(false)
.rows(rows)
.ts_value_generator(generate_unique_timestamp_for_mysql(ts_base))
.value_generator(Box::new(generate_random_value))
.build()
.unwrap();
insert_generator.generate(rng)
}
async fn create_tables<R: Rng + 'static>(
ctx: &FuzzContext,
input: FuzzInput,
rng: &mut R,
) -> Result<Vec<TableState>> {
let mut tables = Vec::with_capacity(input.tables);
for idx in 0..input.tables {
let create_expr = generate_create_expr(input, idx, rng)?;
let translator = CreateTableExprTranslator;
let sql = translator.translate(&create_expr)?;
let result = sqlx::query(&sql)
.execute(&ctx.greptime)
.await
.context(error::ExecuteQuerySnafu { sql: &sql })?;
info!(
"Create table: {}, profile: {:?}, result: {result:?}",
create_expr.table_name,
ActivityProfile::for_table(idx)
);
tables.push(TableState {
ctx: Arc::new(TableContext::from(&create_expr)),
profile: ActivityProfile::for_table(idx),
insert_exprs: Vec::new(),
});
}
Ok(tables)
}
async fn insert_table<R: Rng + 'static>(
ctx: &FuzzContext,
input: FuzzInput,
table: &mut TableState,
ts_base: i64,
rng: &mut R,
) -> Result<()> {
let insert_expr = generate_insert_expr(input.rows_per_insert, ts_base, rng, table.ctx.clone())?;
let translator = InsertIntoExprTranslator;
let sql = translator.translate(&insert_expr)?;
let result = ctx
.greptime
// unprepared query, see <https://github.com/GreptimeTeam/greptimedb/issues/3500>
.execute(sql.as_str())
.await
.context(error::ExecuteQuerySnafu { sql: &sql })?;
ensure!(
result.rows_affected() == insert_expr.values_list.len() as u64,
error::AssertSnafu {
reason: format!(
"expected rows affected: {}, actual: {}",
insert_expr.values_list.len(),
result.rows_affected(),
)
}
);
table.insert_exprs.push(insert_expr);
Ok(())
}
async fn flush_table(ctx: &FuzzContext, table: &TableState) -> Result<()> {
flush_memtable(&ctx.greptime, &table.ctx.name).await
}
async fn validate_table(ctx: &FuzzContext, table: &TableState) -> Result<()> {
if table.insert_exprs.is_empty() {
return Ok(());
}
info!(
"Validating table: {}, profile: {:?}, batches: {}",
table.ctx.name,
table.profile,
table.insert_exprs.len()
);
validate_insert_exprs(ctx, &table.ctx, &table.insert_exprs).await
}
async fn validate_insert_exprs(
ctx: &FuzzContext,
table_ctx: &TableContextRef,
insert_exprs: &[InsertIntoExpr],
) -> Result<()> {
let ts_column = table_ctx.timestamp_column().unwrap();
for (idx, insert_expr) in insert_exprs[0..insert_exprs.len() - 1].iter().enumerate() {
let ts_column_idx = insert_expr.timestamp_column_idx().unwrap();
let ts_value = insert_expr.values_list[0][ts_column_idx].clone();
let next_batch_ts_column_idx = insert_exprs[idx + 1].timestamp_column_idx().unwrap();
let next_batch_ts = insert_exprs[idx + 1].values_list[0][next_batch_ts_column_idx].clone();
let primary_keys_idx = insert_expr.primary_key_column_idx();
let column_list = format_columns(&insert_expr.columns);
let primary_keys_column_list = format_columns(&insert_expr.primary_key_columns());
let select_sql = format!(
"SELECT {} FROM {} WHERE {} >= {} AND {} < {} ORDER BY {};",
column_list,
table_ctx.name,
ts_column.name,
ts_value,
ts_column.name,
next_batch_ts,
primary_keys_column_list
);
info!("Executing sql: {select_sql}");
let fetched_rows = ctx.greptime.fetch_all(select_sql.as_str()).await.unwrap();
let mut expected_rows = replace_default(&insert_expr.values_list, table_ctx, insert_expr);
sort_by_primary_keys(&mut expected_rows, primary_keys_idx);
validator::row::assert_eq::<MySql>(&insert_expr.columns, &fetched_rows, &expected_rows)?;
}
let insert_expr = insert_exprs.last().unwrap();
let ts_column_idx = insert_expr.timestamp_column_idx().unwrap();
let ts_value = insert_expr.values_list[0][ts_column_idx].clone();
let primary_keys_idx = insert_expr.primary_key_column_idx();
let column_list = format_columns(&insert_expr.columns);
let primary_keys_column_list = format_columns(&insert_expr.primary_key_columns());
let select_sql = format!(
"SELECT {} FROM {} WHERE {} >= {} ORDER BY {};",
column_list, table_ctx.name, ts_column.name, ts_value, primary_keys_column_list
);
info!("Executing sql: {select_sql}");
let fetched_rows = ctx.greptime.fetch_all(select_sql.as_str()).await.unwrap();
let mut expected_rows = replace_default(&insert_expr.values_list, table_ctx, insert_expr);
sort_by_primary_keys(&mut expected_rows, primary_keys_idx);
validator::row::assert_eq::<MySql>(&insert_expr.columns, &fetched_rows, &expected_rows)?;
Ok(())
}
async fn migrate_all_regions<R: Rng + 'static>(
ctx: &FuzzContext,
tables: &[TableState],
rng: &mut R,
) -> Result<()> {
let datanodes = fetch_datanodes(ctx).await?;
let mut migrations = Vec::new();
for table in tables {
let partitions = fetch_partitions(&ctx.greptime, table.ctx.name.clone()).await?;
for partition in partitions {
let from_peer = partition.datanode_id;
let to_peer = choose_target_peer(&datanodes, from_peer, rng, || {
format!(
"failed to choose migration target, table: {}, region: {}, from_peer: {}",
table.ctx.name, partition.region_id, from_peer
)
})?;
migrations.push(Migration {
from_peer,
to_peer,
region_id: RegionId::from_u64(partition.region_id),
});
}
}
migrate_regions(ctx, &migrations).await
}
async fn fetch_datanodes(ctx: &FuzzContext) -> Result<Vec<u64>> {
let datanodes = fetch_nodes(&ctx.greptime)
.await?
.into_iter()
.filter(|node| node.peer_type == PEER_TYPE_DATANODE)
.map(|node| node.peer_id as u64)
.collect::<Vec<_>>();
ensure!(
datanodes.len() >= 2,
error::AssertSnafu {
reason: format!(
"remote WAL logical prune fuzz requires at least two datanodes, got {}",
datanodes.len()
)
}
);
Ok(datanodes)
}
fn choose_target_peer<R, F>(
datanodes: &[u64],
from_peer: u64,
rng: &mut R,
error_message: F,
) -> Result<u64>
where
R: Rng + 'static,
F: FnOnce() -> String,
{
datanodes
.iter()
.filter(|peer| **peer != from_peer)
.copied()
.collect::<Vec<_>>()
.choose(rng)
.copied()
.ok_or_else(|| {
error::AssertSnafu {
reason: error_message(),
}
.build()
})
}
async fn migrate_regions(ctx: &FuzzContext, migrations: &[Migration]) -> Result<()> {
let mut procedure_ids = Vec::with_capacity(migrations.len());
for migration in migrations {
procedure_ids.push(submit_migration(ctx, migration).await);
}
for (migration, procedure_id) in migrations.iter().zip(procedure_ids) {
wait_for_migration(ctx, migration, &procedure_id).await?;
}
if !migrations.is_empty() {
tokio::time::sleep(default_distributed_time_constants().region_lease).await;
}
Ok(())
}
async fn submit_migration(ctx: &FuzzContext, migration: &Migration) -> String {
let procedure_id = migrate_region(
&ctx.greptime,
migration.region_id.as_u64(),
migration.from_peer,
migration.to_peer,
MIGRATION_TIMEOUT_SECS,
)
.await;
info!(
"Migrating region: {:?}, procedure: {}",
migration, procedure_id
);
procedure_id
}
async fn wait_for_migration(
ctx: &FuzzContext,
migration: &Migration,
procedure_id: &str,
) -> Result<()> {
let region_id = migration.region_id.as_u64();
wait_condition_fn(
Duration::from_secs(MIGRATION_TIMEOUT_SECS),
|| {
let greptime = ctx.greptime.clone();
let procedure_id = procedure_id.to_string();
Box::pin(async move {
let output = procedure_state(&greptime, &procedure_id).await;
info!("Checking procedure: {procedure_id}, output: {output}");
(fetch_partition(&greptime, region_id).await.ok(), output)
})
},
|(partition, output)| {
if let Some(partition) = partition {
partition.datanode_id == migration.to_peer && output.contains("Done")
} else {
false
}
},
Duration::from_secs(5),
)
.await;
Ok(())
}
fn next_ts_base(input: FuzzInput, batch_idx: usize) -> i64 {
input.seed as i64 + (batch_idx as i64 * 1_000_000)
}
async fn run_activity_round<R: Rng + 'static>(
ctx: &FuzzContext,
input: FuzzInput,
tables: &mut [TableState],
batch_idx: &mut usize,
round: usize,
rng: &mut R,
) -> Result<()> {
info!("Start mixed activity round: {round}");
for table in tables.iter_mut() {
if table.profile.should_insert(rng) {
insert_table(ctx, input, table, next_ts_base(input, *batch_idx), rng).await?;
*batch_idx += 1;
if table.profile.should_flush(rng) {
flush_table(ctx, table).await?;
}
}
}
for table in tables.iter() {
match table.profile {
ActivityProfile::Cold | ActivityProfile::Dormant => validate_table(ctx, table).await?,
ActivityProfile::Hot | ActivityProfile::Warm if rng.random_bool(0.5) => {
validate_table(ctx, table).await?
}
ActivityProfile::Hot | ActivityProfile::Warm => {}
}
}
Ok(())
}
async fn execute_logical_prune_fuzz<W: WalAdmin>(
ctx: FuzzContext,
input: FuzzInput,
wal_admin: &W,
) -> Result<()> {
info!("input: {input:?}");
let mut rng = ChaChaRng::seed_from_u64(input.seed);
let mut tables = create_tables(&ctx, input, &mut rng).await?;
let mut batch_idx = 0;
for table in &mut tables {
insert_table(&ctx, input, table, next_ts_base(input, batch_idx), &mut rng).await?;
batch_idx += 1;
}
for table in &tables {
validate_table(&ctx, table).await?;
}
info!("Recording Kafka WAL latest offsets via WAL admin");
let delete_boundary_offsets = wal_admin.record_topic_latest_offsets().await?;
for offset in &delete_boundary_offsets {
info!(
"Recorded Kafka WAL delete boundary, topic: {}, partition: {}, offset: {}",
offset.topic, offset.partition, offset.offset
);
}
let delete_deadline =
tokio::time::Instant::now() + Duration::from_secs(input.delete_after_secs);
info!(
"Start foreground activity until DeleteRecords deadline, delete_after_secs: {}",
input.delete_after_secs
);
let mut round = 0;
while tokio::time::Instant::now() < delete_deadline {
run_activity_round(&ctx, input, &mut tables, &mut batch_idx, round, &mut rng).await?;
round += 1;
if input.rounds > 0 && round.is_multiple_of(input.rounds) {
info!(
"Completed configured activity round batch before DeleteRecords, rounds: {round}"
);
}
tokio::time::sleep(Duration::from_secs(ACTIVITY_TICK_SECS)).await;
}
for offset in &delete_boundary_offsets {
info!(
"Deleting Kafka WAL records to boundary, topic: {}, partition: {}, offset: {}",
offset.topic, offset.partition, offset.offset
);
}
wal_admin
.delete_records_to_offsets(&delete_boundary_offsets)
.await?;
for offset in &delete_boundary_offsets {
info!(
"Deleted Kafka WAL records to boundary, topic: {}, partition: {}, offset: {}",
offset.topic, offset.partition, offset.offset
);
}
migrate_all_regions(&ctx, &tables, &mut rng).await?;
for table in &tables {
validate_table(&ctx, table).await?;
}
for table in tables {
let sql = format!("DROP TABLE {}", table.ctx.name);
let result = sqlx::query(&sql)
.execute(&ctx.greptime)
.await
.context(error::ExecuteQuerySnafu { sql })?;
info!("Drop table: {}, result: {result:?}", table.ctx.name);
}
ctx.close().await;
Ok(())
}
fuzz_target!(|input: FuzzInput| {
common_telemetry::init_default_ut_logging();
common_runtime::block_on_global(async {
let Connections { mysql } = init_greptime_connections_via_env().await;
let ctx = FuzzContext {
greptime: mysql.expect("mysql connection init must be succeed"),
};
let wal_admin = wal_admin_from_env();
execute_logical_prune_fuzz(ctx, input, &wal_admin)
.await
.unwrap_or_else(|err| panic!("fuzz test must be succeed: {err:?}"));
})
});

141
tests-fuzz/utils/README.md Normal file
View File

@@ -0,0 +1,141 @@
# Fuzz Utilities
Utilities in this directory support fuzz tests and related CI/manual workflows.
## Kafka WAL Helper
`kafka_wal_helper` is a small HTTP service used by Remote WAL fuzz tests. It runs near Kafka, usually inside Kubernetes, and provides the minimum Kafka operations needed to simulate external WAL TTL deletion:
- read latest offsets for configured WAL topics;
- delete Kafka records up to previously recorded offsets.
This avoids exposing Kafka outside Kubernetes or relying on broker advertised listeners to be reachable from the fuzz runner.
Source and related files:
- `tests-fuzz/utils/kafka_wal_helper.rs` — HTTP helper binary.
- `docker/ci/ubuntu/Dockerfile.kafka-wal-helper` — lightweight runtime image.
- `.github/actions/setup-greptimedb-cluster/kafka-wal-helper.yaml` — Kubernetes Deployment and Service.
### Build the binary
Build the helper binary with the nightly profile used by fuzz targets:
```bash
cargo build --bin kafka_wal_helper --profile nightly
```
The binary is expected at:
```text
target/nightly/kafka_wal_helper
```
### Build the Docker image
The Dockerfile is runtime-only. Build the Rust binary first, then copy it into an architecture-specific directory expected by the image build.
For `amd64`:
```bash
mkdir -p amd64
cp target/nightly/kafka_wal_helper amd64/kafka_wal_helper
docker build \
-f docker/ci/ubuntu/Dockerfile.kafka-wal-helper \
--build-arg TARGETARCH=amd64 \
-t <registry>/tools/kafka_wal_helper:<tag> \
.
```
Push the image:
```bash
docker push <registry>/tools/kafka_wal_helper:<tag>
```
### Deploy to Kubernetes
Update `.github/actions/setup-greptimedb-cluster/kafka-wal-helper.yaml` to use the image you pushed:
```yaml
image: <registry>/tools/kafka_wal_helper:<tag>
```
Apply the helper manifest to the namespace where you want the helper to run:
```bash
kubectl -n kafka-cluster apply \
-f .github/actions/setup-greptimedb-cluster/kafka-wal-helper.yaml
```
Forward the helper service to a local port:
```bash
kubectl -n kafka-cluster port-forward svc/kafka-wal-helper 8080:8080
```
Then call the helper through `127.0.0.1:8080`.
### HTTP API
#### Health check
```bash
curl http://127.0.0.1:8080/health
```
#### Record latest offsets
```bash
curl -s http://127.0.0.1:8080/record-offsets \
-H 'content-type: application/json' \
-d '{
"broker_endpoints":["kafka-broker-0.kafka-broker-headless.test-env-kafka.svc.cluster.local:9092"],
"topic_prefix":"greptimedb_wal_topic",
"num_topics":3,
"partition":0
}'
```
Example response:
```json
{
"offsets": [
{"topic":"greptimedb_wal_topic_0","partition":0,"offset":123},
{"topic":"greptimedb_wal_topic_1","partition":0,"offset":456},
{"topic":"greptimedb_wal_topic_2","partition":0,"offset":789}
]
}
```
#### Delete records
Use offsets returned by `/record-offsets` as the deletion boundary. Do not delete to the current latest offsets after additional writes, otherwise the test may remove data that should remain replayable.
```bash
curl -s http://127.0.0.1:8080/delete-records \
-H 'content-type: application/json' \
-d '{
"broker_endpoints":["kafka-broker-0.kafka-broker-headless.test-env-kafka.svc.cluster.local:9092"],
"timeout_ms":5000,
"offsets":[
{"topic":"greptimedb_wal_topic_0","partition":0,"offset":123},
{"topic":"greptimedb_wal_topic_1","partition":0,"offset":456},
{"topic":"greptimedb_wal_topic_2","partition":0,"offset":789}
]
}'
```
Example response:
```json
{
"deleted": [
{"topic":"greptimedb_wal_topic_0","partition":0,"offset":123},
{"topic":"greptimedb_wal_topic_1","partition":0,"offset":456},
{"topic":"greptimedb_wal_topic_2","partition":0,"offset":789}
]
}
```

View File

@@ -0,0 +1,144 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::net::SocketAddr;
use axum::http::StatusCode;
use axum::routing::{get, post};
use axum::{Json, Router};
use clap::Parser;
use common_telemetry::info;
use rskafka::client::ClientBuilder;
use rskafka::client::partition::{OffsetAt, UnknownTopicHandling};
use serde::Serialize;
use tests_fuzz::utils::kafka_wal_http::{
DeleteRecordsRequest, DeleteRecordsResponse, RecordOffsetsRequest, RecordOffsetsResponse,
TopicOffset,
};
type Error = Box<dyn std::error::Error + Send + Sync>;
type Result<T> = std::result::Result<T, Error>;
const DEFAULT_DELETE_RECORDS_TIMEOUT_MS: i32 = 5000;
#[derive(Debug, Parser)]
#[command(about = "Kafka WAL helper for Remote WAL fuzz tests")]
struct Args {
/// HTTP listen address.
#[arg(long, default_value = "0.0.0.0:8080")]
addr: SocketAddr,
}
#[derive(Debug, Serialize)]
struct HealthResponse {
status: &'static str,
}
#[tokio::main]
async fn main() -> Result<()> {
common_telemetry::init_default_ut_logging();
let args = Args::parse();
serve(args.addr).await
}
async fn serve(addr: SocketAddr) -> Result<()> {
let app = Router::new()
.route("/health", get(health))
.route("/record-offsets", post(record_offsets_handler))
.route("/delete-records", post(delete_records_handler));
let listener = tokio::net::TcpListener::bind(addr).await?;
info!("Kafka WAL helper listening on {}", listener.local_addr()?);
axum::serve(listener, app).await?;
Ok(())
}
async fn health() -> Json<HealthResponse> {
Json(HealthResponse { status: "ok" })
}
async fn record_offsets_handler(
Json(request): Json<RecordOffsetsRequest>,
) -> HttpResult<RecordOffsetsResponse> {
record_offsets(request)
.await
.map(Json)
.map_err(to_http_error)
}
async fn delete_records_handler(
Json(request): Json<DeleteRecordsRequest>,
) -> HttpResult<DeleteRecordsResponse> {
delete_records(request)
.await
.map(Json)
.map_err(to_http_error)
}
type HttpResult<T> = std::result::Result<Json<T>, (StatusCode, String)>;
fn to_http_error(error: Error) -> (StatusCode, String) {
(StatusCode::INTERNAL_SERVER_ERROR, error.to_string())
}
async fn record_offsets(request: RecordOffsetsRequest) -> Result<RecordOffsetsResponse> {
let client = ClientBuilder::new(request.broker_endpoints).build().await?;
let mut offsets = Vec::with_capacity(request.num_topics);
for idx in 0..request.num_topics {
let topic = format!("{}_{}", request.topic_prefix, idx);
let partition_client = client
.partition_client(&topic, request.partition, UnknownTopicHandling::Retry)
.await?;
let offset = partition_client.get_offset(OffsetAt::Latest).await?;
info!(
"Recorded Kafka WAL offset, topic: {}, partition: {}, offset: {}",
topic, request.partition, offset
);
offsets.push(TopicOffset {
topic,
partition: request.partition,
offset,
});
}
Ok(RecordOffsetsResponse { offsets })
}
async fn delete_records(request: DeleteRecordsRequest) -> Result<DeleteRecordsResponse> {
let client = ClientBuilder::new(request.broker_endpoints).build().await?;
let timeout_ms = request
.timeout_ms
.unwrap_or(DEFAULT_DELETE_RECORDS_TIMEOUT_MS);
for offset in &request.offsets {
let partition_client = client
.partition_client(&offset.topic, offset.partition, UnknownTopicHandling::Retry)
.await?;
info!(
"Deleting Kafka WAL records, topic: {}, partition: {}, offset: {}",
offset.topic, offset.partition, offset.offset
);
partition_client
.delete_records(offset.offset, timeout_ms)
.await?;
info!(
"Deleted Kafka WAL records, topic: {}, partition: {}, offset: {}",
offset.topic, offset.partition, offset.offset
);
}
Ok(DeleteRecordsResponse {
deleted: request.offsets,
})
}