This commit is contained in:
albertlockett
2024-04-02 12:07:23 -04:00
parent 5b490d08dd
commit f6bd785398

View File

@@ -51,7 +51,7 @@ impl MirroringObjectStore {
MirroringSecondaryCopy::SkipIfNotFound,
Err(object_store::Error::NotFound { path: _, source: _ }),
) => Ok(()),
(_, Err(e)) => return Err(e),
(_, Err(e)) => Err(e),
}
}
}
@@ -462,6 +462,12 @@ impl CheckedCopyObjectStoreWrapper {
}
}
impl Default for CheckedCopyObjectStoreWrapper {
fn default() -> Self {
Self::new()
}
}
impl WrappingObjectStore for CheckedCopyObjectStoreWrapper {
fn wrap(&self, inner: Arc<dyn ObjectStore>) -> Arc<dyn ObjectStore> {
Arc::new(CheckedCopyObjectStore { inner })
@@ -474,11 +480,18 @@ mod test {
use super::*;
use futures::TryStreamExt;
use lance::{dataset::WriteParams, io::ObjectStoreParams};
use lance::{
dataset::WriteParams,
io::{ObjectStore, ObjectStoreParams},
};
use lance_testing::datagen::{BatchGenerator, IncrementingInt32, RandomVector};
use object_store::local::LocalFileSystem;
use std::{error::Error, time::Duration};
use object_store::ObjectStore as _;
use std::fs::File;
use std::io::Write;
use std::time::Duration;
use tempfile;
use url::Url;
use crate::{
connect,
@@ -568,6 +581,72 @@ mod test {
}
}
#[tokio::test]
async fn test_secondary_copy_skip_if_not_found() {
let dir1 = tempfile::tempdir().unwrap().into_path();
let dir2 = tempfile::tempdir().unwrap().into_path();
// create a file that only exists in partition 1
let file_path = format!("{}/hello.txt", dir1.to_str().unwrap());
let mut file = File::create(file_path).unwrap();
file.write_all(b"hello").unwrap();
// check we can copy a file that exists on the primary while skipping the secondary
let secondary_store =
Arc::new(LocalFileSystem::new_with_prefix(dir2.to_str().unwrap()).unwrap());
let mirroring_wrapper = MirroringObjectStoreWrapper::new(secondary_store.clone())
.with_secondary_copy_behavior(MirroringSecondaryCopy::SkipIfNotFound)
.with_secondary_wrapper(Arc::new(CheckedCopyObjectStoreWrapper::new()));
let primary_store = LocalFileSystem::new_with_prefix(dir1.to_str().unwrap()).unwrap();
let store = ObjectStore::new(
Arc::new(primary_store) as _,
Url::from_directory_path(dir1.clone()).unwrap(),
None,
Some(Arc::new(mirroring_wrapper)),
);
let result = store
.copy(&Path::from("hello.txt"), &Path::from("hello2.txt"))
.await;
assert!(result.is_ok());
assert!(store.exists(&Path::from("hello2.txt")).await.unwrap());
// check that we will return an error if using MirroedSecondarryCopy::Copy and also that the primary copy does not succeed
let mirroring_wrapper = MirroringObjectStoreWrapper::new(secondary_store.clone())
.with_secondary_copy_behavior(MirroringSecondaryCopy::Copy)
.with_secondary_wrapper(Arc::new(CheckedCopyObjectStoreWrapper::new()));
let primary_store = LocalFileSystem::new_with_prefix(dir1.to_str().unwrap()).unwrap();
let store = ObjectStore::new(
Arc::new(primary_store) as _,
Url::from_directory_path(dir1).unwrap(),
None,
Some(Arc::new(mirroring_wrapper)),
);
let result = store
.copy(&Path::from("hello.txt"), &Path::from("hello3.txt"))
.await;
assert!(result.is_err());
assert!(!store.exists(&Path::from("hello3.txt")).await.unwrap());
// check that if the file exists in the secondary store, we can successfully copy it
let file_path = format!("{}/hello.txt", dir2.to_str().unwrap());
let mut file = File::create(file_path).unwrap();
file.write_all(b"hello").unwrap();
let result = store
.copy(&Path::from("hello.txt"), &Path::from("hello3.txt"))
.await;
assert!(result.is_ok());
assert!(store.exists(&Path::from("hello3.txt")).await.unwrap());
assert!(secondary_store
.as_ref()
.head(&Path::from("hello3.txt"))
.await
.is_ok());
}
#[tokio::test]
async fn test_copy_loop_avoidance() {
let dir1 = tempfile::tempdir().unwrap().into_path();
@@ -578,10 +657,9 @@ mod test {
object_store_wrapper: Some(Arc::new(object_store_wrapper)),
..Default::default()
};
let (store, _) =
lance::io::ObjectStore::from_uri_and_params(dir1.to_str().unwrap(), &store_params)
.await
.unwrap();
let (store, _) = ObjectStore::from_uri_and_params(dir1.to_str().unwrap(), &store_params)
.await
.unwrap();
// wrap in timeout to ensure we don't go into the infinite loop
// https://github.com/apache/arrow-rs/issues/5503
@@ -592,9 +670,6 @@ mod test {
if result.is_ok() {
return Err("copy should have errored".to_string());
}
if let Err(result) = result {
let source = result.source();
}
Ok(())
})
.await