Compare commits

...

13 Commits

Author SHA1 Message Date
Lance Release
51437bc228 Bump version: 0.21.0-beta.0 → 0.21.0-beta.1 2025-03-06 19:23:06 +00:00
Bert
fa53cfcfd2 feat: support modifying field metadata in lancedb python (#2178) 2025-03-04 16:58:46 -05:00
vinoyang
374fe0ad95 feat(rust): introduce Catalog trait and implement ListingCatalog (#2148)
Co-authored-by: Weston Pace <weston.pace@gmail.com>
2025-03-03 20:22:24 -08:00
BubbleCal
35e5b84ba9 chore: upgrade lance to 0.24.0-beta.1 (#2171)
Signed-off-by: BubbleCal <bubble-cal@outlook.com>
2025-03-03 12:32:12 +08:00
Lei Xu
7c12d497b0 ci: bump python to 3.12 in GHA (#2169) 2025-03-01 17:24:02 -08:00
ayao227
dfe4ba8dad chore: add reo integration (#2149)
This PR adds reo integration to the lancedb documentation website.
2025-02-28 07:51:34 -08:00
Weston Pace
fa1b9ad5bd fix: don't use with_schema to remove schema metadata (#2162)
It seems that `RecordBatch::with_schema` is unable to remove schema
metadata from a batch. It fails with the error `target schema is not
superset of current schema`.

I'm not sure how the `test_metadata_erased` test is passing. Strangely,
the metadata was not present by the time the batch arrived at the
metadata eraser. I think maybe the schema metadata is only present in
the batch if there is a filter.

I've created a new unit test that makes sure the metadata is erased if
we have a filter also
2025-02-27 10:24:00 -08:00
BubbleCal
8877eb020d feat: record the server version for remote table (#2147)
Signed-off-by: BubbleCal <bubble-cal@outlook.com>
2025-02-27 15:55:59 +08:00
Will Jones
01e4291d21 feat(python): drop hard dependency on pylance (#2156)
Closes #1793
2025-02-26 15:53:45 -08:00
Lance Release
ab3ea76ad1 Updating package-lock.json 2025-02-26 21:23:39 +00:00
Lance Release
728ef8657d Updating package-lock.json 2025-02-26 20:11:37 +00:00
Lance Release
0b13901a16 Updating package-lock.json 2025-02-26 20:11:22 +00:00
Lance Release
84b110e0ef Bump version: 0.17.0 → 0.18.0-beta.0 2025-02-26 20:11:07 +00:00
41 changed files with 1254 additions and 206 deletions

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.17.0"
current_version = "0.18.0-beta.0"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -41,7 +41,7 @@ jobs:
doctest:
name: "Doctest"
timeout-minutes: 30
runs-on: "ubuntu-22.04"
runs-on: "ubuntu-24.04"
defaults:
run:
shell: bash
@@ -54,7 +54,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
python-version: "3.12"
cache: "pip"
- name: Install protobuf
run: |
@@ -75,8 +75,8 @@ jobs:
timeout-minutes: 30
strategy:
matrix:
python-minor-version: ["9", "11"]
runs-on: "ubuntu-22.04"
python-minor-version: ["9", "12"]
runs-on: "ubuntu-24.04"
defaults:
run:
shell: bash
@@ -127,7 +127,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
python-version: "3.12"
- uses: Swatinem/rust-cache@v2
with:
workspaces: python
@@ -157,7 +157,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
python-version: "3.12"
- uses: Swatinem/rust-cache@v2
with:
workspaces: python
@@ -168,7 +168,7 @@ jobs:
run: rm -rf target/wheels
pydantic1x:
timeout-minutes: 30
runs-on: "ubuntu-22.04"
runs-on: "ubuntu-24.04"
defaults:
run:
shell: bash

View File

@@ -184,15 +184,17 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Install dependencies
- name: Install dependencies (part 1)
run: |
set -e
apk add protobuf-dev curl clang lld llvm19 grep npm bash msitools sed
curl --proto '=https' --tlsv1.3 -sSf https://raw.githubusercontent.com/rust-lang/rustup/refs/heads/master/rustup-init.sh | sh -s -- -y
source $HOME/.cargo/env
rustup target add aarch64-pc-windows-msvc
- name: Install rust
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
target: aarch64-pc-windows-msvc
- name: Install dependencies (part 2)
run: |
set -e
mkdir -p sysroot
cd sysroot
sh ../ci/sysroot-aarch64-pc-windows-msvc.sh
@@ -264,7 +266,7 @@ jobs:
- name: Install Rust
run: |
Invoke-WebRequest https://win.rustup.rs/x86_64 -OutFile rustup-init.exe
.\rustup-init.exe -y --default-host aarch64-pc-windows-msvc
.\rustup-init.exe -y --default-host aarch64-pc-windows-msvc --default-toolchain 1.83.0
shell: powershell
- name: Add Rust to PATH
run: |

94
Cargo.lock generated
View File

@@ -2570,8 +2570,8 @@ dependencies = [
[[package]]
name = "fsst"
version = "0.23.3"
source = "git+https://github.com/lancedb/lance.git?tag=v0.23.3-beta.1#f69480e34dc1be6f1689cc8c36d43f6d8866c678"
version = "0.24.0"
source = "git+https://github.com/lancedb/lance.git?tag=v0.24.0-beta.1#33ae43b2944c12e0dbd139e8aa098cffa74edef5"
dependencies = [
"rand",
]
@@ -3532,8 +3532,8 @@ dependencies = [
[[package]]
name = "lance"
version = "0.23.3"
source = "git+https://github.com/lancedb/lance.git?tag=v0.23.3-beta.1#f69480e34dc1be6f1689cc8c36d43f6d8866c678"
version = "0.24.0"
source = "git+https://github.com/lancedb/lance.git?tag=v0.24.0-beta.1#33ae43b2944c12e0dbd139e8aa098cffa74edef5"
dependencies = [
"arrow",
"arrow-arith",
@@ -3592,8 +3592,8 @@ dependencies = [
[[package]]
name = "lance-arrow"
version = "0.23.3"
source = "git+https://github.com/lancedb/lance.git?tag=v0.23.3-beta.1#f69480e34dc1be6f1689cc8c36d43f6d8866c678"
version = "0.24.0"
source = "git+https://github.com/lancedb/lance.git?tag=v0.24.0-beta.1#33ae43b2944c12e0dbd139e8aa098cffa74edef5"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -3610,8 +3610,8 @@ dependencies = [
[[package]]
name = "lance-core"
version = "0.23.3"
source = "git+https://github.com/lancedb/lance.git?tag=v0.23.3-beta.1#f69480e34dc1be6f1689cc8c36d43f6d8866c678"
version = "0.24.0"
source = "git+https://github.com/lancedb/lance.git?tag=v0.24.0-beta.1#33ae43b2944c12e0dbd139e8aa098cffa74edef5"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -3647,8 +3647,8 @@ dependencies = [
[[package]]
name = "lance-datafusion"
version = "0.23.3"
source = "git+https://github.com/lancedb/lance.git?tag=v0.23.3-beta.1#f69480e34dc1be6f1689cc8c36d43f6d8866c678"
version = "0.24.0"
source = "git+https://github.com/lancedb/lance.git?tag=v0.24.0-beta.1#33ae43b2944c12e0dbd139e8aa098cffa74edef5"
dependencies = [
"arrow",
"arrow-array",
@@ -3673,8 +3673,8 @@ dependencies = [
[[package]]
name = "lance-encoding"
version = "0.23.3"
source = "git+https://github.com/lancedb/lance.git?tag=v0.23.3-beta.1#f69480e34dc1be6f1689cc8c36d43f6d8866c678"
version = "0.24.0"
source = "git+https://github.com/lancedb/lance.git?tag=v0.24.0-beta.1#33ae43b2944c12e0dbd139e8aa098cffa74edef5"
dependencies = [
"arrayref",
"arrow",
@@ -3712,8 +3712,8 @@ dependencies = [
[[package]]
name = "lance-file"
version = "0.23.3"
source = "git+https://github.com/lancedb/lance.git?tag=v0.23.3-beta.1#f69480e34dc1be6f1689cc8c36d43f6d8866c678"
version = "0.24.0"
source = "git+https://github.com/lancedb/lance.git?tag=v0.24.0-beta.1#33ae43b2944c12e0dbd139e8aa098cffa74edef5"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -3747,8 +3747,8 @@ dependencies = [
[[package]]
name = "lance-index"
version = "0.23.3"
source = "git+https://github.com/lancedb/lance.git?tag=v0.23.3-beta.1#f69480e34dc1be6f1689cc8c36d43f6d8866c678"
version = "0.24.0"
source = "git+https://github.com/lancedb/lance.git?tag=v0.24.0-beta.1#33ae43b2944c12e0dbd139e8aa098cffa74edef5"
dependencies = [
"arrow",
"arrow-array",
@@ -3800,8 +3800,8 @@ dependencies = [
[[package]]
name = "lance-io"
version = "0.23.3"
source = "git+https://github.com/lancedb/lance.git?tag=v0.23.3-beta.1#f69480e34dc1be6f1689cc8c36d43f6d8866c678"
version = "0.24.0"
source = "git+https://github.com/lancedb/lance.git?tag=v0.24.0-beta.1#33ae43b2944c12e0dbd139e8aa098cffa74edef5"
dependencies = [
"arrow",
"arrow-arith",
@@ -3839,8 +3839,8 @@ dependencies = [
[[package]]
name = "lance-linalg"
version = "0.23.3"
source = "git+https://github.com/lancedb/lance.git?tag=v0.23.3-beta.1#f69480e34dc1be6f1689cc8c36d43f6d8866c678"
version = "0.24.0"
source = "git+https://github.com/lancedb/lance.git?tag=v0.24.0-beta.1#33ae43b2944c12e0dbd139e8aa098cffa74edef5"
dependencies = [
"arrow-array",
"arrow-ord",
@@ -3863,8 +3863,8 @@ dependencies = [
[[package]]
name = "lance-table"
version = "0.23.3"
source = "git+https://github.com/lancedb/lance.git?tag=v0.23.3-beta.1#f69480e34dc1be6f1689cc8c36d43f6d8866c678"
version = "0.24.0"
source = "git+https://github.com/lancedb/lance.git?tag=v0.24.0-beta.1#33ae43b2944c12e0dbd139e8aa098cffa74edef5"
dependencies = [
"arrow",
"arrow-array",
@@ -3903,8 +3903,8 @@ dependencies = [
[[package]]
name = "lance-testing"
version = "0.23.3"
source = "git+https://github.com/lancedb/lance.git?tag=v0.23.3-beta.1#f69480e34dc1be6f1689cc8c36d43f6d8866c678"
version = "0.24.0"
source = "git+https://github.com/lancedb/lance.git?tag=v0.24.0-beta.1#33ae43b2944c12e0dbd139e8aa098cffa74edef5"
dependencies = [
"arrow-array",
"arrow-schema",
@@ -3915,7 +3915,7 @@ dependencies = [
[[package]]
name = "lancedb"
version = "0.17.0"
version = "0.18.0-beta.0"
dependencies = [
"arrow",
"arrow-array",
@@ -3969,6 +3969,8 @@ dependencies = [
"random_word",
"regex",
"reqwest",
"rstest",
"semver 1.0.25",
"serde",
"serde_json",
"serde_with",
@@ -3999,7 +4001,7 @@ dependencies = [
[[package]]
name = "lancedb-node"
version = "0.17.0"
version = "0.18.0-beta.0"
dependencies = [
"arrow-array",
"arrow-ipc",
@@ -4024,7 +4026,7 @@ dependencies = [
[[package]]
name = "lancedb-nodejs"
version = "0.17.0"
version = "0.18.0-beta.0"
dependencies = [
"arrow-array",
"arrow-ipc",
@@ -4042,7 +4044,7 @@ dependencies = [
[[package]]
name = "lancedb-python"
version = "0.20.0"
version = "0.21.0-beta.0"
dependencies = [
"arrow",
"env_logger",
@@ -5938,6 +5940,12 @@ version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
[[package]]
name = "relative-path"
version = "1.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2"
[[package]]
name = "reqwest"
version = "0.12.12"
@@ -6043,6 +6051,36 @@ dependencies = [
"byteorder",
]
[[package]]
name = "rstest"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a2c585be59b6b5dd66a9d2084aa1d8bd52fbdb806eafdeffb52791147862035"
dependencies = [
"futures",
"futures-timer",
"rstest_macros",
"rustc_version",
]
[[package]]
name = "rstest_macros"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "825ea780781b15345a146be27eaefb05085e337e869bff01b4306a4fd4a9ad5a"
dependencies = [
"cfg-if",
"glob",
"proc-macro-crate",
"proc-macro2",
"quote",
"regex",
"relative-path",
"rustc_version",
"syn 2.0.98",
"unicode-ident",
]
[[package]]
name = "rust-stemmers"
version = "1.2.0"

View File

@@ -21,16 +21,16 @@ categories = ["database-implementations"]
rust-version = "1.78.0"
[workspace.dependencies]
lance = { "version" = "=0.23.3", "features" = [
lance = { "version" = "=0.24.0", "features" = [
"dynamodb",
], git = "https://github.com/lancedb/lance.git", tag = "v0.23.3-beta.1" }
lance-io = { version = "=0.23.3", tag = "v0.23.3-beta.1", git = "https://github.com/lancedb/lance.git" }
lance-index = { version = "=0.23.3", tag = "v0.23.3-beta.1", git = "https://github.com/lancedb/lance.git" }
lance-linalg = { version = "=0.23.3", tag = "v0.23.3-beta.1", git = "https://github.com/lancedb/lance.git" }
lance-table = { version = "=0.23.3", tag = "v0.23.3-beta.1", git = "https://github.com/lancedb/lance.git" }
lance-testing = { version = "=0.23.3", tag = "v0.23.3-beta.1", git = "https://github.com/lancedb/lance.git" }
lance-datafusion = { version = "=0.23.3", tag = "v0.23.3-beta.1", git = "https://github.com/lancedb/lance.git" }
lance-encoding = { version = "=0.23.3", tag = "v0.23.3-beta.1", git = "https://github.com/lancedb/lance.git" }
], git = "https://github.com/lancedb/lance.git", tag = "v0.24.0-beta.1" }
lance-io = { version = "=0.24.0", tag = "v0.24.0-beta.1", git = "https://github.com/lancedb/lance.git" }
lance-index = { version = "=0.24.0", tag = "v0.24.0-beta.1", git = "https://github.com/lancedb/lance.git" }
lance-linalg = { version = "=0.24.0", tag = "v0.24.0-beta.1", git = "https://github.com/lancedb/lance.git" }
lance-table = { version = "=0.24.0", tag = "v0.24.0-beta.1", git = "https://github.com/lancedb/lance.git" }
lance-testing = { version = "=0.24.0", tag = "v0.24.0-beta.1", git = "https://github.com/lancedb/lance.git" }
lance-datafusion = { version = "=0.24.0", tag = "v0.24.0-beta.1", git = "https://github.com/lancedb/lance.git" }
lance-encoding = { version = "=0.24.0", tag = "v0.24.0-beta.1", git = "https://github.com/lancedb/lance.git" }
# Note that this one does not include pyarrow
arrow = { version = "53.2", optional = false }
arrow-array = "53.2"
@@ -62,6 +62,7 @@ num-traits = "0.2"
rand = "0.8"
regex = "1.10"
lazy_static = "1"
semver = "1.0.25"
# Temporary pins to work around downstream issues
# https://github.com/apache/arrow-rs/commit/2fddf85afcd20110ce783ed5b4cdeb82293da30b

View File

@@ -377,6 +377,7 @@ extra_css:
extra_javascript:
- "extra_js/init_ask_ai_widget.js"
- "extra_js/reo.js"
extra:
analytics:

1
docs/src/extra_js/reo.js Normal file
View File

@@ -0,0 +1 @@
!function(){var e,t,n;e="9627b71b382d201",t=function(){Reo.init({clientID:"9627b71b382d201"})},(n=document.createElement("script")).src="https://static.reo.dev/"+e+"/reo.js",n.defer=!0,n.onload=t,document.head.appendChild(n)}();

View File

@@ -8,7 +8,7 @@
<parent>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.17.0-final.0</version>
<version>0.18.0-beta.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -6,7 +6,7 @@
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.17.0-final.0</version>
<version>0.18.0-beta.0</version>
<packaging>pom</packaging>
<name>LanceDB Parent</name>

68
node/package-lock.json generated
View File

@@ -1,12 +1,12 @@
{
"name": "vectordb",
"version": "0.17.0",
"version": "0.18.0-beta.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "vectordb",
"version": "0.17.0",
"version": "0.18.0-beta.0",
"cpu": [
"x64",
"arm64"
@@ -52,14 +52,14 @@
"uuid": "^9.0.0"
},
"optionalDependencies": {
"@lancedb/vectordb-darwin-arm64": "0.17.0",
"@lancedb/vectordb-darwin-x64": "0.17.0",
"@lancedb/vectordb-linux-arm64-gnu": "0.17.0",
"@lancedb/vectordb-linux-arm64-musl": "0.17.0",
"@lancedb/vectordb-linux-x64-gnu": "0.17.0",
"@lancedb/vectordb-linux-x64-musl": "0.17.0",
"@lancedb/vectordb-win32-arm64-msvc": "0.17.0",
"@lancedb/vectordb-win32-x64-msvc": "0.17.0"
"@lancedb/vectordb-darwin-arm64": "0.18.0-beta.0",
"@lancedb/vectordb-darwin-x64": "0.18.0-beta.0",
"@lancedb/vectordb-linux-arm64-gnu": "0.18.0-beta.0",
"@lancedb/vectordb-linux-arm64-musl": "0.18.0-beta.0",
"@lancedb/vectordb-linux-x64-gnu": "0.18.0-beta.0",
"@lancedb/vectordb-linux-x64-musl": "0.18.0-beta.0",
"@lancedb/vectordb-win32-arm64-msvc": "0.18.0-beta.0",
"@lancedb/vectordb-win32-x64-msvc": "0.18.0-beta.0"
},
"peerDependencies": {
"@apache-arrow/ts": "^14.0.2",
@@ -330,9 +330,9 @@
}
},
"node_modules/@lancedb/vectordb-darwin-arm64": {
"version": "0.17.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-arm64/-/vectordb-darwin-arm64-0.17.0.tgz",
"integrity": "sha512-QlTuaCGaqHLlMrDTqcMFTg2Y07pAQjBy10oIKqDv7qUk5Lo7OoPeTsBUJrTjOnxy+SqlU7UE1ZmuQBRSsScFvQ==",
"version": "0.18.0-beta.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-arm64/-/vectordb-darwin-arm64-0.18.0-beta.0.tgz",
"integrity": "sha512-dLLgMPllYJOiRfPqkqkmoQu48RIa7K4dOF/qFP8Aex3zqeHE/0sFm3DYjtSFc6SR/6yT8u6Y9iFo2cQp5rCFJA==",
"cpu": [
"arm64"
],
@@ -343,9 +343,9 @@
]
},
"node_modules/@lancedb/vectordb-darwin-x64": {
"version": "0.17.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-x64/-/vectordb-darwin-x64-0.17.0.tgz",
"integrity": "sha512-iAtZNgU9Rf+pmcNRW878uAw47+rLRRUip1Po+WKwdC/9pYpK+GEMqTjwftFhdGn5NAw4WWPWOzvBC3AGnmk0dA==",
"version": "0.18.0-beta.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-x64/-/vectordb-darwin-x64-0.18.0-beta.0.tgz",
"integrity": "sha512-la0eauU0rzHO5eeVjBt8o/5UW4VzRYAuRA7nqUFLX5T6SWP5+UWjqusVVbWGz3ski+8uEX6VhlaFZP5uIJKGIg==",
"cpu": [
"x64"
],
@@ -356,9 +356,9 @@
]
},
"node_modules/@lancedb/vectordb-linux-arm64-gnu": {
"version": "0.17.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-gnu/-/vectordb-linux-arm64-gnu-0.17.0.tgz",
"integrity": "sha512-F0el7nOhH9LNnbGpS3jR81Ge8BxLuSe5WGcQaneDIS3EAdEbVlMnNMFvcRye2OCIlWAMjXoXn+m0b5R736TaZA==",
"version": "0.18.0-beta.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-gnu/-/vectordb-linux-arm64-gnu-0.18.0-beta.0.tgz",
"integrity": "sha512-AkXI/lB3yu1Di2G1lhilf89V6qPTppb13aAt+/6gU5/PSfA94y9VXD67D4WyvRbuQghJjDvAavMlWMrJc2NuMw==",
"cpu": [
"arm64"
],
@@ -369,9 +369,9 @@
]
},
"node_modules/@lancedb/vectordb-linux-arm64-musl": {
"version": "0.17.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-musl/-/vectordb-linux-arm64-musl-0.17.0.tgz",
"integrity": "sha512-V1aCbIjvDIhVRzErpgMHZ0vLphHFHZFmxBD3tCfor9+ymTbgp6X5CWkocsRNKAXFZdfXGAXXkxdERvdcVCYAhw==",
"version": "0.18.0-beta.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-musl/-/vectordb-linux-arm64-musl-0.18.0-beta.0.tgz",
"integrity": "sha512-kTVcJ4LA8w/7egY4m0EXOt8c1DeFUquVtyvexO+VzIFeeHfBkkrMI0DkE0CpHmk+gctkG7EY39jzjgLnPvppnw==",
"cpu": [
"arm64"
],
@@ -382,9 +382,9 @@
]
},
"node_modules/@lancedb/vectordb-linux-x64-gnu": {
"version": "0.17.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-gnu/-/vectordb-linux-x64-gnu-0.17.0.tgz",
"integrity": "sha512-t+3SiocM0RKUH9iwiJ2fXxk8dsWZai0ZmgbmbCpKGoyFcj8/9M36ecBO8SZlc65JmLXEnDcy8U8vwVnwJrWLvg==",
"version": "0.18.0-beta.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-gnu/-/vectordb-linux-x64-gnu-0.18.0-beta.0.tgz",
"integrity": "sha512-KbtIy5DkaWTsKENm5Q27hjovrR7FRuoHhl0wDJtO/2CUZYlrskjEIfcfkfA2CrEQesBug4s5jgsvNM4Wcp6zoA==",
"cpu": [
"x64"
],
@@ -395,9 +395,9 @@
]
},
"node_modules/@lancedb/vectordb-linux-x64-musl": {
"version": "0.17.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-musl/-/vectordb-linux-x64-musl-0.17.0.tgz",
"integrity": "sha512-beSMUj2n45XNeR7Jv+lsXJx6RuZaemy4aPif5+y0iRiD8SbrRLZ6WBaC4zydhYLXdo6XpnBwleZoemrWp5P3Dw==",
"version": "0.18.0-beta.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-musl/-/vectordb-linux-x64-musl-0.18.0-beta.0.tgz",
"integrity": "sha512-SF07gmoGVExcF5v+IE6kBbCbXJSDyTgC7QCt+MDS1NsgoQ9OH7IyH7r6HJu16tKflUOUKlUHnP0hQOPpv1fWpg==",
"cpu": [
"x64"
],
@@ -408,9 +408,9 @@
]
},
"node_modules/@lancedb/vectordb-win32-arm64-msvc": {
"version": "0.17.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-arm64-msvc/-/vectordb-win32-arm64-msvc-0.17.0.tgz",
"integrity": "sha512-yX4PLLVckWurBNbYvq/tKreBwEm09Rua28Y4yvFJRiV3X/RbOd7PDQcZBolsXUoDnuqBfNwmQDjGNvnWhgFk+Q==",
"version": "0.18.0-beta.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-arm64-msvc/-/vectordb-win32-arm64-msvc-0.18.0-beta.0.tgz",
"integrity": "sha512-YYBuSBGDlxJgSI5gHjDmQo9sl05lAXfzil6QiKfgmUMsBtb2sT+GoUCgG6qzsfe99sWiTf+pMeWDsQgfrj9vNw==",
"cpu": [
"arm64"
],
@@ -421,9 +421,9 @@
]
},
"node_modules/@lancedb/vectordb-win32-x64-msvc": {
"version": "0.17.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-x64-msvc/-/vectordb-win32-x64-msvc-0.17.0.tgz",
"integrity": "sha512-f5ZEZj5yvfL2wPeJKOtbvzmYUn/jg2ab0fukZQX2LTzMQIUQ0kAF6UwxinmMB4R33giNwhHEfrAMykbG/G0c4g==",
"version": "0.18.0-beta.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-x64-msvc/-/vectordb-win32-x64-msvc-0.18.0-beta.0.tgz",
"integrity": "sha512-t9TXeUnMU7YbP+/nUJpStm75aWwUydZj2AK+G2XwDtQrQo4Xg7/NETEbBeogmIOHuidNQYia8jEeQCUon5/+Dw==",
"cpu": [
"x64"
],

View File

@@ -1,6 +1,6 @@
{
"name": "vectordb",
"version": "0.17.0",
"version": "0.18.0-beta.0",
"description": " Serverless, low-latency vector database for AI applications",
"private": false,
"main": "dist/index.js",
@@ -92,13 +92,13 @@
}
},
"optionalDependencies": {
"@lancedb/vectordb-darwin-x64": "0.17.0",
"@lancedb/vectordb-darwin-arm64": "0.17.0",
"@lancedb/vectordb-linux-x64-gnu": "0.17.0",
"@lancedb/vectordb-linux-arm64-gnu": "0.17.0",
"@lancedb/vectordb-linux-x64-musl": "0.17.0",
"@lancedb/vectordb-linux-arm64-musl": "0.17.0",
"@lancedb/vectordb-win32-x64-msvc": "0.17.0",
"@lancedb/vectordb-win32-arm64-msvc": "0.17.0"
"@lancedb/vectordb-darwin-x64": "0.18.0-beta.0",
"@lancedb/vectordb-darwin-arm64": "0.18.0-beta.0",
"@lancedb/vectordb-linux-x64-gnu": "0.18.0-beta.0",
"@lancedb/vectordb-linux-arm64-gnu": "0.18.0-beta.0",
"@lancedb/vectordb-linux-x64-musl": "0.18.0-beta.0",
"@lancedb/vectordb-linux-arm64-musl": "0.18.0-beta.0",
"@lancedb/vectordb-win32-x64-msvc": "0.18.0-beta.0",
"@lancedb/vectordb-win32-arm64-msvc": "0.18.0-beta.0"
}
}

View File

@@ -1,7 +1,7 @@
[package]
name = "lancedb-nodejs"
edition.workspace = true
version = "0.17.0"
version = "0.18.0-beta.0"
license.workspace = true
description.workspace = true
repository.workspace = true

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-darwin-arm64",
"version": "0.17.0",
"version": "0.18.0-beta.0",
"os": ["darwin"],
"cpu": ["arm64"],
"main": "lancedb.darwin-arm64.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-darwin-x64",
"version": "0.17.0",
"version": "0.18.0-beta.0",
"os": ["darwin"],
"cpu": ["x64"],
"main": "lancedb.darwin-x64.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-gnu",
"version": "0.17.0",
"version": "0.18.0-beta.0",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-musl",
"version": "0.17.0",
"version": "0.18.0-beta.0",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-gnu",
"version": "0.17.0",
"version": "0.18.0-beta.0",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-musl",
"version": "0.17.0",
"version": "0.18.0-beta.0",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-arm64-msvc",
"version": "0.17.0",
"version": "0.18.0-beta.0",
"os": [
"win32"
],

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-x64-msvc",
"version": "0.17.0",
"version": "0.18.0-beta.0",
"os": ["win32"],
"cpu": ["x64"],
"main": "lancedb.win32-x64-msvc.node",

View File

@@ -1,12 +1,12 @@
{
"name": "@lancedb/lancedb",
"version": "0.17.0",
"version": "0.18.0-beta.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@lancedb/lancedb",
"version": "0.17.0",
"version": "0.18.0-beta.0",
"cpu": [
"x64",
"arm64"

View File

@@ -11,7 +11,7 @@
"ann"
],
"private": false,
"version": "0.17.0",
"version": "0.18.0-beta.0",
"main": "dist/index.js",
"exports": {
".": "./dist/index.js",

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.21.0-beta.0"
current_version = "0.21.0-beta.1"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-python"
version = "0.21.0-beta.0"
version = "0.21.0-beta.1"
edition.workspace = true
description = "Python bindings for LanceDB"
license.workspace = true

View File

@@ -4,8 +4,8 @@ name = "lancedb"
dynamic = ["version"]
dependencies = [
"deprecation",
"pylance~=0.23.2",
"tqdm>=4.27.0",
"pyarrow>=14",
"pydantic>=1.10",
"packaging",
"overrides>=0.7",
@@ -54,6 +54,7 @@ tests = [
"polars>=0.19, <=1.3.0",
"tantivy",
"pyarrow-stubs",
"pylance~=0.23.2",
]
dev = [
"ruff",

View File

@@ -142,6 +142,10 @@ class CompactionStats:
files_removed: int
files_added: int
class CleanupStats:
bytes_removed: int
old_versions: int
class RemovalStats:
bytes_removed: int
old_versions_removed: int

View File

@@ -5,6 +5,7 @@ from __future__ import annotations
import asyncio
import inspect
import deprecation
import warnings
from abc import ABC, abstractmethod
from dataclasses import dataclass
@@ -24,16 +25,15 @@ from typing import (
)
from urllib.parse import urlparse
import lance
from . import __version__
from lancedb.arrow import peek_reader
from lancedb.background_loop import LOOP
from .dependencies import _check_for_pandas
from .dependencies import _check_for_hugging_face, _check_for_pandas
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.fs as pa_fs
import numpy as np
from lance import LanceDataset
from lance.dependencies import _check_for_hugging_face
from .common import DATA, VEC, VECTOR_COLUMN_NAME
from .embeddings import EmbeddingFunctionConfig, EmbeddingFunctionRegistry
@@ -66,10 +66,14 @@ from .index import lang_mapping
if TYPE_CHECKING:
from ._lancedb import Table as LanceDBTable, OptimizeStats, CompactionStats
from ._lancedb import (
Table as LanceDBTable,
OptimizeStats,
CleanupStats,
CompactionStats,
)
from .db import LanceDBConnection
from .index import IndexConfig
from lance.dataset import CleanupStats, ReaderLike
import pandas
import PIL
@@ -80,10 +84,9 @@ QueryType = Literal["vector", "fts", "hybrid", "auto"]
def _into_pyarrow_reader(data) -> pa.RecordBatchReader:
if _check_for_hugging_face(data):
# Huggingface datasets
from lance.dependencies import datasets
from lancedb.dependencies import datasets
if _check_for_hugging_face(data):
if isinstance(data, datasets.Dataset):
schema = data.features.arrow_schema
return pa.RecordBatchReader.from_batches(schema, data.data.to_batches())
@@ -1074,7 +1077,7 @@ class Table(ABC):
older_than: Optional[timedelta] = None,
*,
delete_unverified: bool = False,
) -> CleanupStats:
) -> "CleanupStats":
"""
Clean up old versions of the table, freeing disk space.
@@ -1385,6 +1388,14 @@ class LanceTable(Table):
def to_lance(self, **kwargs) -> LanceDataset:
"""Return the LanceDataset backing this table."""
try:
import lance
except ImportError:
raise ImportError(
"The lance library is required to use this function. "
"Please install with `pip install pylance`."
)
return lance.dataset(
self._dataset_path,
version=self.version,
@@ -1844,7 +1855,7 @@ class LanceTable(Table):
def merge(
self,
other_table: Union[LanceTable, ReaderLike],
other_table: Union[LanceTable, DATA],
left_on: str,
right_on: Optional[str] = None,
schema: Optional[Union[pa.Schema, LanceModel]] = None,
@@ -1894,12 +1905,13 @@ class LanceTable(Table):
1 2 b e
2 3 c f
"""
if isinstance(schema, LanceModel):
schema = schema.to_arrow_schema()
if isinstance(other_table, LanceTable):
other_table = other_table.to_lance()
if isinstance(other_table, LanceDataset):
other_table = other_table.to_table()
else:
other_table = _sanitize_data(
other_table,
schema,
)
self.to_lance().merge(
other_table, left_on=left_on, right_on=right_on, schema=schema
)
@@ -2222,12 +2234,17 @@ class LanceTable(Table):
):
LOOP.run(self._table._do_merge(merge, new_data, on_bad_vectors, fill_value))
@deprecation.deprecated(
deprecated_in="0.21.0",
current_version=__version__,
details="Use `Table.optimize` instead.",
)
def cleanup_old_versions(
self,
older_than: Optional[timedelta] = None,
*,
delete_unverified: bool = False,
) -> CleanupStats:
) -> "CleanupStats":
"""
Clean up old versions of the table, freeing disk space.
@@ -2252,6 +2269,11 @@ class LanceTable(Table):
older_than, delete_unverified=delete_unverified
)
@deprecation.deprecated(
deprecated_in="0.21.0",
current_version=__version__,
details="Use `Table.optimize` instead.",
)
def compact_files(self, *args, **kwargs) -> CompactionStats:
"""
Run the compaction process on the table.
@@ -2383,6 +2405,19 @@ class LanceTable(Table):
"""
LOOP.run(self._table.migrate_v2_manifest_paths())
def replace_field_metadata(self, field_name: str, new_metadata: Dict[str, str]):
"""
Replace the metadata of a field in the schema
Parameters
----------
field_name: str
The name of the field to replace the metadata for
new_metadata: dict
The new metadata to set
"""
LOOP.run(self._table.replace_field_metadata(field_name, new_metadata))
def _handle_bad_vectors(
reader: pa.RecordBatchReader,
@@ -3613,6 +3648,21 @@ class AsyncTable:
"""
await self._inner.migrate_manifest_paths_v2()
async def replace_field_metadata(
self, field_name: str, new_metadata: dict[str, str]
):
"""
Replace the metadata of a field in the schema
Parameters
----------
field_name: str
The name of the field to replace the metadata for
new_metadata: dict
The new metadata to set
"""
await self._inner.replace_field_metadata(field_name, new_metadata)
@dataclass
class IndexStatistics:

View File

@@ -9,6 +9,7 @@ import json
import threading
from unittest.mock import MagicMock
import uuid
from packaging.version import Version
import lancedb
from lancedb.conftest import MockTextEmbeddingFunction
@@ -277,11 +278,12 @@ def test_table_create_indices():
@contextlib.contextmanager
def query_test_table(query_handler):
def query_test_table(query_handler, *, server_version=Version("0.1.0")):
def handler(request):
if request.path == "/v1/table/test/describe/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.send_header("phalanx-version", str(server_version))
request.end_headers()
request.wfile.write(b"{}")
elif request.path == "/v1/table/test/query/":
@@ -388,17 +390,25 @@ def test_query_sync_maximal():
)
def test_query_sync_multiple_vectors():
@pytest.mark.parametrize("server_version", [Version("0.1.0"), Version("0.2.0")])
def test_query_sync_batch_queries(server_version):
def handler(body):
# TODO: we will add the ability to get the server version,
# so that we can decide how to perform batch quires.
vectors = body["vector"]
res = []
for i, vector in enumerate(vectors):
res.append({"id": 1, "query_index": i})
return pa.Table.from_pylist(res)
if server_version >= Version(
"0.2.0"
): # we can handle batch queries in single request since 0.2.0
assert len(vectors) == 2
res = []
for i, vector in enumerate(vectors):
res.append({"id": 1, "query_index": i})
return pa.Table.from_pylist(res)
else:
assert len(vectors) == 3 # matching dim
return pa.table({"id": [1]})
with query_test_table(handler) as table:
with query_test_table(handler, server_version=server_version) as table:
results = table.search([[1, 2, 3], [4, 5, 6]]).limit(1).to_list()
assert len(results) == 2
results.sort(key=lambda x: x["query_index"])

View File

@@ -1481,3 +1481,12 @@ async def test_optimize_delete_unverified(tmp_db_async: AsyncConnection, tmp_pat
cleanup_older_than=timedelta(seconds=0), delete_unverified=True
)
assert stats.prune.old_versions_removed == 2
def test_replace_field_metadata(tmp_path):
db = lancedb.connect(tmp_path)
table = db.create_table("my_table", data=[{"x": 0}])
table.replace_field_metadata("x", {"foo": "bar"})
schema = table.schema
field = schema[0].metadata
assert field == {b"foo": b"bar"}

View File

@@ -10,12 +10,13 @@ use lancedb::table::{
Table as LanceDbTable,
};
use pyo3::{
exceptions::{PyRuntimeError, PyValueError},
exceptions::{PyKeyError, PyRuntimeError, PyValueError},
pyclass, pymethods,
types::{IntoPyDict, PyAnyMethods, PyDict, PyDictMethods},
Bound, FromPyObject, PyAny, PyRef, PyResult, Python, ToPyObject,
};
use pyo3_async_runtimes::tokio::future_into_py;
use std::collections::HashMap;
use crate::{
error::PythonErrorExt,
@@ -486,6 +487,37 @@ impl Table {
Ok(())
})
}
pub fn replace_field_metadata<'a>(
self_: PyRef<'a, Self>,
field_name: String,
metadata: &Bound<'_, PyDict>,
) -> PyResult<Bound<'a, PyAny>> {
let mut new_metadata = HashMap::<String, String>::new();
for (column_name, value) in metadata.into_iter() {
let key: String = column_name.extract()?;
let value: String = value.extract()?;
new_metadata.insert(key, value);
}
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
let native_tbl = inner
.as_native()
.ok_or_else(|| PyValueError::new_err("This cannot be run on a remote table"))?;
let schema = native_tbl.manifest().await.infer_error()?.schema;
let field = schema
.field(&field_name)
.ok_or_else(|| PyKeyError::new_err(format!("Field {} not found", field_name)))?;
native_tbl
.replace_field_metadata(vec![(field.id as u32, new_metadata)])
.await
.infer_error()?;
Ok(())
})
}
}
#[derive(FromPyObject)]

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-node"
version = "0.17.0"
version = "0.18.0-beta.0"
description = "Serverless, low-latency vector database for AI applications"
license.workspace = true
edition.workspace = true

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb"
version = "0.17.0"
version = "0.18.0-beta.0"
edition.workspace = true
description = "LanceDB: A serverless, low-latency vector database for AI applications"
license.workspace = true
@@ -70,6 +70,7 @@ candle-core = { version = "0.6.0", optional = true }
candle-transformers = { version = "0.6.0", optional = true }
candle-nn = { version = "0.6.0", optional = true }
tokenizers = { version = "0.19.1", optional = true }
semver = { workspace = true }
# For a workaround, see workspace Cargo.toml
crunchy.workspace = true
@@ -87,6 +88,7 @@ aws-config = { version = "1.0" }
aws-smithy-runtime = { version = "1.3" }
datafusion.workspace = true
http-body = "1" # Matching reqwest
rstest = "0.23.0"
[features]

View File

@@ -0,0 +1,82 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
//! Catalog implementation for managing databases
pub mod listing;
use std::collections::HashMap;
use std::sync::Arc;
use crate::database::Database;
use crate::error::Result;
use async_trait::async_trait;
/// Request parameters for listing databases
#[derive(Clone, Debug, Default)]
pub struct DatabaseNamesRequest {
/// Start listing after this name (exclusive)
pub start_after: Option<String>,
/// Maximum number of names to return
pub limit: Option<u32>,
}
/// Request to open an existing database
#[derive(Clone, Debug)]
pub struct OpenDatabaseRequest {
/// The name of the database to open
pub name: String,
/// A map of database-specific options
///
/// Consult the catalog / database implementation to determine which options are available
pub database_options: HashMap<String, String>,
}
/// Database creation mode
///
/// The default behavior is Create
pub enum CreateDatabaseMode {
/// Create new database, error if exists
Create,
/// Open existing database if present
ExistOk,
/// Overwrite existing database
Overwrite,
}
impl Default for CreateDatabaseMode {
fn default() -> Self {
Self::Create
}
}
/// Request to create a new database
pub struct CreateDatabaseRequest {
/// The name of the database to create
pub name: String,
/// The creation mode
pub mode: CreateDatabaseMode,
/// A map of catalog-specific options, consult your catalog implementation to determine what's available
pub options: HashMap<String, String>,
}
#[async_trait]
pub trait Catalog: Send + Sync + std::fmt::Debug + 'static {
/// List database names with pagination
async fn database_names(&self, request: DatabaseNamesRequest) -> Result<Vec<String>>;
/// Create a new database
async fn create_database(&self, request: CreateDatabaseRequest) -> Result<Arc<dyn Database>>;
/// Open existing database
async fn open_database(&self, request: OpenDatabaseRequest) -> Result<Arc<dyn Database>>;
/// Rename database
async fn rename_database(&self, old_name: &str, new_name: &str) -> Result<()>;
/// Delete database
async fn drop_database(&self, name: &str) -> Result<()>;
/// Delete all databases
async fn drop_all_databases(&self) -> Result<()>;
}

View File

@@ -0,0 +1,569 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
//! Catalog implementation based on a local file system.
use std::collections::HashMap;
use std::fs::create_dir_all;
use std::path::Path;
use std::sync::Arc;
use super::{
Catalog, CreateDatabaseMode, CreateDatabaseRequest, DatabaseNamesRequest, OpenDatabaseRequest,
};
use crate::connection::ConnectRequest;
use crate::database::listing::ListingDatabase;
use crate::database::Database;
use crate::error::{CreateDirSnafu, Error, Result};
use async_trait::async_trait;
use lance::io::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry};
use lance_io::local::to_local_path;
use object_store::path::Path as ObjectStorePath;
use snafu::ResultExt;
/// A catalog implementation that works by listing subfolders in a directory
///
/// The listing catalog will be created with a base folder specified by the URI. Every subfolder
/// in this base folder will be considered a database. These will be opened as a
/// [`crate::database::listing::ListingDatabase`]
#[derive(Debug)]
pub struct ListingCatalog {
object_store: ObjectStore,
uri: String,
base_path: ObjectStorePath,
storage_options: HashMap<String, String>,
}
impl ListingCatalog {
/// Try to create a local directory to store the lancedb dataset
pub fn try_create_dir(path: &str) -> core::result::Result<(), std::io::Error> {
let path = Path::new(path);
if !path.try_exists()? {
create_dir_all(path)?;
}
Ok(())
}
pub fn uri(&self) -> &str {
&self.uri
}
async fn open_path(path: &str) -> Result<Self> {
let (object_store, base_path) = ObjectStore::from_path(path).unwrap();
if object_store.is_local() {
Self::try_create_dir(path).context(CreateDirSnafu { path })?;
}
Ok(Self {
uri: path.to_string(),
base_path,
object_store,
storage_options: HashMap::new(),
})
}
pub async fn connect(request: &ConnectRequest) -> Result<Self> {
let uri = &request.uri;
let parse_res = url::Url::parse(uri);
match parse_res {
Ok(url) if url.scheme().len() == 1 && cfg!(windows) => Self::open_path(uri).await,
Ok(url) => {
let plain_uri = url.to_string();
let registry = Arc::new(ObjectStoreRegistry::default());
let storage_options = request.storage_options.clone();
let os_params = ObjectStoreParams {
storage_options: Some(storage_options.clone()),
..Default::default()
};
let (object_store, base_path) =
ObjectStore::from_uri_and_params(registry, &plain_uri, &os_params).await?;
if object_store.is_local() {
Self::try_create_dir(&plain_uri).context(CreateDirSnafu { path: plain_uri })?;
}
Ok(Self {
uri: String::from(url.clone()),
base_path,
object_store,
storage_options,
})
}
Err(_) => Self::open_path(uri).await,
}
}
fn database_path(&self, name: &str) -> ObjectStorePath {
self.base_path.child(name.replace('\\', "/"))
}
}
#[async_trait]
impl Catalog for ListingCatalog {
async fn database_names(&self, request: DatabaseNamesRequest) -> Result<Vec<String>> {
let mut f = self
.object_store
.read_dir(self.base_path.clone())
.await?
.iter()
.map(Path::new)
.filter_map(|p| p.file_name().and_then(|s| s.to_str().map(String::from)))
.collect::<Vec<String>>();
f.sort();
if let Some(start_after) = request.start_after {
let index = f
.iter()
.position(|name| name.as_str() > start_after.as_str())
.unwrap_or(f.len());
f.drain(0..index);
}
if let Some(limit) = request.limit {
f.truncate(limit as usize);
}
Ok(f)
}
async fn create_database(&self, request: CreateDatabaseRequest) -> Result<Arc<dyn Database>> {
let db_path = self.database_path(&request.name);
let db_path_str = to_local_path(&db_path);
let exists = Path::new(&db_path_str).exists();
match request.mode {
CreateDatabaseMode::Create if exists => {
return Err(Error::DatabaseAlreadyExists { name: request.name })
}
CreateDatabaseMode::Create => {
create_dir_all(db_path.to_string()).unwrap();
}
CreateDatabaseMode::ExistOk => {
if !exists {
create_dir_all(db_path.to_string()).unwrap();
}
}
CreateDatabaseMode::Overwrite => {
if exists {
self.drop_database(&request.name).await?;
}
create_dir_all(db_path.to_string()).unwrap();
}
}
let db_uri = format!("/{}/{}", self.base_path, request.name);
let connect_request = ConnectRequest {
uri: db_uri,
api_key: None,
region: None,
host_override: None,
#[cfg(feature = "remote")]
client_config: Default::default(),
read_consistency_interval: None,
storage_options: self.storage_options.clone(),
};
Ok(Arc::new(
ListingDatabase::connect_with_options(&connect_request).await?,
))
}
async fn open_database(&self, request: OpenDatabaseRequest) -> Result<Arc<dyn Database>> {
let db_path = self.database_path(&request.name);
let db_path_str = to_local_path(&db_path);
let exists = Path::new(&db_path_str).exists();
if !exists {
return Err(Error::DatabaseNotFound { name: request.name });
}
let connect_request = ConnectRequest {
uri: db_path.to_string(),
api_key: None,
region: None,
host_override: None,
#[cfg(feature = "remote")]
client_config: Default::default(),
read_consistency_interval: None,
storage_options: self.storage_options.clone(),
};
Ok(Arc::new(
ListingDatabase::connect_with_options(&connect_request).await?,
))
}
async fn rename_database(&self, _old_name: &str, _new_name: &str) -> Result<()> {
Err(Error::NotSupported {
message: "rename_database is not supported in LanceDB OSS yet".to_string(),
})
}
async fn drop_database(&self, name: &str) -> Result<()> {
let db_path = self.database_path(name);
self.object_store
.remove_dir_all(db_path.clone())
.await
.map_err(|err| match err {
lance::Error::NotFound { .. } => Error::DatabaseNotFound {
name: name.to_owned(),
},
_ => Error::from(err),
})?;
Ok(())
}
async fn drop_all_databases(&self) -> Result<()> {
self.object_store
.remove_dir_all(self.base_path.clone())
.await?;
Ok(())
}
}
#[cfg(all(test, not(windows)))]
mod tests {
use super::*;
/// file:/// URIs with drive letters do not work correctly on Windows
#[cfg(windows)]
fn path_to_uri(path: PathBuf) -> String {
path.to_str().unwrap().to_string()
}
#[cfg(not(windows))]
fn path_to_uri(path: PathBuf) -> String {
Url::from_file_path(path).unwrap().to_string()
}
async fn setup_catalog() -> (TempDir, ListingCatalog) {
let tempdir = tempfile::tempdir().unwrap();
let catalog_path = tempdir.path().join("catalog");
std::fs::create_dir_all(&catalog_path).unwrap();
let uri = path_to_uri(catalog_path);
let request = ConnectRequest {
uri: uri.clone(),
api_key: None,
region: None,
host_override: None,
#[cfg(feature = "remote")]
client_config: Default::default(),
storage_options: HashMap::new(),
read_consistency_interval: None,
};
let catalog = ListingCatalog::connect(&request).await.unwrap();
(tempdir, catalog)
}
use crate::database::{CreateTableData, CreateTableRequest, TableNamesRequest};
use crate::table::TableDefinition;
use arrow_schema::Field;
use std::path::PathBuf;
use std::sync::Arc;
use tempfile::{tempdir, TempDir};
use url::Url;
#[tokio::test]
async fn test_database_names() {
let (_tempdir, catalog) = setup_catalog().await;
let names = catalog
.database_names(DatabaseNamesRequest::default())
.await
.unwrap();
assert!(names.is_empty());
}
#[tokio::test]
async fn test_create_database() {
let (_tempdir, catalog) = setup_catalog().await;
catalog
.create_database(CreateDatabaseRequest {
name: "db1".into(),
mode: CreateDatabaseMode::Create,
options: HashMap::new(),
})
.await
.unwrap();
let names = catalog
.database_names(DatabaseNamesRequest::default())
.await
.unwrap();
assert_eq!(names, vec!["db1"]);
}
#[tokio::test]
async fn test_create_database_exist_ok() {
let (_tempdir, catalog) = setup_catalog().await;
let db1 = catalog
.create_database(CreateDatabaseRequest {
name: "db_exist_ok".into(),
mode: CreateDatabaseMode::ExistOk,
options: HashMap::new(),
})
.await
.unwrap();
let dummy_schema = Arc::new(arrow_schema::Schema::new(Vec::<Field>::default()));
db1.create_table(CreateTableRequest {
name: "test_table".parse().unwrap(),
data: CreateTableData::Empty(TableDefinition::new_from_schema(dummy_schema)),
mode: Default::default(),
write_options: Default::default(),
})
.await
.unwrap();
let db2 = catalog
.create_database(CreateDatabaseRequest {
name: "db_exist_ok".into(),
mode: CreateDatabaseMode::ExistOk,
options: HashMap::new(),
})
.await
.unwrap();
let tables = db2.table_names(TableNamesRequest::default()).await.unwrap();
assert_eq!(tables, vec!["test_table".to_string()]);
}
#[tokio::test]
async fn test_create_database_overwrite() {
let (_tempdir, catalog) = setup_catalog().await;
let db = catalog
.create_database(CreateDatabaseRequest {
name: "db_overwrite".into(),
mode: CreateDatabaseMode::Create,
options: HashMap::new(),
})
.await
.unwrap();
let dummy_schema = Arc::new(arrow_schema::Schema::new(Vec::<Field>::default()));
db.create_table(CreateTableRequest {
name: "old_table".parse().unwrap(),
data: CreateTableData::Empty(TableDefinition::new_from_schema(dummy_schema)),
mode: Default::default(),
write_options: Default::default(),
})
.await
.unwrap();
let tables = db.table_names(TableNamesRequest::default()).await.unwrap();
assert!(!tables.is_empty());
let new_db = catalog
.create_database(CreateDatabaseRequest {
name: "db_overwrite".into(),
mode: CreateDatabaseMode::Overwrite,
options: HashMap::new(),
})
.await
.unwrap();
let tables = new_db
.table_names(TableNamesRequest::default())
.await
.unwrap();
assert!(tables.is_empty());
}
#[tokio::test]
async fn test_create_database_overwrite_non_existing() {
let (_tempdir, catalog) = setup_catalog().await;
catalog
.create_database(CreateDatabaseRequest {
name: "new_db".into(),
mode: CreateDatabaseMode::Overwrite,
options: HashMap::new(),
})
.await
.unwrap();
let names = catalog
.database_names(DatabaseNamesRequest::default())
.await
.unwrap();
assert!(names.contains(&"new_db".to_string()));
}
#[tokio::test]
async fn test_open_database() {
let (_tempdir, catalog) = setup_catalog().await;
// Test open non-existent
let result = catalog
.open_database(OpenDatabaseRequest {
name: "missing".into(),
database_options: HashMap::new(),
})
.await;
assert!(matches!(
result.unwrap_err(),
Error::DatabaseNotFound { name } if name == "missing"
));
// Create and open
catalog
.create_database(CreateDatabaseRequest {
name: "valid_db".into(),
mode: CreateDatabaseMode::Create,
options: HashMap::new(),
})
.await
.unwrap();
let db = catalog
.open_database(OpenDatabaseRequest {
name: "valid_db".into(),
database_options: HashMap::new(),
})
.await
.unwrap();
assert_eq!(
db.table_names(TableNamesRequest::default()).await.unwrap(),
Vec::<String>::new()
);
}
#[tokio::test]
async fn test_drop_database() {
let (_tempdir, catalog) = setup_catalog().await;
// Create test database
catalog
.create_database(CreateDatabaseRequest {
name: "to_drop".into(),
mode: CreateDatabaseMode::Create,
options: HashMap::new(),
})
.await
.unwrap();
let names = catalog
.database_names(DatabaseNamesRequest::default())
.await
.unwrap();
assert!(!names.is_empty());
// Drop database
catalog.drop_database("to_drop").await.unwrap();
let names = catalog
.database_names(DatabaseNamesRequest::default())
.await
.unwrap();
assert!(names.is_empty());
}
#[tokio::test]
async fn test_drop_all_databases() {
let (_tempdir, catalog) = setup_catalog().await;
catalog
.create_database(CreateDatabaseRequest {
name: "db1".into(),
mode: CreateDatabaseMode::Create,
options: HashMap::new(),
})
.await
.unwrap();
catalog
.create_database(CreateDatabaseRequest {
name: "db2".into(),
mode: CreateDatabaseMode::Create,
options: HashMap::new(),
})
.await
.unwrap();
catalog.drop_all_databases().await.unwrap();
let names = catalog
.database_names(DatabaseNamesRequest::default())
.await
.unwrap();
assert!(names.is_empty());
}
#[tokio::test]
async fn test_rename_database_unsupported() {
let (_tempdir, catalog) = setup_catalog().await;
let result = catalog.rename_database("old", "new").await;
assert!(matches!(
result.unwrap_err(),
Error::NotSupported { message } if message.contains("rename_database")
));
}
#[tokio::test]
async fn test_connect_local_path() {
let tmp_dir = tempdir().unwrap();
let path = tmp_dir.path().to_str().unwrap();
let request = ConnectRequest {
uri: path.to_string(),
api_key: None,
region: None,
host_override: None,
#[cfg(feature = "remote")]
client_config: Default::default(),
storage_options: HashMap::new(),
read_consistency_interval: None,
};
let catalog = ListingCatalog::connect(&request).await.unwrap();
assert!(catalog.object_store.is_local());
assert_eq!(catalog.uri, path);
}
#[tokio::test]
async fn test_connect_file_scheme() {
let tmp_dir = tempdir().unwrap();
let path = tmp_dir.path();
let uri = path_to_uri(path.to_path_buf());
let request = ConnectRequest {
uri: uri.clone(),
api_key: None,
region: None,
host_override: None,
#[cfg(feature = "remote")]
client_config: Default::default(),
storage_options: HashMap::new(),
read_consistency_interval: None,
};
let catalog = ListingCatalog::connect(&request).await.unwrap();
assert!(catalog.object_store.is_local());
assert_eq!(catalog.uri, uri);
}
#[tokio::test]
async fn test_connect_invalid_uri_fallback() {
let invalid_uri = "invalid:///path";
let request = ConnectRequest {
uri: invalid_uri.to_string(),
api_key: None,
region: None,
host_override: None,
#[cfg(feature = "remote")]
client_config: Default::default(),
storage_options: HashMap::new(),
read_consistency_interval: None,
};
let result = ListingCatalog::connect(&request).await;
assert!(result.is_err());
}
}

View File

@@ -15,6 +15,10 @@ pub enum Error {
InvalidInput { message: String },
#[snafu(display("Table '{name}' was not found"))]
TableNotFound { name: String },
#[snafu(display("Database '{name}' was not found"))]
DatabaseNotFound { name: String },
#[snafu(display("Database '{name}' already exists."))]
DatabaseAlreadyExists { name: String },
#[snafu(display("Index '{name}' was not found"))]
IndexNotFound { name: String },
#[snafu(display("Embedding function '{name}' was not found. : {reason}"))]

View File

@@ -191,6 +191,7 @@
//! ```
pub mod arrow;
pub mod catalog;
pub mod connection;
pub mod data;
pub mod database;

View File

@@ -19,12 +19,41 @@ use crate::database::{
};
use crate::error::Result;
use crate::table::BaseTable;
use crate::Error;
use super::client::{ClientConfig, HttpSend, RequestResultExt, RestfulLanceDbClient, Sender};
use super::table::RemoteTable;
use super::util::batches_to_ipc_bytes;
use super::util::{batches_to_ipc_bytes, parse_server_version};
use super::ARROW_STREAM_CONTENT_TYPE;
// the versions of the server that we support
// for any new feature that we need to change the SDK behavior, we should bump the server version,
// and add a feature flag as method of `ServerVersion` here.
pub const DEFAULT_SERVER_VERSION: semver::Version = semver::Version::new(0, 1, 0);
#[derive(Debug, Clone)]
pub struct ServerVersion(pub semver::Version);
impl Default for ServerVersion {
fn default() -> Self {
Self(DEFAULT_SERVER_VERSION.clone())
}
}
impl ServerVersion {
pub fn parse(version: &str) -> Result<Self> {
let version = Self(
semver::Version::parse(version).map_err(|e| Error::InvalidInput {
message: e.to_string(),
})?,
);
Ok(version)
}
pub fn support_multivector(&self) -> bool {
self.0 >= semver::Version::new(0, 2, 0)
}
}
#[derive(Deserialize)]
struct ListTablesResponse {
tables: Vec<String>,
@@ -33,7 +62,7 @@ struct ListTablesResponse {
#[derive(Debug)]
pub struct RemoteDatabase<S: HttpSend = Sender> {
client: RestfulLanceDbClient<S>,
table_cache: Cache<String, ()>,
table_cache: Cache<String, Arc<RemoteTable<S>>>,
}
impl RemoteDatabase {
@@ -115,13 +144,19 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
}
let (request_id, rsp) = self.client.send(req, true).await?;
let rsp = self.client.check_response(&request_id, rsp).await?;
let version = parse_server_version(&request_id, &rsp)?;
let tables = rsp
.json::<ListTablesResponse>()
.await
.err_to_http(request_id)?
.tables;
for table in &tables {
self.table_cache.insert(table.clone(), ()).await;
let remote_table = Arc::new(RemoteTable::new(
self.client.clone(),
table.clone(),
version.clone(),
));
self.table_cache.insert(table.clone(), remote_table).await;
}
Ok(tables)
}
@@ -187,34 +222,42 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
return Err(crate::Error::InvalidInput { message: body });
}
}
self.client.check_response(&request_id, rsp).await?;
self.table_cache.insert(request.name.clone(), ()).await;
Ok(Arc::new(RemoteTable::new(
let rsp = self.client.check_response(&request_id, rsp).await?;
let version = parse_server_version(&request_id, &rsp)?;
let table = Arc::new(RemoteTable::new(
self.client.clone(),
request.name,
)))
request.name.clone(),
version,
));
self.table_cache
.insert(request.name.clone(), table.clone())
.await;
Ok(table)
}
async fn open_table(&self, request: OpenTableRequest) -> Result<Arc<dyn BaseTable>> {
// We describe the table to confirm it exists before moving on.
if self.table_cache.get(&request.name).await.is_none() {
if let Some(table) = self.table_cache.get(&request.name).await {
Ok(table.clone())
} else {
let req = self
.client
.post(&format!("/v1/table/{}/describe/", request.name));
let (request_id, resp) = self.client.send(req, true).await?;
if resp.status() == StatusCode::NOT_FOUND {
let (request_id, rsp) = self.client.send(req, true).await?;
if rsp.status() == StatusCode::NOT_FOUND {
return Err(crate::Error::TableNotFound { name: request.name });
}
self.client.check_response(&request_id, resp).await?;
let rsp = self.client.check_response(&request_id, rsp).await?;
let version = parse_server_version(&request_id, &rsp)?;
let table = Arc::new(RemoteTable::new(
self.client.clone(),
request.name.clone(),
version,
));
self.table_cache.insert(request.name, table.clone()).await;
Ok(table)
}
Ok(Arc::new(RemoteTable::new(
self.client.clone(),
request.name,
)))
}
async fn rename_table(&self, current_name: &str, new_name: &str) -> Result<()> {
@@ -224,8 +267,10 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
let req = req.json(&serde_json::json!({ "new_table_name": new_name }));
let (request_id, resp) = self.client.send(req, false).await?;
self.client.check_response(&request_id, resp).await?;
self.table_cache.remove(current_name).await;
self.table_cache.insert(new_name.into(), ()).await;
let table = self.table_cache.remove(current_name).await;
if let Some(table) = table {
self.table_cache.insert(new_name.into(), table).await;
}
Ok(())
}

View File

@@ -10,7 +10,7 @@ use crate::index::IndexStatistics;
use crate::query::{QueryFilter, QueryRequest, Select, VectorQueryRequest};
use crate::table::{AddDataMode, AnyQuery, Filter};
use crate::utils::{supported_btree_data_type, supported_vector_data_type};
use crate::{DistanceType, Error};
use crate::{DistanceType, Error, Table};
use arrow_array::RecordBatchReader;
use arrow_ipc::reader::FileReader;
use arrow_schema::{DataType, SchemaRef};
@@ -24,7 +24,7 @@ use http::StatusCode;
use lance::arrow::json::{JsonDataType, JsonSchema};
use lance::dataset::scanner::DatasetRecordBatchStream;
use lance::dataset::{ColumnAlteration, NewColumnTransform, Version};
use lance_datafusion::exec::OneShotExec;
use lance_datafusion::exec::{execute_plan, OneShotExec};
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
@@ -41,6 +41,7 @@ use crate::{
use super::client::RequestResultExt;
use super::client::{HttpSend, RestfulLanceDbClient, Sender};
use super::db::ServerVersion;
use super::ARROW_STREAM_CONTENT_TYPE;
#[derive(Debug)]
@@ -48,15 +49,21 @@ pub struct RemoteTable<S: HttpSend = Sender> {
#[allow(dead_code)]
client: RestfulLanceDbClient<S>,
name: String,
server_version: ServerVersion,
version: RwLock<Option<u64>>,
}
impl<S: HttpSend> RemoteTable<S> {
pub fn new(client: RestfulLanceDbClient<S>, name: String) -> Self {
pub fn new(
client: RestfulLanceDbClient<S>,
name: String,
server_version: ServerVersion,
) -> Self {
Self {
client,
name,
server_version,
version: RwLock::new(None),
}
}
@@ -212,10 +219,11 @@ impl<S: HttpSend> RemoteTable<S> {
}
fn apply_vector_query_params(
body: &mut serde_json::Value,
&self,
mut body: serde_json::Value,
query: &VectorQueryRequest,
) -> Result<()> {
Self::apply_query_params(body, &query.base)?;
) -> Result<Vec<serde_json::Value>> {
Self::apply_query_params(&mut body, &query.base)?;
// Apply general parameters, before we dispatch based on number of query vectors.
body["distance_type"] = serde_json::json!(query.distance_type.unwrap_or_default());
@@ -256,25 +264,40 @@ impl<S: HttpSend> RemoteTable<S> {
}
}
match query.query_vector.len() {
let bodies = match query.query_vector.len() {
0 => {
// Server takes empty vector, not null or undefined.
body["vector"] = serde_json::Value::Array(Vec::new());
vec![body]
}
1 => {
body["vector"] = vector_to_json(&query.query_vector[0])?;
vec![body]
}
_ => {
let vectors = query
.query_vector
.iter()
.map(vector_to_json)
.collect::<Result<Vec<_>>>()?;
body["vector"] = serde_json::Value::Array(vectors);
if self.server_version.support_multivector() {
let vectors = query
.query_vector
.iter()
.map(vector_to_json)
.collect::<Result<Vec<_>>>()?;
body["vector"] = serde_json::Value::Array(vectors);
vec![body]
} else {
// Server does not support multiple vectors in a single query.
// We need to send multiple requests.
let mut bodies = Vec::with_capacity(query.query_vector.len());
for vector in &query.query_vector {
let mut body = body.clone();
body["vector"] = vector_to_json(vector)?;
bodies.push(body);
}
bodies
}
}
}
};
Ok(())
Ok(bodies)
}
async fn check_mutable(&self) -> Result<()> {
@@ -299,27 +322,34 @@ impl<S: HttpSend> RemoteTable<S> {
&self,
query: &AnyQuery,
_options: QueryExecutionOptions,
) -> Result<Pin<Box<dyn RecordBatchStream + Send>>> {
) -> Result<Vec<Pin<Box<dyn RecordBatchStream + Send>>>> {
let request = self.client.post(&format!("/v1/table/{}/query/", self.name));
let version = self.current_version().await;
let mut body = serde_json::json!({ "version": version });
match query {
let requests = match query {
AnyQuery::Query(query) => {
Self::apply_query_params(&mut body, query)?;
// Empty vector can be passed if no vector search is performed.
body["vector"] = serde_json::Value::Array(Vec::new());
vec![request.json(&body)]
}
AnyQuery::VectorQuery(query) => {
Self::apply_vector_query_params(&mut body, query)?;
let bodies = self.apply_vector_query_params(body, query)?;
bodies
.into_iter()
.map(|body| request.try_clone().unwrap().json(&body))
.collect()
}
}
};
let request = request.json(&body);
let (request_id, response) = self.client.send(request, true).await?;
let stream = self.read_arrow_stream(&request_id, response).await?;
Ok(stream)
let futures = requests.into_iter().map(|req| async move {
let (request_id, response) = self.client.send(req, true).await?;
self.read_arrow_stream(&request_id, response).await
});
let streams = futures::future::try_join_all(futures).await?;
Ok(streams)
}
}
@@ -342,7 +372,7 @@ mod test_utils {
use crate::remote::client::test_utils::MockSender;
impl RemoteTable<MockSender> {
pub fn new_mock<F, T>(name: String, handler: F) -> Self
pub fn new_mock<F, T>(name: String, handler: F, version: Option<semver::Version>) -> Self
where
F: Fn(reqwest::Request) -> http::Response<T> + Send + Sync + 'static,
T: Into<reqwest::Body>,
@@ -351,6 +381,7 @@ mod test_utils {
Self {
client,
name,
server_version: version.map(ServerVersion).unwrap_or_default(),
version: RwLock::new(None),
}
}
@@ -491,8 +522,17 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
query: &AnyQuery,
options: QueryExecutionOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let stream = self.execute_query(query, options).await?;
Ok(Arc::new(OneShotExec::new(stream)))
let streams = self.execute_query(query, options).await?;
if streams.len() == 1 {
let stream = streams.into_iter().next().unwrap();
Ok(Arc::new(OneShotExec::new(stream)))
} else {
let stream_execs = streams
.into_iter()
.map(|stream| Arc::new(OneShotExec::new(stream)) as Arc<dyn ExecutionPlan>)
.collect();
Table::multi_vector_plan(stream_execs)
}
}
async fn query(
@@ -500,8 +540,24 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
query: &AnyQuery,
_options: QueryExecutionOptions,
) -> Result<DatasetRecordBatchStream> {
let stream = self.execute_query(query, _options).await?;
Ok(DatasetRecordBatchStream::new(stream))
let streams = self.execute_query(query, _options).await?;
if streams.len() == 1 {
Ok(DatasetRecordBatchStream::new(
streams.into_iter().next().unwrap(),
))
} else {
let stream_execs = streams
.into_iter()
.map(|stream| Arc::new(OneShotExec::new(stream)) as Arc<dyn ExecutionPlan>)
.collect();
let plan = Table::multi_vector_plan(stream_execs)?;
Ok(DatasetRecordBatchStream::new(execute_plan(
plan,
Default::default(),
)?))
}
}
async fn update(&self, update: UpdateBuilder) -> Result<u64> {
self.check_mutable().await?;
@@ -884,8 +940,10 @@ mod tests {
use futures::{future::BoxFuture, StreamExt, TryFutureExt};
use lance_index::scalar::FullTextSearchQuery;
use reqwest::Body;
use rstest::rstest;
use crate::index::vector::IvfFlatIndexBuilder;
use crate::remote::db::DEFAULT_SERVER_VERSION;
use crate::remote::JSON_CONTENT_TYPE;
use crate::{
index::{vector::IvfPqIndexBuilder, Index, IndexStatistics, IndexType},
@@ -1554,9 +1612,12 @@ mod tests {
.unwrap();
}
#[rstest]
#[case(DEFAULT_SERVER_VERSION.clone())]
#[case(semver::Version::new(0, 2, 0))]
#[tokio::test]
async fn test_query_multiple_vectors() {
let table = Table::new_with_handler("my_table", |request| {
async fn test_batch_queries(#[case] version: semver::Version) {
let table = Table::new_with_handler_version("my_table", version.clone(), move |request| {
assert_eq!(request.method(), "POST");
assert_eq!(request.url().path(), "/v1/table/my_table/query/");
assert_eq!(
@@ -1566,20 +1627,32 @@ mod tests {
let body: serde_json::Value =
serde_json::from_slice(request.body().unwrap().as_bytes().unwrap()).unwrap();
let query_vectors = body["vector"].as_array().unwrap();
assert_eq!(query_vectors.len(), 2);
assert_eq!(query_vectors[0].as_array().unwrap().len(), 3);
assert_eq!(query_vectors[1].as_array().unwrap().len(), 3);
let data = RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("query_index", DataType::Int32, false),
])),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6])),
Arc::new(Int32Array::from(vec![0, 0, 0, 1, 1, 1])),
],
)
.unwrap();
let version = ServerVersion(version.clone());
let data = if version.support_multivector() {
assert_eq!(query_vectors.len(), 2);
assert_eq!(query_vectors[0].as_array().unwrap().len(), 3);
assert_eq!(query_vectors[1].as_array().unwrap().len(), 3);
RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("query_index", DataType::Int32, false),
])),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6])),
Arc::new(Int32Array::from(vec![0, 0, 0, 1, 1, 1])),
],
)
.unwrap()
} else {
// it's single flat vector, so here the length is dim
assert_eq!(query_vectors.len(), 3);
RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap()
};
let response_body = write_ipc_file(&data);
http::Response::builder()
.status(200)

View File

@@ -4,9 +4,12 @@
use std::io::Cursor;
use arrow_array::RecordBatchReader;
use reqwest::Response;
use crate::Result;
use super::db::ServerVersion;
pub fn batches_to_ipc_bytes(batches: impl RecordBatchReader) -> Result<Vec<u8>> {
const WRITE_BUF_SIZE: usize = 4096;
let buf = Vec::with_capacity(WRITE_BUF_SIZE);
@@ -22,3 +25,24 @@ pub fn batches_to_ipc_bytes(batches: impl RecordBatchReader) -> Result<Vec<u8>>
}
Ok(buf.into_inner())
}
pub fn parse_server_version(req_id: &str, rsp: &Response) -> Result<ServerVersion> {
let version = rsp
.headers()
.get("phalanx-version")
.map(|v| {
let v = v.to_str().map_err(|e| crate::Error::Http {
source: e.into(),
request_id: req_id.to_string(),
status_code: Some(rsp.status()),
})?;
ServerVersion::parse(v).map_err(|e| crate::Error::Http {
source: e.into(),
request_id: req_id.to_string(),
status_code: Some(rsp.status()),
})
})
.transpose()?
.unwrap_or_default();
Ok(version)
}

View File

@@ -509,6 +509,27 @@ mod test_utils {
let inner = Arc::new(crate::remote::table::RemoteTable::new_mock(
name.into(),
handler,
None,
));
Self {
inner,
// Registry is unused.
embedding_registry: Arc::new(MemoryRegistry::new()),
}
}
pub fn new_with_handler_version<T>(
name: impl Into<String>,
version: semver::Version,
handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
) -> Self
where
T: Into<reqwest::Body>,
{
let inner = Arc::new(crate::remote::table::RemoteTable::new_mock(
name.into(),
handler,
Some(version),
));
Self {
inner,

View File

@@ -4,6 +4,7 @@
//! This module contains adapters to allow LanceDB tables to be used as DataFusion table providers.
use std::{collections::HashMap, sync::Arc};
use arrow_array::RecordBatch;
use arrow_schema::Schema as ArrowSchema;
use async_trait::async_trait;
use datafusion_catalog::{Session, TableProvider};
@@ -104,7 +105,9 @@ impl ExecutionPlan for MetadataEraserExec {
) -> DataFusionResult<SendableRecordBatchStream> {
let stream = self.input.execute(partition, context)?;
let schema = self.schema.clone();
let stream = stream.map_ok(move |batch| batch.with_schema(schema.clone()).unwrap());
let stream = stream.map_ok(move |batch| {
RecordBatch::try_new(schema.clone(), batch.columns().to_vec()).unwrap()
});
Ok(
Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), stream))
as SendableRecordBatchStream,
@@ -201,7 +204,8 @@ pub mod tests {
use arrow::array::AsArray;
use arrow_array::{
Int32Array, RecordBatch, RecordBatchIterator, RecordBatchReader, UInt32Array,
BinaryArray, Float64Array, Int32Array, Int64Array, RecordBatch, RecordBatchIterator,
RecordBatchReader, StringArray, UInt32Array,
};
use arrow_schema::{DataType, Field, Schema};
use datafusion::{datasource::provider_as_source, prelude::SessionContext};
@@ -238,9 +242,49 @@ pub mod tests {
)
}
fn make_tbl_two_test_batches() -> impl RecordBatchReader + Send + Sync + 'static {
let metadata = HashMap::from_iter(vec![("foo".to_string(), "bar".to_string())]);
let schema = Arc::new(
Schema::new(vec![
Field::new("ints", DataType::Int64, true),
Field::new("strings", DataType::Utf8, true),
Field::new("floats", DataType::Float64, true),
Field::new("jsons", DataType::Utf8, true),
Field::new("bins", DataType::Binary, true),
Field::new("nodates", DataType::Utf8, true),
])
.with_metadata(metadata),
);
RecordBatchIterator::new(
vec![RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from_iter_values(0..1000)),
Arc::new(StringArray::from_iter_values(
(0..1000).map(|i| i.to_string()),
)),
Arc::new(Float64Array::from_iter_values((0..1000).map(|i| i as f64))),
Arc::new(StringArray::from_iter_values(
(0..1000).map(|i| format!("{{\"i\":{}}}", i)),
)),
Arc::new(BinaryArray::from_iter_values(
(0..1000).map(|i| (i as u32).to_be_bytes().to_vec()),
)),
Arc::new(StringArray::from_iter_values(
(0..1000).map(|i| i.to_string()),
)),
],
)],
schema,
)
}
struct TestFixture {
_tmp_dir: tempfile::TempDir,
// An adapter for a table with make_test_batches batches
adapter: Arc<BaseTableAdapter>,
// an adapter for a table with make_tbl_two_test_batches batches
adapter2: Arc<BaseTableAdapter>,
}
impl TestFixture {
@@ -262,15 +306,28 @@ pub mod tests {
.await
.unwrap();
let tbl2 = db
.create_table("tbl2", make_tbl_two_test_batches())
.execute()
.await
.unwrap();
let adapter = Arc::new(
BaseTableAdapter::try_new(tbl.base_table().clone())
.await
.unwrap(),
);
let adapter2 = Arc::new(
BaseTableAdapter::try_new(tbl2.base_table().clone())
.await
.unwrap(),
);
Self {
_tmp_dir: tmp_dir,
adapter,
adapter2,
}
}
@@ -309,7 +366,7 @@ pub mod tests {
}
async fn check_plan(plan: LogicalPlan, expected: &str) {
let physical_plan = dbg!(Self::plan_to_explain(plan).await);
let physical_plan = Self::plan_to_explain(plan).await;
let mut lines_checked = 0;
for (actual_line, expected_line) in physical_plan.lines().zip(expected.lines()) {
lines_checked += 1;
@@ -343,6 +400,27 @@ pub mod tests {
}
}
#[tokio::test]
async fn test_metadata_erased_with_filter() {
// This is a regression test where the metadata eraser was not properly erasing metadata
let fixture = TestFixture::new().await;
assert!(fixture.adapter.schema().metadata().is_empty());
let plan = LogicalPlanBuilder::scan("foo", provider_as_source(fixture.adapter2), None)
.unwrap()
.filter(col("ints").lt(lit(10)))
.unwrap()
.build()
.unwrap();
let mut stream = TestFixture::plan_to_stream(plan).await;
while let Some(batch) = stream.try_next().await.unwrap() {
assert!(batch.schema().metadata().is_empty());
}
}
#[tokio::test]
async fn test_filter_pushdown() {
let fixture = TestFixture::new().await;