From 5b490d08ddabebc61ef89fba8ac9df4a3e0e059b Mon Sep 17 00:00:00 2001 From: albertlockett Date: Tue, 2 Apr 2024 09:38:06 -0400 Subject: [PATCH] add checked copy and new secondary copy behaviour --- rust/lancedb/src/io/object_store.rs | 199 ++++++++++++++++++++++++++-- 1 file changed, 185 insertions(+), 14 deletions(-) diff --git a/rust/lancedb/src/io/object_store.rs b/rust/lancedb/src/io/object_store.rs index 4e052d65..db24c9fa 100644 --- a/rust/lancedb/src/io/object_store.rs +++ b/rust/lancedb/src/io/object_store.rs @@ -39,11 +39,26 @@ use tokio::{ struct MirroringObjectStore { primary: Arc, secondary: Arc, + secondary_copy_behavior: MirroringSecondaryCopy, +} + +impl MirroringObjectStore { + async fn secondary_copy(&self, from: &Path, to: &Path) -> Result<()> { + let secondary_cp_result = self.secondary.copy(from, to).await; + match (&self.secondary_copy_behavior, secondary_cp_result) { + (_, Ok(_)) => Ok(()), + ( + MirroringSecondaryCopy::SkipIfNotFound, + Err(object_store::Error::NotFound { path: _, source: _ }), + ) => Ok(()), + (_, Err(e)) => return Err(e), + } + } } impl std::fmt::Display for MirroringObjectStore { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - writeln!(f, "MirrowingObjectStore")?; + writeln!(f, "MirroringObjectStore")?; writeln!(f, "primary:")?; self.primary.fmt(f)?; writeln!(f, "secondary:")?; @@ -62,12 +77,29 @@ impl PrimaryOnly for Path { } } -/// An object store that mirrors write to secondsry object store first +/// Controls the behavior of copying objects in the secondary store. +#[derive(Debug, Clone)] +pub enum MirroringSecondaryCopy { + // Default behaviour is to copy + Copy, + // Since the secondary store may not be as durable as the primary, the copy source + // may exist on the primary but not on the secondary. If the source is not found, + // this skips making the copy + SkipIfNotFound, +} + +impl Default for MirroringSecondaryCopy { + fn default() -> Self { + Self::Copy + } +} + +/// An object store that mirrors write to secondary object store first /// and than commit to primary object store. /// -/// This is meant to mirrow writes to a less-durable but lower-latency +/// This is meant to mirror writes to a less-durable but lower-latency /// store. We have primary store that is durable but slow, and a secondary -/// store that is fast but not asdurable +/// store that is fast but not as durable /// /// Note: this object store does not mirror writes to *.manifest files #[async_trait] @@ -156,7 +188,7 @@ impl ObjectStore for MirroringObjectStore { if to.primary_only() { self.primary.copy(from, to).await } else { - self.secondary.copy(from, to).await?; + self.secondary_copy(from, to).await?; self.primary.copy(from, to).await?; Ok(()) } @@ -164,7 +196,7 @@ impl ObjectStore for MirroringObjectStore { async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { if !to.primary_only() { - self.secondary.copy(from, to).await?; + self.secondary_copy(from, to).await?; } self.primary.copy_if_not_exists(from, to).await } @@ -314,23 +346,128 @@ impl AsyncWrite for MirroringUpload { #[derive(Debug)] pub struct MirroringObjectStoreWrapper { secondary: Arc, + secondary_copy_behavior: MirroringSecondaryCopy, + secondary_wrapper: Option>, } impl MirroringObjectStoreWrapper { pub fn new(secondary: Arc) -> Self { - Self { secondary } + Self { + secondary, + secondary_copy_behavior: MirroringSecondaryCopy::default(), + secondary_wrapper: None, + } + } + + pub fn with_secondary_copy_behavior( + mut self, + secondary_copy_behavior: MirroringSecondaryCopy, + ) -> Self { + self.secondary_copy_behavior = secondary_copy_behavior; + self + } + + pub fn with_secondary_wrapper(mut self, wrapper: Arc) -> Self { + self.secondary_wrapper = Some(wrapper); + self } } impl WrappingObjectStore for MirroringObjectStoreWrapper { fn wrap(&self, primary: Arc) -> Arc { + let mut secondary = self.secondary.clone(); + if let Some(wrapper) = &self.secondary_wrapper { + secondary = wrapper.wrap(secondary); + } Arc::new(MirroringObjectStore { primary, - secondary: self.secondary.clone(), + secondary, + secondary_copy_behavior: self.secondary_copy_behavior.clone(), }) } } +/// An object store that will check if the source of the copy exists before attempting +/// to copy the object. +/// +/// The primary use case is to workaround a bug in version 0.9 of object_store where +/// copying from a non-existent source causes the thread to hang forever +/// https://github.com/apache/arrow-rs/issues/5503 +#[derive(Debug)] +struct CheckedCopyObjectStore { + inner: Arc, +} + +impl std::fmt::Display for CheckedCopyObjectStore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + writeln!(f, "CheckedCopyObjectStore")?; + writeln!(f, "inner:")?; + self.inner.fmt(f)?; + Ok(()) + } +} + +#[async_trait] +impl ObjectStore for CheckedCopyObjectStore { + async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { + self.inner.put_opts(location, bytes, opts).await + } + + async fn put_multipart( + &self, + location: &Path, + ) -> Result<(MultipartId, Box)> { + self.inner.put_multipart(location).await + } + + async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> { + self.inner.abort_multipart(location, multipart_id).await + } + + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + self.inner.get_opts(location, options).await + } + + async fn delete(&self, location: &Path) -> Result<()> { + self.inner.delete(location).await + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { + self.inner.list(prefix) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &Path, to: &Path) -> Result<()> { + // check that the from object exists + self.inner.head(from).await?; + self.inner.copy(from, to).await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + // check that the from object exists + self.inner.head(from).await?; + self.inner.copy_if_not_exists(from, to).await + } +} + +#[derive(Debug)] +pub struct CheckedCopyObjectStoreWrapper {} + +impl CheckedCopyObjectStoreWrapper { + pub fn new() -> Self { + Self {} + } +} + +impl WrappingObjectStore for CheckedCopyObjectStoreWrapper { + fn wrap(&self, inner: Arc) -> Arc { + Arc::new(CheckedCopyObjectStore { inner }) + } +} + // windows pathing can't be simply concatenated #[cfg(all(test, not(windows)))] mod test { @@ -340,6 +477,7 @@ mod test { use lance::{dataset::WriteParams, io::ObjectStoreParams}; use lance_testing::datagen::{BatchGenerator, IncrementingInt32, RandomVector}; use object_store::local::LocalFileSystem; + use std::{error::Error, time::Duration}; use tempfile; use crate::{ @@ -354,9 +492,8 @@ mod test { let dir2 = tempfile::tempdir().unwrap().into_path(); let secondary_store = LocalFileSystem::new_with_prefix(dir2.to_str().unwrap()).unwrap(); - let object_store_wrapper = Arc::new(MirroringObjectStoreWrapper { - secondary: Arc::new(secondary_store), - }); + let object_store_wrapper = + Arc::new(MirroringObjectStoreWrapper::new(Arc::new(secondary_store))); let db = connect(dir1.to_str().unwrap()).execute().await.unwrap(); @@ -393,9 +530,9 @@ mod test { .await .unwrap(); - let bateches = q.try_collect::>().await.unwrap(); - assert_eq!(bateches.len(), 1); - assert_eq!(bateches[0].num_rows(), 10); + let batches = q.try_collect::>().await.unwrap(); + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].num_rows(), 10); use walkdir::WalkDir; @@ -430,4 +567,38 @@ mod test { secondary_elem = secondary_iter.next(); } } + + #[tokio::test] + async fn test_copy_loop_avoidance() { + let dir1 = tempfile::tempdir().unwrap().into_path(); + + let object_store_wrapper = CheckedCopyObjectStoreWrapper::new(); + + let store_params = ObjectStoreParams { + object_store_wrapper: Some(Arc::new(object_store_wrapper)), + ..Default::default() + }; + let (store, _) = + lance::io::ObjectStore::from_uri_and_params(dir1.to_str().unwrap(), &store_params) + .await + .unwrap(); + + // wrap in timeout to ensure we don't go into the infinite loop + // https://github.com/apache/arrow-rs/issues/5503 + tokio::time::timeout(Duration::from_secs(10), async move { + let result = store + .copy(&Path::from("hello1.txt"), &Path::from("hello2.txt")) + .await; + if result.is_ok() { + return Err("copy should have errored".to_string()); + } + if let Err(result) = result { + let source = result.source(); + } + Ok(()) + }) + .await + .unwrap() + .unwrap(); + } }