diff --git a/Cargo.lock b/Cargo.lock index 47a6591850..5170d5d340 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4522,6 +4522,7 @@ dependencies = [ "store-api", "tokio", "tokio-util", + "uuid", ] [[package]] diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index dd85e2933c..f5923c7720 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -40,3 +40,4 @@ tokio.workspace = true common-meta = { workspace = true, features = ["testing"] } common-test-util.workspace = true rand.workspace = true +uuid.workspace = true diff --git a/src/log-store/src/kafka/util.rs b/src/log-store/src/kafka/util.rs index 61059b1645..86243e38d6 100644 --- a/src/log-store/src/kafka/util.rs +++ b/src/log-store/src/kafka/util.rs @@ -14,3 +14,5 @@ pub mod offset; pub mod record; +#[cfg(test)] +mod test_util; diff --git a/src/log-store/src/kafka/util/record.rs b/src/log-store/src/kafka/util/record.rs index 7d45165514..71a2cd1db6 100644 --- a/src/log-store/src/kafka/util/record.rs +++ b/src/log-store/src/kafka/util/record.rs @@ -295,10 +295,11 @@ mod tests { use common_base::readable_size::ReadableSize; use common_config::wal::KafkaConfig; - use rand::Rng; + use uuid::Uuid; use super::*; use crate::kafka::client_manager::ClientManager; + use crate::kafka::util::test_util::run_test_with_kafka_wal; // Implements some utility methods for testing. impl Default for Record { @@ -544,21 +545,24 @@ mod tests { #[tokio::test] async fn test_produce_large_entry() { - let topic = format!("greptimedb_wal_topic_{}", rand::thread_rng().gen::()); - let ns = NamespaceImpl { - region_id: 1, - topic, - }; - let entry = new_test_entry([b'1'; 2000000], 0, ns.clone()); - let producer = RecordProducer::new(ns.clone()).with_entries(vec![entry]); - - // TODO(niebayes): get broker endpoints from env vars. - let config = KafkaConfig { - broker_endpoints: vec!["localhost:9092".to_string()], - max_batch_size: ReadableSize::mb(1), - ..Default::default() - }; - let manager = Arc::new(ClientManager::try_new(&config).await.unwrap()); - producer.produce(&manager).await.unwrap(); + run_test_with_kafka_wal(|broker_endpoints| { + Box::pin(async { + let topic = format!("greptimedb_wal_topic_{}", Uuid::new_v4()); + let ns = NamespaceImpl { + region_id: 1, + topic, + }; + let entry = new_test_entry([b'1'; 2000000], 0, ns.clone()); + let producer = RecordProducer::new(ns.clone()).with_entries(vec![entry]); + let config = KafkaConfig { + broker_endpoints, + max_batch_size: ReadableSize::mb(1), + ..Default::default() + }; + let manager = Arc::new(ClientManager::try_new(&config).await.unwrap()); + producer.produce(&manager).await.unwrap(); + }) + }) + .await } } diff --git a/src/log-store/src/kafka/util/test_util.rs b/src/log-store/src/kafka/util/test_util.rs new file mode 100644 index 0000000000..b7dc3f7512 --- /dev/null +++ b/src/log-store/src/kafka/util/test_util.rs @@ -0,0 +1,35 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::env; + +use common_telemetry::warn; +use futures_util::future::BoxFuture; + +pub async fn run_test_with_kafka_wal(test: F) +where + F: FnOnce(Vec) -> BoxFuture<'static, ()>, +{ + let Ok(endpoints) = env::var("GT_KAFKA_ENDPOINTS") else { + warn!("The endpoints is empty, skipping the test"); + return; + }; + + let endpoints = endpoints + .split(',') + .map(|s| s.trim().to_string()) + .collect::>(); + + test(endpoints).await +}