mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-15 20:40:39 +00:00
chore: fix compaction caused race condition (#1759)
* fix: set max_files_in_l0 in unit tests to avoid compaction * refactor: pass while EngineConfig * fix: comment out unstable sqlness test * revert commented sqlness
This commit is contained in:
@@ -306,7 +306,8 @@ async fn test_new_region() {
|
||||
let dir = create_temp_dir("test_new_region");
|
||||
let store_dir = dir.path().to_str().unwrap();
|
||||
|
||||
let store_config = config_util::new_store_config(region_name, store_dir).await;
|
||||
let store_config =
|
||||
config_util::new_store_config(region_name, store_dir, EngineConfig::default()).await;
|
||||
let placeholder_memtable = store_config
|
||||
.memtable_builder
|
||||
.build(metadata.schema().clone());
|
||||
|
||||
@@ -26,6 +26,7 @@ use store_api::storage::{
|
||||
SchemaRef, Snapshot, WriteRequest, WriteResponse,
|
||||
};
|
||||
|
||||
use crate::config::EngineConfig;
|
||||
use crate::region::tests::{self, FileTesterBase};
|
||||
use crate::region::{OpenOptions, RawRegionMetadata, RegionImpl, RegionMetadata};
|
||||
use crate::test_util;
|
||||
@@ -38,7 +39,8 @@ async fn create_region_for_alter(store_dir: &str) -> RegionImpl<RaftEngineLogSto
|
||||
// Always disable version column in this test.
|
||||
let metadata = tests::new_metadata(REGION_NAME);
|
||||
|
||||
let store_config = config_util::new_store_config(REGION_NAME, store_dir).await;
|
||||
let store_config =
|
||||
config_util::new_store_config(REGION_NAME, store_dir, EngineConfig::default()).await;
|
||||
|
||||
RegionImpl::create(metadata, store_config).await.unwrap()
|
||||
}
|
||||
@@ -112,7 +114,9 @@ impl AlterTester {
|
||||
}
|
||||
self.base = None;
|
||||
// Reopen the region.
|
||||
let store_config = config_util::new_store_config(REGION_NAME, &self.store_dir).await;
|
||||
let store_config =
|
||||
config_util::new_store_config(REGION_NAME, &self.store_dir, EngineConfig::default())
|
||||
.await;
|
||||
let opts = OpenOptions::default();
|
||||
let region = RegionImpl::open(REGION_NAME.to_string(), store_config, &opts)
|
||||
.await
|
||||
|
||||
@@ -19,6 +19,7 @@ use common_test_util::temp_dir::create_temp_dir;
|
||||
use log_store::raft_engine::log_store::RaftEngineLogStore;
|
||||
use store_api::storage::{OpenOptions, SequenceNumber, WriteResponse};
|
||||
|
||||
use crate::config::EngineConfig;
|
||||
use crate::error::Result;
|
||||
use crate::region::tests::{self, FileTesterBase};
|
||||
use crate::region::RegionImpl;
|
||||
@@ -32,7 +33,8 @@ async fn create_region_for_basic(
|
||||
store_dir: &str,
|
||||
) -> RegionImpl<RaftEngineLogStore> {
|
||||
let metadata = tests::new_metadata(region_name);
|
||||
let store_config = config_util::new_store_config(region_name, store_dir).await;
|
||||
let store_config =
|
||||
config_util::new_store_config(region_name, store_dir, EngineConfig::default()).await;
|
||||
RegionImpl::create(metadata, store_config).await.unwrap()
|
||||
}
|
||||
|
||||
@@ -75,7 +77,12 @@ impl Tester {
|
||||
|
||||
self.base = None;
|
||||
// Reopen the region.
|
||||
let store_config = config_util::new_store_config(&self.region_name, &self.store_dir).await;
|
||||
let store_config = config_util::new_store_config(
|
||||
&self.region_name,
|
||||
&self.store_dir,
|
||||
EngineConfig::default(),
|
||||
)
|
||||
.await;
|
||||
let opts = OpenOptions::default();
|
||||
let region = RegionImpl::open(self.region_name.clone(), store_config, &opts).await?;
|
||||
match region {
|
||||
|
||||
@@ -22,6 +22,7 @@ use store_api::storage::{
|
||||
AlterOperation, AlterRequest, CloseContext, Region, RegionMeta, WriteResponse,
|
||||
};
|
||||
|
||||
use crate::config::EngineConfig;
|
||||
use crate::engine;
|
||||
use crate::error::Error;
|
||||
use crate::flush::FlushStrategyRef;
|
||||
@@ -44,7 +45,8 @@ async fn create_region_for_close(
|
||||
) -> RegionImpl<RaftEngineLogStore> {
|
||||
let metadata = tests::new_metadata(REGION_NAME);
|
||||
|
||||
let mut store_config = config_util::new_store_config(REGION_NAME, store_dir).await;
|
||||
let mut store_config =
|
||||
config_util::new_store_config(REGION_NAME, store_dir, EngineConfig::default()).await;
|
||||
store_config.flush_strategy = flush_strategy;
|
||||
|
||||
RegionImpl::create(metadata, store_config).await.unwrap()
|
||||
|
||||
@@ -83,6 +83,7 @@ async fn create_region_for_compaction<
|
||||
REGION_NAME,
|
||||
store_dir,
|
||||
object_store.clone(),
|
||||
EngineConfig::default(),
|
||||
)
|
||||
.await;
|
||||
store_config.engine_config = Arc::new(engine_config);
|
||||
|
||||
@@ -27,6 +27,7 @@ use store_api::storage::{
|
||||
FlushContext, FlushReason, OpenOptions, Region, ScanRequest, WriteResponse,
|
||||
};
|
||||
|
||||
use crate::config::EngineConfig;
|
||||
use crate::engine::{self, RegionMap};
|
||||
use crate::flush::{FlushStrategyRef, FlushType};
|
||||
use crate::region::tests::{self, FileTesterBase};
|
||||
@@ -46,8 +47,15 @@ async fn create_region_for_flush(
|
||||
) {
|
||||
let metadata = tests::new_metadata(REGION_NAME);
|
||||
|
||||
let (mut store_config, regions) =
|
||||
config_util::new_store_config_and_region_map(REGION_NAME, store_dir).await;
|
||||
let (mut store_config, regions) = config_util::new_store_config_and_region_map(
|
||||
REGION_NAME,
|
||||
store_dir,
|
||||
EngineConfig {
|
||||
max_files_in_l0: usize::MAX,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
store_config.flush_strategy = flush_strategy;
|
||||
|
||||
(
|
||||
@@ -84,7 +92,15 @@ impl FlushTester {
|
||||
}
|
||||
self.base = None;
|
||||
// Reopen the region.
|
||||
let mut store_config = config_util::new_store_config(REGION_NAME, &self.store_dir).await;
|
||||
let mut store_config = config_util::new_store_config(
|
||||
REGION_NAME,
|
||||
&self.store_dir,
|
||||
EngineConfig {
|
||||
max_files_in_l0: usize::MAX,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
store_config.flush_strategy = self.flush_strategy.clone();
|
||||
let opts = OpenOptions::default();
|
||||
let region = RegionImpl::open(REGION_NAME.to_string(), store_config, &opts)
|
||||
|
||||
@@ -26,6 +26,7 @@ use store_api::storage::{
|
||||
Chunk, ChunkReader, ReadContext, Region, ScanRequest, Snapshot, WriteContext, WriteRequest,
|
||||
};
|
||||
|
||||
use crate::config::EngineConfig;
|
||||
use crate::region::{RegionImpl, RegionMetadata};
|
||||
use crate::test_util::{self, config_util, descriptor_util, write_batch_util};
|
||||
use crate::write_batch::WriteBatch;
|
||||
@@ -171,7 +172,8 @@ const REGION_NAME: &str = "region-projection-0";
|
||||
async fn new_tester(store_dir: &str) -> ProjectionTester<RaftEngineLogStore> {
|
||||
let metadata = new_metadata(REGION_NAME);
|
||||
|
||||
let store_config = config_util::new_store_config(REGION_NAME, store_dir).await;
|
||||
let store_config =
|
||||
config_util::new_store_config(REGION_NAME, store_dir, EngineConfig::default()).await;
|
||||
let region = RegionImpl::create(metadata, store_config).await.unwrap();
|
||||
|
||||
ProjectionTester::with_region(region)
|
||||
|
||||
@@ -22,7 +22,7 @@ use object_store::ObjectStore;
|
||||
use store_api::manifest::Manifest;
|
||||
|
||||
use crate::compaction::noop::NoopCompactionScheduler;
|
||||
use crate::config::DEFAULT_REGION_WRITE_BUFFER_SIZE;
|
||||
use crate::config::{EngineConfig, DEFAULT_REGION_WRITE_BUFFER_SIZE};
|
||||
use crate::engine::{self, RegionMap};
|
||||
use crate::file_purger::noop::NoopFilePurgeHandler;
|
||||
use crate::flush::{FlushScheduler, PickerConfig, SizeBasedStrategy};
|
||||
@@ -40,12 +40,13 @@ fn log_store_dir(store_dir: &str) -> String {
|
||||
pub async fn new_store_config(
|
||||
region_name: &str,
|
||||
store_dir: &str,
|
||||
engine_config: EngineConfig,
|
||||
) -> StoreConfig<RaftEngineLogStore> {
|
||||
let mut builder = Fs::default();
|
||||
builder.root(store_dir);
|
||||
let object_store = ObjectStore::new(builder).unwrap().finish();
|
||||
|
||||
new_store_config_with_object_store(region_name, store_dir, object_store)
|
||||
new_store_config_with_object_store(region_name, store_dir, object_store, engine_config)
|
||||
.await
|
||||
.0
|
||||
}
|
||||
@@ -54,6 +55,7 @@ pub async fn new_store_config(
|
||||
pub async fn new_store_config_and_region_map(
|
||||
region_name: &str,
|
||||
store_dir: &str,
|
||||
engine_config: EngineConfig,
|
||||
) -> (
|
||||
StoreConfig<RaftEngineLogStore>,
|
||||
Arc<RegionMap<RaftEngineLogStore>>,
|
||||
@@ -62,7 +64,7 @@ pub async fn new_store_config_and_region_map(
|
||||
builder.root(store_dir);
|
||||
let object_store = ObjectStore::new(builder).unwrap().finish();
|
||||
|
||||
new_store_config_with_object_store(region_name, store_dir, object_store).await
|
||||
new_store_config_with_object_store(region_name, store_dir, object_store, engine_config).await
|
||||
}
|
||||
|
||||
/// Create a new StoreConfig with given object store.
|
||||
@@ -70,6 +72,7 @@ pub async fn new_store_config_with_object_store(
|
||||
region_name: &str,
|
||||
store_dir: &str,
|
||||
object_store: ObjectStore,
|
||||
engine_config: EngineConfig,
|
||||
) -> (
|
||||
StoreConfig<RaftEngineLogStore>,
|
||||
Arc<RegionMap<RaftEngineLogStore>>,
|
||||
@@ -117,7 +120,7 @@ pub async fn new_store_config_with_object_store(
|
||||
flush_scheduler,
|
||||
flush_strategy: Arc::new(SizeBasedStrategy::default()),
|
||||
compaction_scheduler,
|
||||
engine_config: Default::default(),
|
||||
engine_config: Arc::new(engine_config),
|
||||
file_purger,
|
||||
ttl: None,
|
||||
compaction_time_window: None,
|
||||
|
||||
Reference in New Issue
Block a user