diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 247b81a5d5..a3d8fa2494 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -51,7 +51,7 @@ max_purge_tasks = 32 # Create a checkpoint every actions. checkpoint_margin = 10 # Region manifest logs and checkpoints gc execution duration -gc_duration = '30s' +gc_duration = '10m' # Whether to try creating a manifest checkpoint on region opening checkpoint_on_startup = false diff --git a/config/standalone.example.toml b/config/standalone.example.toml index c4b6800630..2aa0240b69 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -115,7 +115,7 @@ max_purge_tasks = 32 # Create a checkpoint every actions. checkpoint_margin = 10 # Region manifest logs and checkpoints gc execution duration -gc_duration = '30s' +gc_duration = '10m' # Whether to try creating a manifest checkpoint on region opening checkpoint_on_startup = false diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index f7458be5a1..3ccc1b6470 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -221,7 +221,7 @@ impl Default for RegionManifestConfig { fn default() -> Self { Self { checkpoint_margin: Some(10u16), - gc_duration: Some(Duration::from_secs(30)), + gc_duration: Some(Duration::from_secs(600)), checkpoint_on_startup: false, compress: false, } diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index 7012e4e3f8..2ce1ceeac9 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -295,7 +295,7 @@ fn build_row_key_desc( let column_schemas = &table_schema.column_schemas(); - //TODO(boyan): enable version column by table option? + //TODO(dennis): enable version column by table option? let mut builder = RowKeyDescriptorBuilder::new(ts_column); for index in primary_key_indices { diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index 4fae116499..69e2ff3cdd 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -14,9 +14,9 @@ metrics.workspace = true opendal = { version = "0.36", features = ["layers-tracing", "layers-metrics"] } pin-project = "1.0" tokio.workspace = true +uuid.workspace = true [dev-dependencies] anyhow = "1.0" common-telemetry = { path = "../common/telemetry" } common-test-util = { path = "../common/test-util" } -uuid.workspace = true diff --git a/src/object-store/src/test_util.rs b/src/object-store/src/test_util.rs index 2f38b4ccbc..945244bb87 100644 --- a/src/object-store/src/test_util.rs +++ b/src/object-store/src/test_util.rs @@ -12,8 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::env; + use crate::{ObjectStore, Result}; +/// Temp folder for object store test pub struct TempFolder { store: ObjectStore, // The path under root. @@ -28,7 +31,34 @@ impl TempFolder { } } - pub async fn remove_all(&mut self) -> Result<()> { + pub async fn remove_all(&self) -> Result<()> { self.store.remove_all(&self.path).await } } + +/// Test s3 config from environment variables +#[derive(Debug)] +pub struct TestS3Config { + pub root: String, + pub access_key_id: String, + pub secret_access_key: String, + pub bucket: String, + pub region: Option, +} + +/// Returns s3 test config, return None if not found. +pub fn s3_test_config() -> Option { + if let Ok(b) = env::var("GT_S3_BUCKET") { + if !b.is_empty() { + return Some(TestS3Config { + root: uuid::Uuid::new_v4().to_string(), + access_key_id: env::var("GT_S3_ACCESS_KEY_ID").ok()?, + secret_access_key: env::var("GT_S3_ACCESS_KEY").ok()?, + bucket: env::var("GT_S3_BUCKET").ok()?, + region: Some(env::var("GT_S3_REGION").ok()?), + }); + } + } + + None +} diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index c70587bed7..3c06d7fe82 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -120,7 +120,7 @@ async fn test_s3_backend() -> Result<()> { let store = ObjectStore::new(builder).unwrap().finish(); - let mut guard = TempFolder::new(&store, "/"); + let guard = TempFolder::new(&store, "/"); test_object_crud(&store).await?; test_object_list(&store).await?; guard.remove_all().await?; @@ -148,7 +148,7 @@ async fn test_oss_backend() -> Result<()> { let store = ObjectStore::new(builder).unwrap().finish(); - let mut guard = TempFolder::new(&store, "/"); + let guard = TempFolder::new(&store, "/"); test_object_crud(&store).await?; test_object_list(&store).await?; guard.remove_all().await?; @@ -176,7 +176,7 @@ async fn test_azblob_backend() -> Result<()> { let store = ObjectStore::new(builder).unwrap().finish(); - let mut guard = TempFolder::new(&store, "/"); + let guard = TempFolder::new(&store, "/"); test_object_crud(&store).await?; test_object_list(&store).await?; guard.remove_all().await?; diff --git a/src/storage/src/manifest/impl_.rs b/src/storage/src/manifest/impl_.rs index 699775fb71..835b03feed 100644 --- a/src/storage/src/manifest/impl_.rs +++ b/src/storage/src/manifest/impl_.rs @@ -36,7 +36,7 @@ use crate::manifest::checkpoint::Checkpointer; use crate::manifest::storage::{ManifestObjectStore, ObjectStoreLogIterator}; const CHECKPOINT_ACTIONS_MARGIN: u16 = 10; -const GC_DURATION_SECS: u64 = 30; +const GC_DURATION_SECS: u64 = 600; #[derive(Clone, Debug)] pub struct ManifestImpl, M: MetaAction> { diff --git a/src/storage/src/manifest/region.rs b/src/storage/src/manifest/region.rs index 5938134870..4c80f47f1a 100644 --- a/src/storage/src/manifest/region.rs +++ b/src/storage/src/manifest/region.rs @@ -183,7 +183,8 @@ mod tests { use std::sync::Arc; use common_test_util::temp_dir::create_temp_dir; - use object_store::services::Fs; + use object_store::services::{Fs, S3}; + use object_store::test_util::{s3_test_config, TempFolder}; use object_store::ObjectStore; use store_api::manifest::action::ProtocolAction; use store_api::manifest::{Manifest, MetaActionIterator, MAX_VERSION}; @@ -195,17 +196,36 @@ mod tests { use crate::sst::FileId; #[tokio::test] - async fn test_region_manifest_compress() { - test_region_manifest(true).await + async fn test_fs_region_manifest_compress() { + let manifest = new_fs_manifest(true, None).await; + test_region_manifest(&manifest).await } #[tokio::test] - async fn test_region_manifest_uncompress() { - test_region_manifest(false).await + async fn test_fs_region_manifest_uncompress() { + let manifest = new_fs_manifest(false, None).await; + test_region_manifest(&manifest).await } - async fn test_region_manifest(compress: bool) { - common_telemetry::init_default_ut_logging(); + #[tokio::test] + async fn test_s3_region_manifest_compress() { + if s3_test_config().is_some() { + let (manifest, temp_dir) = new_s3_manifest(true, None).await; + test_region_manifest(&manifest).await; + temp_dir.remove_all().await.unwrap(); + } + } + + #[tokio::test] + async fn test_s3_region_manifest_uncompress() { + if s3_test_config().is_some() { + let (manifest, temp_dir) = new_s3_manifest(false, None).await; + test_region_manifest(&manifest).await; + temp_dir.remove_all().await.unwrap(); + } + } + + async fn new_fs_manifest(compress: bool, gc_duration: Option) -> RegionManifest { let tmp_dir = create_temp_dir("test_region_manifest"); let mut builder = Fs::default(); builder.root(&tmp_dir.path().to_string_lossy()); @@ -216,9 +236,43 @@ mod tests { object_store, manifest_compress_type(compress), None, - None, + gc_duration, ); manifest.start().await.unwrap(); + manifest + } + + async fn new_s3_manifest( + compress: bool, + gc_duration: Option, + ) -> (RegionManifest, TempFolder) { + let s3_config = s3_test_config().unwrap(); + let mut builder = S3::default(); + builder + .root(&s3_config.root) + .access_key_id(&s3_config.access_key_id) + .secret_access_key(&s3_config.secret_access_key) + .bucket(&s3_config.bucket); + + if s3_config.region.is_some() { + builder.region(s3_config.region.as_ref().unwrap()); + } + let store = ObjectStore::new(builder).unwrap().finish(); + let temp_folder = TempFolder::new(&store, "/"); + let manifest = RegionManifest::with_checkpointer( + "/manifest/", + store, + manifest_compress_type(compress), + None, + gc_duration, + ); + manifest.start().await.unwrap(); + + (manifest, temp_folder) + } + + async fn test_region_manifest(manifest: &RegionManifest) { + common_telemetry::init_default_ut_logging(); let region_meta = Arc::new(build_region_meta()); @@ -325,31 +379,48 @@ mod tests { } #[tokio::test] - async fn test_region_manifest_checkpoint_compress() { - test_region_manifest_checkpoint(true).await + async fn test_fs_region_manifest_checkpoint_compress() { + let duration = Duration::from_millis(50); + let manifest = new_fs_manifest(true, Some(duration)).await; + + test_region_manifest_checkpoint(&manifest, duration).await } #[tokio::test] - async fn test_region_manifest_checkpoint_uncompress() { - test_region_manifest_checkpoint(false).await + async fn test_fs_region_manifest_checkpoint_uncompress() { + let duration = Duration::from_millis(50); + let manifest = new_fs_manifest(false, Some(duration)).await; + + test_region_manifest_checkpoint(&manifest, duration).await } - async fn test_region_manifest_checkpoint(compress: bool) { - common_telemetry::init_default_ut_logging(); - let tmp_dir = create_temp_dir("test_region_manifest_checkpoint"); - let mut builder = Fs::default(); - builder.root(&tmp_dir.path().to_string_lossy()); - let object_store = ObjectStore::new(builder).unwrap().finish(); + #[tokio::test] + async fn test_s3_region_manifest_checkpoint_compress() { + if s3_test_config().is_some() { + let duration = Duration::from_millis(50); + let (manifest, temp_dir) = new_s3_manifest(true, Some(duration)).await; - let test_gc_duration = Duration::from_millis(50); - let manifest = RegionManifest::with_checkpointer( - "/manifest/", - object_store, - manifest_compress_type(compress), - None, - Some(test_gc_duration), - ); - manifest.start().await.unwrap(); + test_region_manifest_checkpoint(&manifest, duration).await; + temp_dir.remove_all().await.unwrap(); + } + } + + #[tokio::test] + async fn test_s3_region_manifest_checkpoint_uncompress() { + if s3_test_config().is_some() { + let duration = Duration::from_millis(50); + let (manifest, temp_dir) = new_s3_manifest(false, Some(duration)).await; + + test_region_manifest_checkpoint(&manifest, duration).await; + temp_dir.remove_all().await.unwrap(); + } + } + + async fn test_region_manifest_checkpoint( + manifest: &RegionManifest, + test_gc_duration: Duration, + ) { + common_telemetry::init_default_ut_logging(); let region_meta = Arc::new(build_region_meta()); let new_region_meta = Arc::new(build_altered_region_meta()); @@ -376,7 +447,7 @@ mod tests { manifest.update(action).await.unwrap(); } assert!(manifest.last_checkpoint().await.unwrap().is_none()); - assert_scan(&manifest, 0, 3).await; + assert_scan(manifest, 0, 3).await; // update flushed manifest version for doing checkpoint manifest.set_flushed_manifest_version(2); @@ -435,7 +506,7 @@ mod tests { manifest.update(action).await.unwrap(); } - assert_scan(&manifest, 3, 2).await; + assert_scan(manifest, 3, 2).await; // do another checkpoints // compacted RegionChange @@ -459,7 +530,7 @@ mod tests { files.contains_key(&file_ids[1]) && *metadata == RawRegionMetadata::from(region_meta.as_ref()))); - assert_scan(&manifest, 4, 1).await; + assert_scan(manifest, 4, 1).await; // compacted RegionEdit manifest.set_flushed_manifest_version(4); let checkpoint = manifest.do_checkpoint().await.unwrap().unwrap();