mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
test: add more integration test for kafka wal (#3190)
* test: add integration tests for kafka wal * chore: rebase main * chore: unify naming convention for wal config * chore: add register loaders switch * chore: alter tables by adding a new column * chore: move rand to dev-dependencies * chore: update Cargo.lock
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -10114,6 +10114,7 @@ dependencies = [
|
||||
"paste",
|
||||
"prost 0.12.3",
|
||||
"query",
|
||||
"rand",
|
||||
"rstest",
|
||||
"rstest_reuse",
|
||||
"script",
|
||||
|
||||
@@ -480,6 +480,7 @@ impl StartCommand {
|
||||
table_metadata_manager,
|
||||
table_meta_allocator,
|
||||
Arc::new(MemoryRegionKeeper::default()),
|
||||
true,
|
||||
)
|
||||
.context(InitDdlManagerSnafu)?,
|
||||
);
|
||||
|
||||
@@ -79,6 +79,7 @@ impl DdlManager {
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
table_metadata_allocator: TableMetadataAllocatorRef,
|
||||
memory_region_keeper: MemoryRegionKeeperRef,
|
||||
register_loaders: bool,
|
||||
) -> Result<Self> {
|
||||
let manager = Self {
|
||||
procedure_manager,
|
||||
@@ -88,7 +89,9 @@ impl DdlManager {
|
||||
table_metadata_allocator,
|
||||
memory_region_keeper,
|
||||
};
|
||||
manager.register_loaders()?;
|
||||
if register_loaders {
|
||||
manager.register_loaders()?;
|
||||
}
|
||||
Ok(manager)
|
||||
}
|
||||
|
||||
@@ -767,6 +770,7 @@ mod tests {
|
||||
Arc::new(WalOptionsAllocator::default()),
|
||||
)),
|
||||
Arc::new(MemoryRegionKeeper::default()),
|
||||
true,
|
||||
);
|
||||
|
||||
let expected_loaders = vec![
|
||||
|
||||
@@ -415,6 +415,7 @@ fn build_ddl_manager(
|
||||
table_metadata_manager.clone(),
|
||||
table_metadata_allocator.clone(),
|
||||
memory_region_keeper.clone(),
|
||||
true,
|
||||
)
|
||||
.context(error::InitDdlManagerSnafu)?,
|
||||
))
|
||||
|
||||
@@ -77,6 +77,7 @@ opentelemetry-proto.workspace = true
|
||||
partition.workspace = true
|
||||
paste.workspace = true
|
||||
prost.workspace = true
|
||||
rand.workspace = true
|
||||
script.workspace = true
|
||||
session = { workspace = true, features = ["testing"] }
|
||||
store-api.workspace = true
|
||||
|
||||
@@ -77,8 +77,8 @@ pub struct GreptimeDbClusterBuilder {
|
||||
store_config: Option<ObjectStoreConfig>,
|
||||
store_providers: Option<Vec<StorageType>>,
|
||||
datanodes: Option<u32>,
|
||||
wal_config: DatanodeWalConfig,
|
||||
meta_wal_config: MetaSrvWalConfig,
|
||||
datanode_wal_config: DatanodeWalConfig,
|
||||
metasrv_wal_config: MetaSrvWalConfig,
|
||||
shared_home_dir: Option<Arc<TempDir>>,
|
||||
meta_selector: Option<SelectorRef>,
|
||||
}
|
||||
@@ -108,8 +108,8 @@ impl GreptimeDbClusterBuilder {
|
||||
store_config: None,
|
||||
store_providers: None,
|
||||
datanodes: None,
|
||||
wal_config: DatanodeWalConfig::default(),
|
||||
meta_wal_config: MetaSrvWalConfig::default(),
|
||||
datanode_wal_config: DatanodeWalConfig::default(),
|
||||
metasrv_wal_config: MetaSrvWalConfig::default(),
|
||||
shared_home_dir: None,
|
||||
meta_selector: None,
|
||||
}
|
||||
@@ -134,14 +134,14 @@ impl GreptimeDbClusterBuilder {
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_wal_config(mut self, wal_config: DatanodeWalConfig) -> Self {
|
||||
self.wal_config = wal_config;
|
||||
pub fn with_datanode_wal_config(mut self, datanode_wal_config: DatanodeWalConfig) -> Self {
|
||||
self.datanode_wal_config = datanode_wal_config;
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_meta_wal_config(mut self, wal_meta: MetaSrvWalConfig) -> Self {
|
||||
self.meta_wal_config = wal_meta;
|
||||
pub fn with_metasrv_wal_config(mut self, metasrv_wal_config: MetaSrvWalConfig) -> Self {
|
||||
self.metasrv_wal_config = metasrv_wal_config;
|
||||
self
|
||||
}
|
||||
|
||||
@@ -174,7 +174,7 @@ impl GreptimeDbClusterBuilder {
|
||||
max_retry_times: 5,
|
||||
retry_delay: Duration::from_secs(1),
|
||||
},
|
||||
wal: self.meta_wal_config.clone(),
|
||||
wal: self.metasrv_wal_config.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
@@ -249,7 +249,7 @@ impl GreptimeDbClusterBuilder {
|
||||
store_config.clone(),
|
||||
vec![],
|
||||
home_dir,
|
||||
self.wal_config.clone(),
|
||||
self.datanode_wal_config.clone(),
|
||||
)
|
||||
} else {
|
||||
let (opts, guard) = create_tmp_dir_and_datanode_opts(
|
||||
@@ -257,7 +257,7 @@ impl GreptimeDbClusterBuilder {
|
||||
StorageType::File,
|
||||
self.store_providers.clone().unwrap_or_default(),
|
||||
&format!("{}-dn-{}", self.cluster_name, datanode_id),
|
||||
self.wal_config.clone(),
|
||||
self.datanode_wal_config.clone(),
|
||||
);
|
||||
|
||||
storage_guards.push(guard.storage_guards);
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
#![feature(assert_matches)]
|
||||
|
||||
pub mod cluster;
|
||||
mod grpc;
|
||||
mod influxdb;
|
||||
|
||||
@@ -50,8 +50,8 @@ pub struct GreptimeDbStandalone {
|
||||
|
||||
pub struct GreptimeDbStandaloneBuilder {
|
||||
instance_name: String,
|
||||
wal_config: DatanodeWalConfig,
|
||||
meta_wal_config: MetaSrvWalConfig,
|
||||
datanode_wal_config: DatanodeWalConfig,
|
||||
metasrv_wal_config: MetaSrvWalConfig,
|
||||
store_providers: Option<Vec<StorageType>>,
|
||||
default_store: Option<StorageType>,
|
||||
plugin: Option<Plugins>,
|
||||
@@ -64,8 +64,8 @@ impl GreptimeDbStandaloneBuilder {
|
||||
store_providers: None,
|
||||
plugin: None,
|
||||
default_store: None,
|
||||
wal_config: DatanodeWalConfig::default(),
|
||||
meta_wal_config: MetaSrvWalConfig::default(),
|
||||
datanode_wal_config: DatanodeWalConfig::default(),
|
||||
metasrv_wal_config: MetaSrvWalConfig::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -96,23 +96,24 @@ impl GreptimeDbStandaloneBuilder {
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_wal_config(mut self, wal_config: DatanodeWalConfig) -> Self {
|
||||
self.wal_config = wal_config;
|
||||
pub fn with_datanode_wal_config(mut self, datanode_wal_config: DatanodeWalConfig) -> Self {
|
||||
self.datanode_wal_config = datanode_wal_config;
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_meta_wal_config(mut self, wal_meta: MetaSrvWalConfig) -> Self {
|
||||
self.meta_wal_config = wal_meta;
|
||||
pub fn with_metasrv_wal_config(mut self, metasrv_wal_config: MetaSrvWalConfig) -> Self {
|
||||
self.metasrv_wal_config = metasrv_wal_config;
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn build_with(
|
||||
&self,
|
||||
kv_backend: KvBackendRef,
|
||||
procedure_manager: ProcedureManagerRef,
|
||||
guard: TestGuard,
|
||||
mix_options: MixOptions,
|
||||
procedure_manager: ProcedureManagerRef,
|
||||
register_procedure_loaders: bool,
|
||||
) -> GreptimeDbStandalone {
|
||||
let plugins = self.plugin.clone().unwrap_or_default();
|
||||
|
||||
@@ -153,6 +154,7 @@ impl GreptimeDbStandaloneBuilder {
|
||||
table_metadata_manager,
|
||||
table_meta_allocator,
|
||||
Arc::new(MemoryRegionKeeper::default()),
|
||||
register_procedure_loaders,
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
@@ -194,7 +196,7 @@ impl GreptimeDbStandaloneBuilder {
|
||||
default_store_type,
|
||||
store_types,
|
||||
&self.instance_name,
|
||||
self.wal_config.clone(),
|
||||
self.datanode_wal_config.clone(),
|
||||
);
|
||||
|
||||
let kv_backend_config = KvBackendConfig::default();
|
||||
@@ -207,7 +209,7 @@ impl GreptimeDbStandaloneBuilder {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let wal_meta = self.meta_wal_config.clone();
|
||||
let wal_meta = self.metasrv_wal_config.clone();
|
||||
let mix_options = MixOptions {
|
||||
data_home: opts.storage.data_home.to_string(),
|
||||
procedure: procedure_config,
|
||||
@@ -218,7 +220,7 @@ impl GreptimeDbStandaloneBuilder {
|
||||
wal_meta,
|
||||
};
|
||||
|
||||
self.build_with(kv_backend, procedure_manager, guard, mix_options)
|
||||
self.build_with(kv_backend, guard, mix_options, procedure_manager, true)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,31 +12,43 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use client::OutputData;
|
||||
use common_query::Output;
|
||||
use common_recordbatch::util;
|
||||
use client::DEFAULT_CATALOG_NAME;
|
||||
use common_query::{Output, OutputData};
|
||||
use datatypes::vectors::{TimestampMillisecondVector, VectorRef};
|
||||
use frontend::error::Result;
|
||||
use frontend::instance::Instance;
|
||||
use itertools::Itertools;
|
||||
use rand::rngs::ThreadRng;
|
||||
use rand::Rng;
|
||||
use rstest::rstest;
|
||||
use rstest_reuse::apply;
|
||||
use servers::query_handler::sql::SqlQueryHandler;
|
||||
use session::context::{QueryContext, QueryContextRef};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::tests::test_util::*;
|
||||
|
||||
#[apply(both_instances_cases_with_kafka_wal)]
|
||||
async fn test_create_database_and_insert_query(instance: Option<Box<dyn RebuildableMockInstance>>) {
|
||||
let Some(instance) = instance else { return };
|
||||
|
||||
async fn test_create_database_and_insert_query(
|
||||
rebuildable_instance: Option<Box<dyn RebuildableMockInstance>>,
|
||||
) {
|
||||
let Some(instance) = rebuildable_instance else {
|
||||
return;
|
||||
};
|
||||
let instance = instance.frontend();
|
||||
|
||||
let output = execute_sql(&instance, "create database test").await;
|
||||
assert!(matches!(output.data, OutputData::AffectedRows(1)));
|
||||
let output = execute_sql_with(
|
||||
&instance,
|
||||
"create database test",
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test"),
|
||||
)
|
||||
.await;
|
||||
assert_matches!(output.data, OutputData::AffectedRows(1));
|
||||
|
||||
let output = execute_sql(
|
||||
let output = execute_sql_with(
|
||||
&instance,
|
||||
r#"create table greptime.test.demo(
|
||||
host STRING,
|
||||
@@ -44,25 +56,32 @@ async fn test_create_database_and_insert_query(instance: Option<Box<dyn Rebuilda
|
||||
memory DOUBLE,
|
||||
ts timestamp,
|
||||
TIME INDEX(ts)
|
||||
)"#,
|
||||
)"#,
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test"),
|
||||
)
|
||||
.await;
|
||||
assert!(matches!(output.data, OutputData::AffectedRows(0)));
|
||||
|
||||
let output = execute_sql(
|
||||
let output = execute_sql_with(
|
||||
&instance,
|
||||
r#"insert into test.demo(host, cpu, memory, ts) values
|
||||
('host1', 66.6, 1024, 1655276557000),
|
||||
('host2', 88.8, 333.3, 1655276558000)
|
||||
"#,
|
||||
('host2', 88.8, 333.3, 1655276558000)
|
||||
"#,
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test"),
|
||||
)
|
||||
.await;
|
||||
assert!(matches!(output.data, OutputData::AffectedRows(2)));
|
||||
|
||||
let query_output = execute_sql(&instance, "select ts from test.demo order by ts limit 1").await;
|
||||
let query_output = execute_sql_with(
|
||||
&instance,
|
||||
"select ts from test.demo order by ts limit 1",
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test"),
|
||||
)
|
||||
.await;
|
||||
match query_output.data {
|
||||
OutputData::Stream(s) => {
|
||||
let batches = util::collect(s).await.unwrap();
|
||||
let batches = common_recordbatch::util::collect(s).await.unwrap();
|
||||
assert_eq!(1, batches[0].num_columns());
|
||||
assert_eq!(
|
||||
Arc::new(TimestampMillisecondVector::from_vec(vec![
|
||||
@@ -75,24 +94,216 @@ async fn test_create_database_and_insert_query(instance: Option<Box<dyn Rebuilda
|
||||
}
|
||||
}
|
||||
|
||||
async fn execute_sql(instance: &Arc<Instance>, sql: &str) -> Output {
|
||||
execute_sql_with(instance, sql, QueryContext::arc()).await
|
||||
/// Maintains metadata of a table.
|
||||
struct Table {
|
||||
name: String,
|
||||
logical_timer: AtomicU64,
|
||||
inserted: Mutex<Vec<u64>>,
|
||||
}
|
||||
|
||||
async fn try_execute_sql_with(
|
||||
instance: &Arc<Instance>,
|
||||
sql: &str,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
instance.do_query(sql, query_ctx).await.remove(0)
|
||||
/// Inserts some data to a collection of tables and checks if these data exist after restart.
|
||||
#[apply(both_instances_cases_with_kafka_wal)]
|
||||
async fn test_replay(rebuildable_instance: Option<Box<dyn RebuildableMockInstance>>) {
|
||||
let Some(mut rebuildable_instance) = rebuildable_instance else {
|
||||
return;
|
||||
};
|
||||
let instance = rebuildable_instance.frontend();
|
||||
|
||||
let output = execute_sql_with(
|
||||
&instance,
|
||||
"create database test",
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test"),
|
||||
)
|
||||
.await;
|
||||
assert_matches!(output.data, OutputData::AffectedRows(1));
|
||||
|
||||
let tables = create_tables("test_replay", &instance, 10).await;
|
||||
insert_data(&tables, &instance, 15).await;
|
||||
ensure_data_exists(&tables, &instance).await;
|
||||
|
||||
// Rebuilds to emulate restart which triggers a replay.
|
||||
rebuildable_instance.rebuild().await;
|
||||
ensure_data_exists(&tables, &rebuildable_instance.frontend()).await;
|
||||
}
|
||||
|
||||
/// Inserts some data to a collection of tables and sends alter table requests to force flushing each table.
|
||||
/// Then checks if these data exist after restart.
|
||||
#[apply(both_instances_cases_with_kafka_wal)]
|
||||
async fn test_flush_then_replay(rebuildable_instance: Option<Box<dyn RebuildableMockInstance>>) {
|
||||
let Some(mut rebuildable_instance) = rebuildable_instance else {
|
||||
return;
|
||||
};
|
||||
let instance = rebuildable_instance.frontend();
|
||||
|
||||
let output = execute_sql_with(
|
||||
&instance,
|
||||
"create database test",
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test"),
|
||||
)
|
||||
.await;
|
||||
assert_matches!(output.data, OutputData::AffectedRows(1));
|
||||
|
||||
let tables = create_tables("test_flush_then_replay", &instance, 10).await;
|
||||
insert_data(&tables, &instance, 15).await;
|
||||
ensure_data_exists(&tables, &instance).await;
|
||||
|
||||
// Alters tables to force flushing.
|
||||
futures::future::join_all(tables.iter().map(|table| {
|
||||
let instance = instance.clone();
|
||||
async move {
|
||||
assert_matches!(
|
||||
do_alter(&instance, &table.name).await.data,
|
||||
OutputData::AffectedRows(0)
|
||||
);
|
||||
}
|
||||
}))
|
||||
.await;
|
||||
|
||||
// Inserts more data and check all data exists after flushing.
|
||||
insert_data(&tables, &instance, 15).await;
|
||||
ensure_data_exists(&tables, &instance).await;
|
||||
|
||||
// Rebuilds to emulate restart which triggers a replay.
|
||||
rebuildable_instance.rebuild().await;
|
||||
ensure_data_exists(&tables, &rebuildable_instance.frontend()).await;
|
||||
}
|
||||
|
||||
/// Creates a given number of tables.
|
||||
async fn create_tables(test_name: &str, instance: &Arc<Instance>, num_tables: usize) -> Vec<Table> {
|
||||
futures::future::join_all((0..num_tables).map(|i| {
|
||||
let instance = instance.clone();
|
||||
async move {
|
||||
let table_name = format!("{}_{}", test_name, i);
|
||||
assert_matches!(
|
||||
do_create(&instance, &table_name).await.data,
|
||||
OutputData::AffectedRows(0)
|
||||
);
|
||||
Table {
|
||||
name: table_name,
|
||||
logical_timer: AtomicU64::new(1685508715000),
|
||||
inserted: Mutex::new(Vec::new()),
|
||||
}
|
||||
}
|
||||
}))
|
||||
.await
|
||||
}
|
||||
|
||||
/// Inserts data to the tables in parallel.
|
||||
/// The reason why the insertion is parallel is that we want to ensure the kafka wal works as expected under parallel write workloads.
|
||||
async fn insert_data(tables: &[Table], instance: &Arc<Instance>, num_writers: usize) {
|
||||
// Each writer randomly chooses a table and inserts a sequence of rows into the table.
|
||||
futures::future::join_all((0..num_writers).map(|_| async {
|
||||
let mut rng = rand::thread_rng();
|
||||
let table = &tables[rng.gen_range(0..tables.len())];
|
||||
for _ in 0..10 {
|
||||
let ts = table.logical_timer.fetch_add(1000, Ordering::Relaxed);
|
||||
let row = make_row(ts, &mut rng);
|
||||
assert_matches!(
|
||||
do_insert(instance, &table.name, row).await.data,
|
||||
OutputData::AffectedRows(1)
|
||||
);
|
||||
{
|
||||
// Inserting into the `inserted` vector and inserting into the database are not atomic
|
||||
// which requires us to do a sorting upon checking data integrity.
|
||||
let mut inserted = table.inserted.lock().await;
|
||||
inserted.push(ts);
|
||||
}
|
||||
}
|
||||
}))
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Sends queries to ensure the data exists for each table.
|
||||
async fn ensure_data_exists(tables: &[Table], instance: &Arc<Instance>) {
|
||||
futures::future::join_all(tables.iter().map(|table| async {
|
||||
let output = do_query(instance, &table.name).await;
|
||||
let OutputData::Stream(stream) = output.data else {
|
||||
unreachable!()
|
||||
};
|
||||
let record_batches = common_recordbatch::util::collect(stream).await.unwrap();
|
||||
let queried = record_batches
|
||||
.into_iter()
|
||||
.flat_map(|rb| {
|
||||
rb.rows()
|
||||
.map(|row| row[0].as_timestamp().unwrap().value() as u64)
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let inserted = table
|
||||
.inserted
|
||||
.lock()
|
||||
.await
|
||||
.iter()
|
||||
.sorted()
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(queried, inserted);
|
||||
}))
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Sends a create table SQL.
|
||||
async fn do_create(instance: &Arc<Instance>, table_name: &str) -> Output {
|
||||
execute_sql_with(
|
||||
instance,
|
||||
&format!(
|
||||
r#"create table greptime.test.{} (
|
||||
host STRING,
|
||||
cpu DOUBLE,
|
||||
memory DOUBLE,
|
||||
ts timestamp,
|
||||
TIME INDEX(ts)
|
||||
)"#,
|
||||
table_name
|
||||
),
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test"),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Sends an alter table SQL.
|
||||
async fn do_alter(instance: &Arc<Instance>, table_name: &str) -> Output {
|
||||
execute_sql_with(
|
||||
instance,
|
||||
&format!("alter table {} add column new_col STRING", table_name),
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test"),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Sends a insert SQL.
|
||||
async fn do_insert(instance: &Arc<Instance>, table_name: &str, row: String) -> Output {
|
||||
execute_sql_with(
|
||||
instance,
|
||||
&format!("insert into test.{table_name}(host, cpu, memory, ts) values {row}"),
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test"),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Sends a query SQL.
|
||||
async fn do_query(instance: &Arc<Instance>, table_name: &str) -> Output {
|
||||
execute_sql_with(
|
||||
instance,
|
||||
&format!("select ts from test.{table_name} order by ts"),
|
||||
QueryContext::with(DEFAULT_CATALOG_NAME, "test"),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Sends a SQL with the given context which specifies the catalog name and schema name, aka. database name.
|
||||
/// The query context is required since the tables are created in the `test` schema rather than the default `public` schema.
|
||||
async fn execute_sql_with(
|
||||
instance: &Arc<Instance>,
|
||||
sql: &str,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Output {
|
||||
try_execute_sql_with(instance, sql, query_ctx)
|
||||
.await
|
||||
.unwrap()
|
||||
instance.do_query(sql, query_ctx).await.remove(0).unwrap()
|
||||
}
|
||||
|
||||
fn make_row(ts: u64, rng: &mut ThreadRng) -> String {
|
||||
let host = format!("host{}", rng.gen_range(0..5));
|
||||
let cpu: f64 = rng.gen_range(0.0..99.9);
|
||||
let memory: f64 = rng.gen_range(0.0..999.9);
|
||||
format!("('{host}', {cpu}, {memory}, {ts})")
|
||||
}
|
||||
|
||||
@@ -114,7 +114,7 @@ impl MockInstanceBuilder {
|
||||
} = instance;
|
||||
MockInstanceImpl::Standalone(
|
||||
builder
|
||||
.build_with(kv_backend, procedure_manager, guard, mix_options)
|
||||
.build_with(kv_backend, guard, mix_options, procedure_manager, false)
|
||||
.await,
|
||||
)
|
||||
}
|
||||
@@ -223,11 +223,11 @@ pub(crate) async fn standalone_with_kafka_wal() -> Option<Box<dyn RebuildableMoc
|
||||
.collect::<Vec<_>>();
|
||||
let test_name = uuid::Uuid::new_v4().to_string();
|
||||
let builder = GreptimeDbStandaloneBuilder::new(&test_name)
|
||||
.with_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig {
|
||||
.with_datanode_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig {
|
||||
broker_endpoints: endpoints.clone(),
|
||||
..Default::default()
|
||||
}))
|
||||
.with_meta_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
|
||||
.with_metasrv_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
|
||||
broker_endpoints: endpoints,
|
||||
topic_name_prefix: test_name.to_string(),
|
||||
num_topics: 3,
|
||||
@@ -253,11 +253,11 @@ pub(crate) async fn distributed_with_kafka_wal() -> Option<Box<dyn RebuildableMo
|
||||
let test_name = uuid::Uuid::new_v4().to_string();
|
||||
let builder = GreptimeDbClusterBuilder::new(&test_name)
|
||||
.await
|
||||
.with_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig {
|
||||
.with_datanode_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig {
|
||||
broker_endpoints: endpoints.clone(),
|
||||
..Default::default()
|
||||
}))
|
||||
.with_meta_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
|
||||
.with_metasrv_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
|
||||
broker_endpoints: endpoints,
|
||||
topic_name_prefix: test_name.to_string(),
|
||||
num_topics: 3,
|
||||
@@ -276,7 +276,7 @@ pub(crate) async fn distributed_with_kafka_wal() -> Option<Box<dyn RebuildableMo
|
||||
pub(crate) fn both_instances_cases_with_kafka_wal(
|
||||
#[future]
|
||||
#[case]
|
||||
instance: Option<Box<dyn RebuildableMockInstance>>,
|
||||
rebuildable_instance: Option<Box<dyn RebuildableMockInstance>>,
|
||||
) {
|
||||
}
|
||||
|
||||
|
||||
@@ -111,12 +111,12 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec<Strin
|
||||
let cluster = builder
|
||||
.with_datanodes(datanodes as u32)
|
||||
.with_store_config(store_config)
|
||||
.with_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig {
|
||||
.with_datanode_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig {
|
||||
broker_endpoints: endpoints.clone(),
|
||||
linger: Duration::from_millis(25),
|
||||
..Default::default()
|
||||
}))
|
||||
.with_meta_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
|
||||
.with_metasrv_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
|
||||
broker_endpoints: endpoints,
|
||||
num_topics: 3,
|
||||
topic_name_prefix: Uuid::new_v4().to_string(),
|
||||
@@ -238,12 +238,12 @@ pub async fn test_region_migration_by_sql(store_type: StorageType, endpoints: Ve
|
||||
let cluster = builder
|
||||
.with_datanodes(datanodes as u32)
|
||||
.with_store_config(store_config)
|
||||
.with_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig {
|
||||
.with_datanode_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig {
|
||||
broker_endpoints: endpoints.clone(),
|
||||
linger: Duration::from_millis(25),
|
||||
..Default::default()
|
||||
}))
|
||||
.with_meta_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
|
||||
.with_metasrv_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
|
||||
broker_endpoints: endpoints,
|
||||
num_topics: 3,
|
||||
topic_name_prefix: Uuid::new_v4().to_string(),
|
||||
@@ -360,12 +360,12 @@ pub async fn test_region_migration_multiple_regions(
|
||||
let cluster = builder
|
||||
.with_datanodes(datanodes as u32)
|
||||
.with_store_config(store_config)
|
||||
.with_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig {
|
||||
.with_datanode_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig {
|
||||
broker_endpoints: endpoints.clone(),
|
||||
linger: Duration::from_millis(25),
|
||||
..Default::default()
|
||||
}))
|
||||
.with_meta_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
|
||||
.with_metasrv_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
|
||||
broker_endpoints: endpoints,
|
||||
num_topics: 3,
|
||||
topic_name_prefix: Uuid::new_v4().to_string(),
|
||||
@@ -497,12 +497,12 @@ pub async fn test_region_migration_all_regions(store_type: StorageType, endpoint
|
||||
let cluster = builder
|
||||
.with_datanodes(datanodes as u32)
|
||||
.with_store_config(store_config)
|
||||
.with_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig {
|
||||
.with_datanode_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig {
|
||||
broker_endpoints: endpoints.clone(),
|
||||
linger: Duration::from_millis(25),
|
||||
..Default::default()
|
||||
}))
|
||||
.with_meta_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
|
||||
.with_metasrv_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
|
||||
broker_endpoints: endpoints,
|
||||
num_topics: 3,
|
||||
topic_name_prefix: Uuid::new_v4().to_string(),
|
||||
@@ -629,12 +629,12 @@ pub async fn test_region_migration_incorrect_from_peer(
|
||||
let cluster = builder
|
||||
.with_datanodes(datanodes as u32)
|
||||
.with_store_config(store_config)
|
||||
.with_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig {
|
||||
.with_datanode_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig {
|
||||
broker_endpoints: endpoints.clone(),
|
||||
linger: Duration::from_millis(25),
|
||||
..Default::default()
|
||||
}))
|
||||
.with_meta_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
|
||||
.with_metasrv_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
|
||||
broker_endpoints: endpoints,
|
||||
num_topics: 3,
|
||||
topic_name_prefix: Uuid::new_v4().to_string(),
|
||||
@@ -704,12 +704,12 @@ pub async fn test_region_migration_incorrect_region_id(
|
||||
let cluster = builder
|
||||
.with_datanodes(datanodes as u32)
|
||||
.with_store_config(store_config)
|
||||
.with_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig {
|
||||
.with_datanode_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig {
|
||||
broker_endpoints: endpoints.clone(),
|
||||
linger: Duration::from_millis(25),
|
||||
..Default::default()
|
||||
}))
|
||||
.with_meta_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
|
||||
.with_metasrv_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
|
||||
broker_endpoints: endpoints,
|
||||
num_topics: 3,
|
||||
topic_name_prefix: Uuid::new_v4().to_string(),
|
||||
|
||||
Reference in New Issue
Block a user