test(remote_wal): add unit tests for kafka remote wal (#2993)

* test: add unit tests

* feat: introduce kafka runtime backed by testcontainers

* test: add test for kafka runtime

* fix: format

* chore: make kafka image ready to be used

* feat: add entry builder

* tmp

* test: add unit tests for client manager

* test: add some unit tests for kafka log store

* chore: resolve some todos

* chore: resolve some todos

* test: add unit tests for kafka log store

* chore: add deprecate develop branch warning

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* tmp: ready to move unit tests to an indie dir

* test: update unit tests for client manager

* test: add unit tests for meta srv remote wal

* fix: license

* fix: test

* refactor: kafka image

* doc: add doc example for kafka image

* chore: migrate kafka image to an indie PR

* fix: CR

* fix: CR

* fix: test

* fix: CR

* fix: update Cargo.toml

* fix: CR

* feat: skip test if no endpoints env

* fix: format

* test: rewrite parallel test with barrier

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
niebayes
2024-01-08 18:48:11 +08:00
committed by GitHub
parent fcacb100a2
commit 8c58d3f85b
19 changed files with 605 additions and 59 deletions

4
Cargo.lock generated
View File

@@ -1833,6 +1833,7 @@ dependencies = [
"derive_builder 0.12.0",
"etcd-client",
"futures",
"futures-util",
"humantime-serde",
"hyper",
"lazy_static",
@@ -1851,6 +1852,7 @@ dependencies = [
"tokio",
"toml 0.8.8",
"tonic 0.10.2",
"uuid",
]
[[package]]
@@ -4507,7 +4509,6 @@ dependencies = [
"common-runtime",
"common-telemetry",
"common-test-util",
"dashmap",
"futures",
"futures-util",
"itertools 0.10.5",
@@ -4515,6 +4516,7 @@ dependencies = [
"protobuf-build",
"raft-engine",
"rand",
"rand_distr",
"rskafka",
"serde",
"serde_json",

View File

@@ -40,7 +40,6 @@ pub struct KafkaConfig {
pub broker_endpoints: Vec<String>,
/// The compression algorithm used to compress log entries.
#[serde(skip)]
#[serde(default)]
pub compression: RsKafkaCompression,
/// The max size of a single producer batch.
pub max_batch_size: ReadableSize,

View File

@@ -14,6 +14,7 @@ async-stream.workspace = true
async-trait.workspace = true
base64.workspace = true
bytes.workspace = true
chrono.workspace = true
common-catalog.workspace = true
common-config.workspace = true
common-error.workspace = true
@@ -27,6 +28,7 @@ common-time.workspace = true
datatypes.workspace = true
derive_builder.workspace = true
etcd-client.workspace = true
futures-util.workspace = true
futures.workspace = true
humantime-serde.workspace = true
lazy_static.workspace = true
@@ -51,3 +53,4 @@ chrono.workspace = true
common-procedure = { workspace = true, features = ["testing"] }
datatypes.workspace = true
hyper = { version = "0.14", features = ["full"] }
uuid.workspace = true

View File

@@ -36,7 +36,6 @@ pub mod sequence;
pub mod state_store;
pub mod table_name;
pub mod util;
#[allow(unused)]
pub mod wal;
pub type ClusterId = u64;

View File

@@ -19,12 +19,9 @@ use std::collections::HashMap;
use common_config::wal::StandaloneWalConfig;
use common_config::WAL_OPTIONS_KEY;
use common_telemetry::warn;
use serde::{Deserialize, Serialize};
use serde_with::with_prefix;
use store_api::storage::{RegionId, RegionNumber};
use crate::error::Result;
use crate::wal::kafka::KafkaConfig;
pub use crate::wal::kafka::Topic as KafkaWalTopic;
pub use crate::wal::options_allocator::{
@@ -43,7 +40,7 @@ pub enum WalConfig {
impl From<StandaloneWalConfig> for WalConfig {
fn from(value: StandaloneWalConfig) -> Self {
match value {
StandaloneWalConfig::RaftEngine(config) => WalConfig::RaftEngine,
StandaloneWalConfig::RaftEngine(_) => WalConfig::RaftEngine,
StandaloneWalConfig::Kafka(config) => WalConfig::Kafka(KafkaConfig {
broker_endpoints: config.base.broker_endpoints,
num_topics: config.num_topics,

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
pub mod topic;
pub mod topic_manager;
pub mod topic_selector;
@@ -19,7 +21,6 @@ pub mod topic_selector;
use std::time::Duration;
use common_config::wal::kafka::{kafka_backoff, KafkaBackoffConfig, TopicSelectorType};
use common_config::wal::StandaloneWalConfig;
use serde::{Deserialize, Serialize};
pub use crate::wal::kafka::topic::Topic;

View File

@@ -0,0 +1,33 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::warn;
use futures_util::future::BoxFuture;
pub async fn run_test_with_kafka_wal<F>(test: F)
where
F: FnOnce(Vec<String>) -> BoxFuture<'static, ()>,
{
let Ok(endpoints) = std::env::var("GT_KAFKA_ENDPOINTS") else {
warn!("The endpoints is empty, skipping the test");
return;
};
let endpoints = endpoints
.split(',')
.map(|s| s.trim().to_string())
.collect::<Vec<_>>();
test(endpoints).await
}

View File

@@ -15,4 +15,5 @@
/// Kafka wal topic.
/// Publishers publish log entries to the topic while subscribers pull log entries from the topic.
/// A topic is simply a string right now. But it may be more complex in the future.
// TODO(niebayes): remove the Topic alias.
pub type Topic = String;

View File

@@ -14,10 +14,9 @@
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use common_config::wal::kafka::TopicSelectorType;
use common_telemetry::{debug, error, info};
use common_telemetry::{error, info};
use rskafka::client::controller::ControllerClient;
use rskafka::client::error::Error as RsKafkaError;
use rskafka::client::error::ProtocolError::TopicAlreadyExists;
@@ -25,7 +24,7 @@ use rskafka::client::partition::{Compression, UnknownTopicHandling};
use rskafka::client::{Client, ClientBuilder};
use rskafka::record::Record;
use rskafka::BackoffConfig;
use snafu::{ensure, AsErrorSource, ResultExt};
use snafu::{ensure, ResultExt};
use crate::error::{
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, BuildKafkaPartitionClientSnafu,
@@ -47,9 +46,8 @@ const DEFAULT_PARTITION: i32 = 0;
/// Manages topic initialization and selection.
pub struct TopicManager {
config: KafkaConfig,
// TODO(niebayes): maybe add a guard to ensure all topics in the topic pool are created.
topic_pool: Vec<Topic>,
topic_selector: TopicSelectorRef,
pub(crate) topic_pool: Vec<Topic>,
pub(crate) topic_selector: TopicSelectorRef,
kv_backend: KvBackendRef,
}
@@ -168,7 +166,7 @@ impl TopicManager {
vec![Record {
key: None,
value: None,
timestamp: rskafka::chrono::Utc::now(),
timestamp: chrono::Utc::now(),
headers: Default::default(),
}],
Compression::NoCompression,
@@ -240,13 +238,9 @@ impl TopicManager {
#[cfg(test)]
mod tests {
use std::env;
use common_telemetry::info;
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::{self};
use crate::wal::kafka::test_util::run_test_with_kafka_wal;
// Tests that topics can be successfully persisted into the kv backend and can be successfully restored from the kv backend.
#[tokio::test]
@@ -273,26 +267,60 @@ mod tests {
assert_eq!(topics, restored_topics);
}
/// Tests that the topic manager could allocate topics correctly.
#[tokio::test]
async fn test_topic_manager() {
let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default();
common_telemetry::init_default_ut_logging();
async fn test_alloc_topics() {
run_test_with_kafka_wal(|broker_endpoints| {
Box::pin(async {
// Constructs topics that should be created.
let topics = (0..256)
.map(|i| format!("test_alloc_topics_{}_{}", i, uuid::Uuid::new_v4()))
.collect::<Vec<_>>();
if endpoints.is_empty() {
info!("The endpoints is empty, skipping the test.");
return;
}
// TODO: supports topic prefix
let kv_backend = Arc::new(MemoryKvBackend::new());
let config = KafkaConfig {
replication_factor: 1,
broker_endpoints: endpoints
.split(',')
.map(|s| s.to_string())
.collect::<Vec<_>>(),
..Default::default()
};
let manager = TopicManager::new(config, kv_backend);
manager.start().await.unwrap();
// Creates a topic manager.
let config = KafkaConfig {
replication_factor: broker_endpoints.len() as i16,
broker_endpoints,
..Default::default()
};
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let mut manager = TopicManager::new(config.clone(), kv_backend);
// Replaces the default topic pool with the constructed topics.
manager.topic_pool = topics.clone();
// Replaces the default selector with a round-robin selector without shuffled.
manager.topic_selector = Arc::new(RoundRobinTopicSelector::default());
manager.start().await.unwrap();
// Selects exactly the number of `num_topics` topics one by one.
let got = (0..topics.len())
.map(|_| manager.select().unwrap())
.cloned()
.collect::<Vec<_>>();
assert_eq!(got, topics);
// Selects exactly the number of `num_topics` topics in a batching manner.
let got = manager
.select_batch(topics.len())
.unwrap()
.into_iter()
.map(ToString::to_string)
.collect::<Vec<_>>();
assert_eq!(got, topics);
// Selects more than the number of `num_topics` topics.
let got = manager
.select_batch(2 * topics.len())
.unwrap()
.into_iter()
.map(ToString::to_string)
.collect::<Vec<_>>();
let expected = vec![topics.clone(); 2]
.into_iter()
.flatten()
.collect::<Vec<_>>();
assert_eq!(got, expected);
})
})
.await;
}
}

View File

@@ -16,7 +16,6 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use rand::Rng;
use serde::{Deserialize, Serialize};
use snafu::ensure;
use crate::error::{EmptyTopicPoolSnafu, Result};
@@ -60,6 +59,14 @@ impl TopicSelector for RoundRobinTopicSelector {
mod tests {
use super::*;
/// Tests that a selector behaves as expected when the given topic pool is empty.
#[test]
fn test_empty_topic_pool() {
let topic_pool = vec![];
let selector = RoundRobinTopicSelector::default();
assert!(selector.select(&topic_pool).is_err());
}
#[test]
fn test_round_robin_topic_selector() {
let topic_pool: Vec<_> = [0, 1, 2].into_iter().map(|v| v.to_string()).collect();

View File

@@ -107,14 +107,16 @@ pub fn allocate_region_wal_options(
mod tests {
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::wal::kafka::test_util::run_test_with_kafka_wal;
use crate::wal::kafka::topic_selector::RoundRobinTopicSelector;
use crate::wal::kafka::KafkaConfig;
// Tests the wal options allocator could successfully allocate raft-engine wal options.
// Note: tests for allocator with kafka are integration tests.
#[tokio::test]
async fn test_allocator_with_raft_engine() {
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let wal_config = WalConfig::RaftEngine;
let mut allocator = WalOptionsAllocator::new(wal_config, kv_backend);
let allocator = WalOptionsAllocator::new(wal_config, kv_backend);
allocator.start().await.unwrap();
let num_regions = 32;
@@ -128,4 +130,49 @@ mod tests {
.collect();
assert_eq!(got, expected);
}
// Tests that the wal options allocator could successfully allocate Kafka wal options.
#[tokio::test]
async fn test_allocator_with_kafka() {
run_test_with_kafka_wal(|broker_endpoints| {
Box::pin(async {
let topics = (0..256)
.map(|i| format!("test_allocator_with_kafka_{}_{}", i, uuid::Uuid::new_v4()))
.collect::<Vec<_>>();
// Creates a topic manager.
let config = KafkaConfig {
replication_factor: broker_endpoints.len() as i16,
broker_endpoints,
..Default::default()
};
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let mut topic_manager = KafkaTopicManager::new(config.clone(), kv_backend);
// Replaces the default topic pool with the constructed topics.
topic_manager.topic_pool = topics.clone();
// Replaces the default selector with a round-robin selector without shuffled.
topic_manager.topic_selector = Arc::new(RoundRobinTopicSelector::default());
// Creates an options allocator.
let allocator = WalOptionsAllocator::Kafka(topic_manager);
allocator.start().await.unwrap();
let num_regions = 32;
let regions = (0..num_regions).collect::<Vec<_>>();
let got = allocate_region_wal_options(regions.clone(), &allocator).unwrap();
// Check the allocated wal options contain the expected topics.
let expected = (0..num_regions)
.map(|i| {
let options = WalOptions::Kafka(KafkaWalOptions {
topic: topics[i as usize].clone(),
});
(i, serde_json::to_string(&options).unwrap())
})
.collect::<HashMap<_, _>>();
assert_eq!(got, expected);
})
})
.await;
}
}

View File

@@ -22,10 +22,8 @@ common-macro.workspace = true
common-meta.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
dashmap.workspace = true
futures-util.workspace = true
futures.workspace = true
itertools.workspace = true
protobuf = { version = "2", features = ["bytes"] }
raft-engine.workspace = true
rskafka.workspace = true
@@ -39,5 +37,7 @@ tokio.workspace = true
[dev-dependencies]
common-meta = { workspace = true, features = ["testing"] }
common-test-util.workspace = true
itertools.workspace = true
rand.workspace = true
rand_distr = "0.4"
uuid.workspace = true

View File

@@ -40,7 +40,7 @@ impl Namespace for NamespaceImpl {
impl Display for NamespaceImpl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", self.topic, self.region_id)
write!(f, "[topic: {}, region: {}]", self.topic, self.region_id)
}
}
@@ -48,11 +48,11 @@ impl Display for NamespaceImpl {
#[derive(Debug, PartialEq, Clone)]
pub struct EntryImpl {
/// Entry payload.
data: Vec<u8>,
pub data: Vec<u8>,
/// The logical entry id.
id: EntryId,
pub id: EntryId,
/// The namespace used to identify and isolate log entries from different regions.
ns: NamespaceImpl,
pub ns: NamespaceImpl,
}
impl Entry for EntryImpl {
@@ -76,7 +76,7 @@ impl Display for EntryImpl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Entry (ns: {}, id: {}, data_len: {})",
"Entry [ns: {}, id: {}, data_len: {}]",
self.ns,
self.id,
self.data.len()

View File

@@ -98,14 +98,13 @@ impl ClientManager {
/// Gets the client associated with the topic. If the client does not exist, a new one will
/// be created and returned.
pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result<Client> {
let client_pool = self.client_pool.read().await;
if let Some(client) = client_pool.get(topic) {
return Ok(client.clone());
{
let client_pool = self.client_pool.read().await;
if let Some(client) = client_pool.get(topic) {
return Ok(client.clone());
}
}
// Manullay releases the read lock.
drop(client_pool);
// Acquires the write lock.
let mut client_pool = self.client_pool.write().await;
match client_pool.get(topic) {
Some(client) => Ok(client.clone()),
@@ -134,3 +133,95 @@ impl ClientManager {
Ok(Client::new(raw_client, &self.config))
}
}
#[cfg(test)]
mod tests {
use common_meta::wal::kafka::test_util::run_test_with_kafka_wal;
use tokio::sync::Barrier;
use super::*;
use crate::test_util::kafka::create_topics;
/// Prepares for a test in that a collection of topics and a client manager are created.
async fn prepare(
test_name: &str,
num_topics: usize,
broker_endpoints: Vec<String>,
) -> (ClientManager, Vec<Topic>) {
let topics = create_topics(
num_topics,
|i| format!("{test_name}_{}_{}", i, uuid::Uuid::new_v4()),
&broker_endpoints,
)
.await;
let config = KafkaConfig {
broker_endpoints,
..Default::default()
};
let manager = ClientManager::try_new(&config).await.unwrap();
(manager, topics)
}
/// Sends `get_or_insert` requests sequentially to the client manager, and checks if it could handle them correctly.
#[tokio::test]
async fn test_sequential() {
run_test_with_kafka_wal(|broker_endpoints| {
Box::pin(async {
let (manager, topics) = prepare("test_sequential", 128, broker_endpoints).await;
// Assigns multiple regions to a topic.
let region_topic = (0..512)
.map(|region_id| (region_id, &topics[region_id % topics.len()]))
.collect::<HashMap<_, _>>();
// Gets all clients sequentially.
for (_, topic) in region_topic {
manager.get_or_insert(topic).await.unwrap();
}
// Ensures all clients exist.
let client_pool = manager.client_pool.read().await;
let all_exist = topics.iter().all(|topic| client_pool.contains_key(topic));
assert!(all_exist);
})
})
.await;
}
/// Sends `get_or_insert` requests in parallel to the client manager, and checks if it could handle them correctly.
#[tokio::test(flavor = "multi_thread")]
async fn test_parallel() {
run_test_with_kafka_wal(|broker_endpoints| {
Box::pin(async {
let (manager, topics) = prepare("test_parallel", 128, broker_endpoints).await;
// Assigns multiple regions to a topic.
let region_topic = (0..512)
.map(|region_id| (region_id, topics[region_id % topics.len()].clone()))
.collect::<HashMap<_, _>>();
// Gets all clients in parallel.
let manager = Arc::new(manager);
let barrier = Arc::new(Barrier::new(region_topic.len()));
let tasks = region_topic
.into_values()
.map(|topic| {
let manager = manager.clone();
let barrier = barrier.clone();
tokio::spawn(async move {
barrier.wait().await;
assert!(manager.get_or_insert(&topic).await.is_ok());
})
})
.collect::<Vec<_>>();
futures::future::try_join_all(tasks).await.unwrap();
// Ensures all clients exist.
let client_pool = manager.client_pool.read().await;
let all_exist = topics.iter().all(|topic| client_pool.contains_key(topic));
assert!(all_exist);
})
})
.await;
}
}

View File

@@ -140,8 +140,13 @@ impl LogStore for KafkaLogStore {
.await
.context(GetOffsetSnafu { ns: ns.clone() })?
- 1;
// Reads entries with offsets in the range [start_offset, end_offset).
let start_offset: i64 = Offset::try_from(entry_id)?.0;
// Reads entries with offsets in the range [start_offset, end_offset].
let start_offset = Offset::try_from(entry_id)?.0;
debug!(
"Start reading entries in range [{}, {}] for ns {}",
start_offset, end_offset, ns
);
// Abort if there're no new entries.
// FIXME(niebayes): how come this case happens?
@@ -274,3 +279,205 @@ fn check_termination(
Ok(false)
}
}
#[cfg(test)]
mod tests {
use common_base::readable_size::ReadableSize;
use common_config::wal::KafkaWalTopic as Topic;
use rand::seq::IteratorRandom;
use super::*;
use crate::test_util::kafka::{
create_topics, entries_with_random_data, new_namespace, EntryBuilder,
};
// Stores test context for a region.
struct RegionContext {
ns: NamespaceImpl,
entry_builder: EntryBuilder,
expected: Vec<EntryImpl>,
flushed_entry_id: EntryId,
}
/// Prepares for a test in that a log store is constructed and a collection of topics is created.
async fn prepare(
test_name: &str,
num_topics: usize,
broker_endpoints: Vec<String>,
) -> (KafkaLogStore, Vec<Topic>) {
let topics = create_topics(
num_topics,
|i| format!("{test_name}_{}_{}", i, uuid::Uuid::new_v4()),
&broker_endpoints,
)
.await;
let config = KafkaConfig {
broker_endpoints,
max_batch_size: ReadableSize::kb(32),
..Default::default()
};
let logstore = KafkaLogStore::try_new(&config).await.unwrap();
// Appends a no-op record to each topic.
for topic in topics.iter() {
let last_entry_id = logstore
.append(EntryImpl {
data: vec![],
id: 0,
ns: new_namespace(topic, 0),
})
.await
.unwrap()
.last_entry_id;
assert_eq!(last_entry_id, 0);
}
(logstore, topics)
}
/// Creates a vector containing indexes of all regions if the `all` is true.
/// Otherwise, creates a subset of the indexes. The cardinality of the subset
/// is nearly a quarter of that of the universe set.
fn all_or_subset(all: bool, num_regions: usize) -> Vec<u64> {
assert!(num_regions > 0);
let amount = if all {
num_regions
} else {
(num_regions / 4).max(1)
};
(0..num_regions as u64).choose_multiple(&mut rand::thread_rng(), amount)
}
/// Builds entries for regions specified by `which`. Builds large entries if `large` is true.
/// Returns the aggregated entries.
fn build_entries(
region_contexts: &mut HashMap<u64, RegionContext>,
which: &[u64],
large: bool,
) -> Vec<EntryImpl> {
let mut aggregated = Vec::with_capacity(which.len());
for region_id in which {
let ctx = region_contexts.get_mut(region_id).unwrap();
// Builds entries for the region.
ctx.expected = if !large {
entries_with_random_data(3, &ctx.entry_builder)
} else {
// Builds a large entry of size 256KB which is way greater than the configured `max_batch_size` which is 32KB.
let large_entry = ctx.entry_builder.with_data([b'1'; 256 * 1024]);
vec![large_entry]
};
// Aggregates entries of all regions.
aggregated.push(ctx.expected.clone());
}
aggregated.into_iter().flatten().collect()
}
/// Starts a test with:
/// * `test_name` - The name of the test.
/// * `num_topics` - Number of topics to be created in the preparation phase.
/// * `num_regions` - Number of regions involved in the test.
/// * `num_appends` - Number of append operations to be performed.
/// * `all` - All regions will be involved in an append operation if `all` is true. Otherwise,
/// an append operation will only randomly choose a subset of regions.
/// * `large` - Builds large entries for each region is `large` is true.
async fn test_with(
test_name: &str,
num_topics: usize,
num_regions: usize,
num_appends: usize,
all: bool,
large: bool,
) {
let Ok(broker_endpoints) = std::env::var("GT_KAFKA_ENDPOINTS") else {
warn!("The endpoints is empty, skipping the test {test_name}");
return;
};
let broker_endpoints = broker_endpoints
.split(',')
.map(|s| s.trim().to_string())
.collect::<Vec<_>>();
let (logstore, topics) = prepare(test_name, num_topics, broker_endpoints).await;
let mut region_contexts = (0..num_regions)
.map(|i| {
let topic = &topics[i % topics.len()];
let ns = new_namespace(topic, i as u64);
let entry_builder = EntryBuilder::new(ns.clone());
(
i as u64,
RegionContext {
ns,
entry_builder,
expected: Vec::new(),
flushed_entry_id: 0,
},
)
})
.collect();
for _ in 0..num_appends {
// Appends entries for a subset of regions.
let which = all_or_subset(all, num_regions);
let entries = build_entries(&mut region_contexts, &which, large);
let last_entry_ids = logstore.append_batch(entries).await.unwrap().last_entry_ids;
// Reads entries for regions and checks for each region that the gotten entries are identical with the expected ones.
for region_id in which {
let ctx = &region_contexts[&region_id];
let stream = logstore
.read(&ctx.ns, ctx.flushed_entry_id + 1)
.await
.unwrap();
let got = stream
.collect::<Vec<_>>()
.await
.into_iter()
.flat_map(|x| x.unwrap())
.collect::<Vec<_>>();
assert_eq!(ctx.expected, got);
}
// Simulates a flush for regions.
for (region_id, last_entry_id) in last_entry_ids {
let ctx = region_contexts.get_mut(&region_id).unwrap();
ctx.flushed_entry_id = last_entry_id;
}
}
}
/// Appends entries for one region and checks all entries can be read successfully.
#[tokio::test]
async fn test_one_region() {
test_with("test_one_region", 1, 1, 1, true, false).await;
}
/// Appends entries for multiple regions and checks entries for each region can be read successfully.
/// A topic is assigned only a single region.
#[tokio::test]
async fn test_multi_regions_disjoint() {
test_with("test_multi_regions_disjoint", 5, 5, 1, true, false).await;
}
/// Appends entries for multiple regions and checks entries for each region can be read successfully.
/// A topic is assigned multiple regions.
#[tokio::test]
async fn test_multi_regions_overlapped() {
test_with("test_multi_regions_overlapped", 5, 20, 1, true, false).await;
}
/// Appends entries for multiple regions and checks entries for each region can be read successfully.
/// A topic may be assigned multiple regions. The append operation repeats for a several iterations.
/// Each append operation will only append entries for a subset of randomly chosen regions.
#[tokio::test]
async fn test_multi_appends() {
test_with("test_multi_appends", 5, 20, 3, false, false).await;
}
/// Appends large entries for multiple regions and checks entries for each region can be read successfully.
/// A topic may be assigned multiple regions.
#[tokio::test]
async fn test_append_large_entries() {
test_with("test_append_large_entries", 5, 20, 3, true, true).await;
}
}

View File

@@ -149,6 +149,7 @@ impl RecordProducer {
}
/// Produces the buffered entries to Kafka sever. Those entries may span several Kafka records.
/// Returns the offset of the last successfully produced record.
pub(crate) async fn produce(self, client_manager: &ClientManagerRef) -> Result<Offset> {
ensure!(!self.entries.is_empty(), EmptyEntriesSnafu);

View File

@@ -12,4 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(test)]
pub mod kafka;
pub mod log_store_util;

View File

@@ -0,0 +1,126 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::atomic::{AtomicU64 as AtomicEntryId, Ordering};
use std::sync::Mutex;
use common_meta::wal::KafkaWalTopic as Topic;
use rand::distributions::Alphanumeric;
use rand::rngs::ThreadRng;
use rand::{thread_rng, Rng};
use rskafka::client::ClientBuilder;
use store_api::logstore::EntryId;
use crate::kafka::{EntryImpl, NamespaceImpl};
/// Creates `num_topiocs` number of topics each will be decorated by the given decorator.
pub async fn create_topics<F>(
num_topics: usize,
decorator: F,
broker_endpoints: &[String],
) -> Vec<Topic>
where
F: Fn(usize) -> String,
{
assert!(!broker_endpoints.is_empty());
let client = ClientBuilder::new(broker_endpoints.to_vec())
.build()
.await
.unwrap();
let ctrl_client = client.controller_client().unwrap();
let (topics, tasks): (Vec<_>, Vec<_>) = (0..num_topics)
.map(|i| {
let topic = decorator(i);
let task = ctrl_client.create_topic(topic.clone(), 1, 1, 500);
(topic, task)
})
.unzip();
futures::future::try_join_all(tasks).await.unwrap();
topics
}
/// Creates a new Kafka namespace with the given topic and region id.
pub fn new_namespace(topic: &str, region_id: u64) -> NamespaceImpl {
NamespaceImpl {
topic: topic.to_string(),
region_id,
}
}
/// A builder for building entries for a namespace.
pub struct EntryBuilder {
/// The namespace of the entries.
ns: NamespaceImpl,
/// The next entry id to allocate. It starts from 0 by default.
next_entry_id: AtomicEntryId,
/// A generator for supporting random data generation.
/// Wrapped with Mutex<Option<_>> to provide interior mutability.
rng: Mutex<Option<ThreadRng>>,
}
impl EntryBuilder {
/// Creates an EntryBuilder for the given namespace.
pub fn new(ns: NamespaceImpl) -> Self {
Self {
ns,
next_entry_id: AtomicEntryId::new(0),
rng: Mutex::new(Some(thread_rng())),
}
}
/// Sets the next entry id to the given entry id.
pub fn next_entry_id(self, entry_id: EntryId) -> Self {
Self {
next_entry_id: AtomicEntryId::new(entry_id),
..self
}
}
/// Skips the next `step` entry ids and returns the next entry id after the stepping.
pub fn skip(&mut self, step: EntryId) -> EntryId {
let old = self.next_entry_id.fetch_add(step, Ordering::Relaxed);
old + step
}
/// Builds an entry with the given data.
pub fn with_data<D: AsRef<[u8]>>(&self, data: D) -> EntryImpl {
EntryImpl {
data: data.as_ref().to_vec(),
id: self.alloc_entry_id(),
ns: self.ns.clone(),
}
}
/// Builds an entry with random data.
pub fn with_random_data(&self) -> EntryImpl {
self.with_data(self.make_random_data())
}
fn alloc_entry_id(&self) -> EntryId {
self.next_entry_id.fetch_add(1, Ordering::Relaxed)
}
fn make_random_data(&self) -> Vec<u8> {
let mut guard = self.rng.lock().unwrap();
let rng = guard.as_mut().unwrap();
(0..42).map(|_| rng.sample(Alphanumeric)).collect()
}
}
/// Builds a batch of entries each with random data.
pub fn entries_with_random_data(batch_size: usize, builder: &EntryBuilder) -> Vec<EntryImpl> {
(0..batch_size)
.map(|_| builder.with_random_data())
.collect()
}

View File

@@ -19,9 +19,11 @@ use std::collections::HashMap;
use common_config::wal::WalOptions;
use common_error::ext::ErrorExt;
use crate::logstore::entry::{Entry, Id as EntryId};
use crate::logstore::entry::Entry;
pub use crate::logstore::entry::Id as EntryId;
use crate::logstore::entry_stream::SendableEntryStream;
use crate::logstore::namespace::{Id as NamespaceId, Namespace};
pub use crate::logstore::namespace::Id as NamespaceId;
use crate::logstore::namespace::Namespace;
pub mod entry;
pub mod entry_stream;