Compare commits

...

4 Commits

Author SHA1 Message Date
albertlockett
a40170c680 linter 2024-04-02 15:40:39 -04:00
albertlockett
ada28d3f81 implement display for copy behavior 2024-04-02 15:16:22 -04:00
albertlockett
f6bd785398 tests 2024-04-02 12:07:23 -04:00
albertlockett
5b490d08dd add checked copy and new secondary copy behaviour 2024-04-02 10:30:09 -04:00

View File

@@ -39,11 +39,29 @@ use tokio::{
struct MirroringObjectStore {
primary: Arc<dyn ObjectStore>,
secondary: Arc<dyn ObjectStore>,
secondary_copy_behavior: MirroringSecondaryCopy,
}
impl MirroringObjectStore {
async fn secondary_copy(&self, from: &Path, to: &Path) -> Result<()> {
let secondary_cp_result = self.secondary.copy(from, to).await;
match (&self.secondary_copy_behavior, secondary_cp_result) {
(_, Ok(_)) => Ok(()),
(
MirroringSecondaryCopy::SkipIfNotFound,
Err(object_store::Error::NotFound { path: _, source: _ }),
) => Ok(()),
(_, Err(e)) => Err(e),
}
}
}
impl std::fmt::Display for MirroringObjectStore {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
writeln!(f, "MirrowingObjectStore")?;
write!(f, "MirroringObjectStore(secondary_copy_behavior=")?;
self.secondary_copy_behavior.fmt(f)?;
writeln!(f, ")")?;
writeln!(f, "primary:")?;
self.primary.fmt(f)?;
writeln!(f, "secondary:")?;
@@ -62,12 +80,40 @@ impl PrimaryOnly for Path {
}
}
/// An object store that mirrors write to secondsry object store first
/// Controls the behavior of copying objects in the secondary store.
#[derive(Debug, Clone)]
pub enum MirroringSecondaryCopy {
// Default behaviour is to copy
Copy,
// Since the secondary store may not be as durable as the primary, the copy source
// may exist on the primary but not on the secondary. If the source is not found,
// this skips making the copy
SkipIfNotFound,
}
impl Default for MirroringSecondaryCopy {
fn default() -> Self {
Self::Copy
}
}
impl std::fmt::Display for MirroringSecondaryCopy {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Copy => write!(f, "Copy"),
Self::SkipIfNotFound => write!(f, "SkipIfNotFound"),
}?;
Ok(())
}
}
/// An object store that mirrors write to secondary object store first
/// and than commit to primary object store.
///
/// This is meant to mirrow writes to a less-durable but lower-latency
/// This is meant to mirror writes to a less-durable but lower-latency
/// store. We have primary store that is durable but slow, and a secondary
/// store that is fast but not asdurable
/// store that is fast but not as durable
///
/// Note: this object store does not mirror writes to *.manifest files
#[async_trait]
@@ -156,7 +202,7 @@ impl ObjectStore for MirroringObjectStore {
if to.primary_only() {
self.primary.copy(from, to).await
} else {
self.secondary.copy(from, to).await?;
self.secondary_copy(from, to).await?;
self.primary.copy(from, to).await?;
Ok(())
}
@@ -164,7 +210,7 @@ impl ObjectStore for MirroringObjectStore {
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
if !to.primary_only() {
self.secondary.copy(from, to).await?;
self.secondary_copy(from, to).await?;
}
self.primary.copy_if_not_exists(from, to).await
}
@@ -314,33 +360,152 @@ impl AsyncWrite for MirroringUpload {
#[derive(Debug)]
pub struct MirroringObjectStoreWrapper {
secondary: Arc<dyn ObjectStore>,
secondary_copy_behavior: MirroringSecondaryCopy,
secondary_wrapper: Option<Arc<dyn WrappingObjectStore>>,
}
impl MirroringObjectStoreWrapper {
pub fn new(secondary: Arc<dyn ObjectStore>) -> Self {
Self { secondary }
Self {
secondary,
secondary_copy_behavior: MirroringSecondaryCopy::default(),
secondary_wrapper: None,
}
}
pub fn with_secondary_copy_behavior(
mut self,
secondary_copy_behavior: MirroringSecondaryCopy,
) -> Self {
self.secondary_copy_behavior = secondary_copy_behavior;
self
}
pub fn with_secondary_wrapper(mut self, wrapper: Arc<dyn WrappingObjectStore>) -> Self {
self.secondary_wrapper = Some(wrapper);
self
}
}
impl WrappingObjectStore for MirroringObjectStoreWrapper {
fn wrap(&self, primary: Arc<dyn ObjectStore>) -> Arc<dyn ObjectStore> {
let mut secondary = self.secondary.clone();
if let Some(wrapper) = &self.secondary_wrapper {
secondary = wrapper.wrap(secondary);
}
Arc::new(MirroringObjectStore {
primary,
secondary: self.secondary.clone(),
secondary,
secondary_copy_behavior: self.secondary_copy_behavior.clone(),
})
}
}
/// An object store that will check if the source of the copy exists before attempting
/// to copy the object.
///
/// The primary use case is to workaround a bug in version 0.9 of object_store where
/// copying from a non-existent source causes the thread to hang forever
/// https://github.com/apache/arrow-rs/issues/5503
#[derive(Debug)]
struct CheckedCopyObjectStore {
inner: Arc<dyn ObjectStore>,
}
impl std::fmt::Display for CheckedCopyObjectStore {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
writeln!(f, "CheckedCopyObjectStore")?;
writeln!(f, "inner:")?;
self.inner.fmt(f)?;
Ok(())
}
}
#[async_trait]
impl ObjectStore for CheckedCopyObjectStore {
async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result<PutResult> {
self.inner.put_opts(location, bytes, opts).await
}
async fn put_multipart(
&self,
location: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
self.inner.put_multipart(location).await
}
async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> {
self.inner.abort_multipart(location, multipart_id).await
}
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
self.inner.get_opts(location, options).await
}
async fn delete(&self, location: &Path) -> Result<()> {
self.inner.delete(location).await
}
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
self.inner.list(prefix)
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
self.inner.list_with_delimiter(prefix).await
}
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
// check that the from object exists
self.inner.head(from).await?;
self.inner.copy(from, to).await
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
// check that the from object exists
self.inner.head(from).await?;
self.inner.copy_if_not_exists(from, to).await
}
}
#[derive(Debug)]
pub struct CheckedCopyObjectStoreWrapper {}
impl CheckedCopyObjectStoreWrapper {
pub fn new() -> Self {
Self {}
}
}
impl Default for CheckedCopyObjectStoreWrapper {
fn default() -> Self {
Self::new()
}
}
impl WrappingObjectStore for CheckedCopyObjectStoreWrapper {
fn wrap(&self, inner: Arc<dyn ObjectStore>) -> Arc<dyn ObjectStore> {
Arc::new(CheckedCopyObjectStore { inner })
}
}
// windows pathing can't be simply concatenated
#[cfg(all(test, not(windows)))]
mod test {
use super::*;
use futures::TryStreamExt;
use lance::{dataset::WriteParams, io::ObjectStoreParams};
use lance::{
dataset::WriteParams,
io::{ObjectStore, ObjectStoreParams},
};
use lance_testing::datagen::{BatchGenerator, IncrementingInt32, RandomVector};
use object_store::local::LocalFileSystem;
use object_store::ObjectStore as _;
use std::fs::File;
use std::io::Write;
use std::time::Duration;
use tempfile;
use url::Url;
use crate::{
connect,
@@ -354,9 +519,8 @@ mod test {
let dir2 = tempfile::tempdir().unwrap().into_path();
let secondary_store = LocalFileSystem::new_with_prefix(dir2.to_str().unwrap()).unwrap();
let object_store_wrapper = Arc::new(MirroringObjectStoreWrapper {
secondary: Arc::new(secondary_store),
});
let object_store_wrapper =
Arc::new(MirroringObjectStoreWrapper::new(Arc::new(secondary_store)));
let db = connect(dir1.to_str().unwrap()).execute().await.unwrap();
@@ -393,9 +557,9 @@ mod test {
.await
.unwrap();
let bateches = q.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(bateches.len(), 1);
assert_eq!(bateches[0].num_rows(), 10);
let batches = q.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 10);
use walkdir::WalkDir;
@@ -430,4 +594,100 @@ mod test {
secondary_elem = secondary_iter.next();
}
}
#[tokio::test]
async fn test_secondary_copy_skip_if_not_found() {
let dir1 = tempfile::tempdir().unwrap().into_path();
let dir2 = tempfile::tempdir().unwrap().into_path();
// create a file that only exists in partition 1
let file_path = format!("{}/hello.txt", dir1.to_str().unwrap());
let mut file = File::create(file_path).unwrap();
file.write_all(b"hello").unwrap();
// check we can copy a file that exists on the primary while skipping the secondary
let secondary_store =
Arc::new(LocalFileSystem::new_with_prefix(dir2.to_str().unwrap()).unwrap());
let mirroring_wrapper = MirroringObjectStoreWrapper::new(secondary_store.clone())
.with_secondary_copy_behavior(MirroringSecondaryCopy::SkipIfNotFound)
.with_secondary_wrapper(Arc::new(CheckedCopyObjectStoreWrapper::new()));
let primary_store = LocalFileSystem::new_with_prefix(dir1.to_str().unwrap()).unwrap();
let store = ObjectStore::new(
Arc::new(primary_store) as _,
Url::from_directory_path(dir1.clone()).unwrap(),
None,
Some(Arc::new(mirroring_wrapper)),
);
let result = store
.copy(&Path::from("hello.txt"), &Path::from("hello2.txt"))
.await;
assert!(result.is_ok());
assert!(store.exists(&Path::from("hello2.txt")).await.unwrap());
// check that we will return an error if using MirroedSecondarryCopy::Copy and also that the primary copy does not succeed
let mirroring_wrapper = MirroringObjectStoreWrapper::new(secondary_store.clone())
.with_secondary_copy_behavior(MirroringSecondaryCopy::Copy)
.with_secondary_wrapper(Arc::new(CheckedCopyObjectStoreWrapper::new()));
let primary_store = LocalFileSystem::new_with_prefix(dir1.to_str().unwrap()).unwrap();
let store = ObjectStore::new(
Arc::new(primary_store) as _,
Url::from_directory_path(dir1).unwrap(),
None,
Some(Arc::new(mirroring_wrapper)),
);
let result = store
.copy(&Path::from("hello.txt"), &Path::from("hello3.txt"))
.await;
assert!(result.is_err());
assert!(!store.exists(&Path::from("hello3.txt")).await.unwrap());
// check that if the file exists in the secondary store, we can successfully copy it
let file_path = format!("{}/hello.txt", dir2.to_str().unwrap());
let mut file = File::create(file_path).unwrap();
file.write_all(b"hello").unwrap();
let result = store
.copy(&Path::from("hello.txt"), &Path::from("hello3.txt"))
.await;
assert!(result.is_ok());
assert!(store.exists(&Path::from("hello3.txt")).await.unwrap());
assert!(secondary_store
.as_ref()
.head(&Path::from("hello3.txt"))
.await
.is_ok());
}
#[tokio::test]
async fn test_copy_loop_avoidance() {
let dir1 = tempfile::tempdir().unwrap().into_path();
let object_store_wrapper = CheckedCopyObjectStoreWrapper::new();
let store_params = ObjectStoreParams {
object_store_wrapper: Some(Arc::new(object_store_wrapper)),
..Default::default()
};
let (store, _) = ObjectStore::from_uri_and_params(dir1.to_str().unwrap(), &store_params)
.await
.unwrap();
// wrap in timeout to ensure we don't go into the infinite loop
// https://github.com/apache/arrow-rs/issues/5503
tokio::time::timeout(Duration::from_secs(10), async move {
let result = store
.copy(&Path::from("hello1.txt"), &Path::from("hello2.txt"))
.await;
if result.is_ok() {
return Err("copy should have errored".to_string());
}
Ok(())
})
.await
.unwrap()
.unwrap();
}
}