fix: check env before running kafka test (#3110)

* fix: check env before running kafka test

* Apply suggestions from code review

Co-authored-by: niebayes <niebayes@gmail.com>

---------

Co-authored-by: niebayes <niebayes@gmail.com>
This commit is contained in:
Weny Xu
2024-01-08 15:30:43 +09:00
committed by GitHub
parent f78c467a86
commit 58ada1dfef
5 changed files with 60 additions and 17 deletions

1
Cargo.lock generated
View File

@@ -4522,6 +4522,7 @@ dependencies = [
"store-api",
"tokio",
"tokio-util",
"uuid",
]
[[package]]

View File

@@ -40,3 +40,4 @@ tokio.workspace = true
common-meta = { workspace = true, features = ["testing"] }
common-test-util.workspace = true
rand.workspace = true
uuid.workspace = true

View File

@@ -14,3 +14,5 @@
pub mod offset;
pub mod record;
#[cfg(test)]
mod test_util;

View File

@@ -295,10 +295,11 @@ mod tests {
use common_base::readable_size::ReadableSize;
use common_config::wal::KafkaConfig;
use rand::Rng;
use uuid::Uuid;
use super::*;
use crate::kafka::client_manager::ClientManager;
use crate::kafka::util::test_util::run_test_with_kafka_wal;
// Implements some utility methods for testing.
impl Default for Record {
@@ -544,21 +545,24 @@ mod tests {
#[tokio::test]
async fn test_produce_large_entry() {
let topic = format!("greptimedb_wal_topic_{}", rand::thread_rng().gen::<usize>());
let ns = NamespaceImpl {
region_id: 1,
topic,
};
let entry = new_test_entry([b'1'; 2000000], 0, ns.clone());
let producer = RecordProducer::new(ns.clone()).with_entries(vec![entry]);
// TODO(niebayes): get broker endpoints from env vars.
let config = KafkaConfig {
broker_endpoints: vec!["localhost:9092".to_string()],
max_batch_size: ReadableSize::mb(1),
..Default::default()
};
let manager = Arc::new(ClientManager::try_new(&config).await.unwrap());
producer.produce(&manager).await.unwrap();
run_test_with_kafka_wal(|broker_endpoints| {
Box::pin(async {
let topic = format!("greptimedb_wal_topic_{}", Uuid::new_v4());
let ns = NamespaceImpl {
region_id: 1,
topic,
};
let entry = new_test_entry([b'1'; 2000000], 0, ns.clone());
let producer = RecordProducer::new(ns.clone()).with_entries(vec![entry]);
let config = KafkaConfig {
broker_endpoints,
max_batch_size: ReadableSize::mb(1),
..Default::default()
};
let manager = Arc::new(ClientManager::try_new(&config).await.unwrap());
producer.produce(&manager).await.unwrap();
})
})
.await
}
}

View File

@@ -0,0 +1,35 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::env;
use common_telemetry::warn;
use futures_util::future::BoxFuture;
pub async fn run_test_with_kafka_wal<F>(test: F)
where
F: FnOnce(Vec<String>) -> BoxFuture<'static, ()>,
{
let Ok(endpoints) = env::var("GT_KAFKA_ENDPOINTS") else {
warn!("The endpoints is empty, skipping the test");
return;
};
let endpoints = endpoints
.split(',')
.map(|s| s.trim().to_string())
.collect::<Vec<_>>();
test(endpoints).await
}