mirror of
https://github.com/lancedb/lancedb.git
synced 2025-12-25 06:19:57 +00:00
Compare commits
9 Commits
python-v0.
...
1176/alber
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a40170c680 | ||
|
|
ada28d3f81 | ||
|
|
f6bd785398 | ||
|
|
5b490d08dd | ||
|
|
46e95f2c4c | ||
|
|
73810b4410 | ||
|
|
09280bc54a | ||
|
|
5603f1e57f | ||
|
|
1d67615cff |
@@ -226,3 +226,10 @@ extra:
|
||||
analytics:
|
||||
provider: google
|
||||
property: G-B7NFM40W74
|
||||
social:
|
||||
- icon: fontawesome/brands/github
|
||||
link: https://github.com/lancedb/lancedb
|
||||
- icon: fontawesome/brands/x-twitter
|
||||
link: https://twitter.com/lancedb
|
||||
- icon: fontawesome/brands/linkedin
|
||||
link: https://www.linkedin.com/company/lancedb
|
||||
|
||||
60
node/package-lock.json
generated
60
node/package-lock.json
generated
@@ -333,6 +333,66 @@
|
||||
"@jridgewell/sourcemap-codec": "^1.4.10"
|
||||
}
|
||||
},
|
||||
"node_modules/@lancedb/vectordb-darwin-arm64": {
|
||||
"version": "0.4.15",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-arm64/-/vectordb-darwin-arm64-0.4.15.tgz",
|
||||
"integrity": "sha512-asNVd0ojbExbj/iRCFu/+vpnnGtAHbrn7AjrL1PFeSl1JvsvzeZBBxq+WlM4UfGKfiJhkyBvQwh609OiCP3Snw==",
|
||||
"cpu": [
|
||||
"arm64"
|
||||
],
|
||||
"optional": true,
|
||||
"os": [
|
||||
"darwin"
|
||||
]
|
||||
},
|
||||
"node_modules/@lancedb/vectordb-darwin-x64": {
|
||||
"version": "0.4.15",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-x64/-/vectordb-darwin-x64-0.4.15.tgz",
|
||||
"integrity": "sha512-tbMGb1P9KXdnoP6dqFnjhUUjGIVzzo2V/Ewc8iktMU1scCVQ7/rEPPvTh9jHuM1r1i+wVTNtKYijIVaHZZPJLA==",
|
||||
"cpu": [
|
||||
"x64"
|
||||
],
|
||||
"optional": true,
|
||||
"os": [
|
||||
"darwin"
|
||||
]
|
||||
},
|
||||
"node_modules/@lancedb/vectordb-linux-arm64-gnu": {
|
||||
"version": "0.4.15",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-gnu/-/vectordb-linux-arm64-gnu-0.4.15.tgz",
|
||||
"integrity": "sha512-5oiWvS9Y5mRc2PhaVzu9zoM1UFf77gDgX8IM95U87CZZdPAoREDvnbVhyIZa2SEZccEjLG7tShe+PJsOFWOT/w==",
|
||||
"cpu": [
|
||||
"arm64"
|
||||
],
|
||||
"optional": true,
|
||||
"os": [
|
||||
"linux"
|
||||
]
|
||||
},
|
||||
"node_modules/@lancedb/vectordb-linux-x64-gnu": {
|
||||
"version": "0.4.15",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-gnu/-/vectordb-linux-x64-gnu-0.4.15.tgz",
|
||||
"integrity": "sha512-FjlrV2h97t5aoammRpPSBjws2cWSKYQpzALHm8Af8QjTuvo9J/MD63Fr/D6Sb9Ie2ER3pCiWD1o8UCFndCFtRg==",
|
||||
"cpu": [
|
||||
"x64"
|
||||
],
|
||||
"optional": true,
|
||||
"os": [
|
||||
"linux"
|
||||
]
|
||||
},
|
||||
"node_modules/@lancedb/vectordb-win32-x64-msvc": {
|
||||
"version": "0.4.15",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-x64-msvc/-/vectordb-win32-x64-msvc-0.4.15.tgz",
|
||||
"integrity": "sha512-o3zlY/FnY426kp2Y7xn4UbD6/Oeg5xqtezYNhhmt44lCmhSlpydx+2m9Fq1OGSDNUCWtrhMhgdHVnGTu2VTa5A==",
|
||||
"cpu": [
|
||||
"x64"
|
||||
],
|
||||
"optional": true,
|
||||
"os": [
|
||||
"win32"
|
||||
]
|
||||
},
|
||||
"node_modules/@neon-rs/cli": {
|
||||
"version": "0.0.160",
|
||||
"resolved": "https://registry.npmjs.org/@neon-rs/cli/-/cli-0.0.160.tgz",
|
||||
|
||||
@@ -454,8 +454,10 @@ export class RemoteTable<T = number[]> implements Table<T> {
|
||||
}
|
||||
}
|
||||
|
||||
async countRows (): Promise<number> {
|
||||
const result = await this._client.post(`/v1/table/${this._name}/describe/`)
|
||||
async countRows (filter?: string): Promise<number> {
|
||||
const result = await this._client.post(`/v1/table/${this._name}/count_rows/`, {
|
||||
predicate: filter
|
||||
})
|
||||
return (await result.body())?.stats?.num_rows
|
||||
}
|
||||
|
||||
|
||||
53
nodejs/package-lock.json
generated
53
nodejs/package-lock.json
generated
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb",
|
||||
"version": "0.4.3",
|
||||
"version": "0.4.15",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "@lancedb/lancedb",
|
||||
"version": "0.4.3",
|
||||
"version": "0.4.15",
|
||||
"cpu": [
|
||||
"x64",
|
||||
"arm64"
|
||||
@@ -45,11 +45,11 @@
|
||||
"node": ">= 18"
|
||||
},
|
||||
"optionalDependencies": {
|
||||
"@lancedb/lancedb-darwin-arm64": "0.4.3",
|
||||
"@lancedb/lancedb-darwin-x64": "0.4.3",
|
||||
"@lancedb/lancedb-linux-arm64-gnu": "0.4.3",
|
||||
"@lancedb/lancedb-linux-x64-gnu": "0.4.3",
|
||||
"@lancedb/lancedb-win32-x64-msvc": "0.4.3"
|
||||
"@lancedb/lancedb-darwin-arm64": "0.4.15",
|
||||
"@lancedb/lancedb-darwin-x64": "0.4.15",
|
||||
"@lancedb/lancedb-linux-arm64-gnu": "0.4.15",
|
||||
"@lancedb/lancedb-linux-x64-gnu": "0.4.15",
|
||||
"@lancedb/lancedb-win32-x64-msvc": "0.4.15"
|
||||
}
|
||||
},
|
||||
"node_modules/@75lb/deep-merge": {
|
||||
@@ -1320,9 +1320,9 @@
|
||||
}
|
||||
},
|
||||
"node_modules/@lancedb/lancedb-darwin-arm64": {
|
||||
"version": "0.4.3",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/lancedb-darwin-arm64/-/lancedb-darwin-arm64-0.4.3.tgz",
|
||||
"integrity": "sha512-+kxuWUK9vtLBbjFMkIKeQ32kxK2tgvZRCQaU1I3RJ3+dLmDIVeIj+KJSlMelkKa2QC4JoyHQi9Ty1PdS2DojmQ==",
|
||||
"version": "0.4.15",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/lancedb-darwin-arm64/-/lancedb-darwin-arm64-0.4.15.tgz",
|
||||
"integrity": "sha512-bBImUd2mMUrYzQtyvGSPA3AKxXF5Q4hAbWtv1PD4R8LvOmR6KGlWPiVp8ywkkHeue7DN+C/lFACUVw6iW06dTQ==",
|
||||
"cpu": [
|
||||
"arm64"
|
||||
],
|
||||
@@ -1335,9 +1335,9 @@
|
||||
}
|
||||
},
|
||||
"node_modules/@lancedb/lancedb-darwin-x64": {
|
||||
"version": "0.4.3",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/lancedb-darwin-x64/-/lancedb-darwin-x64-0.4.3.tgz",
|
||||
"integrity": "sha512-JYvsSYxTOa/7OMojulz9h0gN2FwvypG/6l6dpLkViZ5LDvRcfVyDTzOLcOJkFn+db4TKeBOVyMWnnpDKaB+jLA==",
|
||||
"version": "0.4.15",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/lancedb-darwin-x64/-/lancedb-darwin-x64-0.4.15.tgz",
|
||||
"integrity": "sha512-V1af4K+U21oL9zgbUCDfwPU9n0eOfdeb3bBCuxNRPz1GCVu8BOhKD07v9AiFolC4zoSkR8mXYvV2w3cxVN/Tlw==",
|
||||
"cpu": [
|
||||
"x64"
|
||||
],
|
||||
@@ -1349,10 +1349,25 @@
|
||||
"node": ">= 18"
|
||||
}
|
||||
},
|
||||
"node_modules/@lancedb/lancedb-linux-arm64-gnu": {
|
||||
"version": "0.4.15",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/lancedb-linux-arm64-gnu/-/lancedb-linux-arm64-gnu-0.4.15.tgz",
|
||||
"integrity": "sha512-rwo3xC0h8udlRtrlqk44n7h4Jc7wu5YuVB/pvcRU0UZGp0xKKwOdfO4mLflGmVlboKzqcjZFObOB2gcv7dRwLg==",
|
||||
"cpu": [
|
||||
"arm64"
|
||||
],
|
||||
"optional": true,
|
||||
"os": [
|
||||
"linux"
|
||||
],
|
||||
"engines": {
|
||||
"node": ">= 18"
|
||||
}
|
||||
},
|
||||
"node_modules/@lancedb/lancedb-linux-x64-gnu": {
|
||||
"version": "0.4.3",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/lancedb-linux-x64-gnu/-/lancedb-linux-x64-gnu-0.4.3.tgz",
|
||||
"integrity": "sha512-jDANHchWNGmu1wfAyBk0apoFlLxtJ7FRc31pAQ3tKE4fwlgG7bUcaTX6s5C3vMNWXnyQLQtVuWZNXi2nVj879g==",
|
||||
"version": "0.4.15",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/lancedb-linux-x64-gnu/-/lancedb-linux-x64-gnu-0.4.15.tgz",
|
||||
"integrity": "sha512-uOhhX0gfx8SSzekH43Od4RsR3/1T8BRq3+aijUKaDd9tllecwxv3B1ucPH9nNMaYzMwD/Y1+tJETOddgrjsD5g==",
|
||||
"cpu": [
|
||||
"x64"
|
||||
],
|
||||
@@ -1365,9 +1380,9 @@
|
||||
}
|
||||
},
|
||||
"node_modules/@lancedb/lancedb-win32-x64-msvc": {
|
||||
"version": "0.4.3",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/lancedb-win32-x64-msvc/-/lancedb-win32-x64-msvc-0.4.3.tgz",
|
||||
"integrity": "sha512-qADveXyv4YzllIbOOq8soqFfL7p7I35uhrD3PcTvj4Qxuo6q7pgQWQz2Mt3kGBpyPkH2yE4wWAGJhayShLRbiQ==",
|
||||
"version": "0.4.15",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/lancedb-win32-x64-msvc/-/lancedb-win32-x64-msvc-0.4.15.tgz",
|
||||
"integrity": "sha512-u+vaAWZMLrA9i99Xrf0P5bTRIc/1PhUcxP4Q7E8rlKhzodRQLYeUlFflCDBXZOiUcNMMkvnR3YN+YTpHWhXlgA==",
|
||||
"cpu": [
|
||||
"x64"
|
||||
],
|
||||
|
||||
@@ -499,11 +499,8 @@ class RemoteTable(Table):
|
||||
)
|
||||
|
||||
def count_rows(self, filter: Optional[str] = None) -> int:
|
||||
# payload = {"filter": filter}
|
||||
# self._conn._client.post(f"/v1/table/{self._name}/count_rows/", data=payload)
|
||||
return NotImplementedError(
|
||||
"count_rows() is not yet supported on the LanceDB cloud"
|
||||
)
|
||||
payload = {"predicate": filter}
|
||||
self._conn._client.post(f"/v1/table/{self._name}/count_rows/", data=payload)
|
||||
|
||||
def add_columns(self, transforms: Dict[str, str]):
|
||||
raise NotImplementedError(
|
||||
|
||||
@@ -101,8 +101,8 @@ impl TableNamesBuilder {
|
||||
///
|
||||
/// This can be combined with limit to implement pagination by setting this to
|
||||
/// the last table name from the previous page.
|
||||
pub fn start_after(mut self, start_after: String) -> Self {
|
||||
self.start_after = Some(start_after);
|
||||
pub fn start_after(mut self, start_after: impl Into<String>) -> Self {
|
||||
self.start_after = Some(start_after.into());
|
||||
self
|
||||
}
|
||||
|
||||
@@ -862,7 +862,7 @@ mod tests {
|
||||
|
||||
let tables = db
|
||||
.table_names()
|
||||
.start_after(names[30].clone())
|
||||
.start_after(&names[30])
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -871,7 +871,7 @@ mod tests {
|
||||
|
||||
let tables = db
|
||||
.table_names()
|
||||
.start_after(names[30].clone())
|
||||
.start_after(&names[30])
|
||||
.limit(7)
|
||||
.execute()
|
||||
.await
|
||||
|
||||
@@ -39,11 +39,29 @@ use tokio::{
|
||||
struct MirroringObjectStore {
|
||||
primary: Arc<dyn ObjectStore>,
|
||||
secondary: Arc<dyn ObjectStore>,
|
||||
secondary_copy_behavior: MirroringSecondaryCopy,
|
||||
}
|
||||
|
||||
impl MirroringObjectStore {
|
||||
async fn secondary_copy(&self, from: &Path, to: &Path) -> Result<()> {
|
||||
let secondary_cp_result = self.secondary.copy(from, to).await;
|
||||
match (&self.secondary_copy_behavior, secondary_cp_result) {
|
||||
(_, Ok(_)) => Ok(()),
|
||||
(
|
||||
MirroringSecondaryCopy::SkipIfNotFound,
|
||||
Err(object_store::Error::NotFound { path: _, source: _ }),
|
||||
) => Ok(()),
|
||||
(_, Err(e)) => Err(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for MirroringObjectStore {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
writeln!(f, "MirrowingObjectStore")?;
|
||||
write!(f, "MirroringObjectStore(secondary_copy_behavior=")?;
|
||||
self.secondary_copy_behavior.fmt(f)?;
|
||||
writeln!(f, ")")?;
|
||||
|
||||
writeln!(f, "primary:")?;
|
||||
self.primary.fmt(f)?;
|
||||
writeln!(f, "secondary:")?;
|
||||
@@ -62,12 +80,40 @@ impl PrimaryOnly for Path {
|
||||
}
|
||||
}
|
||||
|
||||
/// An object store that mirrors write to secondsry object store first
|
||||
/// Controls the behavior of copying objects in the secondary store.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum MirroringSecondaryCopy {
|
||||
// Default behaviour is to copy
|
||||
Copy,
|
||||
// Since the secondary store may not be as durable as the primary, the copy source
|
||||
// may exist on the primary but not on the secondary. If the source is not found,
|
||||
// this skips making the copy
|
||||
SkipIfNotFound,
|
||||
}
|
||||
|
||||
impl Default for MirroringSecondaryCopy {
|
||||
fn default() -> Self {
|
||||
Self::Copy
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for MirroringSecondaryCopy {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Copy => write!(f, "Copy"),
|
||||
Self::SkipIfNotFound => write!(f, "SkipIfNotFound"),
|
||||
}?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// An object store that mirrors write to secondary object store first
|
||||
/// and than commit to primary object store.
|
||||
///
|
||||
/// This is meant to mirrow writes to a less-durable but lower-latency
|
||||
/// This is meant to mirror writes to a less-durable but lower-latency
|
||||
/// store. We have primary store that is durable but slow, and a secondary
|
||||
/// store that is fast but not asdurable
|
||||
/// store that is fast but not as durable
|
||||
///
|
||||
/// Note: this object store does not mirror writes to *.manifest files
|
||||
#[async_trait]
|
||||
@@ -156,7 +202,7 @@ impl ObjectStore for MirroringObjectStore {
|
||||
if to.primary_only() {
|
||||
self.primary.copy(from, to).await
|
||||
} else {
|
||||
self.secondary.copy(from, to).await?;
|
||||
self.secondary_copy(from, to).await?;
|
||||
self.primary.copy(from, to).await?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -164,7 +210,7 @@ impl ObjectStore for MirroringObjectStore {
|
||||
|
||||
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
|
||||
if !to.primary_only() {
|
||||
self.secondary.copy(from, to).await?;
|
||||
self.secondary_copy(from, to).await?;
|
||||
}
|
||||
self.primary.copy_if_not_exists(from, to).await
|
||||
}
|
||||
@@ -314,33 +360,152 @@ impl AsyncWrite for MirroringUpload {
|
||||
#[derive(Debug)]
|
||||
pub struct MirroringObjectStoreWrapper {
|
||||
secondary: Arc<dyn ObjectStore>,
|
||||
secondary_copy_behavior: MirroringSecondaryCopy,
|
||||
secondary_wrapper: Option<Arc<dyn WrappingObjectStore>>,
|
||||
}
|
||||
|
||||
impl MirroringObjectStoreWrapper {
|
||||
pub fn new(secondary: Arc<dyn ObjectStore>) -> Self {
|
||||
Self { secondary }
|
||||
Self {
|
||||
secondary,
|
||||
secondary_copy_behavior: MirroringSecondaryCopy::default(),
|
||||
secondary_wrapper: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_secondary_copy_behavior(
|
||||
mut self,
|
||||
secondary_copy_behavior: MirroringSecondaryCopy,
|
||||
) -> Self {
|
||||
self.secondary_copy_behavior = secondary_copy_behavior;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_secondary_wrapper(mut self, wrapper: Arc<dyn WrappingObjectStore>) -> Self {
|
||||
self.secondary_wrapper = Some(wrapper);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl WrappingObjectStore for MirroringObjectStoreWrapper {
|
||||
fn wrap(&self, primary: Arc<dyn ObjectStore>) -> Arc<dyn ObjectStore> {
|
||||
let mut secondary = self.secondary.clone();
|
||||
if let Some(wrapper) = &self.secondary_wrapper {
|
||||
secondary = wrapper.wrap(secondary);
|
||||
}
|
||||
Arc::new(MirroringObjectStore {
|
||||
primary,
|
||||
secondary: self.secondary.clone(),
|
||||
secondary,
|
||||
secondary_copy_behavior: self.secondary_copy_behavior.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// An object store that will check if the source of the copy exists before attempting
|
||||
/// to copy the object.
|
||||
///
|
||||
/// The primary use case is to workaround a bug in version 0.9 of object_store where
|
||||
/// copying from a non-existent source causes the thread to hang forever
|
||||
/// https://github.com/apache/arrow-rs/issues/5503
|
||||
#[derive(Debug)]
|
||||
struct CheckedCopyObjectStore {
|
||||
inner: Arc<dyn ObjectStore>,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for CheckedCopyObjectStore {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
writeln!(f, "CheckedCopyObjectStore")?;
|
||||
writeln!(f, "inner:")?;
|
||||
self.inner.fmt(f)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ObjectStore for CheckedCopyObjectStore {
|
||||
async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result<PutResult> {
|
||||
self.inner.put_opts(location, bytes, opts).await
|
||||
}
|
||||
|
||||
async fn put_multipart(
|
||||
&self,
|
||||
location: &Path,
|
||||
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
|
||||
self.inner.put_multipart(location).await
|
||||
}
|
||||
|
||||
async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> {
|
||||
self.inner.abort_multipart(location, multipart_id).await
|
||||
}
|
||||
|
||||
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
|
||||
self.inner.get_opts(location, options).await
|
||||
}
|
||||
|
||||
async fn delete(&self, location: &Path) -> Result<()> {
|
||||
self.inner.delete(location).await
|
||||
}
|
||||
|
||||
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
|
||||
self.inner.list(prefix)
|
||||
}
|
||||
|
||||
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
|
||||
self.inner.list_with_delimiter(prefix).await
|
||||
}
|
||||
|
||||
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
|
||||
// check that the from object exists
|
||||
self.inner.head(from).await?;
|
||||
self.inner.copy(from, to).await
|
||||
}
|
||||
|
||||
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
|
||||
// check that the from object exists
|
||||
self.inner.head(from).await?;
|
||||
self.inner.copy_if_not_exists(from, to).await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct CheckedCopyObjectStoreWrapper {}
|
||||
|
||||
impl CheckedCopyObjectStoreWrapper {
|
||||
pub fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for CheckedCopyObjectStoreWrapper {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl WrappingObjectStore for CheckedCopyObjectStoreWrapper {
|
||||
fn wrap(&self, inner: Arc<dyn ObjectStore>) -> Arc<dyn ObjectStore> {
|
||||
Arc::new(CheckedCopyObjectStore { inner })
|
||||
}
|
||||
}
|
||||
|
||||
// windows pathing can't be simply concatenated
|
||||
#[cfg(all(test, not(windows)))]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
use futures::TryStreamExt;
|
||||
use lance::{dataset::WriteParams, io::ObjectStoreParams};
|
||||
use lance::{
|
||||
dataset::WriteParams,
|
||||
io::{ObjectStore, ObjectStoreParams},
|
||||
};
|
||||
use lance_testing::datagen::{BatchGenerator, IncrementingInt32, RandomVector};
|
||||
use object_store::local::LocalFileSystem;
|
||||
use object_store::ObjectStore as _;
|
||||
use std::fs::File;
|
||||
use std::io::Write;
|
||||
use std::time::Duration;
|
||||
use tempfile;
|
||||
use url::Url;
|
||||
|
||||
use crate::{
|
||||
connect,
|
||||
@@ -354,9 +519,8 @@ mod test {
|
||||
let dir2 = tempfile::tempdir().unwrap().into_path();
|
||||
|
||||
let secondary_store = LocalFileSystem::new_with_prefix(dir2.to_str().unwrap()).unwrap();
|
||||
let object_store_wrapper = Arc::new(MirroringObjectStoreWrapper {
|
||||
secondary: Arc::new(secondary_store),
|
||||
});
|
||||
let object_store_wrapper =
|
||||
Arc::new(MirroringObjectStoreWrapper::new(Arc::new(secondary_store)));
|
||||
|
||||
let db = connect(dir1.to_str().unwrap()).execute().await.unwrap();
|
||||
|
||||
@@ -393,9 +557,9 @@ mod test {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let bateches = q.try_collect::<Vec<_>>().await.unwrap();
|
||||
assert_eq!(bateches.len(), 1);
|
||||
assert_eq!(bateches[0].num_rows(), 10);
|
||||
let batches = q.try_collect::<Vec<_>>().await.unwrap();
|
||||
assert_eq!(batches.len(), 1);
|
||||
assert_eq!(batches[0].num_rows(), 10);
|
||||
|
||||
use walkdir::WalkDir;
|
||||
|
||||
@@ -430,4 +594,100 @@ mod test {
|
||||
secondary_elem = secondary_iter.next();
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_secondary_copy_skip_if_not_found() {
|
||||
let dir1 = tempfile::tempdir().unwrap().into_path();
|
||||
let dir2 = tempfile::tempdir().unwrap().into_path();
|
||||
|
||||
// create a file that only exists in partition 1
|
||||
let file_path = format!("{}/hello.txt", dir1.to_str().unwrap());
|
||||
let mut file = File::create(file_path).unwrap();
|
||||
file.write_all(b"hello").unwrap();
|
||||
|
||||
// check we can copy a file that exists on the primary while skipping the secondary
|
||||
let secondary_store =
|
||||
Arc::new(LocalFileSystem::new_with_prefix(dir2.to_str().unwrap()).unwrap());
|
||||
let mirroring_wrapper = MirroringObjectStoreWrapper::new(secondary_store.clone())
|
||||
.with_secondary_copy_behavior(MirroringSecondaryCopy::SkipIfNotFound)
|
||||
.with_secondary_wrapper(Arc::new(CheckedCopyObjectStoreWrapper::new()));
|
||||
|
||||
let primary_store = LocalFileSystem::new_with_prefix(dir1.to_str().unwrap()).unwrap();
|
||||
let store = ObjectStore::new(
|
||||
Arc::new(primary_store) as _,
|
||||
Url::from_directory_path(dir1.clone()).unwrap(),
|
||||
None,
|
||||
Some(Arc::new(mirroring_wrapper)),
|
||||
);
|
||||
|
||||
let result = store
|
||||
.copy(&Path::from("hello.txt"), &Path::from("hello2.txt"))
|
||||
.await;
|
||||
assert!(result.is_ok());
|
||||
assert!(store.exists(&Path::from("hello2.txt")).await.unwrap());
|
||||
|
||||
// check that we will return an error if using MirroedSecondarryCopy::Copy and also that the primary copy does not succeed
|
||||
let mirroring_wrapper = MirroringObjectStoreWrapper::new(secondary_store.clone())
|
||||
.with_secondary_copy_behavior(MirroringSecondaryCopy::Copy)
|
||||
.with_secondary_wrapper(Arc::new(CheckedCopyObjectStoreWrapper::new()));
|
||||
let primary_store = LocalFileSystem::new_with_prefix(dir1.to_str().unwrap()).unwrap();
|
||||
let store = ObjectStore::new(
|
||||
Arc::new(primary_store) as _,
|
||||
Url::from_directory_path(dir1).unwrap(),
|
||||
None,
|
||||
Some(Arc::new(mirroring_wrapper)),
|
||||
);
|
||||
|
||||
let result = store
|
||||
.copy(&Path::from("hello.txt"), &Path::from("hello3.txt"))
|
||||
.await;
|
||||
assert!(result.is_err());
|
||||
assert!(!store.exists(&Path::from("hello3.txt")).await.unwrap());
|
||||
|
||||
// check that if the file exists in the secondary store, we can successfully copy it
|
||||
let file_path = format!("{}/hello.txt", dir2.to_str().unwrap());
|
||||
let mut file = File::create(file_path).unwrap();
|
||||
file.write_all(b"hello").unwrap();
|
||||
|
||||
let result = store
|
||||
.copy(&Path::from("hello.txt"), &Path::from("hello3.txt"))
|
||||
.await;
|
||||
assert!(result.is_ok());
|
||||
assert!(store.exists(&Path::from("hello3.txt")).await.unwrap());
|
||||
assert!(secondary_store
|
||||
.as_ref()
|
||||
.head(&Path::from("hello3.txt"))
|
||||
.await
|
||||
.is_ok());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_copy_loop_avoidance() {
|
||||
let dir1 = tempfile::tempdir().unwrap().into_path();
|
||||
|
||||
let object_store_wrapper = CheckedCopyObjectStoreWrapper::new();
|
||||
|
||||
let store_params = ObjectStoreParams {
|
||||
object_store_wrapper: Some(Arc::new(object_store_wrapper)),
|
||||
..Default::default()
|
||||
};
|
||||
let (store, _) = ObjectStore::from_uri_and_params(dir1.to_str().unwrap(), &store_params)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// wrap in timeout to ensure we don't go into the infinite loop
|
||||
// https://github.com/apache/arrow-rs/issues/5503
|
||||
tokio::time::timeout(Duration::from_secs(10), async move {
|
||||
let result = store
|
||||
.copy(&Path::from("hello1.txt"), &Path::from("hello2.txt"))
|
||||
.await;
|
||||
if result.is_ok() {
|
||||
return Err("copy should have errored".to_string());
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user