test: s3 manifest (#1755)

* feat: change default manifest options

* test: s3 manifest

* feat: revert checkpoint_margin to 10

* Update src/object-store/src/test_util.rs

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>

---------

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
This commit is contained in:
dennis zhuang
2023-06-09 18:28:41 +08:00
committed by GitHub
parent 7437820bdc
commit f08f726bec
9 changed files with 141 additions and 40 deletions

View File

@@ -51,7 +51,7 @@ max_purge_tasks = 32
# Create a checkpoint every <checkpoint_margin> actions.
checkpoint_margin = 10
# Region manifest logs and checkpoints gc execution duration
gc_duration = '30s'
gc_duration = '10m'
# Whether to try creating a manifest checkpoint on region opening
checkpoint_on_startup = false

View File

@@ -115,7 +115,7 @@ max_purge_tasks = 32
# Create a checkpoint every <checkpoint_margin> actions.
checkpoint_margin = 10
# Region manifest logs and checkpoints gc execution duration
gc_duration = '30s'
gc_duration = '10m'
# Whether to try creating a manifest checkpoint on region opening
checkpoint_on_startup = false

View File

@@ -221,7 +221,7 @@ impl Default for RegionManifestConfig {
fn default() -> Self {
Self {
checkpoint_margin: Some(10u16),
gc_duration: Some(Duration::from_secs(30)),
gc_duration: Some(Duration::from_secs(600)),
checkpoint_on_startup: false,
compress: false,
}

View File

@@ -295,7 +295,7 @@ fn build_row_key_desc(
let column_schemas = &table_schema.column_schemas();
//TODO(boyan): enable version column by table option?
//TODO(dennis): enable version column by table option?
let mut builder = RowKeyDescriptorBuilder::new(ts_column);
for index in primary_key_indices {

View File

@@ -14,9 +14,9 @@ metrics.workspace = true
opendal = { version = "0.36", features = ["layers-tracing", "layers-metrics"] }
pin-project = "1.0"
tokio.workspace = true
uuid.workspace = true
[dev-dependencies]
anyhow = "1.0"
common-telemetry = { path = "../common/telemetry" }
common-test-util = { path = "../common/test-util" }
uuid.workspace = true

View File

@@ -12,8 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::env;
use crate::{ObjectStore, Result};
/// Temp folder for object store test
pub struct TempFolder {
store: ObjectStore,
// The path under root.
@@ -28,7 +31,34 @@ impl TempFolder {
}
}
pub async fn remove_all(&mut self) -> Result<()> {
pub async fn remove_all(&self) -> Result<()> {
self.store.remove_all(&self.path).await
}
}
/// Test s3 config from environment variables
#[derive(Debug)]
pub struct TestS3Config {
pub root: String,
pub access_key_id: String,
pub secret_access_key: String,
pub bucket: String,
pub region: Option<String>,
}
/// Returns s3 test config, return None if not found.
pub fn s3_test_config() -> Option<TestS3Config> {
if let Ok(b) = env::var("GT_S3_BUCKET") {
if !b.is_empty() {
return Some(TestS3Config {
root: uuid::Uuid::new_v4().to_string(),
access_key_id: env::var("GT_S3_ACCESS_KEY_ID").ok()?,
secret_access_key: env::var("GT_S3_ACCESS_KEY").ok()?,
bucket: env::var("GT_S3_BUCKET").ok()?,
region: Some(env::var("GT_S3_REGION").ok()?),
});
}
}
None
}

View File

@@ -120,7 +120,7 @@ async fn test_s3_backend() -> Result<()> {
let store = ObjectStore::new(builder).unwrap().finish();
let mut guard = TempFolder::new(&store, "/");
let guard = TempFolder::new(&store, "/");
test_object_crud(&store).await?;
test_object_list(&store).await?;
guard.remove_all().await?;
@@ -148,7 +148,7 @@ async fn test_oss_backend() -> Result<()> {
let store = ObjectStore::new(builder).unwrap().finish();
let mut guard = TempFolder::new(&store, "/");
let guard = TempFolder::new(&store, "/");
test_object_crud(&store).await?;
test_object_list(&store).await?;
guard.remove_all().await?;
@@ -176,7 +176,7 @@ async fn test_azblob_backend() -> Result<()> {
let store = ObjectStore::new(builder).unwrap().finish();
let mut guard = TempFolder::new(&store, "/");
let guard = TempFolder::new(&store, "/");
test_object_crud(&store).await?;
test_object_list(&store).await?;
guard.remove_all().await?;

View File

@@ -36,7 +36,7 @@ use crate::manifest::checkpoint::Checkpointer;
use crate::manifest::storage::{ManifestObjectStore, ObjectStoreLogIterator};
const CHECKPOINT_ACTIONS_MARGIN: u16 = 10;
const GC_DURATION_SECS: u64 = 30;
const GC_DURATION_SECS: u64 = 600;
#[derive(Clone, Debug)]
pub struct ManifestImpl<S: Checkpoint<Error = Error>, M: MetaAction<Error = Error>> {

View File

@@ -183,7 +183,8 @@ mod tests {
use std::sync::Arc;
use common_test_util::temp_dir::create_temp_dir;
use object_store::services::Fs;
use object_store::services::{Fs, S3};
use object_store::test_util::{s3_test_config, TempFolder};
use object_store::ObjectStore;
use store_api::manifest::action::ProtocolAction;
use store_api::manifest::{Manifest, MetaActionIterator, MAX_VERSION};
@@ -195,17 +196,36 @@ mod tests {
use crate::sst::FileId;
#[tokio::test]
async fn test_region_manifest_compress() {
test_region_manifest(true).await
async fn test_fs_region_manifest_compress() {
let manifest = new_fs_manifest(true, None).await;
test_region_manifest(&manifest).await
}
#[tokio::test]
async fn test_region_manifest_uncompress() {
test_region_manifest(false).await
async fn test_fs_region_manifest_uncompress() {
let manifest = new_fs_manifest(false, None).await;
test_region_manifest(&manifest).await
}
async fn test_region_manifest(compress: bool) {
common_telemetry::init_default_ut_logging();
#[tokio::test]
async fn test_s3_region_manifest_compress() {
if s3_test_config().is_some() {
let (manifest, temp_dir) = new_s3_manifest(true, None).await;
test_region_manifest(&manifest).await;
temp_dir.remove_all().await.unwrap();
}
}
#[tokio::test]
async fn test_s3_region_manifest_uncompress() {
if s3_test_config().is_some() {
let (manifest, temp_dir) = new_s3_manifest(false, None).await;
test_region_manifest(&manifest).await;
temp_dir.remove_all().await.unwrap();
}
}
async fn new_fs_manifest(compress: bool, gc_duration: Option<Duration>) -> RegionManifest {
let tmp_dir = create_temp_dir("test_region_manifest");
let mut builder = Fs::default();
builder.root(&tmp_dir.path().to_string_lossy());
@@ -216,9 +236,43 @@ mod tests {
object_store,
manifest_compress_type(compress),
None,
None,
gc_duration,
);
manifest.start().await.unwrap();
manifest
}
async fn new_s3_manifest(
compress: bool,
gc_duration: Option<Duration>,
) -> (RegionManifest, TempFolder) {
let s3_config = s3_test_config().unwrap();
let mut builder = S3::default();
builder
.root(&s3_config.root)
.access_key_id(&s3_config.access_key_id)
.secret_access_key(&s3_config.secret_access_key)
.bucket(&s3_config.bucket);
if s3_config.region.is_some() {
builder.region(s3_config.region.as_ref().unwrap());
}
let store = ObjectStore::new(builder).unwrap().finish();
let temp_folder = TempFolder::new(&store, "/");
let manifest = RegionManifest::with_checkpointer(
"/manifest/",
store,
manifest_compress_type(compress),
None,
gc_duration,
);
manifest.start().await.unwrap();
(manifest, temp_folder)
}
async fn test_region_manifest(manifest: &RegionManifest) {
common_telemetry::init_default_ut_logging();
let region_meta = Arc::new(build_region_meta());
@@ -325,31 +379,48 @@ mod tests {
}
#[tokio::test]
async fn test_region_manifest_checkpoint_compress() {
test_region_manifest_checkpoint(true).await
async fn test_fs_region_manifest_checkpoint_compress() {
let duration = Duration::from_millis(50);
let manifest = new_fs_manifest(true, Some(duration)).await;
test_region_manifest_checkpoint(&manifest, duration).await
}
#[tokio::test]
async fn test_region_manifest_checkpoint_uncompress() {
test_region_manifest_checkpoint(false).await
async fn test_fs_region_manifest_checkpoint_uncompress() {
let duration = Duration::from_millis(50);
let manifest = new_fs_manifest(false, Some(duration)).await;
test_region_manifest_checkpoint(&manifest, duration).await
}
async fn test_region_manifest_checkpoint(compress: bool) {
common_telemetry::init_default_ut_logging();
let tmp_dir = create_temp_dir("test_region_manifest_checkpoint");
let mut builder = Fs::default();
builder.root(&tmp_dir.path().to_string_lossy());
let object_store = ObjectStore::new(builder).unwrap().finish();
#[tokio::test]
async fn test_s3_region_manifest_checkpoint_compress() {
if s3_test_config().is_some() {
let duration = Duration::from_millis(50);
let (manifest, temp_dir) = new_s3_manifest(true, Some(duration)).await;
let test_gc_duration = Duration::from_millis(50);
let manifest = RegionManifest::with_checkpointer(
"/manifest/",
object_store,
manifest_compress_type(compress),
None,
Some(test_gc_duration),
);
manifest.start().await.unwrap();
test_region_manifest_checkpoint(&manifest, duration).await;
temp_dir.remove_all().await.unwrap();
}
}
#[tokio::test]
async fn test_s3_region_manifest_checkpoint_uncompress() {
if s3_test_config().is_some() {
let duration = Duration::from_millis(50);
let (manifest, temp_dir) = new_s3_manifest(false, Some(duration)).await;
test_region_manifest_checkpoint(&manifest, duration).await;
temp_dir.remove_all().await.unwrap();
}
}
async fn test_region_manifest_checkpoint(
manifest: &RegionManifest,
test_gc_duration: Duration,
) {
common_telemetry::init_default_ut_logging();
let region_meta = Arc::new(build_region_meta());
let new_region_meta = Arc::new(build_altered_region_meta());
@@ -376,7 +447,7 @@ mod tests {
manifest.update(action).await.unwrap();
}
assert!(manifest.last_checkpoint().await.unwrap().is_none());
assert_scan(&manifest, 0, 3).await;
assert_scan(manifest, 0, 3).await;
// update flushed manifest version for doing checkpoint
manifest.set_flushed_manifest_version(2);
@@ -435,7 +506,7 @@ mod tests {
manifest.update(action).await.unwrap();
}
assert_scan(&manifest, 3, 2).await;
assert_scan(manifest, 3, 2).await;
// do another checkpoints
// compacted RegionChange
@@ -459,7 +530,7 @@ mod tests {
files.contains_key(&file_ids[1]) &&
*metadata == RawRegionMetadata::from(region_meta.as_ref())));
assert_scan(&manifest, 4, 1).await;
assert_scan(manifest, 4, 1).await;
// compacted RegionEdit
manifest.set_flushed_manifest_version(4);
let checkpoint = manifest.do_checkpoint().await.unwrap().unwrap();