From fbdc35a109f8999059aece5eb69dd1bcbd52532b Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Thu, 18 Jun 2026 16:24:47 +0800 Subject: [PATCH] ci: add remote WAL logical pruning fuzz (#8307) * feat: configure remote wal checkpoint intervals Signed-off-by: WenyXu * feat: add logical pruning fuzz target Signed-off-by: WenyXu * ci: add remote WAL logical pruning fuzz Signed-off-by: WenyXu * chore: apply suggestions Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- .../kafka-wal-helper.yaml | 53 ++ .../with-remote-wal-logical-prune.yaml | 63 ++ .github/workflows/develop.yml | 47 +- Cargo.lock | 3 + docker/ci/ubuntu/Dockerfile.kafka-wal-helper | 18 + src/common/meta/src/region_registry.rs | 7 +- src/common/wal/src/config.rs | 7 + src/common/wal/src/config/kafka/common.rs | 6 + src/common/wal/src/config/kafka/datanode.rs | 8 +- src/common/wal/src/config/kafka/metasrv.rs | 13 +- src/log-store/src/kafka/log_store.rs | 8 +- src/meta-srv/src/metasrv/builder.rs | 2 + .../src/procedure/region_migration.rs | 6 +- .../src/procedure/wal_prune/manager.rs | 4 + src/meta-srv/src/region/flush_trigger.rs | 40 +- tests-fuzz/Cargo.toml | 17 + tests-fuzz/src/error.rs | 23 + tests-fuzz/src/utils.rs | 1 + tests-fuzz/src/utils/kafka_wal_http.rs | 165 +++++ .../wal/fuzz_remote_wal_logical_prune.rs | 647 ++++++++++++++++++ tests-fuzz/utils/README.md | 141 ++++ tests-fuzz/utils/kafka_wal_helper.rs | 144 ++++ 22 files changed, 1404 insertions(+), 19 deletions(-) create mode 100644 .github/actions/setup-greptimedb-cluster/kafka-wal-helper.yaml create mode 100644 .github/actions/setup-greptimedb-cluster/with-remote-wal-logical-prune.yaml create mode 100644 docker/ci/ubuntu/Dockerfile.kafka-wal-helper create mode 100644 tests-fuzz/src/utils/kafka_wal_http.rs create mode 100644 tests-fuzz/targets/wal/fuzz_remote_wal_logical_prune.rs create mode 100644 tests-fuzz/utils/README.md create mode 100644 tests-fuzz/utils/kafka_wal_helper.rs diff --git a/.github/actions/setup-greptimedb-cluster/kafka-wal-helper.yaml b/.github/actions/setup-greptimedb-cluster/kafka-wal-helper.yaml new file mode 100644 index 0000000000..268195ae6f --- /dev/null +++ b/.github/actions/setup-greptimedb-cluster/kafka-wal-helper.yaml @@ -0,0 +1,53 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: kafka-wal-helper + labels: + app: kafka-wal-helper +spec: + replicas: 1 + selector: + matchLabels: + app: kafka-wal-helper + template: + metadata: + labels: + app: kafka-wal-helper + spec: + containers: + - name: kafka-wal-helper + image: localhost:5001/greptime/kafka-wal-helper:latest + imagePullPolicy: IfNotPresent + args: + - "--addr" + - "0.0.0.0:8080" + ports: + - name: http + containerPort: 8080 + readinessProbe: + httpGet: + path: /health + port: http + initialDelaySeconds: 2 + periodSeconds: 5 + livenessProbe: + httpGet: + path: /health + port: http + initialDelaySeconds: 5 + periodSeconds: 10 +--- +apiVersion: v1 +kind: Service +metadata: + name: kafka-wal-helper + labels: + app: kafka-wal-helper +spec: + type: ClusterIP + selector: + app: kafka-wal-helper + ports: + - name: http + port: 8080 + targetPort: http diff --git a/.github/actions/setup-greptimedb-cluster/with-remote-wal-logical-prune.yaml b/.github/actions/setup-greptimedb-cluster/with-remote-wal-logical-prune.yaml new file mode 100644 index 0000000000..60c23def6e --- /dev/null +++ b/.github/actions/setup-greptimedb-cluster/with-remote-wal-logical-prune.yaml @@ -0,0 +1,63 @@ +logging: + level: "info" + format: "json" + filters: + - log_store=debug +meta: + configData: |- + [runtime] + global_rt_size = 4 + + [wal] + provider = "kafka" + broker_endpoints = ["kafka.kafka-cluster.svc.cluster.local:9092"] + num_topics = 3 + auto_prune_interval = "30s" + auto_prune_parallelism = 3 + auto_prune_logical_delete = true + trigger_flush_threshold = 100 + region_flush_trigger_interval = "30s" + periodic_checkpoint_persist_interval = "60s" + flush_trigger_size = "1MB" + checkpoint_trigger_size = "1MB" + + [datanode] + [datanode.client] + timeout = "120s" +datanode: + configData: |- + [runtime] + global_rt_size = 4 + compact_rt_size = 2 + + [wal] + provider = "kafka" + broker_endpoints = ["kafka.kafka-cluster.svc.cluster.local:9092"] + num_topics = 3 + topic_latest_offset_fetch_interval = "30s" + overwrite_entry_start_id = false + + [[region_engine]] + [region_engine.mito] + auto_flush_interval = "30s" +frontend: + configData: |- + [runtime] + global_rt_size = 4 + + [meta_client] + ddl_timeout = "120s" +objectStorage: + s3: + bucket: default + region: us-west-2 + root: test-root + endpoint: http://minio.minio.svc.cluster.local + credentials: + accessKeyId: rootuser + secretAccessKey: rootpass123 +remoteWal: + enabled: true + kafka: + brokerEndpoints: + - "kafka.kafka-cluster.svc.cluster.local:9092" diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index 65546dcc25..3aa226398c 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -329,6 +329,13 @@ jobs: minio: true kafka: false values: "with-minio-repartition-gc.yaml" + - target: "fuzz_remote_wal_logical_prune" + mode: + name: "Remote WAL logical pruning" + minio: true + kafka: true + values: "with-remote-wal-logical-prune.yaml" + kafkaWalHelper: true steps: - name: Remove unused software run: | @@ -378,6 +385,24 @@ jobs: rm ./bin.tar.gz - name: Build and push GreptimeDB image uses: ./.github/actions/build-and-push-ci-image + - if: matrix.mode.kafkaWalHelper + name: Build Kafka WAL helper binary + shell: bash + run: | + cargo build --bin kafka_wal_helper + mkdir -p amd64 + cp target/debug/kafka_wal_helper amd64/kafka_wal_helper + - if: matrix.mode.kafkaWalHelper + name: Build and push Kafka WAL helper image + uses: docker/build-push-action@v5 + with: + context: . + file: ./docker/ci/ubuntu/Dockerfile.kafka-wal-helper + push: true + tags: localhost:5001/greptime/kafka-wal-helper:latest + build-args: | + TARGETARCH=amd64 + TARGET_BIN=kafka_wal_helper - name: Wait for etcd run: | kubectl wait \ @@ -401,6 +426,17 @@ jobs: pod -l app.kubernetes.io/instance=kafka \ --timeout=120s \ -n kafka-cluster + - if: matrix.mode.kafkaWalHelper + name: Setup Kafka WAL helper + shell: bash + run: | + kubectl -n kafka-cluster apply \ + -f .github/actions/setup-greptimedb-cluster/kafka-wal-helper.yaml + + kubectl -n kafka-cluster wait \ + --for=condition=Ready \ + pod -l app=kafka-wal-helper \ + --timeout=120s - name: Print etcd info shell: bash run: kubectl get all --show-labels -n etcd-cluster @@ -413,14 +449,23 @@ jobs: - name: Port forward (mysql) run: | kubectl port-forward service/my-greptimedb-frontend 4002:4002 -n my-greptimedb& + - if: matrix.mode.kafkaWalHelper + name: Port forward Kafka WAL helper + run: | + kubectl port-forward service/kafka-wal-helper 8080:8080 -n kafka-cluster& - name: Fuzz Test uses: ./.github/actions/fuzz-test env: CUSTOM_LIBFUZZER_PATH: /usr/lib/llvm-14/lib/libFuzzer.a GT_MYSQL_ADDR: 127.0.0.1:4002 + GT_KAFKA_WAL_HELPER_URL: http://127.0.0.1:8080 + GT_KAFKA_ADDR: kafka.kafka-cluster.svc.cluster.local:9092 + GT_WAL_TOPIC_PREFIX: greptimedb_wal_topic + GT_WAL_NUM_TOPICS: 3 + GT_FUZZ_OVERRIDE_DELETE_AFTER_SECS: 300 with: target: ${{ matrix.target }} - max-total-time: 120 + max-total-time: ${{ matrix.mode.kafkaWalHelper && 600 || 120 }} - name: Describe Nodes if: failure() shell: bash diff --git a/Cargo.lock b/Cargo.lock index 73687696f4..c49b03a9b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14546,7 +14546,9 @@ version = "1.1.0" dependencies = [ "arbitrary", "async-trait", + "axum 0.8.4", "chrono", + "clap", "common-base", "common-error", "common-macro", @@ -14570,6 +14572,7 @@ dependencies = [ "rand 0.9.4", "rand_chacha 0.9.0", "reqwest 0.13.4", + "rskafka", "rustls", "schemars", "serde", diff --git a/docker/ci/ubuntu/Dockerfile.kafka-wal-helper b/docker/ci/ubuntu/Dockerfile.kafka-wal-helper new file mode 100644 index 0000000000..4b94eff55d --- /dev/null +++ b/docker/ci/ubuntu/Dockerfile.kafka-wal-helper @@ -0,0 +1,18 @@ +FROM ubuntu:24.04 + +ARG TARGETARCH +ARG TARGET_BIN=kafka_wal_helper + +RUN apt-get update && \ + apt-get install -y --no-install-recommends ca-certificates && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + +ADD ${TARGETARCH}/${TARGET_BIN} /greptime/bin/${TARGET_BIN} + +ENV PATH=/greptime/bin/:$PATH + +EXPOSE 8080 + +ENTRYPOINT ["/greptime/bin/kafka_wal_helper"] +CMD ["--addr", "0.0.0.0:8080"] diff --git a/src/common/meta/src/region_registry.rs b/src/common/meta/src/region_registry.rs index f1741b281b..8bef3f2d38 100644 --- a/src/common/meta/src/region_registry.rs +++ b/src/common/meta/src/region_registry.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use std::collections::hash_map::Entry; use std::sync::{Arc, RwLock}; -use common_telemetry::warn; +use common_telemetry::{debug, warn}; use store_api::storage::RegionId; use crate::datanode::{RegionManifestInfo, RegionStat}; @@ -241,6 +241,11 @@ impl LeaderRegionRegistry { leader_region.manifest.manifest_version() ); } else { + debug!( + "Updating leader region for region {}, pruned entry id: {}", + region_id, + leader_region.manifest.prunable_entry_id(), + ); entry.insert(leader_region); } } diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs index d31040e6e1..017eb56e33 100644 --- a/src/common/wal/src/config.rs +++ b/src/common/wal/src/config.rs @@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize}; use crate::config::kafka::common::{ DEFAULT_AUTO_PRUNE_INTERVAL, DEFAULT_AUTO_PRUNE_LOGICAL_DELETE, DEFAULT_AUTO_PRUNE_PARALLELISM, DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE, + DEFAULT_PERIODIC_CHECKPOINT_PERSIST_INTERVAL, DEFAULT_REGION_FLUSH_TRIGGER_INTERVAL, }; use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig}; use crate::config::raft_engine::RaftEngineConfig; @@ -73,6 +74,10 @@ impl TryFrom for MetasrvWalConfig { flush_trigger_size: DEFAULT_FLUSH_TRIGGER_SIZE, // This field won't be used in standalone mode checkpoint_trigger_size: DEFAULT_CHECKPOINT_TRIGGER_SIZE, + // This field won't be used in standalone mode + region_flush_trigger_interval: DEFAULT_REGION_FLUSH_TRIGGER_INTERVAL, + // This field won't be used in standalone mode + periodic_checkpoint_persist_interval: DEFAULT_PERIODIC_CHECKPOINT_PERSIST_INTERVAL, })), DatanodeWalConfig::Noop => UnsupportedWalProviderSnafu { provider: "noop".to_string(), @@ -225,6 +230,8 @@ mod tests { auto_prune_parallelism: 10, flush_trigger_size: ReadableSize::mb(512), checkpoint_trigger_size: ReadableSize::mb(128), + region_flush_trigger_interval: Duration::from_secs(60), + periodic_checkpoint_persist_interval: Duration::from_secs(60 * 60), }; assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected)); diff --git a/src/common/wal/src/config/kafka/common.rs b/src/common/wal/src/config/kafka/common.rs index ab78e3b5e7..8840197086 100644 --- a/src/common/wal/src/config/kafka/common.rs +++ b/src/common/wal/src/config/kafka/common.rs @@ -46,6 +46,12 @@ pub const DEFAULT_AUTO_PRUNE_PARALLELISM: usize = 10; pub const DEFAULT_FLUSH_TRIGGER_SIZE: ReadableSize = ReadableSize::mb(512); /// Default checkpoint trigger size. pub const DEFAULT_CHECKPOINT_TRIGGER_SIZE: ReadableSize = ReadableSize::mb(128); +/// Default interval for remote WAL region flush trigger. +pub const DEFAULT_REGION_FLUSH_TRIGGER_INTERVAL: Duration = Duration::from_secs(60); +/// Default interval to periodically persist remote WAL checkpoints. +pub const DEFAULT_PERIODIC_CHECKPOINT_PERSIST_INTERVAL: Duration = Duration::from_secs(60 * 60); +/// Default interval for fetching latest Kafka topic offsets. +pub const DEFAULT_TOPIC_LATEST_OFFSET_FETCH_INTERVAL: Duration = Duration::from_secs(60); use crate::error::{self, Result}; use crate::{BROKER_ENDPOINT, TOPIC_NAME_PREFIX, TopicSelectorType}; diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs index 77cf05397d..374a7229d4 100644 --- a/src/common/wal/src/config/kafka/datanode.rs +++ b/src/common/wal/src/config/kafka/datanode.rs @@ -17,7 +17,9 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; use serde::{Deserialize, Serialize}; -use crate::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig}; +use crate::config::kafka::common::{ + DEFAULT_TOPIC_LATEST_OFFSET_FETCH_INTERVAL, KafkaConnectionConfig, KafkaTopicConfig, +}; /// Kafka wal configurations for datanode. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -42,6 +44,9 @@ pub struct DatanodeKafkaConfig { pub create_index: bool, #[serde(with = "humantime_serde")] pub dump_index_interval: Duration, + /// Internal interval for fetching latest Kafka topic offsets. + #[serde(with = "humantime_serde")] + pub topic_latest_offset_fetch_interval: Duration, /// Ignore missing entries during read WAL. pub overwrite_entry_start_id: bool, } @@ -57,6 +62,7 @@ impl Default for DatanodeKafkaConfig { auto_create_topics: true, create_index: true, dump_index_interval: Duration::from_secs(60), + topic_latest_offset_fetch_interval: DEFAULT_TOPIC_LATEST_OFFSET_FETCH_INTERVAL, overwrite_entry_start_id: false, } } diff --git a/src/common/wal/src/config/kafka/metasrv.rs b/src/common/wal/src/config/kafka/metasrv.rs index f840ce6026..f101e8a271 100644 --- a/src/common/wal/src/config/kafka/metasrv.rs +++ b/src/common/wal/src/config/kafka/metasrv.rs @@ -19,8 +19,9 @@ use serde::{Deserialize, Serialize}; use crate::config::kafka::common::{ DEFAULT_AUTO_PRUNE_INTERVAL, DEFAULT_AUTO_PRUNE_LOGICAL_DELETE, DEFAULT_AUTO_PRUNE_PARALLELISM, - DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE, KafkaConnectionConfig, - KafkaTopicConfig, + DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE, + DEFAULT_PERIODIC_CHECKPOINT_PERSIST_INTERVAL, DEFAULT_REGION_FLUSH_TRIGGER_INTERVAL, + KafkaConnectionConfig, KafkaTopicConfig, }; /// Kafka wal configurations for metasrv. @@ -46,6 +47,12 @@ pub struct MetasrvKafkaConfig { pub flush_trigger_size: ReadableSize, // The checkpoint trigger size. pub checkpoint_trigger_size: ReadableSize, + /// Internal interval of remote WAL region flush trigger. + #[serde(with = "humantime_serde")] + pub region_flush_trigger_interval: Duration, + /// Internal interval to periodically persist remote WAL checkpoints. + #[serde(with = "humantime_serde")] + pub periodic_checkpoint_persist_interval: Duration, } impl Default for MetasrvKafkaConfig { @@ -59,6 +66,8 @@ impl Default for MetasrvKafkaConfig { auto_prune_parallelism: DEFAULT_AUTO_PRUNE_PARALLELISM, flush_trigger_size: DEFAULT_FLUSH_TRIGGER_SIZE, checkpoint_trigger_size: DEFAULT_CHECKPOINT_TRIGGER_SIZE, + region_flush_trigger_interval: DEFAULT_REGION_FLUSH_TRIGGER_INTERVAL, + periodic_checkpoint_persist_interval: DEFAULT_PERIODIC_CHECKPOINT_PERSIST_INTERVAL, } } } diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index e7fd06816d..01484a7d90 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -47,8 +47,6 @@ use crate::kafka::util::record::{ }; use crate::metrics; -const DEFAULT_OFFSET_FETCH_INTERVAL: Duration = Duration::from_secs(60); - /// Statistics for a topic. #[derive(Debug, Clone, Copy, Default)] pub struct TopicStat { @@ -152,8 +150,10 @@ impl KafkaLogStore { let client_manager = Arc::new( ClientManager::try_new(config, global_index_collector, topic_stats.clone()).await?, ); - let fetcher = - PeriodicOffsetFetcher::new(DEFAULT_OFFSET_FETCH_INTERVAL, client_manager.clone()); + let fetcher = PeriodicOffsetFetcher::new( + config.topic_latest_offset_fetch_interval, + client_manager.clone(), + ); fetcher.run().await; Ok(Self { diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index e842c96644..52ee3f3522 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -467,6 +467,8 @@ impl MetasrvBuilder { options.grpc.server_addr.clone(), remote_wal_options.flush_trigger_size, remote_wal_options.checkpoint_trigger_size, + remote_wal_options.region_flush_trigger_interval, + remote_wal_options.periodic_checkpoint_persist_interval, ); region_flush_trigger.try_start()?; diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 079ca2930a..7299513bf6 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -52,7 +52,7 @@ use common_procedure::error::{ use common_procedure::{ Context as ProcedureContext, LockKey, Procedure, Status, StringKey, UserMetadata, }; -use common_telemetry::{error, info}; +use common_telemetry::{debug, error, info}; use manager::RegionMigrationProcedureGuard; pub use manager::{ RegionMigrationManagerRef, RegionMigrationProcedureTask, RegionMigrationProcedureTracker, @@ -708,6 +708,10 @@ impl Context { .batch_get(topic_name_keys) .await .context(error::TableMetadataManagerSnafu)?; + debug!( + "Fetched topic region values: {:?}, topic name values: {:?}", + topic_region_values, topic_name_values + ); let replay_checkpoints = region_topics .iter() diff --git a/src/meta-srv/src/procedure/wal_prune/manager.rs b/src/meta-srv/src/procedure/wal_prune/manager.rs index d99ac7bb22..e398be93e4 100644 --- a/src/meta-srv/src/procedure/wal_prune/manager.rs +++ b/src/meta-srv/src/procedure/wal_prune/manager.rs @@ -242,6 +242,10 @@ impl WalPruneManager { .await .context(error::TableMetadataManagerSnafu)? .map(|v| v.into_inner().pruned_entry_id); + debug!( + "Found prunable entry id {} for topic {}, current pruned entry id: {:?}", + prunable_entry_id, topic_name, current + ); if !should_trigger_prune(current, prunable_entry_id) { debug!( "No need to prune topic {}, current pruned entry id: {:?}, prunable entry id: {}", diff --git a/src/meta-srv/src/region/flush_trigger.rs b/src/meta-srv/src/region/flush_trigger.rs index 3d3f210e5e..24d2d6d19d 100644 --- a/src/meta-srv/src/region/flush_trigger.rs +++ b/src/meta-srv/src/region/flush_trigger.rs @@ -29,6 +29,7 @@ use common_telemetry::{debug, error, info, warn}; use common_time::util::current_time_millis; use common_wal::config::kafka::common::{ DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE, + DEFAULT_PERIODIC_CHECKPOINT_PERSIST_INTERVAL, DEFAULT_REGION_FLUSH_TRIGGER_INTERVAL, }; use snafu::{OptionExt, ResultExt}; use store_api::region_request::RegionFlushReason; @@ -39,15 +40,9 @@ use crate::error::{self, Result}; use crate::service::mailbox::{Channel, MailboxRef}; use crate::{define_ticker, metrics}; -/// The interval of the region flush ticker. -const TICKER_INTERVAL: Duration = Duration::from_secs(60); - /// The duration of the recent period. const RECENT_DURATION: Duration = Duration::from_secs(300); -/// The interval to periodically persist region checkpoints regardless of replay size. -const PERIODIC_CHECKPOINT_PERSIST_INTERVAL: Duration = Duration::from_mins(60); - /// [`Event`] represents various types of events that can be processed by the region flush ticker. /// /// Variants: @@ -87,6 +82,10 @@ pub struct RegionFlushTrigger { flush_trigger_size: ReadableSize, /// The checkpoint trigger size. checkpoint_trigger_size: ReadableSize, + /// The interval of the region flush trigger. + region_flush_trigger_interval: Duration, + /// The interval to periodically persist region checkpoints regardless of replay size. + periodic_checkpoint_persist_interval: Duration, /// The last timestamp in milliseconds when a region checkpoint was persisted. last_checkpoint_persist_millis_by_region: HashMap, /// The receiver of events. @@ -95,6 +94,7 @@ pub struct RegionFlushTrigger { impl RegionFlushTrigger { /// Creates a new [`RegionFlushTrigger`]. + #[allow(clippy::too_many_arguments)] pub(crate) fn new( table_metadata_manager: TableMetadataManagerRef, leader_region_registry: LeaderRegionRegistryRef, @@ -103,6 +103,8 @@ impl RegionFlushTrigger { server_addr: String, mut flush_trigger_size: ReadableSize, mut checkpoint_trigger_size: ReadableSize, + mut region_flush_trigger_interval: Duration, + mut periodic_checkpoint_persist_interval: Duration, ) -> (Self, RegionFlushTicker) { if flush_trigger_size.as_bytes() == 0 { flush_trigger_size = DEFAULT_FLUSH_TRIGGER_SIZE; @@ -118,8 +120,22 @@ impl RegionFlushTrigger { checkpoint_trigger_size ); } + if region_flush_trigger_interval.is_zero() { + region_flush_trigger_interval = DEFAULT_REGION_FLUSH_TRIGGER_INTERVAL; + warn!( + "region_flush_trigger_interval is not set, using default value: {:?}", + region_flush_trigger_interval + ); + } + if periodic_checkpoint_persist_interval.is_zero() { + periodic_checkpoint_persist_interval = DEFAULT_PERIODIC_CHECKPOINT_PERSIST_INTERVAL; + warn!( + "periodic_checkpoint_persist_interval is not set, using default value: {:?}", + periodic_checkpoint_persist_interval + ); + } let (tx, rx) = Self::channel(); - let region_flush_ticker = RegionFlushTicker::new(TICKER_INTERVAL, tx); + let region_flush_ticker = RegionFlushTicker::new(region_flush_trigger_interval, tx); let region_flush_trigger = Self { table_metadata_manager, leader_region_registry, @@ -128,6 +144,8 @@ impl RegionFlushTrigger { server_addr, flush_trigger_size, checkpoint_trigger_size, + region_flush_trigger_interval, + periodic_checkpoint_persist_interval, last_checkpoint_persist_millis_by_region: HashMap::new(), receiver: rx, }; @@ -230,7 +248,7 @@ impl RegionFlushTrigger { topic_regions.keys().copied(), &self.last_checkpoint_persist_millis_by_region, now_millis, - PERIODIC_CHECKPOINT_PERSIST_INTERVAL, + self.periodic_checkpoint_persist_interval, ); let regions_to_persist = merge_region_ids(size_based_regions, periodic_regions); let region_manifests = self @@ -284,7 +302,7 @@ impl RegionFlushTrigger { let Some(stat) = self .topic_stats_registry - .get_calculated_topic_stat(topic, TICKER_INTERVAL) + .get_calculated_topic_stat(topic, self.region_flush_trigger_interval) else { debug!("No topic stat found for topic: {}", topic); return None; @@ -992,6 +1010,8 @@ mod tests { "127.0.0.1:3002".to_string(), ReadableSize(1), ReadableSize(1), + DEFAULT_REGION_FLUSH_TRIGGER_INTERVAL, + DEFAULT_PERIODIC_CHECKPOINT_PERSIST_INTERVAL, ); let topic = "test_topic"; @@ -1064,6 +1084,8 @@ mod tests { "127.0.0.1:3002".to_string(), ReadableSize(1), ReadableSize(1), + DEFAULT_REGION_FLUSH_TRIGGER_INTERVAL, + DEFAULT_PERIODIC_CHECKPOINT_PERSIST_INTERVAL, ); let topic = "test_topic".to_string(); diff --git a/tests-fuzz/Cargo.toml b/tests-fuzz/Cargo.toml index a38a4a13f8..2cd66b8afe 100644 --- a/tests-fuzz/Cargo.toml +++ b/tests-fuzz/Cargo.toml @@ -17,7 +17,9 @@ unstable = ["nix"] [dependencies] arbitrary = { version = "1.3.0", features = ["derive"] } async-trait = { workspace = true } +axum.workspace = true chrono = { workspace = true } +clap.workspace = true common-base = { workspace = true } common-error = { workspace = true } common-macro = { workspace = true } @@ -46,6 +48,7 @@ paste.workspace = true rand = { workspace = true } rand_chacha = "0.9" reqwest = { workspace = true } +rskafka.workspace = true rustls = { workspace = true, default-features = false, features = ["aws_lc_rs", "std", "tls12"] } schemars = "0.8" serde = { workspace = true } @@ -171,3 +174,17 @@ path = "targets/migration/fuzz_migrate_metric_regions.rs" test = false bench = false doc = false + +[[bin]] +name = "fuzz_remote_wal_logical_prune" +path = "targets/wal/fuzz_remote_wal_logical_prune.rs" +test = false +bench = false +doc = false + +[[bin]] +name = "kafka_wal_helper" +path = "utils/kafka_wal_helper.rs" +test = false +bench = false +doc = false diff --git a/tests-fuzz/src/error.rs b/tests-fuzz/src/error.rs index 10259099a3..4bde05cb53 100644 --- a/tests-fuzz/src/error.rs +++ b/tests-fuzz/src/error.rs @@ -81,6 +81,29 @@ pub enum Error { location: Location, }, + #[snafu(display("Kafka WAL helper request failed, operation: {}", operation))] + KafkaWalHelperRequest { + operation: String, + #[snafu(source)] + error: reqwest::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "Kafka WAL helper returned non-success status, operation: {}, status: {}, body: {}", + operation, + status, + body + ))] + KafkaWalHelperStatus { + operation: String, + status: reqwest::StatusCode, + body: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to assert: {}", reason))] Assert { reason: String, diff --git a/tests-fuzz/src/utils.rs b/tests-fuzz/src/utils.rs index 2c5717bb71..4b057d1dd6 100644 --- a/tests-fuzz/src/utils.rs +++ b/tests-fuzz/src/utils.rs @@ -18,6 +18,7 @@ pub mod crd; /// CSV dump writer utilities for fuzz tests. pub mod csv_dump_writer; pub mod health; +pub mod kafka_wal_http; pub mod migration; pub mod network_chaos; pub mod partition; diff --git a/tests-fuzz/src/utils/kafka_wal_http.rs b/tests-fuzz/src/utils/kafka_wal_http.rs new file mode 100644 index 0000000000..2540e0f281 --- /dev/null +++ b/tests-fuzz/src/utils/kafka_wal_http.rs @@ -0,0 +1,165 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; + +use crate::error::{self, Result}; + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct TopicOffset { + pub topic: String, + pub partition: i32, + pub offset: i64, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct RecordOffsetsRequest { + pub broker_endpoints: Vec, + pub topic_prefix: String, + pub num_topics: usize, + #[serde(default)] + pub partition: i32, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct DeleteRecordsRequest { + pub broker_endpoints: Vec, + pub offsets: Vec, + pub timeout_ms: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct RecordOffsetsResponse { + pub offsets: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct DeleteRecordsResponse { + pub deleted: Vec, +} + +#[async_trait] +pub trait WalAdmin: Send + Sync { + async fn record_topic_latest_offsets(&self) -> Result>; + + async fn delete_records_to_offsets(&self, offsets: &[TopicOffset]) -> Result<()>; +} + +pub struct HttpWalAdmin { + client: reqwest::Client, + base_url: String, + broker_endpoints: Vec, + topic_prefix: String, + num_topics: usize, + partition: i32, + delete_records_timeout_ms: i32, +} + +pub struct HttpWalAdminConfig { + pub base_url: String, + pub broker_endpoints: Vec, + pub topic_prefix: String, + pub num_topics: usize, + pub partition: i32, + pub delete_records_timeout_ms: i32, +} + +impl HttpWalAdmin { + pub fn new(config: HttpWalAdminConfig) -> Self { + Self { + client: reqwest::Client::new(), + base_url: config.base_url, + broker_endpoints: config.broker_endpoints, + topic_prefix: config.topic_prefix, + num_topics: config.num_topics, + partition: config.partition, + delete_records_timeout_ms: config.delete_records_timeout_ms, + } + } + + fn endpoint(&self, path: &str) -> String { + format!("{}{}", self.base_url.trim_end_matches('/'), path) + } +} + +#[async_trait] +impl WalAdmin for HttpWalAdmin { + async fn record_topic_latest_offsets(&self) -> Result> { + let request = RecordOffsetsRequest { + broker_endpoints: self.broker_endpoints.clone(), + topic_prefix: self.topic_prefix.clone(), + num_topics: self.num_topics, + partition: self.partition, + }; + let response = self + .client + .post(self.endpoint("/record-offsets")) + .json(&request) + .send() + .await + .context(error::KafkaWalHelperRequestSnafu { + operation: "record-offsets".to_string(), + })?; + ensure_success(response, "record-offsets") + .await? + .json::() + .await + .context(error::KafkaWalHelperRequestSnafu { + operation: "record-offsets".to_string(), + }) + .map(|response| response.offsets) + } + + async fn delete_records_to_offsets(&self, offsets: &[TopicOffset]) -> Result<()> { + let request = DeleteRecordsRequest { + broker_endpoints: self.broker_endpoints.clone(), + offsets: offsets.to_vec(), + timeout_ms: Some(self.delete_records_timeout_ms), + }; + let response = self + .client + .post(self.endpoint("/delete-records")) + .json(&request) + .send() + .await + .context(error::KafkaWalHelperRequestSnafu { + operation: "delete-records".to_string(), + })?; + ensure_success(response, "delete-records") + .await? + .json::() + .await + .context(error::KafkaWalHelperRequestSnafu { + operation: "delete-records".to_string(), + }) + .map(|_| ()) + } +} + +async fn ensure_success(response: reqwest::Response, operation: &str) -> Result { + let status = response.status(); + if status.is_success() { + return Ok(response); + } + + let body = response.text().await.unwrap_or_default(); + error::KafkaWalHelperStatusSnafu { + operation: operation.to_string(), + status, + body, + } + .fail() +} diff --git a/tests-fuzz/targets/wal/fuzz_remote_wal_logical_prune.rs b/tests-fuzz/targets/wal/fuzz_remote_wal_logical_prune.rs new file mode 100644 index 0000000000..7545f4a4a4 --- /dev/null +++ b/tests-fuzz/targets/wal/fuzz_remote_wal_logical_prune.rs @@ -0,0 +1,647 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![no_main] + +use std::env; +use std::sync::Arc; +use std::time::Duration; + +use arbitrary::{Arbitrary, Unstructured}; +use common_meta::distributed_time_constants::default_distributed_time_constants; +use common_telemetry::info; +use libfuzzer_sys::fuzz_target; +use rand::seq::IndexedRandom; +use rand::{Rng, SeedableRng}; +use rand_chacha::ChaChaRng; +use snafu::{ResultExt, ensure}; +use sqlx::{Executor, MySql, Pool}; +use store_api::storage::RegionId; +use tests_fuzz::context::{TableContext, TableContextRef}; +use tests_fuzz::error::{self, Result}; +use tests_fuzz::fake::{ + MappedGenerator, WordGenerator, merge_two_word_map_fn, random_capitalize_map, + uppercase_and_keyword_backtick_map, +}; +use tests_fuzz::generator::Generator; +use tests_fuzz::generator::create_expr::CreateTableExprGeneratorBuilder; +use tests_fuzz::generator::insert_expr::InsertExprGeneratorBuilder; +use tests_fuzz::ir::{ + CreateTableExpr, Ident, InsertIntoExpr, MySQLTsColumnTypeGenerator, format_columns, + generate_random_value, generate_unique_timestamp_for_mysql, replace_default, + sort_by_primary_keys, +}; +use tests_fuzz::translator::DslTranslator; +use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator; +use tests_fuzz::translator::mysql::insert_expr::InsertIntoExprTranslator; +use tests_fuzz::utils::cluster_info::{PEER_TYPE_DATANODE, fetch_nodes}; +use tests_fuzz::utils::kafka_wal_http::{HttpWalAdmin, HttpWalAdminConfig, WalAdmin}; +use tests_fuzz::utils::migration::migrate_region; +use tests_fuzz::utils::partition::{fetch_partition, fetch_partitions}; +use tests_fuzz::utils::procedure::procedure_state; +use tests_fuzz::utils::wait::wait_condition_fn; +use tests_fuzz::utils::{ + Connections, flush_memtable, get_fuzz_override, get_gt_fuzz_input_max_rows, + get_gt_fuzz_input_max_tables, init_greptime_connections_via_env, +}; +use tests_fuzz::validator; + +const MIN_TABLES: usize = 4; +const MAX_PARTITIONS: usize = 8; +const MIN_DELETE_AFTER_SECS: u64 = 180; +const DEFAULT_DELETE_AFTER_SECS: u64 = 360; +const MIGRATION_TIMEOUT_SECS: u64 = 240; +const ACTIVITY_TICK_SECS: u64 = 5; +const DEFAULT_KAFKA_ADDR: &str = "127.0.0.1:9092"; +const DEFAULT_KAFKA_WAL_HELPER_URL: &str = "http://127.0.0.1:8080"; +const DEFAULT_WAL_TOPIC_PREFIX: &str = "greptimedb_wal_topic"; +const DEFAULT_WAL_NUM_TOPICS: usize = 3; +const DEFAULT_WAL_PARTITION: i32 = 0; +const DEFAULT_DELETE_RECORDS_TIMEOUT_MS: i32 = 5000; + +struct FuzzContext { + greptime: Pool, +} + +impl FuzzContext { + async fn close(self) { + self.greptime.close().await; + } +} + +#[derive(Copy, Clone, Debug)] +struct FuzzInput { + seed: u64, + tables: usize, + partitions: usize, + rows_per_insert: usize, + rounds: usize, + delete_after_secs: u64, +} + +impl Arbitrary<'_> for FuzzInput { + fn arbitrary(u: &mut Unstructured<'_>) -> arbitrary::Result { + let seed = get_fuzz_override::("SEED").unwrap_or(u.int_in_range(u64::MIN..=u64::MAX)?); + let mut rng = ChaChaRng::seed_from_u64(seed); + + let max_tables = get_gt_fuzz_input_max_tables().max(MIN_TABLES); + let tables = get_fuzz_override::("TABLES") + .unwrap_or_else(|| rng.random_range(MIN_TABLES..=max_tables)); + let partitions = get_fuzz_override::("PARTITIONS") + .unwrap_or_else(|| rng.random_range(1..=MAX_PARTITIONS)); + let max_rows = get_gt_fuzz_input_max_rows().max(32); + let rows_per_insert = + get_fuzz_override::("ROWS").unwrap_or_else(|| rng.random_range(32..=max_rows)); + let rounds = + get_fuzz_override::("ROUNDS").unwrap_or_else(|| rng.random_range(2..=4)); + let delete_after_secs = get_fuzz_override::("DELETE_AFTER_SECS") + .or_else(|| get_fuzz_override::("RETENTION_WAIT_SECS")) + .unwrap_or(DEFAULT_DELETE_AFTER_SECS) + .max(MIN_DELETE_AFTER_SECS); + + Ok(FuzzInput { + seed, + tables, + partitions, + rows_per_insert, + rounds, + delete_after_secs, + }) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum ActivityProfile { + Hot, + Warm, + Cold, + Dormant, +} + +impl ActivityProfile { + fn for_table(idx: usize) -> Self { + match idx % 4 { + 0 => Self::Hot, + 1 => Self::Warm, + 2 => Self::Cold, + _ => Self::Dormant, + } + } + + fn should_insert(&self, rng: &mut R) -> bool { + match self { + Self::Hot => true, + Self::Warm => rng.random_bool(0.7), + Self::Cold => rng.random_bool(0.2), + Self::Dormant => false, + } + } + + fn should_flush(&self, rng: &mut R) -> bool { + match self { + Self::Hot => rng.random_bool(0.05), + Self::Warm => rng.random_bool(0.05), + Self::Cold | Self::Dormant => false, + } + } +} + +struct TableState { + ctx: TableContextRef, + profile: ActivityProfile, + insert_exprs: Vec, +} + +#[derive(Debug)] +struct Migration { + from_peer: u64, + to_peer: u64, + region_id: RegionId, +} + +fn wal_admin_from_env() -> HttpWalAdmin { + let _ = dotenv::dotenv(); + HttpWalAdmin::new(HttpWalAdminConfig { + base_url: env::var("GT_KAFKA_WAL_HELPER_URL") + .unwrap_or_else(|_| DEFAULT_KAFKA_WAL_HELPER_URL.to_string()), + broker_endpoints: kafka_broker_endpoints(), + topic_prefix: env::var("GT_WAL_TOPIC_PREFIX") + .unwrap_or_else(|_| DEFAULT_WAL_TOPIC_PREFIX.to_string()), + num_topics: env::var("GT_WAL_NUM_TOPICS") + .ok() + .and_then(|value| value.parse().ok()) + .unwrap_or(DEFAULT_WAL_NUM_TOPICS), + partition: env::var("GT_WAL_PARTITION") + .ok() + .and_then(|value| value.parse().ok()) + .unwrap_or(DEFAULT_WAL_PARTITION), + delete_records_timeout_ms: env::var("GT_WAL_DELETE_RECORDS_TIMEOUT_MS") + .ok() + .and_then(|value| value.parse().ok()) + .unwrap_or(DEFAULT_DELETE_RECORDS_TIMEOUT_MS), + }) +} + +fn kafka_broker_endpoints() -> Vec { + env::var("GT_KAFKA_ADDR") + .or_else(|_| env::var("KAFKA_ADDR")) + .unwrap_or_else(|_| DEFAULT_KAFKA_ADDR.to_string()) + .split(',') + .map(str::trim) + .filter(|endpoint| !endpoint.is_empty()) + .map(ToString::to_string) + .collect() +} + +fn generate_create_expr( + input: FuzzInput, + idx: usize, + rng: &mut R, +) -> Result { + let table_name = Ident::new(format!("wal_logical_prune_{}_{}", input.seed, idx)); + let create_table_generator = CreateTableExprGeneratorBuilder::default() + .name_generator(Box::new(MappedGenerator::new( + WordGenerator, + merge_two_word_map_fn(random_capitalize_map, uppercase_and_keyword_backtick_map), + ))) + .name(table_name) + .columns(4) + .partition(input.partitions) + .engine("mito") + .ts_column_type_generator(Box::new(MySQLTsColumnTypeGenerator)) + .build() + .unwrap(); + + create_table_generator.generate(rng) +} + +fn generate_insert_expr( + rows: usize, + ts_base: i64, + rng: &mut R, + table_ctx: TableContextRef, +) -> Result { + let insert_generator = InsertExprGeneratorBuilder::default() + .table_ctx(table_ctx) + .omit_column_list(false) + .rows(rows) + .ts_value_generator(generate_unique_timestamp_for_mysql(ts_base)) + .value_generator(Box::new(generate_random_value)) + .build() + .unwrap(); + + insert_generator.generate(rng) +} + +async fn create_tables( + ctx: &FuzzContext, + input: FuzzInput, + rng: &mut R, +) -> Result> { + let mut tables = Vec::with_capacity(input.tables); + + for idx in 0..input.tables { + let create_expr = generate_create_expr(input, idx, rng)?; + let translator = CreateTableExprTranslator; + let sql = translator.translate(&create_expr)?; + let result = sqlx::query(&sql) + .execute(&ctx.greptime) + .await + .context(error::ExecuteQuerySnafu { sql: &sql })?; + info!( + "Create table: {}, profile: {:?}, result: {result:?}", + create_expr.table_name, + ActivityProfile::for_table(idx) + ); + + tables.push(TableState { + ctx: Arc::new(TableContext::from(&create_expr)), + profile: ActivityProfile::for_table(idx), + insert_exprs: Vec::new(), + }); + } + + Ok(tables) +} + +async fn insert_table( + ctx: &FuzzContext, + input: FuzzInput, + table: &mut TableState, + ts_base: i64, + rng: &mut R, +) -> Result<()> { + let insert_expr = generate_insert_expr(input.rows_per_insert, ts_base, rng, table.ctx.clone())?; + let translator = InsertIntoExprTranslator; + let sql = translator.translate(&insert_expr)?; + let result = ctx + .greptime + // unprepared query, see + .execute(sql.as_str()) + .await + .context(error::ExecuteQuerySnafu { sql: &sql })?; + ensure!( + result.rows_affected() == insert_expr.values_list.len() as u64, + error::AssertSnafu { + reason: format!( + "expected rows affected: {}, actual: {}", + insert_expr.values_list.len(), + result.rows_affected(), + ) + } + ); + + table.insert_exprs.push(insert_expr); + Ok(()) +} + +async fn flush_table(ctx: &FuzzContext, table: &TableState) -> Result<()> { + flush_memtable(&ctx.greptime, &table.ctx.name).await +} + +async fn validate_table(ctx: &FuzzContext, table: &TableState) -> Result<()> { + if table.insert_exprs.is_empty() { + return Ok(()); + } + + info!( + "Validating table: {}, profile: {:?}, batches: {}", + table.ctx.name, + table.profile, + table.insert_exprs.len() + ); + validate_insert_exprs(ctx, &table.ctx, &table.insert_exprs).await +} + +async fn validate_insert_exprs( + ctx: &FuzzContext, + table_ctx: &TableContextRef, + insert_exprs: &[InsertIntoExpr], +) -> Result<()> { + let ts_column = table_ctx.timestamp_column().unwrap(); + for (idx, insert_expr) in insert_exprs[0..insert_exprs.len() - 1].iter().enumerate() { + let ts_column_idx = insert_expr.timestamp_column_idx().unwrap(); + let ts_value = insert_expr.values_list[0][ts_column_idx].clone(); + let next_batch_ts_column_idx = insert_exprs[idx + 1].timestamp_column_idx().unwrap(); + let next_batch_ts = insert_exprs[idx + 1].values_list[0][next_batch_ts_column_idx].clone(); + + let primary_keys_idx = insert_expr.primary_key_column_idx(); + let column_list = format_columns(&insert_expr.columns); + let primary_keys_column_list = format_columns(&insert_expr.primary_key_columns()); + let select_sql = format!( + "SELECT {} FROM {} WHERE {} >= {} AND {} < {} ORDER BY {};", + column_list, + table_ctx.name, + ts_column.name, + ts_value, + ts_column.name, + next_batch_ts, + primary_keys_column_list + ); + info!("Executing sql: {select_sql}"); + let fetched_rows = ctx.greptime.fetch_all(select_sql.as_str()).await.unwrap(); + let mut expected_rows = replace_default(&insert_expr.values_list, table_ctx, insert_expr); + sort_by_primary_keys(&mut expected_rows, primary_keys_idx); + validator::row::assert_eq::(&insert_expr.columns, &fetched_rows, &expected_rows)?; + } + + let insert_expr = insert_exprs.last().unwrap(); + let ts_column_idx = insert_expr.timestamp_column_idx().unwrap(); + let ts_value = insert_expr.values_list[0][ts_column_idx].clone(); + let primary_keys_idx = insert_expr.primary_key_column_idx(); + let column_list = format_columns(&insert_expr.columns); + let primary_keys_column_list = format_columns(&insert_expr.primary_key_columns()); + let select_sql = format!( + "SELECT {} FROM {} WHERE {} >= {} ORDER BY {};", + column_list, table_ctx.name, ts_column.name, ts_value, primary_keys_column_list + ); + info!("Executing sql: {select_sql}"); + let fetched_rows = ctx.greptime.fetch_all(select_sql.as_str()).await.unwrap(); + let mut expected_rows = replace_default(&insert_expr.values_list, table_ctx, insert_expr); + sort_by_primary_keys(&mut expected_rows, primary_keys_idx); + validator::row::assert_eq::(&insert_expr.columns, &fetched_rows, &expected_rows)?; + + Ok(()) +} + +async fn migrate_all_regions( + ctx: &FuzzContext, + tables: &[TableState], + rng: &mut R, +) -> Result<()> { + let datanodes = fetch_datanodes(ctx).await?; + let mut migrations = Vec::new(); + + for table in tables { + let partitions = fetch_partitions(&ctx.greptime, table.ctx.name.clone()).await?; + for partition in partitions { + let from_peer = partition.datanode_id; + let to_peer = choose_target_peer(&datanodes, from_peer, rng, || { + format!( + "failed to choose migration target, table: {}, region: {}, from_peer: {}", + table.ctx.name, partition.region_id, from_peer + ) + })?; + migrations.push(Migration { + from_peer, + to_peer, + region_id: RegionId::from_u64(partition.region_id), + }); + } + } + + migrate_regions(ctx, &migrations).await +} + +async fn fetch_datanodes(ctx: &FuzzContext) -> Result> { + let datanodes = fetch_nodes(&ctx.greptime) + .await? + .into_iter() + .filter(|node| node.peer_type == PEER_TYPE_DATANODE) + .map(|node| node.peer_id as u64) + .collect::>(); + ensure!( + datanodes.len() >= 2, + error::AssertSnafu { + reason: format!( + "remote WAL logical prune fuzz requires at least two datanodes, got {}", + datanodes.len() + ) + } + ); + Ok(datanodes) +} + +fn choose_target_peer( + datanodes: &[u64], + from_peer: u64, + rng: &mut R, + error_message: F, +) -> Result +where + R: Rng + 'static, + F: FnOnce() -> String, +{ + datanodes + .iter() + .filter(|peer| **peer != from_peer) + .copied() + .collect::>() + .choose(rng) + .copied() + .ok_or_else(|| { + error::AssertSnafu { + reason: error_message(), + } + .build() + }) +} + +async fn migrate_regions(ctx: &FuzzContext, migrations: &[Migration]) -> Result<()> { + let mut procedure_ids = Vec::with_capacity(migrations.len()); + for migration in migrations { + procedure_ids.push(submit_migration(ctx, migration).await); + } + + for (migration, procedure_id) in migrations.iter().zip(procedure_ids) { + wait_for_migration(ctx, migration, &procedure_id).await?; + } + + if !migrations.is_empty() { + tokio::time::sleep(default_distributed_time_constants().region_lease).await; + } + + Ok(()) +} + +async fn submit_migration(ctx: &FuzzContext, migration: &Migration) -> String { + let procedure_id = migrate_region( + &ctx.greptime, + migration.region_id.as_u64(), + migration.from_peer, + migration.to_peer, + MIGRATION_TIMEOUT_SECS, + ) + .await; + info!( + "Migrating region: {:?}, procedure: {}", + migration, procedure_id + ); + procedure_id +} + +async fn wait_for_migration( + ctx: &FuzzContext, + migration: &Migration, + procedure_id: &str, +) -> Result<()> { + let region_id = migration.region_id.as_u64(); + wait_condition_fn( + Duration::from_secs(MIGRATION_TIMEOUT_SECS), + || { + let greptime = ctx.greptime.clone(); + let procedure_id = procedure_id.to_string(); + Box::pin(async move { + let output = procedure_state(&greptime, &procedure_id).await; + info!("Checking procedure: {procedure_id}, output: {output}"); + (fetch_partition(&greptime, region_id).await.ok(), output) + }) + }, + |(partition, output)| { + if let Some(partition) = partition { + partition.datanode_id == migration.to_peer && output.contains("Done") + } else { + false + } + }, + Duration::from_secs(5), + ) + .await; + + Ok(()) +} + +fn next_ts_base(input: FuzzInput, batch_idx: usize) -> i64 { + input.seed as i64 + (batch_idx as i64 * 1_000_000) +} + +async fn run_activity_round( + ctx: &FuzzContext, + input: FuzzInput, + tables: &mut [TableState], + batch_idx: &mut usize, + round: usize, + rng: &mut R, +) -> Result<()> { + info!("Start mixed activity round: {round}"); + for table in tables.iter_mut() { + if table.profile.should_insert(rng) { + insert_table(ctx, input, table, next_ts_base(input, *batch_idx), rng).await?; + *batch_idx += 1; + + if table.profile.should_flush(rng) { + flush_table(ctx, table).await?; + } + } + } + + for table in tables.iter() { + match table.profile { + ActivityProfile::Cold | ActivityProfile::Dormant => validate_table(ctx, table).await?, + ActivityProfile::Hot | ActivityProfile::Warm if rng.random_bool(0.5) => { + validate_table(ctx, table).await? + } + ActivityProfile::Hot | ActivityProfile::Warm => {} + } + } + + Ok(()) +} + +async fn execute_logical_prune_fuzz( + ctx: FuzzContext, + input: FuzzInput, + wal_admin: &W, +) -> Result<()> { + info!("input: {input:?}"); + let mut rng = ChaChaRng::seed_from_u64(input.seed); + let mut tables = create_tables(&ctx, input, &mut rng).await?; + let mut batch_idx = 0; + + for table in &mut tables { + insert_table(&ctx, input, table, next_ts_base(input, batch_idx), &mut rng).await?; + batch_idx += 1; + } + + for table in &tables { + validate_table(&ctx, table).await?; + } + + info!("Recording Kafka WAL latest offsets via WAL admin"); + let delete_boundary_offsets = wal_admin.record_topic_latest_offsets().await?; + for offset in &delete_boundary_offsets { + info!( + "Recorded Kafka WAL delete boundary, topic: {}, partition: {}, offset: {}", + offset.topic, offset.partition, offset.offset + ); + } + let delete_deadline = + tokio::time::Instant::now() + Duration::from_secs(input.delete_after_secs); + info!( + "Start foreground activity until DeleteRecords deadline, delete_after_secs: {}", + input.delete_after_secs + ); + + let mut round = 0; + while tokio::time::Instant::now() < delete_deadline { + run_activity_round(&ctx, input, &mut tables, &mut batch_idx, round, &mut rng).await?; + round += 1; + + if input.rounds > 0 && round.is_multiple_of(input.rounds) { + info!( + "Completed configured activity round batch before DeleteRecords, rounds: {round}" + ); + } + + tokio::time::sleep(Duration::from_secs(ACTIVITY_TICK_SECS)).await; + } + + for offset in &delete_boundary_offsets { + info!( + "Deleting Kafka WAL records to boundary, topic: {}, partition: {}, offset: {}", + offset.topic, offset.partition, offset.offset + ); + } + wal_admin + .delete_records_to_offsets(&delete_boundary_offsets) + .await?; + for offset in &delete_boundary_offsets { + info!( + "Deleted Kafka WAL records to boundary, topic: {}, partition: {}, offset: {}", + offset.topic, offset.partition, offset.offset + ); + } + migrate_all_regions(&ctx, &tables, &mut rng).await?; + + for table in &tables { + validate_table(&ctx, table).await?; + } + + for table in tables { + let sql = format!("DROP TABLE {}", table.ctx.name); + let result = sqlx::query(&sql) + .execute(&ctx.greptime) + .await + .context(error::ExecuteQuerySnafu { sql })?; + info!("Drop table: {}, result: {result:?}", table.ctx.name); + } + + ctx.close().await; + Ok(()) +} + +fuzz_target!(|input: FuzzInput| { + common_telemetry::init_default_ut_logging(); + common_runtime::block_on_global(async { + let Connections { mysql } = init_greptime_connections_via_env().await; + let ctx = FuzzContext { + greptime: mysql.expect("mysql connection init must be succeed"), + }; + let wal_admin = wal_admin_from_env(); + + execute_logical_prune_fuzz(ctx, input, &wal_admin) + .await + .unwrap_or_else(|err| panic!("fuzz test must be succeed: {err:?}")); + }) +}); diff --git a/tests-fuzz/utils/README.md b/tests-fuzz/utils/README.md new file mode 100644 index 0000000000..870a1088da --- /dev/null +++ b/tests-fuzz/utils/README.md @@ -0,0 +1,141 @@ +# Fuzz Utilities + +Utilities in this directory support fuzz tests and related CI/manual workflows. + +## Kafka WAL Helper + +`kafka_wal_helper` is a small HTTP service used by Remote WAL fuzz tests. It runs near Kafka, usually inside Kubernetes, and provides the minimum Kafka operations needed to simulate external WAL TTL deletion: + +- read latest offsets for configured WAL topics; +- delete Kafka records up to previously recorded offsets. + +This avoids exposing Kafka outside Kubernetes or relying on broker advertised listeners to be reachable from the fuzz runner. + +Source and related files: + +- `tests-fuzz/utils/kafka_wal_helper.rs` — HTTP helper binary. +- `docker/ci/ubuntu/Dockerfile.kafka-wal-helper` — lightweight runtime image. +- `.github/actions/setup-greptimedb-cluster/kafka-wal-helper.yaml` — Kubernetes Deployment and Service. + +### Build the binary + +Build the helper binary with the nightly profile used by fuzz targets: + +```bash +cargo build --bin kafka_wal_helper --profile nightly +``` + +The binary is expected at: + +```text +target/nightly/kafka_wal_helper +``` + +### Build the Docker image + +The Dockerfile is runtime-only. Build the Rust binary first, then copy it into an architecture-specific directory expected by the image build. + +For `amd64`: + +```bash +mkdir -p amd64 +cp target/nightly/kafka_wal_helper amd64/kafka_wal_helper + +docker build \ + -f docker/ci/ubuntu/Dockerfile.kafka-wal-helper \ + --build-arg TARGETARCH=amd64 \ + -t /tools/kafka_wal_helper: \ + . +``` + +Push the image: + +```bash +docker push /tools/kafka_wal_helper: +``` + +### Deploy to Kubernetes + +Update `.github/actions/setup-greptimedb-cluster/kafka-wal-helper.yaml` to use the image you pushed: + +```yaml +image: /tools/kafka_wal_helper: +``` + +Apply the helper manifest to the namespace where you want the helper to run: + +```bash +kubectl -n kafka-cluster apply \ + -f .github/actions/setup-greptimedb-cluster/kafka-wal-helper.yaml +``` + +Forward the helper service to a local port: + +```bash +kubectl -n kafka-cluster port-forward svc/kafka-wal-helper 8080:8080 +``` + +Then call the helper through `127.0.0.1:8080`. + +### HTTP API + +#### Health check + +```bash +curl http://127.0.0.1:8080/health +``` + +#### Record latest offsets + +```bash +curl -s http://127.0.0.1:8080/record-offsets \ + -H 'content-type: application/json' \ + -d '{ + "broker_endpoints":["kafka-broker-0.kafka-broker-headless.test-env-kafka.svc.cluster.local:9092"], + "topic_prefix":"greptimedb_wal_topic", + "num_topics":3, + "partition":0 + }' +``` + +Example response: + +```json +{ + "offsets": [ + {"topic":"greptimedb_wal_topic_0","partition":0,"offset":123}, + {"topic":"greptimedb_wal_topic_1","partition":0,"offset":456}, + {"topic":"greptimedb_wal_topic_2","partition":0,"offset":789} + ] +} +``` + +#### Delete records + +Use offsets returned by `/record-offsets` as the deletion boundary. Do not delete to the current latest offsets after additional writes, otherwise the test may remove data that should remain replayable. + +```bash +curl -s http://127.0.0.1:8080/delete-records \ + -H 'content-type: application/json' \ + -d '{ + "broker_endpoints":["kafka-broker-0.kafka-broker-headless.test-env-kafka.svc.cluster.local:9092"], + "timeout_ms":5000, + "offsets":[ + {"topic":"greptimedb_wal_topic_0","partition":0,"offset":123}, + {"topic":"greptimedb_wal_topic_1","partition":0,"offset":456}, + {"topic":"greptimedb_wal_topic_2","partition":0,"offset":789} + ] + }' +``` + +Example response: + +```json +{ + "deleted": [ + {"topic":"greptimedb_wal_topic_0","partition":0,"offset":123}, + {"topic":"greptimedb_wal_topic_1","partition":0,"offset":456}, + {"topic":"greptimedb_wal_topic_2","partition":0,"offset":789} + ] +} +``` diff --git a/tests-fuzz/utils/kafka_wal_helper.rs b/tests-fuzz/utils/kafka_wal_helper.rs new file mode 100644 index 0000000000..22909ce3bf --- /dev/null +++ b/tests-fuzz/utils/kafka_wal_helper.rs @@ -0,0 +1,144 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::net::SocketAddr; + +use axum::http::StatusCode; +use axum::routing::{get, post}; +use axum::{Json, Router}; +use clap::Parser; +use common_telemetry::info; +use rskafka::client::ClientBuilder; +use rskafka::client::partition::{OffsetAt, UnknownTopicHandling}; +use serde::Serialize; +use tests_fuzz::utils::kafka_wal_http::{ + DeleteRecordsRequest, DeleteRecordsResponse, RecordOffsetsRequest, RecordOffsetsResponse, + TopicOffset, +}; + +type Error = Box; +type Result = std::result::Result; + +const DEFAULT_DELETE_RECORDS_TIMEOUT_MS: i32 = 5000; + +#[derive(Debug, Parser)] +#[command(about = "Kafka WAL helper for Remote WAL fuzz tests")] +struct Args { + /// HTTP listen address. + #[arg(long, default_value = "0.0.0.0:8080")] + addr: SocketAddr, +} + +#[derive(Debug, Serialize)] +struct HealthResponse { + status: &'static str, +} + +#[tokio::main] +async fn main() -> Result<()> { + common_telemetry::init_default_ut_logging(); + let args = Args::parse(); + serve(args.addr).await +} + +async fn serve(addr: SocketAddr) -> Result<()> { + let app = Router::new() + .route("/health", get(health)) + .route("/record-offsets", post(record_offsets_handler)) + .route("/delete-records", post(delete_records_handler)); + let listener = tokio::net::TcpListener::bind(addr).await?; + info!("Kafka WAL helper listening on {}", listener.local_addr()?); + axum::serve(listener, app).await?; + Ok(()) +} + +async fn health() -> Json { + Json(HealthResponse { status: "ok" }) +} + +async fn record_offsets_handler( + Json(request): Json, +) -> HttpResult { + record_offsets(request) + .await + .map(Json) + .map_err(to_http_error) +} + +async fn delete_records_handler( + Json(request): Json, +) -> HttpResult { + delete_records(request) + .await + .map(Json) + .map_err(to_http_error) +} + +type HttpResult = std::result::Result, (StatusCode, String)>; + +fn to_http_error(error: Error) -> (StatusCode, String) { + (StatusCode::INTERNAL_SERVER_ERROR, error.to_string()) +} + +async fn record_offsets(request: RecordOffsetsRequest) -> Result { + let client = ClientBuilder::new(request.broker_endpoints).build().await?; + let mut offsets = Vec::with_capacity(request.num_topics); + + for idx in 0..request.num_topics { + let topic = format!("{}_{}", request.topic_prefix, idx); + let partition_client = client + .partition_client(&topic, request.partition, UnknownTopicHandling::Retry) + .await?; + let offset = partition_client.get_offset(OffsetAt::Latest).await?; + info!( + "Recorded Kafka WAL offset, topic: {}, partition: {}, offset: {}", + topic, request.partition, offset + ); + offsets.push(TopicOffset { + topic, + partition: request.partition, + offset, + }); + } + + Ok(RecordOffsetsResponse { offsets }) +} + +async fn delete_records(request: DeleteRecordsRequest) -> Result { + let client = ClientBuilder::new(request.broker_endpoints).build().await?; + let timeout_ms = request + .timeout_ms + .unwrap_or(DEFAULT_DELETE_RECORDS_TIMEOUT_MS); + + for offset in &request.offsets { + let partition_client = client + .partition_client(&offset.topic, offset.partition, UnknownTopicHandling::Retry) + .await?; + info!( + "Deleting Kafka WAL records, topic: {}, partition: {}, offset: {}", + offset.topic, offset.partition, offset.offset + ); + partition_client + .delete_records(offset.offset, timeout_ms) + .await?; + info!( + "Deleted Kafka WAL records, topic: {}, partition: {}, offset: {}", + offset.topic, offset.partition, offset.offset + ); + } + + Ok(DeleteRecordsResponse { + deleted: request.offsets, + }) +}