Compare commits

..

9 Commits

Author SHA1 Message Date
Bert
2d49a26790 ensure table names are uri encoded for tables (#1189)
This prevents an issue where users can do something like:
```js
db.createTable('my-table#123123')
```
The server has logic to determine that '#' character is not allowed in
the table name, but currently this is being returned as 404 error
because it routes to `/v1/my-table#123123/create` and `#123123/create`
will not be parsed as part of path
2024-04-04 12:01:35 -07:00
qzhu
7f49f4cc35 add a default value for search.limit to be consistent with python sdk 2024-04-04 11:50:54 -07:00
Will Jones
8364d589ab feat: ship fp16kernels in Python wheels (#1148)
Same deal as https://github.com/lancedb/lance/pull/2098
2024-04-04 09:33:34 -07:00
Lei Xu
8687735bea chore: bump to 0.10.8 (#1187) 2024-04-03 16:52:32 -07:00
QianZhu
f0cd43da69 bug: fix the return value of countRows (#1186) 2024-04-03 16:31:49 -07:00
Lei Xu
7b954c7e3e chore: bump lance version (#1185)
Bump lance version to `0.10.7`
2024-04-03 14:46:05 -07:00
Bert
2579f29a92 fix error decoding in nodejs client (#1184)
fixes: #1183
2024-04-03 10:24:51 -04:00
QianZhu
7562b0fad1 remote count_rows need to return the number (#1181) 2024-04-02 13:12:22 -07:00
eduardjbotha
83b6b0d28a SQL Documentation includes DataFusion functions (#1179)
Show that it is possible to use the DataFusion functions in the `WHERE`
clause.

Co-authored-by: Eduard Botha <eduard.botha@inovex.de>
2024-04-02 07:49:48 -07:00
14 changed files with 92 additions and 315 deletions

View File

@@ -14,6 +14,10 @@ inputs:
# Note: this does *not* mean the host is arm64, since we might be cross-compiling.
required: false
default: "false"
manylinux:
description: "The manylinux version to build for"
required: false
default: "2_17"
runs:
using: "composite"
steps:
@@ -28,7 +32,7 @@ runs:
command: build
working-directory: python
target: x86_64-unknown-linux-gnu
manylinux: "2_17"
manylinux: ${{ inputs.manylinux }}
args: ${{ inputs.args }}
before-script-linux: |
set -e
@@ -43,7 +47,7 @@ runs:
command: build
working-directory: python
target: aarch64-unknown-linux-gnu
manylinux: "2_24"
manylinux: ${{ inputs.manylinux }}
args: ${{ inputs.args }}
before-script-linux: |
set -e

View File

@@ -6,13 +6,23 @@ on:
jobs:
linux:
name: Python ${{ matrix.config.platform }} manylinux${{ matrix.config.manylinux }}
timeout-minutes: 60
strategy:
matrix:
python-minor-version: ["8"]
platform:
- x86_64
- aarch64
config:
- platform: x86_64
manylinux: "2_17"
extra_args: ""
- platform: x86_64
manylinux: "2_28"
extra_args: "--features fp16kernels"
- platform: aarch64
manylinux: "2_24"
extra_args: ""
# We don't build fp16 kernels for aarch64, because it uses
# cross compilation image, which doesn't have a new enough compiler.
runs-on: "ubuntu-22.04"
steps:
- uses: actions/checkout@v4
@@ -26,8 +36,9 @@ jobs:
- uses: ./.github/workflows/build_linux_wheel
with:
python-minor-version: ${{ matrix.python-minor-version }}
args: "--release --strip"
arm-build: ${{ matrix.platform == 'aarch64' }}
args: "--release --strip ${{ matrix.config.extra_args }}"
arm-build: ${{ matrix.config.platform == 'aarch64' }}
manylinux: ${{ matrix.config.manylinux }}
- uses: ./.github/workflows/upload_wheel
with:
token: ${{ secrets.LANCEDB_PYPI_API_TOKEN }}
@@ -58,7 +69,7 @@ jobs:
- uses: ./.github/workflows/build_mac_wheel
with:
python-minor-version: ${{ matrix.python-minor-version }}
args: "--release --strip --target ${{ matrix.config.target }}"
args: "--release --strip --target ${{ matrix.config.target }} --features fp16kernels"
- uses: ./.github/workflows/upload_wheel
with:
python-minor-version: ${{ matrix.python-minor-version }}

View File

@@ -31,6 +31,10 @@ jobs:
run:
shell: bash
working-directory: rust
env:
# Need up-to-date compilers for kernels
CC: gcc-12
CXX: g++-12
steps:
- uses: actions/checkout@v4
with:
@@ -54,6 +58,10 @@ jobs:
run:
shell: bash
working-directory: rust
env:
# Need up-to-date compilers for kernels
CC: gcc-12
CXX: g++-12
steps:
- uses: actions/checkout@v4
with:

View File

@@ -14,10 +14,10 @@ keywords = ["lancedb", "lance", "database", "vector", "search"]
categories = ["database-implementations"]
[workspace.dependencies]
lance = { "version" = "=0.10.6", "features" = ["dynamodb"] }
lance-index = { "version" = "=0.10.6" }
lance-linalg = { "version" = "=0.10.6" }
lance-testing = { "version" = "=0.10.6" }
lance = { "version" = "=0.10.8", "features" = ["dynamodb"] }
lance-index = { "version" = "=0.10.8" }
lance-linalg = { "version" = "=0.10.8" }
lance-testing = { "version" = "=0.10.8" }
# Note that this one does not include pyarrow
arrow = { version = "50.0", optional = false }
arrow-array = "50.0"

View File

@@ -66,6 +66,7 @@ Currently, Lance supports a growing list of SQL expressions.
- `LIKE`, `NOT LIKE`
- `CAST`
- `regexp_match(column, pattern)`
- [DataFusion Functions](https://arrow.apache.org/datafusion/user-guide/sql/scalar_functions.html)
For example, the following filter string is acceptable:

View File

@@ -38,7 +38,7 @@ export class Query<T = number[]> {
constructor (query?: T, tbl?: any, embeddings?: EmbeddingFunction<T>) {
this._tbl = tbl
this._query = query
this._limit = undefined
this._limit = 10
this._nprobes = 20
this._refineFactor = undefined
this._select = undefined

View File

@@ -103,6 +103,18 @@ function toLanceRes (res: AxiosResponse): RemoteResponse {
}
}
async function decodeErrorData(
res: RemoteResponse,
responseType?: ResponseType
): Promise<string> {
const errorData = await res.body()
if (responseType === 'arraybuffer') {
return new TextDecoder().decode(errorData)
} else {
return errorData
}
}
export class HttpLancedbClient {
private readonly _url: string
private readonly _apiKey: () => string
@@ -180,7 +192,7 @@ export class HttpLancedbClient {
}
if (response.status !== 200) {
const errorData = new TextDecoder().decode(await response.body())
const errorData = await decodeErrorData(response)
throw new Error(
`Server Error, status: ${response.status}, ` +
`message: ${response.statusText}: ${errorData}`
@@ -226,7 +238,7 @@ export class HttpLancedbClient {
}
if (response.status !== 200) {
const errorData = new TextDecoder().decode(await response.body())
const errorData = await decodeErrorData(response, responseType)
throw new Error(
`Server Error, status: ${response.status}, ` +
`message: ${response.statusText}: ${errorData}`

View File

@@ -156,7 +156,7 @@ export class RemoteConnection implements Connection {
}
const res = await this._client.post(
`/v1/table/${tableName}/create/`,
`/v1/table/${encodeURIComponent(tableName)}/create/`,
buffer,
undefined,
'application/vnd.apache.arrow.stream'
@@ -177,7 +177,7 @@ export class RemoteConnection implements Connection {
}
async dropTable (name: string): Promise<void> {
await this._client.post(`/v1/table/${name}/drop/`)
await this._client.post(`/v1/table/${encodeURIComponent(name)}/drop/`)
}
withMiddleware (middleware: HttpMiddleware): Connection {
@@ -268,7 +268,7 @@ export class RemoteTable<T = number[]> implements Table<T> {
get schema (): Promise<any> {
return this._client
.post(`/v1/table/${this._name}/describe/`)
.post(`/v1/table/${encodeURIComponent(this._name)}/describe/`)
.then(async (res) => {
if (res.status !== 200) {
throw new Error(
@@ -282,7 +282,7 @@ export class RemoteTable<T = number[]> implements Table<T> {
}
search (query: T): Query<T> {
return new RemoteQuery(query, this._client, this._name) //, this._embeddings_new)
return new RemoteQuery(query, this._client, encodeURIComponent(this._name)) //, this._embeddings_new)
}
filter (where: string): Query<T> {
@@ -324,7 +324,7 @@ export class RemoteTable<T = number[]> implements Table<T> {
const buffer = await fromTableToStreamBuffer(tbl, this._embeddings)
const res = await this._client.post(
`/v1/table/${this._name}/merge_insert/`,
`/v1/table/${encodeURIComponent(this._name)}/merge_insert/`,
buffer,
queryParams,
'application/vnd.apache.arrow.stream'
@@ -348,7 +348,7 @@ export class RemoteTable<T = number[]> implements Table<T> {
const buffer = await fromTableToStreamBuffer(tbl, this._embeddings)
const res = await this._client.post(
`/v1/table/${this._name}/insert/`,
`/v1/table/${encodeURIComponent(this._name)}/insert/`,
buffer,
{
mode: 'append'
@@ -374,7 +374,7 @@ export class RemoteTable<T = number[]> implements Table<T> {
}
const buffer = await fromTableToStreamBuffer(tbl, this._embeddings)
const res = await this._client.post(
`/v1/table/${this._name}/insert/`,
`/v1/table/${encodeURIComponent(this._name)}/insert/`,
buffer,
{
mode: 'overwrite'
@@ -421,7 +421,7 @@ export class RemoteTable<T = number[]> implements Table<T> {
index_cache_size: indexCacheSize
}
const res = await this._client.post(
`/v1/table/${this._name}/create_index/`,
`/v1/table/${encodeURIComponent(this._name)}/create_index/`,
data
)
if (res.status !== 200) {
@@ -442,7 +442,7 @@ export class RemoteTable<T = number[]> implements Table<T> {
replace: true
}
const res = await this._client.post(
`/v1/table/${this._name}/create_scalar_index/`,
`/v1/table/${encodeURIComponent(this._name)}/create_scalar_index/`,
data
)
if (res.status !== 200) {
@@ -455,14 +455,14 @@ export class RemoteTable<T = number[]> implements Table<T> {
}
async countRows (filter?: string): Promise<number> {
const result = await this._client.post(`/v1/table/${this._name}/count_rows/`, {
const result = await this._client.post(`/v1/table/${encodeURIComponent(this._name)}/count_rows/`, {
predicate: filter
})
return (await result.body())?.stats?.num_rows
return (await result.body())
}
async delete (filter: string): Promise<void> {
await this._client.post(`/v1/table/${this._name}/delete/`, {
await this._client.post(`/v1/table/${encodeURIComponent(this._name)}/delete/`, {
predicate: filter
})
}
@@ -481,7 +481,7 @@ export class RemoteTable<T = number[]> implements Table<T> {
updates[key] = toSQL(value)
}
}
await this._client.post(`/v1/table/${this._name}/update/`, {
await this._client.post(`/v1/table/${encodeURIComponent(this._name)}/update/`, {
predicate: filter,
updates: Object.entries(updates).map(([key, value]) => [key, value])
})
@@ -489,7 +489,7 @@ export class RemoteTable<T = number[]> implements Table<T> {
async listIndices (): Promise<VectorIndex[]> {
const results = await this._client.post(
`/v1/table/${this._name}/index/list/`
`/v1/table/${encodeURIComponent(this._name)}/index/list/`
)
return (await results.body()).indexes?.map((index: any) => ({
columns: index.columns,
@@ -500,7 +500,7 @@ export class RemoteTable<T = number[]> implements Table<T> {
async indexStats (indexUuid: string): Promise<IndexStats> {
const results = await this._client.post(
`/v1/table/${this._name}/index/${indexUuid}/stats/`
`/v1/table/${encodeURIComponent(this._name)}/index/${indexUuid}/stats/`
)
const body = await results.body()
return {

View File

@@ -31,3 +31,6 @@ pyo3-build-config = { version = "0.20.3", features = [
"extension-module",
"abi3-py38",
] }
[features]
fp16kernels = ["lancedb/fp16kernels"]

View File

@@ -3,7 +3,7 @@ name = "lancedb"
version = "0.6.6"
dependencies = [
"deprecation",
"pylance==0.10.6",
"pylance==0.10.8",
"ratelimiter~=1.0",
"retry>=0.9.2",
"tqdm>=4.27.0",
@@ -41,6 +41,7 @@ classifiers = [
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Scientific/Engineering",
]

View File

@@ -500,7 +500,10 @@ class RemoteTable(Table):
def count_rows(self, filter: Optional[str] = None) -> int:
payload = {"predicate": filter}
self._conn._client.post(f"/v1/table/{self._name}/count_rows/", data=payload)
resp = self._conn._client.post(
f"/v1/table/{self._name}/count_rows/", data=payload
)
return resp
def add_columns(self, transforms: Dict[str, str]):
raise NotImplementedError(

View File

@@ -50,3 +50,4 @@ walkdir = "2"
[features]
default = ["remote"]
remote = ["dep:reqwest"]
fp16kernels = ["lance-linalg/fp16kernels"]

View File

@@ -39,29 +39,11 @@ use tokio::{
struct MirroringObjectStore {
primary: Arc<dyn ObjectStore>,
secondary: Arc<dyn ObjectStore>,
secondary_copy_behavior: MirroringSecondaryCopy,
}
impl MirroringObjectStore {
async fn secondary_copy(&self, from: &Path, to: &Path) -> Result<()> {
let secondary_cp_result = self.secondary.copy(from, to).await;
match (&self.secondary_copy_behavior, secondary_cp_result) {
(_, Ok(_)) => Ok(()),
(
MirroringSecondaryCopy::SkipIfNotFound,
Err(object_store::Error::NotFound { path: _, source: _ }),
) => Ok(()),
(_, Err(e)) => Err(e),
}
}
}
impl std::fmt::Display for MirroringObjectStore {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "MirroringObjectStore(secondary_copy_behavior=")?;
self.secondary_copy_behavior.fmt(f)?;
writeln!(f, ")")?;
writeln!(f, "MirrowingObjectStore")?;
writeln!(f, "primary:")?;
self.primary.fmt(f)?;
writeln!(f, "secondary:")?;
@@ -80,40 +62,12 @@ impl PrimaryOnly for Path {
}
}
/// Controls the behavior of copying objects in the secondary store.
#[derive(Debug, Clone)]
pub enum MirroringSecondaryCopy {
// Default behaviour is to copy
Copy,
// Since the secondary store may not be as durable as the primary, the copy source
// may exist on the primary but not on the secondary. If the source is not found,
// this skips making the copy
SkipIfNotFound,
}
impl Default for MirroringSecondaryCopy {
fn default() -> Self {
Self::Copy
}
}
impl std::fmt::Display for MirroringSecondaryCopy {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Copy => write!(f, "Copy"),
Self::SkipIfNotFound => write!(f, "SkipIfNotFound"),
}?;
Ok(())
}
}
/// An object store that mirrors write to secondary object store first
/// An object store that mirrors write to secondsry object store first
/// and than commit to primary object store.
///
/// This is meant to mirror writes to a less-durable but lower-latency
/// This is meant to mirrow writes to a less-durable but lower-latency
/// store. We have primary store that is durable but slow, and a secondary
/// store that is fast but not as durable
/// store that is fast but not asdurable
///
/// Note: this object store does not mirror writes to *.manifest files
#[async_trait]
@@ -202,7 +156,7 @@ impl ObjectStore for MirroringObjectStore {
if to.primary_only() {
self.primary.copy(from, to).await
} else {
self.secondary_copy(from, to).await?;
self.secondary.copy(from, to).await?;
self.primary.copy(from, to).await?;
Ok(())
}
@@ -210,7 +164,7 @@ impl ObjectStore for MirroringObjectStore {
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
if !to.primary_only() {
self.secondary_copy(from, to).await?;
self.secondary.copy(from, to).await?;
}
self.primary.copy_if_not_exists(from, to).await
}
@@ -360,152 +314,33 @@ impl AsyncWrite for MirroringUpload {
#[derive(Debug)]
pub struct MirroringObjectStoreWrapper {
secondary: Arc<dyn ObjectStore>,
secondary_copy_behavior: MirroringSecondaryCopy,
secondary_wrapper: Option<Arc<dyn WrappingObjectStore>>,
}
impl MirroringObjectStoreWrapper {
pub fn new(secondary: Arc<dyn ObjectStore>) -> Self {
Self {
secondary,
secondary_copy_behavior: MirroringSecondaryCopy::default(),
secondary_wrapper: None,
}
}
pub fn with_secondary_copy_behavior(
mut self,
secondary_copy_behavior: MirroringSecondaryCopy,
) -> Self {
self.secondary_copy_behavior = secondary_copy_behavior;
self
}
pub fn with_secondary_wrapper(mut self, wrapper: Arc<dyn WrappingObjectStore>) -> Self {
self.secondary_wrapper = Some(wrapper);
self
Self { secondary }
}
}
impl WrappingObjectStore for MirroringObjectStoreWrapper {
fn wrap(&self, primary: Arc<dyn ObjectStore>) -> Arc<dyn ObjectStore> {
let mut secondary = self.secondary.clone();
if let Some(wrapper) = &self.secondary_wrapper {
secondary = wrapper.wrap(secondary);
}
Arc::new(MirroringObjectStore {
primary,
secondary,
secondary_copy_behavior: self.secondary_copy_behavior.clone(),
secondary: self.secondary.clone(),
})
}
}
/// An object store that will check if the source of the copy exists before attempting
/// to copy the object.
///
/// The primary use case is to workaround a bug in version 0.9 of object_store where
/// copying from a non-existent source causes the thread to hang forever
/// https://github.com/apache/arrow-rs/issues/5503
#[derive(Debug)]
struct CheckedCopyObjectStore {
inner: Arc<dyn ObjectStore>,
}
impl std::fmt::Display for CheckedCopyObjectStore {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
writeln!(f, "CheckedCopyObjectStore")?;
writeln!(f, "inner:")?;
self.inner.fmt(f)?;
Ok(())
}
}
#[async_trait]
impl ObjectStore for CheckedCopyObjectStore {
async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result<PutResult> {
self.inner.put_opts(location, bytes, opts).await
}
async fn put_multipart(
&self,
location: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
self.inner.put_multipart(location).await
}
async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> {
self.inner.abort_multipart(location, multipart_id).await
}
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
self.inner.get_opts(location, options).await
}
async fn delete(&self, location: &Path) -> Result<()> {
self.inner.delete(location).await
}
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
self.inner.list(prefix)
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
self.inner.list_with_delimiter(prefix).await
}
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
// check that the from object exists
self.inner.head(from).await?;
self.inner.copy(from, to).await
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
// check that the from object exists
self.inner.head(from).await?;
self.inner.copy_if_not_exists(from, to).await
}
}
#[derive(Debug)]
pub struct CheckedCopyObjectStoreWrapper {}
impl CheckedCopyObjectStoreWrapper {
pub fn new() -> Self {
Self {}
}
}
impl Default for CheckedCopyObjectStoreWrapper {
fn default() -> Self {
Self::new()
}
}
impl WrappingObjectStore for CheckedCopyObjectStoreWrapper {
fn wrap(&self, inner: Arc<dyn ObjectStore>) -> Arc<dyn ObjectStore> {
Arc::new(CheckedCopyObjectStore { inner })
}
}
// windows pathing can't be simply concatenated
#[cfg(all(test, not(windows)))]
mod test {
use super::*;
use futures::TryStreamExt;
use lance::{
dataset::WriteParams,
io::{ObjectStore, ObjectStoreParams},
};
use lance::{dataset::WriteParams, io::ObjectStoreParams};
use lance_testing::datagen::{BatchGenerator, IncrementingInt32, RandomVector};
use object_store::local::LocalFileSystem;
use object_store::ObjectStore as _;
use std::fs::File;
use std::io::Write;
use std::time::Duration;
use tempfile;
use url::Url;
use crate::{
connect,
@@ -519,8 +354,9 @@ mod test {
let dir2 = tempfile::tempdir().unwrap().into_path();
let secondary_store = LocalFileSystem::new_with_prefix(dir2.to_str().unwrap()).unwrap();
let object_store_wrapper =
Arc::new(MirroringObjectStoreWrapper::new(Arc::new(secondary_store)));
let object_store_wrapper = Arc::new(MirroringObjectStoreWrapper {
secondary: Arc::new(secondary_store),
});
let db = connect(dir1.to_str().unwrap()).execute().await.unwrap();
@@ -557,9 +393,9 @@ mod test {
.await
.unwrap();
let batches = q.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 10);
let bateches = q.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(bateches.len(), 1);
assert_eq!(bateches[0].num_rows(), 10);
use walkdir::WalkDir;
@@ -594,100 +430,4 @@ mod test {
secondary_elem = secondary_iter.next();
}
}
#[tokio::test]
async fn test_secondary_copy_skip_if_not_found() {
let dir1 = tempfile::tempdir().unwrap().into_path();
let dir2 = tempfile::tempdir().unwrap().into_path();
// create a file that only exists in partition 1
let file_path = format!("{}/hello.txt", dir1.to_str().unwrap());
let mut file = File::create(file_path).unwrap();
file.write_all(b"hello").unwrap();
// check we can copy a file that exists on the primary while skipping the secondary
let secondary_store =
Arc::new(LocalFileSystem::new_with_prefix(dir2.to_str().unwrap()).unwrap());
let mirroring_wrapper = MirroringObjectStoreWrapper::new(secondary_store.clone())
.with_secondary_copy_behavior(MirroringSecondaryCopy::SkipIfNotFound)
.with_secondary_wrapper(Arc::new(CheckedCopyObjectStoreWrapper::new()));
let primary_store = LocalFileSystem::new_with_prefix(dir1.to_str().unwrap()).unwrap();
let store = ObjectStore::new(
Arc::new(primary_store) as _,
Url::from_directory_path(dir1.clone()).unwrap(),
None,
Some(Arc::new(mirroring_wrapper)),
);
let result = store
.copy(&Path::from("hello.txt"), &Path::from("hello2.txt"))
.await;
assert!(result.is_ok());
assert!(store.exists(&Path::from("hello2.txt")).await.unwrap());
// check that we will return an error if using MirroedSecondarryCopy::Copy and also that the primary copy does not succeed
let mirroring_wrapper = MirroringObjectStoreWrapper::new(secondary_store.clone())
.with_secondary_copy_behavior(MirroringSecondaryCopy::Copy)
.with_secondary_wrapper(Arc::new(CheckedCopyObjectStoreWrapper::new()));
let primary_store = LocalFileSystem::new_with_prefix(dir1.to_str().unwrap()).unwrap();
let store = ObjectStore::new(
Arc::new(primary_store) as _,
Url::from_directory_path(dir1).unwrap(),
None,
Some(Arc::new(mirroring_wrapper)),
);
let result = store
.copy(&Path::from("hello.txt"), &Path::from("hello3.txt"))
.await;
assert!(result.is_err());
assert!(!store.exists(&Path::from("hello3.txt")).await.unwrap());
// check that if the file exists in the secondary store, we can successfully copy it
let file_path = format!("{}/hello.txt", dir2.to_str().unwrap());
let mut file = File::create(file_path).unwrap();
file.write_all(b"hello").unwrap();
let result = store
.copy(&Path::from("hello.txt"), &Path::from("hello3.txt"))
.await;
assert!(result.is_ok());
assert!(store.exists(&Path::from("hello3.txt")).await.unwrap());
assert!(secondary_store
.as_ref()
.head(&Path::from("hello3.txt"))
.await
.is_ok());
}
#[tokio::test]
async fn test_copy_loop_avoidance() {
let dir1 = tempfile::tempdir().unwrap().into_path();
let object_store_wrapper = CheckedCopyObjectStoreWrapper::new();
let store_params = ObjectStoreParams {
object_store_wrapper: Some(Arc::new(object_store_wrapper)),
..Default::default()
};
let (store, _) = ObjectStore::from_uri_and_params(dir1.to_str().unwrap(), &store_params)
.await
.unwrap();
// wrap in timeout to ensure we don't go into the infinite loop
// https://github.com/apache/arrow-rs/issues/5503
tokio::time::timeout(Duration::from_secs(10), async move {
let result = store
.copy(&Path::from("hello1.txt"), &Path::from("hello2.txt"))
.await;
if result.is_ok() {
return Err("copy should have errored".to_string());
}
Ok(())
})
.await
.unwrap()
.unwrap();
}
}

View File

@@ -1304,14 +1304,7 @@ impl TableInternal for NativeTable {
}
async fn count_rows(&self, filter: Option<String>) -> Result<usize> {
let dataset = self.dataset.get().await?;
if let Some(filter) = filter {
let mut scanner = dataset.scan();
scanner.filter(&filter)?;
Ok(scanner.count_rows().await? as usize)
} else {
Ok(dataset.count_rows().await?)
}
Ok(self.dataset.get().await?.count_rows(filter).await?)
}
async fn add(