Compare commits

..

8 Commits

Author SHA1 Message Date
Lance Release
dcc216a244 Bump version: 0.4.3 → 0.4.4 2024-01-25 19:52:54 +00:00
Lei Xu
a7aa168c7f feat: improve the rust table query API and documents (#860)
* Easy to type
* Handle `String, &str, [String] and [&str]` well without manual
conversion
* Fix function name to be verb
* Improve docstring of Rust.
* Promote `query` and `search()` to public `Table` trait
2024-01-25 10:44:31 -08:00
Lei Xu
7a89b5ec68 doc: update rust readme to include crate and docs.rs links (#859) 2024-01-24 20:26:30 -08:00
Lei Xu
ee862abd29 feat(napi): Provide a new createIndex API in the napi SDK. (#857) 2024-01-24 17:27:46 -08:00
Will Jones
4e1ed2b139 docs: document basics of configuring object storage (#832)
Created based on upstream PR https://github.com/lancedb/lance/pull/1849

Closes #681

---------

Co-authored-by: Prashanth Rao <35005448+prrao87@users.noreply.github.com>
2024-01-24 15:27:22 -08:00
Lei Xu
008e0b1a93 feat(rust): create index API improvement (#853)
* Extract a minimal Table interface in Rust SDK
* Make create_index composable in Rust.
* Fix compiling issues from ffi
2024-01-24 10:05:12 -08:00
Bert
82cbcf6d07 Bump lance 0.9.9 (#851) 2024-01-24 08:41:28 -05:00
Lei Xu
1cd5426aea feat: rework NodeJS SDK using napi (#847)
Use Napi to write a Node.js SDK that follows Polars for better
maintainability, while keeping most of the logic in Rust.
2024-01-23 15:14:45 -08:00
59 changed files with 9469 additions and 610 deletions

View File

@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.4.3
current_version = 0.4.4
commit = True
message = Bump version: {current_version} → {new_version}
tag = True

114
.github/workflows/nodejs.yml vendored Normal file
View File

@@ -0,0 +1,114 @@
name: NodeJS (NAPI)
on:
push:
branches:
- main
pull_request:
paths:
- nodejs/**
- .github/workflows/nodejs.yml
- docker-compose.yml
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
env:
# Disable full debug symbol generation to speed up CI build and keep memory down
# "1" means line tables only, which is useful for panic tracebacks.
RUSTFLAGS: "-C debuginfo=1"
RUST_BACKTRACE: "1"
jobs:
lint:
name: Lint
runs-on: ubuntu-22.04
defaults:
run:
shell: bash
working-directory: nodejs
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0
lfs: true
- uses: actions/setup-node@v3
with:
node-version: 20
cache: 'npm'
cache-dependency-path: nodejs/package-lock.json
- uses: Swatinem/rust-cache@v2
- name: Install dependencies
run: |
sudo apt update
sudo apt install -y protobuf-compiler libssl-dev
- name: Lint
run: |
cargo fmt --all -- --check
cargo clippy --all --all-features -- -D warnings
npm ci
npm run lint
linux:
name: Linux (NodeJS ${{ matrix.node-version }})
timeout-minutes: 30
strategy:
matrix:
node-version: [ "18", "20" ]
runs-on: "ubuntu-22.04"
defaults:
run:
shell: bash
working-directory: nodejs
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0
lfs: true
- uses: actions/setup-node@v3
with:
node-version: ${{ matrix.node-version }}
cache: 'npm'
cache-dependency-path: node/package-lock.json
- uses: Swatinem/rust-cache@v2
- name: Install dependencies
run: |
sudo apt update
sudo apt install -y protobuf-compiler libssl-dev
npm install -g @napi-rs/cli
- name: Build
run: |
npm ci
npm run build
- name: Test
run: npm run test
macos:
timeout-minutes: 30
runs-on: "macos-13"
defaults:
run:
shell: bash
working-directory: nodejs
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0
lfs: true
- uses: actions/setup-node@v3
with:
node-version: 20
cache: 'npm'
cache-dependency-path: node/package-lock.json
- uses: Swatinem/rust-cache@v2
- name: Install dependencies
run: |
brew install protobuf
npm install -g @napi-rs/cli
- name: Build
run: |
npm ci
npm run build
- name: Test
run: |
npm run test

3
.gitignore vendored
View File

@@ -29,8 +29,9 @@ python/dist
node/dist
node/examples/**/package-lock.json
node/examples/**/dist
dist
## Rust
target
Cargo.lock
Cargo.lock

View File

@@ -1,14 +1,20 @@
[workspace]
members = ["rust/ffi/node", "rust/vectordb"]
members = ["rust/ffi/node", "rust/vectordb", "nodejs"]
# Python package needs to be built by maturin.
exclude = ["python"]
resolver = "2"
[workspace.package]
edition = "2021"
authors = ["Lance Devs <dev@lancedb.com>"]
license = "Apache-2.0"
repository = "https://github.com/lancedb/lancedb"
[workspace.dependencies]
lance = { "version" = "=0.9.7", "features" = ["dynamodb"] }
lance-index = { "version" = "=0.9.7" }
lance-linalg = { "version" = "=0.9.7" }
lance-testing = { "version" = "=0.9.7" }
lance = { "version" = "=0.9.9", "features" = ["dynamodb"] }
lance-index = { "version" = "=0.9.9" }
lance-linalg = { "version" = "=0.9.9" }
lance-testing = { "version" = "=0.9.9" }
# Note that this one does not include pyarrow
arrow = { version = "49.0.0", optional = false }
arrow-array = "49.0"
@@ -18,11 +24,14 @@ arrow-ord = "49.0"
arrow-schema = "49.0"
arrow-arith = "49.0"
arrow-cast = "49.0"
async-trait = "0"
chrono = "0.4.23"
half = { "version" = "=2.3.1", default-features = false, features = [
"num-traits",
] }
futures = "0"
log = "0.4"
object_store = "0.9.0"
snafu = "0.7.4"
url = "2"
num-traits = "0.2"

View File

@@ -90,6 +90,7 @@ nav:
- Full-text search: fts.md
- Filtering: sql.md
- Versioning & Reproducibility: notebooks/reproducibility.ipynb
- Configuring Storage: guides/storage.md
- 🧬 Managing embeddings:
- Overview: embeddings/index.md
- Explicit management: embeddings/embedding_explicit.md
@@ -149,6 +150,7 @@ nav:
- Full-text search: fts.md
- Filtering: sql.md
- Versioning & Reproducibility: notebooks/reproducibility.ipynb
- Configuring Storage: guides/storage.md
- Managing Embeddings:
- Overview: embeddings/index.md
- Explicit management: embeddings/embedding_explicit.md

View File

@@ -0,0 +1,91 @@
# Configuring cloud storage
<!-- TODO: When we add documentation for how to configure other storage types
we can change the name to a more general "Configuring storage" -->
When using LanceDB OSS, you can choose where to store your data. The tradeoffs between different storage options are discussed in the [storage concepts guide](../concepts/storage.md). This guide shows how to configure LanceDB to use different storage options.
## Object Stores
LanceDB OSS supports object stores such as AWS S3 (and compatible stores), Azure Blob Store, and Google Cloud Storage. Which object store to use is determined by the URI scheme of the dataset path. `s3://` is used for AWS S3, `az://` is used for Azure Blob Storage, and `gs://` is used for Google Cloud Storage. These URIs are passed to the `connect` function:
=== "Python"
AWS S3:
```python
import lancedb
db = lancedb.connect("s3://bucket/path")
```
Google Cloud Storage:
```python
import lancedb
db = lancedb.connect("gs://bucket/path")
```
Azure Blob Storage:
```python
import lancedb
db = lancedb.connect("az://bucket/path")
```
=== "JavaScript"
AWS S3:
```javascript
const lancedb = require("lancedb");
const db = await lancedb.connect("s3://bucket/path");
```
Google Cloud Storage:
```javascript
const lancedb = require("lancedb");
const db = await lancedb.connect("gs://bucket/path");
```
Azure Blob Storage:
```javascript
const lancedb = require("lancedb");
const db = await lancedb.connect("az://bucket/path");
```
In most cases, when running in the respective cloud and permissions are set up correctly, no additional configuration is required. When running outside of the respective cloud, authentication credentials must be provided using environment variables. In general, these environment variables are the same as those used by the respective cloud SDKs. The sections below describe the environment variables that can be used to configure each object store.
LanceDB OSS uses the [object-store](https://docs.rs/object_store/latest/object_store/) Rust crate for object store access. There are general environment variables that can be used to configure the object store, such as the request timeout and proxy configuration. See the [object_store ClientConfigKey](https://docs.rs/object_store/latest/object_store/enum.ClientConfigKey.html) doc for available configuration options. The environment variables that can be set are the snake-cased versions of these variable names. For example, to set `ProxyUrl` use the environment variable `PROXY_URL`. (Don't let the Rust docs intimidate you! We link to them so you can see an up-to-date list of the available options.)
### AWS S3
To configure credentials for AWS S3, you can use the `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, and `AWS_SESSION_TOKEN` environment variables.
Alternatively, if you are using AWS SSO, you can use the `AWS_PROFILE` and `AWS_DEFAULT_REGION` environment variables.
You can see a full list of environment variables [here](https://docs.rs/object_store/latest/object_store/aws/struct.AmazonS3Builder.html#method.from_env).
#### S3-compatible stores
LanceDB can also connect to S3-compatible stores, such as MinIO. To do so, you must specify two environment variables: `AWS_ENDPOINT` and `AWS_DEFAULT_REGION`. `AWS_ENDPOINT` should be the URL of the S3-compatible store, and `AWS_DEFAULT_REGION` should be the region to use.
<!-- TODO: we should also document the use of S3 Express once we fully support it -->
### Google Cloud Storage
GCS credentials are configured by setting the `GOOGLE_SERVICE_ACCOUNT` environment variable to the path of a JSON file containing the service account credentials. There are several aliases for this environment variable, documented [here](https://docs.rs/object_store/latest/object_store/gcp/struct.GoogleCloudStorageBuilder.html#method.from_env).
!!! info "HTTP/2 support"
By default, GCS uses HTTP/1 for communication, as opposed to HTTP/2. This improves maximum throughput significantly. However, if you wish to use HTTP/2 for some reason, you can set the environment variable `HTTP1_ONLY` to `false`.
### Azure Blob Storage
Azure Blob Storage credentials can be configured by setting the `AZURE_STORAGE_ACCOUNT_NAME` and ``AZURE_STORAGE_ACCOUNT_KEY`` environment variables. The full list of environment variables that can be set are documented [here](https://docs.rs/object_store/latest/object_store/azure/struct.MicrosoftAzureBuilder.html#method.from_env).
<!-- TODO: demonstrate how to configure networked file systems for optimal performance -->

View File

@@ -1,6 +1,6 @@
{
"name": "vectordb",
"version": "0.4.3",
"version": "0.4.4",
"description": " Serverless, low-latency vector database for AI applications",
"main": "dist/index.js",
"types": "dist/index.d.ts",
@@ -81,10 +81,10 @@
}
},
"optionalDependencies": {
"@lancedb/vectordb-darwin-arm64": "0.4.3",
"@lancedb/vectordb-darwin-x64": "0.4.3",
"@lancedb/vectordb-linux-arm64-gnu": "0.4.3",
"@lancedb/vectordb-linux-x64-gnu": "0.4.3",
"@lancedb/vectordb-win32-x64-msvc": "0.4.3"
"@lancedb/vectordb-darwin-arm64": "0.4.4",
"@lancedb/vectordb-darwin-x64": "0.4.4",
"@lancedb/vectordb-linux-arm64-gnu": "0.4.4",
"@lancedb/vectordb-linux-x64-gnu": "0.4.4",
"@lancedb/vectordb-win32-x64-msvc": "0.4.4"
}
}

22
nodejs/.eslintrc.js Normal file
View File

@@ -0,0 +1,22 @@
module.exports = {
env: {
browser: true,
es2021: true,
},
extends: [
"eslint:recommended",
"plugin:@typescript-eslint/recommended-type-checked",
"plugin:@typescript-eslint/stylistic-type-checked",
],
overrides: [],
parserOptions: {
project: "./tsconfig.json",
ecmaVersion: "latest",
sourceType: "module",
},
rules: {
"@typescript-eslint/method-signature-style": "off",
"@typescript-eslint/no-explicit-any": "off",
},
ignorePatterns: ["node_modules/", "dist/", "build/", "vectordb/native.*"],
};

15
nodejs/.npmignore Normal file
View File

@@ -0,0 +1,15 @@
target
Cargo.lock
.cargo
.github
npm
.eslintrc
.prettierignore
rustfmt.toml
yarn.lock
*.node
.yarn
__test__
renovate.json
.idea
src

27
nodejs/Cargo.toml Normal file
View File

@@ -0,0 +1,27 @@
[package]
name = "vectordb-nodejs"
edition = "2021"
version = "0.0.0"
license.workspace = true
repository.workspace = true
[lib]
crate-type = ["cdylib"]
[dependencies]
arrow-ipc.workspace = true
napi = { version = "2.14", default-features = false, features = [
"napi7",
"async"
] }
napi-derive = "2.14"
vectordb = { path = "../rust/vectordb" }
lance.workspace = true
lance-linalg.workspace = true
[build-dependencies]
napi-build = "2.1"
[profile.release]
lto = true
strip = "symbols"

24
nodejs/README.md Normal file
View File

@@ -0,0 +1,24 @@
# (New) LanceDB NodeJS SDK
It will replace the NodeJS SDK when it is ready.
## Development
```sh
npm run build
npm t
```
Generating docs
```
npm run docs
cd ../docs
# Asssume the virtual environment was created
# python3 -m venv venv
# pip install -r requirements.txt
. ./venv/bin/activate
mkdocs build
```

View File

@@ -0,0 +1,106 @@
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import { makeArrowTable, toBuffer } from "../vectordb/arrow";
import {
Field,
FixedSizeList,
Float16,
Float32,
Int32,
tableFromIPC,
Schema,
Float64,
} from "apache-arrow";
test("customized schema", function () {
const schema = new Schema([
new Field("a", new Int32(), true),
new Field("b", new Float32(), true),
new Field(
"c",
new FixedSizeList(3, new Field("item", new Float16())),
true
),
]);
const table = makeArrowTable(
[
{ a: 1, b: 2, c: [1, 2, 3] },
{ a: 4, b: 5, c: [4, 5, 6] },
{ a: 7, b: 8, c: [7, 8, 9] },
],
{ schema }
);
expect(table.schema.toString()).toEqual(schema.toString());
const buf = toBuffer(table);
expect(buf.byteLength).toBeGreaterThan(0);
const actual = tableFromIPC(buf);
expect(actual.numRows).toBe(3);
const actualSchema = actual.schema;
expect(actualSchema.toString()).toStrictEqual(schema.toString());
});
test("default vector column", function () {
const schema = new Schema([
new Field("a", new Float64(), true),
new Field("b", new Float64(), true),
new Field("vector", new FixedSizeList(3, new Field("item", new Float32()))),
]);
const table = makeArrowTable([
{ a: 1, b: 2, vector: [1, 2, 3] },
{ a: 4, b: 5, vector: [4, 5, 6] },
{ a: 7, b: 8, vector: [7, 8, 9] },
]);
const buf = toBuffer(table);
expect(buf.byteLength).toBeGreaterThan(0);
const actual = tableFromIPC(buf);
expect(actual.numRows).toBe(3);
const actualSchema = actual.schema;
expect(actualSchema.toString()).toEqual(actualSchema.toString());
});
test("2 vector columns", function () {
const schema = new Schema([
new Field("a", new Float64()),
new Field("b", new Float64()),
new Field("vec1", new FixedSizeList(3, new Field("item", new Float16()))),
new Field("vec2", new FixedSizeList(3, new Field("item", new Float16()))),
]);
const table = makeArrowTable(
[
{ a: 1, b: 2, vec1: [1, 2, 3], vec2: [2, 4, 6] },
{ a: 4, b: 5, vec1: [4, 5, 6], vec2: [8, 10, 12] },
{ a: 7, b: 8, vec1: [7, 8, 9], vec2: [14, 16, 18] },
],
{
vectorColumns: {
vec1: { type: new Float16() },
vec2: { type: new Float16() },
},
}
);
const buf = toBuffer(table);
expect(buf.byteLength).toBeGreaterThan(0);
const actual = tableFromIPC(buf);
expect(actual.numRows).toBe(3);
const actualSchema = actual.schema;
expect(actualSchema.toString()).toEqual(schema.toString());
});

View File

@@ -0,0 +1,34 @@
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import * as os from "os";
import * as path from "path";
import * as fs from "fs";
import { Schema, Field, Float64 } from "apache-arrow";
import { connect } from "../dist/index.js";
test("open database", async () => {
const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "test-open"));
const db = await connect(tmpDir);
let tableNames = await db.tableNames();
expect(tableNames).toStrictEqual([]);
const tbl = await db.createTable("test", [{ id: 1 }, { id: 2 }]);
expect(await db.tableNames()).toStrictEqual(["test"]);
const schema = tbl.schema;
expect(schema).toEqual(new Schema([new Field("id", new Float64(), true)]));
});

View File

@@ -0,0 +1,99 @@
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import * as os from "os";
import * as path from "path";
import * as fs from "fs";
import { connect } from "../dist";
import { Schema, Field, Float32, Int32, FixedSizeList } from "apache-arrow";
import { makeArrowTable } from "../dist/arrow";
describe("Test creating index", () => {
let tmpDir: string;
const schema = new Schema([
new Field("id", new Int32(), true),
new Field("vec", new FixedSizeList(32, new Field("item", new Float32()))),
]);
beforeEach(() => {
tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "index-"));
});
test("create vector index with no column", async () => {
const db = await connect(tmpDir);
const data = makeArrowTable(
Array(300)
.fill(1)
.map((_, i) => ({
id: i,
vec: Array(32)
.fill(1)
.map(() => Math.random()),
})),
{
schema,
}
);
const tbl = await db.createTable("test", data);
await tbl.createIndex().build();
// check index directory
const indexDir = path.join(tmpDir, "test.lance", "_indices");
expect(fs.readdirSync(indexDir)).toHaveLength(1);
// TODO: check index type.
});
test("no vector column available", async () => {
const db = await connect(tmpDir);
const tbl = await db.createTable(
"no_vec",
makeArrowTable([
{ id: 1, val: 2 },
{ id: 2, val: 3 },
])
);
await expect(tbl.createIndex().build()).rejects.toThrow(
"No vector column found"
);
await tbl.createIndex("val").build();
const indexDir = path.join(tmpDir, "no_vec.lance", "_indices");
expect(fs.readdirSync(indexDir)).toHaveLength(1);
});
test("create scalar index", async () => {
const db = await connect(tmpDir);
const data = makeArrowTable(
Array(300)
.fill(1)
.map((_, i) => ({
id: i,
vec: Array(32)
.fill(1)
.map(() => Math.random()),
})),
{
schema,
}
);
const tbl = await db.createTable("test", data);
await tbl.createIndex("id").build();
// check index directory
const indexDir = path.join(tmpDir, "test.lance", "_indices");
expect(fs.readdirSync(indexDir)).toHaveLength(1);
// TODO: check index type.
});
});

5
nodejs/build.rs Normal file
View File

@@ -0,0 +1,5 @@
extern crate napi_build;
fn main() {
napi_build::setup();
}

5
nodejs/jest.config.js Normal file
View File

@@ -0,0 +1,5 @@
/** @type {import('ts-jest').JestConfigWithTsJest} */
module.exports = {
preset: 'ts-jest',
testEnvironment: 'node',
};

View File

@@ -0,0 +1,3 @@
# `vectordb-darwin-arm64`
This is the **aarch64-apple-darwin** binary for `vectordb`

View File

@@ -0,0 +1,18 @@
{
"name": "vectordb-darwin-arm64",
"version": "0.4.3",
"os": [
"darwin"
],
"cpu": [
"arm64"
],
"main": "vectordb.darwin-arm64.node",
"files": [
"vectordb.darwin-arm64.node"
],
"license": "MIT",
"engines": {
"node": ">= 18"
}
}

View File

@@ -0,0 +1,3 @@
# `vectordb-darwin-x64`
This is the **x86_64-apple-darwin** binary for `vectordb`

View File

@@ -0,0 +1,18 @@
{
"name": "vectordb-darwin-x64",
"version": "0.4.3",
"os": [
"darwin"
],
"cpu": [
"x64"
],
"main": "vectordb.darwin-x64.node",
"files": [
"vectordb.darwin-x64.node"
],
"license": "MIT",
"engines": {
"node": ">= 18"
}
}

View File

@@ -0,0 +1,3 @@
# `vectordb-linux-arm64-gnu`
This is the **aarch64-unknown-linux-gnu** binary for `vectordb`

View File

@@ -0,0 +1,21 @@
{
"name": "vectordb-linux-arm64-gnu",
"version": "0.4.3",
"os": [
"linux"
],
"cpu": [
"arm64"
],
"main": "vectordb.linux-arm64-gnu.node",
"files": [
"vectordb.linux-arm64-gnu.node"
],
"license": "MIT",
"engines": {
"node": ">= 10"
},
"libc": [
"glibc"
]
}

View File

@@ -0,0 +1,3 @@
# `vectordb-linux-x64-gnu`
This is the **x86_64-unknown-linux-gnu** binary for `vectordb`

View File

@@ -0,0 +1,21 @@
{
"name": "vectordb-linux-x64-gnu",
"version": "0.4.3",
"os": [
"linux"
],
"cpu": [
"x64"
],
"main": "vectordb.linux-x64-gnu.node",
"files": [
"vectordb.linux-x64-gnu.node"
],
"license": "MIT",
"engines": {
"node": ">= 10"
},
"libc": [
"glibc"
]
}

6300
nodejs/package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

67
nodejs/package.json Normal file
View File

@@ -0,0 +1,67 @@
{
"name": "vectordb",
"version": "0.4.3",
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
"napi": {
"name": "vectordb-nodejs",
"triples": {
"defaults": false,
"additional": [
"aarch64-apple-darwin",
"aarch64-unknown-linux-gnu",
"x86_64-apple-darwin",
"x86_64-unknown-linux-gnu"
]
}
},
"license": "Apache 2.0",
"devDependencies": {
"@napi-rs/cli": "^2.18.0",
"@types/jest": "^29.5.11",
"@typescript-eslint/eslint-plugin": "^6.19.0",
"@typescript-eslint/parser": "^6.19.0",
"eslint": "^8.56.0",
"jest": "^29.7.0",
"ts-jest": "^29.1.2",
"typedoc": "^0.25.7",
"typedoc-plugin-markdown": "^3.17.1",
"typescript": "^5.3.3"
},
"ava": {
"timeout": "3m"
},
"engines": {
"node": ">= 18"
},
"cpu": [
"x64",
"arm64"
],
"os": [
"darwin",
"linux",
"windows"
],
"scripts": {
"artifacts": "napi artifacts",
"build:native": "napi build --platform --release --js vectordb/native.js --dts vectordb/native.d.ts dist/",
"build:debug": "napi build --platform --dts ../vectordb/native.d.ts --js ../vectordb/native.js dist/",
"build": "npm run build:debug && tsc -b",
"docs": "typedoc --plugin typedoc-plugin-markdown vectordb/index.ts",
"lint": "eslint vectordb --ext .js,.ts",
"prepublishOnly": "napi prepublish -t npm",
"test": "npm run build && jest",
"universal": "napi universal",
"version": "napi version"
},
"optionalDependencies": {
"vectordb-darwin-arm64": "0.4.3",
"vectordb-darwin-x64": "0.4.3",
"vectordb-linux-arm64-gnu": "0.4.3",
"vectordb-linux-x64-gnu": "0.4.3"
},
"dependencies": {
"apache-arrow": "^15.0.0"
}
}

86
nodejs/src/connection.rs Normal file
View File

@@ -0,0 +1,86 @@
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use napi::bindgen_prelude::*;
use napi_derive::*;
use crate::table::Table;
use vectordb::connection::{Connection as LanceDBConnection, Database};
use vectordb::ipc::ipc_file_to_batches;
#[napi]
pub struct Connection {
conn: Arc<dyn LanceDBConnection>,
}
#[napi]
impl Connection {
/// Create a new Connection instance from the given URI.
#[napi(factory)]
pub async fn new(uri: String) -> napi::Result<Self> {
Ok(Self {
conn: Arc::new(Database::connect(&uri).await.map_err(|e| {
napi::Error::from_reason(format!("Failed to connect to database: {}", e))
})?),
})
}
/// List all tables in the dataset.
#[napi]
pub async fn table_names(&self) -> napi::Result<Vec<String>> {
self.conn
.table_names()
.await
.map_err(|e| napi::Error::from_reason(format!("{}", e)))
}
/// Create table from a Apache Arrow IPC (file) buffer.
///
/// Parameters:
/// - name: The name of the table.
/// - buf: The buffer containing the IPC file.
///
#[napi]
pub async fn create_table(&self, name: String, buf: Buffer) -> napi::Result<Table> {
let batches = ipc_file_to_batches(buf.to_vec())
.map_err(|e| napi::Error::from_reason(format!("Failed to read IPC file: {}", e)))?;
let tbl = self
.conn
.create_table(&name, Box::new(batches), None)
.await
.map_err(|e| napi::Error::from_reason(format!("{}", e)))?;
Ok(Table::new(tbl))
}
#[napi]
pub async fn open_table(&self, name: String) -> napi::Result<Table> {
let tbl = self
.conn
.open_table(&name)
.await
.map_err(|e| napi::Error::from_reason(format!("{}", e)))?;
Ok(Table::new(tbl))
}
/// Drop table with the name. Or raise an error if the table does not exist.
#[napi]
pub async fn drop_table(&self, name: String) -> napi::Result<()> {
self.conn
.drop_table(&name)
.await
.map_err(|e| napi::Error::from_reason(format!("{}", e)))
}
}

101
nodejs/src/index.rs Normal file
View File

@@ -0,0 +1,101 @@
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use lance_linalg::distance::MetricType as LanceMetricType;
use napi_derive::napi;
#[napi]
pub enum IndexType {
Scalar,
IvfPq,
}
#[napi]
pub enum MetricType {
L2,
Cosine,
Dot,
}
impl From<MetricType> for LanceMetricType {
fn from(metric: MetricType) -> Self {
match metric {
MetricType::L2 => Self::L2,
MetricType::Cosine => Self::Cosine,
MetricType::Dot => Self::Dot,
}
}
}
#[napi]
pub struct IndexBuilder {
inner: vectordb::index::IndexBuilder,
}
#[napi]
impl IndexBuilder {
pub fn new(tbl: &dyn vectordb::Table) -> Self {
let inner = tbl.create_index(&[]);
Self { inner }
}
#[napi]
pub unsafe fn replace(&mut self, v: bool) {
self.inner.replace(v);
}
#[napi]
pub unsafe fn column(&mut self, c: String) {
self.inner.columns(&[c.as_str()]);
}
#[napi]
pub unsafe fn name(&mut self, name: String) {
self.inner.name(name.as_str());
}
#[napi]
pub unsafe fn ivf_pq(
&mut self,
metric_type: Option<MetricType>,
num_partitions: Option<u32>,
num_sub_vectors: Option<u32>,
num_bits: Option<u32>,
max_iterations: Option<u32>,
sample_rate: Option<u32>,
) {
self.inner.ivf_pq();
metric_type.map(|m| self.inner.metric_type(m.into()));
num_partitions.map(|p| self.inner.num_partitions(p));
num_sub_vectors.map(|s| self.inner.num_sub_vectors(s));
num_bits.map(|b| self.inner.num_bits(b));
max_iterations.map(|i| self.inner.max_iterations(i));
sample_rate.map(|s| self.inner.sample_rate(s));
}
#[napi]
pub unsafe fn scalar(&mut self) {
self.inner.scalar();
}
#[napi]
pub async fn build(&self) -> napi::Result<()> {
println!("nodejs::index.rs : build");
self.inner
.build()
.await
.map_err(|e| napi::Error::from_reason(format!("Failed to build index: {}", e)))?;
Ok(())
}
}

47
nodejs/src/lib.rs Normal file
View File

@@ -0,0 +1,47 @@
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use connection::Connection;
use napi_derive::*;
mod connection;
mod index;
mod query;
mod table;
#[napi(object)]
pub struct ConnectionOptions {
pub uri: String,
pub api_key: Option<String>,
pub host_override: Option<String>,
}
/// Write mode for writing a table.
#[napi(string_enum)]
pub enum WriteMode {
Create,
Append,
Overwrite,
}
/// Write options when creating a Table.
#[napi(object)]
pub struct WriteOptions {
pub mode: Option<WriteMode>,
}
#[napi]
pub async fn connect(options: ConnectionOptions) -> napi::Result<Connection> {
Connection::new(options.uri.clone()).await
}

48
nodejs/src/query.rs Normal file
View File

@@ -0,0 +1,48 @@
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use napi::bindgen_prelude::*;
use napi_derive::napi;
use vectordb::query::Query as LanceDBQuery;
use crate::table::Table;
#[napi]
pub struct Query {
inner: LanceDBQuery,
}
#[napi]
impl Query {
pub fn new(table: &Table) -> Self {
Self {
inner: table.table.query(),
}
}
#[napi]
pub fn vector(&mut self, vector: Float32Array) {
let inn = self.inner.clone().nearest_to(&vector);
self.inner = inn;
}
#[napi]
pub fn to_arrow(&self) -> napi::Result<()> {
// let buf = self.inner.to_arrow().map_err(|e| {
// napi::Error::from_reason(format!("Failed to convert query to arrow: {}", e))
// })?;
// Ok(buf)
todo!()
}
}

88
nodejs/src/table.rs Normal file
View File

@@ -0,0 +1,88 @@
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use arrow_ipc::writer::FileWriter;
use napi::bindgen_prelude::*;
use napi_derive::napi;
use vectordb::{ipc::ipc_file_to_batches, table::TableRef};
use crate::index::IndexBuilder;
use crate::query::Query;
#[napi]
pub struct Table {
pub(crate) table: TableRef,
}
#[napi]
impl Table {
pub(crate) fn new(table: TableRef) -> Self {
Self { table }
}
/// Return Schema as empty Arrow IPC file.
#[napi]
pub fn schema(&self) -> napi::Result<Buffer> {
let mut writer = FileWriter::try_new(vec![], &self.table.schema())
.map_err(|e| napi::Error::from_reason(format!("Failed to create IPC file: {}", e)))?;
writer
.finish()
.map_err(|e| napi::Error::from_reason(format!("Failed to finish IPC file: {}", e)))?;
Ok(Buffer::from(writer.into_inner().map_err(|e| {
napi::Error::from_reason(format!("Failed to get IPC file: {}", e))
})?))
}
#[napi]
pub async fn add(&self, buf: Buffer) -> napi::Result<()> {
let batches = ipc_file_to_batches(buf.to_vec())
.map_err(|e| napi::Error::from_reason(format!("Failed to read IPC file: {}", e)))?;
self.table.add(Box::new(batches), None).await.map_err(|e| {
napi::Error::from_reason(format!(
"Failed to add batches to table {}: {}",
self.table, e
))
})
}
#[napi]
pub async fn count_rows(&self) -> napi::Result<usize> {
self.table.count_rows().await.map_err(|e| {
napi::Error::from_reason(format!(
"Failed to count rows in table {}: {}",
self.table, e
))
})
}
#[napi]
pub async fn delete(&self, predicate: String) -> napi::Result<()> {
self.table.delete(&predicate).await.map_err(|e| {
napi::Error::from_reason(format!(
"Failed to delete rows in table {}: predicate={}",
self.table, e
))
})
}
#[napi]
pub fn create_index(&self) -> IndexBuilder {
IndexBuilder::new(self.table.as_ref())
}
#[napi]
pub fn query(&self) -> Query {
Query::new(self)
}
}

31
nodejs/tsconfig.json Normal file
View File

@@ -0,0 +1,31 @@
{
"include": [
"vectordb/*.ts",
"vectordb/**/*.ts",
"vectordb/*.js",
],
"compilerOptions": {
"target": "es2022",
"module": "commonjs",
"declaration": true,
"outDir": "./dist",
"strict": true,
"allowJs": true,
"resolveJsonModule": true,
},
"exclude": [
"./dist/*",
],
"typedocOptions": {
"entryPoints": [
"vectordb/index.ts"
],
"out": "../docs/src/javascript/",
"visibilityFilters": {
"protected": false,
"private": false,
"inherited": true,
"external": false,
}
}
}

183
nodejs/vectordb/arrow.ts Normal file
View File

@@ -0,0 +1,183 @@
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import {
Field,
FixedSizeList,
Float,
Float32,
Schema,
Table as ArrowTable,
Table,
Vector,
vectorFromArray,
tableToIPC,
} from "apache-arrow";
/** Data type accepted by NodeJS SDK */
export type Data = Record<string, unknown>[] | ArrowTable;
export class VectorColumnOptions {
/** Vector column type. */
type: Float = new Float32();
constructor(values?: Partial<VectorColumnOptions>) {
Object.assign(this, values);
}
}
/** Options to control the makeArrowTable call. */
export class MakeArrowTableOptions {
/** Provided schema. */
schema?: Schema;
/** Vector columns */
vectorColumns: Record<string, VectorColumnOptions> = {
vector: new VectorColumnOptions(),
};
constructor(values?: Partial<MakeArrowTableOptions>) {
Object.assign(this, values);
}
}
/**
* An enhanced version of the {@link makeTable} function from Apache Arrow
* that supports nested fields and embeddings columns.
*
* Note that it currently does not support nulls.
*
* @param data input data
* @param options options to control the makeArrowTable call.
*
* @example
*
* ```ts
*
* import { fromTableToBuffer, makeArrowTable } from "../arrow";
* import { Field, FixedSizeList, Float16, Float32, Int32, Schema } from "apache-arrow";
*
* const schema = new Schema([
* new Field("a", new Int32()),
* new Field("b", new Float32()),
* new Field("c", new FixedSizeList(3, new Field("item", new Float16()))),
* ]);
* const table = makeArrowTable([
* { a: 1, b: 2, c: [1, 2, 3] },
* { a: 4, b: 5, c: [4, 5, 6] },
* { a: 7, b: 8, c: [7, 8, 9] },
* ], { schema });
* ```
*
* It guesses the vector columns if the schema is not provided. For example,
* by default it assumes that the column named `vector` is a vector column.
*
* ```ts
*
* const schema = new Schema([
new Field("a", new Float64()),
new Field("b", new Float64()),
new Field(
"vector",
new FixedSizeList(3, new Field("item", new Float32()))
),
]);
const table = makeArrowTable([
{ a: 1, b: 2, vector: [1, 2, 3] },
{ a: 4, b: 5, vector: [4, 5, 6] },
{ a: 7, b: 8, vector: [7, 8, 9] },
]);
assert.deepEqual(table.schema, schema);
* ```
*
* You can specify the vector column types and names using the options as well
*
* ```typescript
*
* const schema = new Schema([
new Field('a', new Float64()),
new Field('b', new Float64()),
new Field('vec1', new FixedSizeList(3, new Field('item', new Float16()))),
new Field('vec2', new FixedSizeList(3, new Field('item', new Float16())))
]);
* const table = makeArrowTable([
{ a: 1, b: 2, vec1: [1, 2, 3], vec2: [2, 4, 6] },
{ a: 4, b: 5, vec1: [4, 5, 6], vec2: [8, 10, 12] },
{ a: 7, b: 8, vec1: [7, 8, 9], vec2: [14, 16, 18] }
], {
vectorColumns: {
vec1: { type: new Float16() },
vec2: { type: new Float16() }
}
}
* assert.deepEqual(table.schema, schema)
* ```
*/
export function makeArrowTable(
data: Record<string, any>[],
options?: Partial<MakeArrowTableOptions>
): Table {
if (data.length === 0) {
throw new Error("At least one record needs to be provided");
}
const opt = new MakeArrowTableOptions(options ?? {});
const columns: Record<string, Vector> = {};
// TODO: sample dataset to find missing columns
const columnNames = Object.keys(data[0]);
for (const colName of columnNames) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
const values = data.map((datum) => datum[colName]);
let vector: Vector;
if (opt.schema !== undefined) {
// Explicit schema is provided, highest priority
vector = vectorFromArray(
values,
opt.schema?.fields.filter((f) => f.name === colName)[0]?.type
);
} else {
const vectorColumnOptions = opt.vectorColumns[colName];
if (vectorColumnOptions !== undefined) {
const fslType = new FixedSizeList(
(values[0] as any[]).length,
new Field("item", vectorColumnOptions.type, false)
);
vector = vectorFromArray(values, fslType);
} else {
// Normal case
vector = vectorFromArray(values);
}
}
columns[colName] = vector;
}
return new Table(columns);
}
/**
* Convert an Arrow Table to a Buffer.
*
* @param data Arrow Table
* @param schema Arrow Schema, optional
* @returns Buffer node
*/
export function toBuffer(data: Data, schema?: Schema): Buffer {
let tbl: Table;
if (data instanceof Table) {
tbl = data;
} else {
tbl = makeArrowTable(data, { schema });
}
return Buffer.from(tableToIPC(tbl));
}

View File

@@ -0,0 +1,70 @@
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import { toBuffer } from "./arrow";
import { Connection as _NativeConnection } from "./native";
import { Table } from "./table";
import { Table as ArrowTable } from "apache-arrow";
/**
* A LanceDB Connection that allows you to open tables and create new ones.
*
* Connection could be local against filesystem or remote against a server.
*/
export class Connection {
readonly inner: _NativeConnection;
constructor(inner: _NativeConnection) {
this.inner = inner;
}
/** List all the table names in this database. */
async tableNames(): Promise<string[]> {
return this.inner.tableNames();
}
/**
* Open a table in the database.
*
* @param name The name of the table.
* @param embeddings An embedding function to use on this table
*/
async openTable(name: string): Promise<Table> {
const innerTable = await this.inner.openTable(name);
return new Table(innerTable);
}
/**
* Creates a new Table and initialize it with new data.
*
* @param {string} name - The name of the table.
* @param data - Non-empty Array of Records to be inserted into the table
*/
async createTable(
name: string,
data: Record<string, unknown>[] | ArrowTable
): Promise<Table> {
const buf = toBuffer(data);
const innerTable = await this.inner.createTable(name, buf);
return new Table(innerTable);
}
/**
* Drop an existing table.
* @param name The name of the table to drop.
*/
async dropTable(name: string): Promise<void> {
return this.inner.dropTable(name);
}
}

64
nodejs/vectordb/index.ts Normal file
View File

@@ -0,0 +1,64 @@
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import { Connection } from "./connection";
import { Connection as NativeConnection, ConnectionOptions } from "./native.js";
export {
ConnectionOptions,
WriteOptions,
Query,
MetricType,
} from "./native.js";
export { Connection } from "./connection";
export { Table } from "./table";
export { Data } from "./arrow";
export { IvfPQOptions, IndexBuilder } from "./indexer";
/**
* Connect to a LanceDB instance at the given URI.
*
* Accpeted formats:
*
* - `/path/to/database` - local database
* - `s3://bucket/path/to/database` or `gs://bucket/path/to/database` - database on cloud storage
* - `db://host:port` - remote database (LanceDB cloud)
*
* @param uri The uri of the database. If the database uri starts with `db://` then it connects to a remote database.
*
* @see {@link ConnectionOptions} for more details on the URI format.
*/
export async function connect(uri: string): Promise<Connection>;
export async function connect(
opts: Partial<ConnectionOptions>
): Promise<Connection>;
export async function connect(
args: string | Partial<ConnectionOptions>
): Promise<Connection> {
let opts: ConnectionOptions;
if (typeof args === "string") {
opts = { uri: args };
} else {
opts = Object.assign(
{
uri: "",
apiKey: "",
hostOverride: "",
},
args
);
}
const nativeConn = await NativeConnection.new(opts.uri);
return new Connection(nativeConn);
}

102
nodejs/vectordb/indexer.ts Normal file
View File

@@ -0,0 +1,102 @@
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import {
MetricType,
IndexBuilder as NativeBuilder,
Table as NativeTable,
} from "./native";
/** Options to create `IVF_PQ` index */
export interface IvfPQOptions {
/** Number of IVF partitions. */
num_partitions?: number;
/** Number of sub-vectors in PQ coding. */
num_sub_vectors?: number;
/** Number of bits used for each PQ code.
*/
num_bits?: number;
/** Metric type to calculate the distance between vectors.
*
* Supported metrics: `L2`, `Cosine` and `Dot`.
*/
metric_type?: MetricType;
/** Number of iterations to train K-means.
*
* Default is 50. The more iterations it usually yield better results,
* but it takes longer to train.
*/
max_iterations?: number;
sample_rate?: number;
}
/**
* Building an index on LanceDB {@link Table}
*
* @see {@link Table.createIndex} for detailed usage.
*/
export class IndexBuilder {
private inner: NativeBuilder;
constructor(tbl: NativeTable) {
this.inner = tbl.createIndex();
}
/** Instruct the builder to build an `IVF_PQ` index */
ivf_pq(options?: IvfPQOptions): IndexBuilder {
this.inner.ivfPq(
options?.metric_type,
options?.num_partitions,
options?.num_sub_vectors,
options?.num_bits,
options?.max_iterations,
options?.sample_rate
);
return this;
}
/** Instruct the builder to build a Scalar index. */
scalar(): IndexBuilder {
this.scalar();
return this;
}
/** Set the column(s) to create index on top of. */
column(col: string): IndexBuilder {
this.inner.column(col);
return this;
}
/** Set to true to replace existing index. */
replace(val: boolean): IndexBuilder {
this.inner.replace(val);
return this;
}
/** Specify the name of the index. Optional */
name(n: string): IndexBuilder {
this.inner.name(n);
return this;
}
/** Building the index. */
async build() {
await this.inner.build();
}
}

69
nodejs/vectordb/native.d.ts vendored Normal file
View File

@@ -0,0 +1,69 @@
/* tslint:disable */
/* eslint-disable */
/* auto-generated by NAPI-RS */
export const enum IndexType {
Scalar = 0,
IvfPq = 1
}
export const enum MetricType {
L2 = 0,
Cosine = 1,
Dot = 2
}
export interface ConnectionOptions {
uri: string
apiKey?: string
hostOverride?: string
}
/** Write mode for writing a table. */
export const enum WriteMode {
Create = 'Create',
Append = 'Append',
Overwrite = 'Overwrite'
}
/** Write options when creating a Table. */
export interface WriteOptions {
mode?: WriteMode
}
export function connect(options: ConnectionOptions): Promise<Connection>
export class Connection {
/** Create a new Connection instance from the given URI. */
static new(uri: string): Promise<Connection>
/** List all tables in the dataset. */
tableNames(): Promise<Array<string>>
/**
* Create table from a Apache Arrow IPC (file) buffer.
*
* Parameters:
* - name: The name of the table.
* - buf: The buffer containing the IPC file.
*
*/
createTable(name: string, buf: Buffer): Promise<Table>
openTable(name: string): Promise<Table>
/** Drop table with the name. Or raise an error if the table does not exist. */
dropTable(name: string): Promise<void>
}
export class IndexBuilder {
replace(v: boolean): void
column(c: string): void
name(name: string): void
ivfPq(metricType?: MetricType | undefined | null, numPartitions?: number | undefined | null, numSubVectors?: number | undefined | null, numBits?: number | undefined | null, maxIterations?: number | undefined | null, sampleRate?: number | undefined | null): void
scalar(): void
build(): Promise<void>
}
export class Query {
vector(vector: Float32Array): void
toArrow(): void
}
export class Table {
/** Return Schema as empty Arrow IPC file. */
schema(): Buffer
add(buf: Buffer): Promise<void>
countRows(): Promise<bigint>
delete(predicate: string): Promise<void>
createIndex(): IndexBuilder
query(): Query
}

307
nodejs/vectordb/native.js Normal file
View File

@@ -0,0 +1,307 @@
/* tslint:disable */
/* eslint-disable */
/* prettier-ignore */
/* auto-generated by NAPI-RS */
const { existsSync, readFileSync } = require('fs')
const { join } = require('path')
const { platform, arch } = process
let nativeBinding = null
let localFileExisted = false
let loadError = null
function isMusl() {
// For Node 10
if (!process.report || typeof process.report.getReport !== 'function') {
try {
const lddPath = require('child_process').execSync('which ldd').toString().trim()
return readFileSync(lddPath, 'utf8').includes('musl')
} catch (e) {
return true
}
} else {
const { glibcVersionRuntime } = process.report.getReport().header
return !glibcVersionRuntime
}
}
switch (platform) {
case 'android':
switch (arch) {
case 'arm64':
localFileExisted = existsSync(join(__dirname, 'vectordb-nodejs.android-arm64.node'))
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.android-arm64.node')
} else {
nativeBinding = require('vectordb-android-arm64')
}
} catch (e) {
loadError = e
}
break
case 'arm':
localFileExisted = existsSync(join(__dirname, 'vectordb-nodejs.android-arm-eabi.node'))
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.android-arm-eabi.node')
} else {
nativeBinding = require('vectordb-android-arm-eabi')
}
} catch (e) {
loadError = e
}
break
default:
throw new Error(`Unsupported architecture on Android ${arch}`)
}
break
case 'win32':
switch (arch) {
case 'x64':
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.win32-x64-msvc.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.win32-x64-msvc.node')
} else {
nativeBinding = require('vectordb-win32-x64-msvc')
}
} catch (e) {
loadError = e
}
break
case 'ia32':
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.win32-ia32-msvc.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.win32-ia32-msvc.node')
} else {
nativeBinding = require('vectordb-win32-ia32-msvc')
}
} catch (e) {
loadError = e
}
break
case 'arm64':
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.win32-arm64-msvc.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.win32-arm64-msvc.node')
} else {
nativeBinding = require('vectordb-win32-arm64-msvc')
}
} catch (e) {
loadError = e
}
break
default:
throw new Error(`Unsupported architecture on Windows: ${arch}`)
}
break
case 'darwin':
localFileExisted = existsSync(join(__dirname, 'vectordb-nodejs.darwin-universal.node'))
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.darwin-universal.node')
} else {
nativeBinding = require('vectordb-darwin-universal')
}
break
} catch {}
switch (arch) {
case 'x64':
localFileExisted = existsSync(join(__dirname, 'vectordb-nodejs.darwin-x64.node'))
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.darwin-x64.node')
} else {
nativeBinding = require('vectordb-darwin-x64')
}
} catch (e) {
loadError = e
}
break
case 'arm64':
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.darwin-arm64.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.darwin-arm64.node')
} else {
nativeBinding = require('vectordb-darwin-arm64')
}
} catch (e) {
loadError = e
}
break
default:
throw new Error(`Unsupported architecture on macOS: ${arch}`)
}
break
case 'freebsd':
if (arch !== 'x64') {
throw new Error(`Unsupported architecture on FreeBSD: ${arch}`)
}
localFileExisted = existsSync(join(__dirname, 'vectordb-nodejs.freebsd-x64.node'))
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.freebsd-x64.node')
} else {
nativeBinding = require('vectordb-freebsd-x64')
}
} catch (e) {
loadError = e
}
break
case 'linux':
switch (arch) {
case 'x64':
if (isMusl()) {
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-x64-musl.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-x64-musl.node')
} else {
nativeBinding = require('vectordb-linux-x64-musl')
}
} catch (e) {
loadError = e
}
} else {
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-x64-gnu.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-x64-gnu.node')
} else {
nativeBinding = require('vectordb-linux-x64-gnu')
}
} catch (e) {
loadError = e
}
}
break
case 'arm64':
if (isMusl()) {
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-arm64-musl.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-arm64-musl.node')
} else {
nativeBinding = require('vectordb-linux-arm64-musl')
}
} catch (e) {
loadError = e
}
} else {
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-arm64-gnu.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-arm64-gnu.node')
} else {
nativeBinding = require('vectordb-linux-arm64-gnu')
}
} catch (e) {
loadError = e
}
}
break
case 'arm':
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-arm-gnueabihf.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-arm-gnueabihf.node')
} else {
nativeBinding = require('vectordb-linux-arm-gnueabihf')
}
} catch (e) {
loadError = e
}
break
case 'riscv64':
if (isMusl()) {
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-riscv64-musl.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-riscv64-musl.node')
} else {
nativeBinding = require('vectordb-linux-riscv64-musl')
}
} catch (e) {
loadError = e
}
} else {
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-riscv64-gnu.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-riscv64-gnu.node')
} else {
nativeBinding = require('vectordb-linux-riscv64-gnu')
}
} catch (e) {
loadError = e
}
}
break
case 's390x':
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-s390x-gnu.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-s390x-gnu.node')
} else {
nativeBinding = require('vectordb-linux-s390x-gnu')
}
} catch (e) {
loadError = e
}
break
default:
throw new Error(`Unsupported architecture on Linux: ${arch}`)
}
break
default:
throw new Error(`Unsupported OS: ${platform}, architecture: ${arch}`)
}
if (!nativeBinding) {
if (loadError) {
throw loadError
}
throw new Error(`Failed to load native binding`)
}
const { Connection, IndexType, MetricType, IndexBuilder, Query, Table, WriteMode, connect } = nativeBinding
module.exports.Connection = Connection
module.exports.IndexType = IndexType
module.exports.MetricType = MetricType
module.exports.IndexBuilder = IndexBuilder
module.exports.Query = Query
module.exports.Table = Table
module.exports.WriteMode = WriteMode
module.exports.connect = connect

93
nodejs/vectordb/query.ts Normal file
View File

@@ -0,0 +1,93 @@
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import { RecordBatch } from "apache-arrow";
import { Table } from "./table";
// TODO: re-eanble eslint once we have a real implementation
/* eslint-disable */
class RecordBatchIterator implements AsyncIterator<RecordBatch> {
next(
...args: [] | [undefined]
): Promise<IteratorResult<RecordBatch<any>, any>> {
throw new Error("Method not implemented.");
}
return?(value?: any): Promise<IteratorResult<RecordBatch<any>, any>> {
throw new Error("Method not implemented.");
}
throw?(e?: any): Promise<IteratorResult<RecordBatch<any>, any>> {
throw new Error("Method not implemented.");
}
}
/* eslint-enable */
/** Query executor */
export class Query implements AsyncIterable<RecordBatch> {
private readonly tbl: Table;
private _filter?: string;
private _limit?: number;
// Vector search
private _vector?: Float32Array;
private _nprobes?: number;
private _refine_factor?: number = 1;
constructor(tbl: Table) {
this.tbl = tbl;
}
/** Set the filter predicate, only returns the results that satisfy the filter.
*
*/
filter(predicate: string): Query {
this._filter = predicate;
return this;
}
/**
* Set the limit of rows to return.
*/
limit(limit: number): Query {
this._limit = limit;
return this;
}
/**
* Set the query vector.
*/
vector(vector: number[]): Query {
this._vector = Float32Array.from(vector);
return this;
}
/**
* Set the number of probes to use for the query.
*/
nprobes(nprobes: number): Query {
this._nprobes = nprobes;
return this;
}
/**
* Set the refine factor for the query.
*/
refine_factor(refine_factor: number): Query {
this._refine_factor = refine_factor;
return this;
}
[Symbol.asyncIterator](): AsyncIterator<RecordBatch<any>, any, undefined> {
throw new RecordBatchIterator();
}
}

105
nodejs/vectordb/table.ts Normal file
View File

@@ -0,0 +1,105 @@
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import { Schema, tableFromIPC } from "apache-arrow";
import { Table as _NativeTable } from "./native";
import { toBuffer, Data } from "./arrow";
import { Query } from "./query";
import { IndexBuilder } from "./indexer";
/**
* A LanceDB Table is the collection of Records.
*
* Each Record has one or more vector fields.
*/
export class Table {
private readonly inner: _NativeTable;
/** Construct a Table. Internal use only. */
constructor(inner: _NativeTable) {
this.inner = inner;
}
/** Get the schema of the table. */
get schema(): Schema {
const schemaBuf = this.inner.schema();
const tbl = tableFromIPC(schemaBuf);
return tbl.schema;
}
/**
* Insert records into this Table.
*
* @param {Data} data Records to be inserted into the Table
* @return The number of rows added to the table
*/
async add(data: Data): Promise<void> {
const buffer = toBuffer(data);
await this.inner.add(buffer);
}
/** Count the total number of rows in the dataset. */
async countRows(): Promise<bigint> {
return await this.inner.countRows();
}
/** Delete the rows that satisfy the predicate. */
async delete(predicate: string): Promise<void> {
await this.inner.delete(predicate);
}
/** Create an index over the columns.
*
* @param {string} column The column to create the index on. If not specified,
* it will create an index on vector field.
*
* @example
*
* By default, it creates vector idnex on one vector column.
*
* ```typescript
* const table = await conn.openTable("my_table");
* await table.createIndex().build();
* ```
*
* You can specify `IVF_PQ` parameters via `ivf_pq({})` call.
* ```typescript
* const table = await conn.openTable("my_table");
* await table.createIndex("my_vec_col")
* .ivf_pq({ num_partitions: 128, num_sub_vectors: 16 })
* .build();
* ```
*
* Or create a Scalar index
*
* ```typescript
* await table.createIndex("my_float_col").build();
* ```
*/
createIndex(column?: string): IndexBuilder {
let builder = new IndexBuilder(this.inner);
if (column !== undefined) {
builder = builder.column(column);
}
return builder;
}
search(vector?: number[]): Query {
const q = new Query(this);
if (vector !== undefined) {
q.vector(vector);
}
return q;
}
}

View File

@@ -3,7 +3,7 @@ name = "lancedb"
version = "0.5.1"
dependencies = [
"deprecation",
"pylance==0.9.7",
"pylance==0.9.9",
"ratelimiter~=1.0",
"retry>=0.9.2",
"tqdm>=4.27.0",

View File

@@ -1,6 +1,6 @@
[package]
name = "vectordb-node"
version = "0.4.3"
version = "0.4.4"
description = "Serverless, low-latency vector database for AI applications"
license = "Apache-2.0"
edition = "2018"

View File

@@ -29,10 +29,14 @@ pub(crate) fn table_create_scalar_index(mut cx: FunctionContext) -> JsResult<JsP
let (deferred, promise) = cx.promise();
let channel = cx.channel();
let mut table = js_table.table.clone();
let table = js_table.table.clone();
rt.spawn(async move {
let idx_result = table.create_scalar_index(&column, replace).await;
let idx_result = table
.as_native()
.unwrap()
.create_scalar_index(&column, replace)
.await;
deferred.settle_with(&channel, move |mut cx| {
idx_result.or_throw(&mut cx)?;

View File

@@ -12,13 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use lance_index::vector::{ivf::IvfBuildParams, pq::PQBuildParams};
use lance_linalg::distance::MetricType;
use neon::context::FunctionContext;
use neon::prelude::*;
use std::convert::TryFrom;
use vectordb::index::vector::{IvfPQIndexBuilder, VectorIndexBuilder};
use vectordb::index::IndexBuilder;
use crate::error::Error::InvalidIndexType;
use crate::error::ResultExt;
@@ -29,17 +27,24 @@ use crate::table::JsTable;
pub(crate) fn table_create_vector_index(mut cx: FunctionContext) -> JsResult<JsPromise> {
let js_table = cx.this().downcast_or_throw::<JsBox<JsTable>, _>(&mut cx)?;
let index_params = cx.argument::<JsObject>(0)?;
let index_params_builder = get_index_params_builder(&mut cx, index_params).or_throw(&mut cx)?;
let rt = runtime(&mut cx)?;
let (deferred, promise) = cx.promise();
let channel = cx.channel();
let mut table = js_table.table.clone();
let table = js_table.table.clone();
let column_name = index_params
.get_opt::<JsString, _, _>(&mut cx, "column")?
.map(|s| s.value(&mut cx))
.unwrap_or("vector".to_string()); // Backward compatibility
let tbl = table.clone();
let mut index_builder = tbl.create_index(&[&column_name]);
get_index_params_builder(&mut cx, index_params, &mut index_builder).or_throw(&mut cx)?;
rt.spawn(async move {
let idx_result = table.create_index(&index_params_builder).await;
let idx_result = index_builder.build().await;
deferred.settle_with(&channel, move |mut cx| {
idx_result.or_throw(&mut cx)?;
Ok(cx.boxed(JsTable::from(table)))
@@ -51,66 +56,39 @@ pub(crate) fn table_create_vector_index(mut cx: FunctionContext) -> JsResult<JsP
fn get_index_params_builder(
cx: &mut FunctionContext,
obj: Handle<JsObject>,
) -> crate::error::Result<impl VectorIndexBuilder> {
let idx_type = obj.get::<JsString, _, _>(cx, "type")?.value(cx);
match idx_type.as_str() {
"ivf_pq" => {
let mut index_builder: IvfPQIndexBuilder = IvfPQIndexBuilder::new();
let mut pq_params = PQBuildParams::default();
obj.get_opt::<JsString, _, _>(cx, "column")?
.map(|s| index_builder.column(s.value(cx)));
obj.get_opt::<JsString, _, _>(cx, "index_name")?
.map(|s| index_builder.index_name(s.value(cx)));
if let Some(metric_type) = obj.get_opt::<JsString, _, _>(cx, "metric_type")? {
let metric_type = MetricType::try_from(metric_type.value(cx).as_str()).unwrap();
index_builder.metric_type(metric_type);
}
let num_partitions = obj.get_opt_usize(cx, "num_partitions")?;
let max_iters = obj.get_opt_usize(cx, "max_iters")?;
num_partitions.map(|np| {
let max_iters = max_iters.unwrap_or(50);
let ivf_params = IvfBuildParams {
num_partitions: np,
max_iters,
..Default::default()
};
index_builder.ivf_params(ivf_params)
});
if let Some(use_opq) = obj.get_opt::<JsBoolean, _, _>(cx, "use_opq")? {
pq_params.use_opq = use_opq.value(cx);
}
if let Some(num_sub_vectors) = obj.get_opt_usize(cx, "num_sub_vectors")? {
pq_params.num_sub_vectors = num_sub_vectors;
}
if let Some(num_bits) = obj.get_opt_usize(cx, "num_bits")? {
pq_params.num_bits = num_bits;
}
if let Some(max_iters) = obj.get_opt_usize(cx, "max_iters")? {
pq_params.max_iters = max_iters;
}
if let Some(max_opq_iters) = obj.get_opt_usize(cx, "max_opq_iters")? {
pq_params.max_opq_iters = max_opq_iters;
}
if let Some(replace) = obj.get_opt::<JsBoolean, _, _>(cx, "replace")? {
index_builder.replace(replace.value(cx));
}
Ok(index_builder)
builder: &mut IndexBuilder,
) -> crate::error::Result<()> {
match obj.get::<JsString, _, _>(cx, "type")?.value(cx).as_str() {
"ivf_pq" => builder.ivf_pq(),
_ => {
return Err(InvalidIndexType {
index_type: "".into(),
})
}
index_type => Err(InvalidIndexType {
index_type: index_type.into(),
}),
};
obj.get_opt::<JsString, _, _>(cx, "index_name")?
.map(|s| builder.name(s.value(cx).as_str()));
if let Some(metric_type) = obj.get_opt::<JsString, _, _>(cx, "metric_type")? {
let metric_type = MetricType::try_from(metric_type.value(cx).as_str())?;
builder.metric_type(metric_type);
}
if let Some(np) = obj.get_opt_u32(cx, "num_partitions")? {
builder.num_partitions(np);
}
if let Some(ns) = obj.get_opt_u32(cx, "num_sub_vectors")? {
builder.num_sub_vectors(ns);
}
if let Some(max_iters) = obj.get_opt_u32(cx, "max_iters")? {
builder.max_iterations(max_iters);
}
if let Some(num_bits) = obj.get_opt_u32(cx, "num_bits")? {
builder.num_bits(num_bits);
}
if let Some(replace) = obj.get_opt::<JsBoolean, _, _>(cx, "replace")? {
builder.replace(replace.value(cx));
}
Ok(())
}

View File

@@ -15,7 +15,7 @@
use std::sync::Arc;
use async_trait::async_trait;
use lance::io::object_store::ObjectStoreParams;
use lance::io::ObjectStoreParams;
use neon::prelude::*;
use object_store::aws::{AwsCredential, AwsCredentialProvider};
use object_store::CredentialProvider;

View File

@@ -40,17 +40,6 @@ impl JsQuery {
}
projection_vec
});
let filter = query_obj
.get_opt::<JsString, _, _>(&mut cx, "_filter")?
.map(|s| s.value(&mut cx));
let refine_factor = query_obj
.get_opt_u32(&mut cx, "_refineFactor")
.or_throw(&mut cx)?;
let nprobes = query_obj.get_usize(&mut cx, "_nprobes").or_throw(&mut cx)?;
let metric_type = query_obj
.get_opt::<JsString, _, _>(&mut cx, "_metricType")?
.map(|s| s.value(&mut cx))
.map(|s| MetricType::try_from(s.as_str()).unwrap());
let prefilter = query_obj
.get::<JsBoolean, _, _>(&mut cx, "_prefilter")?
@@ -65,24 +54,41 @@ impl JsQuery {
let (deferred, promise) = cx.promise();
let channel = cx.channel();
let query_vector = query_obj.get_opt::<JsArray, _, _>(&mut cx, "_queryVector")?;
let table = js_table.table.clone();
let query = query_vector.map(|q| convert::js_array_to_vec(q.deref(), &mut cx));
let query_vector = query_obj.get_opt::<JsArray, _, _>(&mut cx, "_queryVector")?;
let mut builder = table.query();
if let Some(query) = query_vector.map(|q| convert::js_array_to_vec(q.deref(), &mut cx)) {
builder = builder.nearest_to(&query);
if let Some(metric_type) = query_obj
.get_opt::<JsString, _, _>(&mut cx, "_metricType")?
.map(|s| s.value(&mut cx))
.map(|s| MetricType::try_from(s.as_str()).unwrap())
{
builder = builder.metric_type(metric_type);
}
let nprobes = query_obj.get_usize(&mut cx, "_nprobes").or_throw(&mut cx)?;
builder = builder.nprobes(nprobes);
};
if let Some(filter) = query_obj
.get_opt::<JsString, _, _>(&mut cx, "_filter")?
.map(|s| s.value(&mut cx))
{
builder = builder.filter(filter);
}
if let Some(select) = select {
builder = builder.select(select.as_slice());
}
if let Some(limit) = limit {
builder = builder.limit(limit as usize);
};
builder = builder.prefilter(prefilter);
rt.spawn(async move {
let mut builder = table
.search(query)
.refine_factor(refine_factor)
.nprobes(nprobes)
.filter(filter)
.metric_type(metric_type)
.select(select)
.prefilter(prefilter);
if let Some(limit) = limit {
builder = builder.limit(limit as usize);
};
let record_batch_stream = builder.execute();
let record_batch_stream = builder.execute_stream();
let results = record_batch_stream
.and_then(|stream| {
stream

View File

@@ -15,24 +15,24 @@
use arrow_array::{RecordBatch, RecordBatchIterator};
use lance::dataset::optimize::CompactionOptions;
use lance::dataset::{WriteMode, WriteParams};
use lance::io::object_store::ObjectStoreParams;
use lance::io::ObjectStoreParams;
use crate::arrow::{arrow_buffer_to_record_batch, record_batch_to_buffer};
use neon::prelude::*;
use neon::types::buffer::TypedArray;
use vectordb::Table;
use vectordb::TableRef;
use crate::error::ResultExt;
use crate::{convert, get_aws_creds, get_aws_region, runtime, JsDatabase};
pub(crate) struct JsTable {
pub table: Table,
pub table: TableRef,
}
impl Finalize for JsTable {}
impl From<Table> for JsTable {
fn from(table: Table) -> Self {
impl From<TableRef> for JsTable {
fn from(table: TableRef) -> Self {
JsTable { table }
}
}
@@ -96,7 +96,7 @@ impl JsTable {
arrow_buffer_to_record_batch(buffer.as_slice(&cx)).or_throw(&mut cx)?;
let rt = runtime(&mut cx)?;
let channel = cx.channel();
let mut table = js_table.table.clone();
let table = js_table.table.clone();
let (deferred, promise) = cx.promise();
let write_mode = match write_mode.as_str() {
@@ -118,7 +118,7 @@ impl JsTable {
rt.spawn(async move {
let batch_reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
let add_result = table.add(batch_reader, Some(params)).await;
let add_result = table.add(Box::new(batch_reader), Some(params)).await;
deferred.settle_with(&channel, move |mut cx| {
add_result.or_throw(&mut cx)?;
@@ -152,7 +152,7 @@ impl JsTable {
let (deferred, promise) = cx.promise();
let predicate = cx.argument::<JsString>(0)?.value(&mut cx);
let channel = cx.channel();
let mut table = js_table.table.clone();
let table = js_table.table.clone();
rt.spawn(async move {
let delete_result = table.delete(&predicate).await;
@@ -167,7 +167,7 @@ impl JsTable {
pub(crate) fn js_update(mut cx: FunctionContext) -> JsResult<JsPromise> {
let js_table = cx.this().downcast_or_throw::<JsBox<JsTable>, _>(&mut cx)?;
let mut table = js_table.table.clone();
let table = js_table.table.clone();
let rt = runtime(&mut cx)?;
let (deferred, promise) = cx.promise();
@@ -218,7 +218,11 @@ impl JsTable {
let predicate = predicate.as_deref();
let update_result = table.update(predicate, updates_arg).await;
let update_result = table
.as_native()
.unwrap()
.update(predicate, updates_arg)
.await;
deferred.settle_with(&channel, move |mut cx| {
update_result.or_throw(&mut cx)?;
Ok(cx.boxed(JsTable::from(table)))
@@ -249,6 +253,8 @@ impl JsTable {
rt.spawn(async move {
let stats = table
.as_native()
.unwrap()
.cleanup_old_versions(older_than, Some(delete_unverified))
.await;
@@ -278,7 +284,7 @@ impl JsTable {
let js_table = cx.this().downcast_or_throw::<JsBox<JsTable>, _>(&mut cx)?;
let rt = runtime(&mut cx)?;
let (deferred, promise) = cx.promise();
let mut table = js_table.table.clone();
let table = js_table.table.clone();
let channel = cx.channel();
let js_options = cx.argument::<JsObject>(0)?;
@@ -310,7 +316,11 @@ impl JsTable {
}
rt.spawn(async move {
let stats = table.compact_files(options, None).await;
let stats = table
.as_native()
.unwrap()
.compact_files(options, None)
.await;
deferred.settle_with(&channel, move |mut cx| {
let stats = stats.or_throw(&mut cx)?;
@@ -349,7 +359,7 @@ impl JsTable {
let table = js_table.table.clone();
rt.spawn(async move {
let indices = table.load_indices().await;
let indices = table.as_native().unwrap().load_indices().await;
deferred.settle_with(&channel, move |mut cx| {
let indices = indices.or_throw(&mut cx)?;
@@ -389,8 +399,8 @@ impl JsTable {
rt.spawn(async move {
let load_stats = futures::try_join!(
table.count_indexed_rows(&index_uuid),
table.count_unindexed_rows(&index_uuid)
table.as_native().unwrap().count_indexed_rows(&index_uuid),
table.as_native().unwrap().count_unindexed_rows(&index_uuid)
);
deferred.settle_with(&channel, move |mut cx| {

View File

@@ -1,6 +1,6 @@
[package]
name = "vectordb"
version = "0.4.3"
version = "0.4.4"
edition = "2021"
description = "LanceDB: A serverless, low-latency vector database for AI applications"
license = "Apache-2.0"
@@ -16,6 +16,7 @@ arrow-data = { workspace = true }
arrow-schema = { workspace = true }
arrow-ord = { workspace = true }
arrow-cast = { workspace = true }
arrow-ipc.workspace = true
chrono = { workspace = true }
object_store = { workspace = true }
snafu = { workspace = true }
@@ -25,11 +26,11 @@ lance-index = { workspace = true }
lance-linalg = { workspace = true }
lance-testing = { workspace = true }
tokio = { version = "1.23", features = ["rt-multi-thread"] }
log = { workspace = true }
log.workspace = true
async-trait = "0"
bytes = "1"
futures = "0"
num-traits = "0"
futures.workspace = true
num-traits.workspace = true
url = { workspace = true }
serde = { version = "^1" }
serde_json = { version = "1" }
@@ -37,4 +38,4 @@ serde_json = { version = "1" }
[dev-dependencies]
tempfile = "3.5.0"
rand = { version = "0.8.3", features = ["small_rng"] }
walkdir = "2"
walkdir = "2"

View File

@@ -1,3 +1,8 @@
# LanceDB Rust
Rust client for LanceDB, a serverless vector database. Read more at: https://lancedb.com/
<a href="https://crates.io/crates/vectordb">![img](https://img.shields.io/crates/v/vectordb)</a>
<a href="https://docs.rs/vectordb/latest/vectordb/">![Docs.rs](https://img.shields.io/docsrs/vectordb)</a>
LanceDB Rust SDK, a serverless vector database.
Read more at: https://lancedb.com/

View File

@@ -21,13 +21,13 @@ use std::sync::Arc;
use arrow_array::RecordBatchReader;
use lance::dataset::WriteParams;
use lance::io::object_store::{ObjectStore, WrappingObjectStore};
use lance::io::{ObjectStore, WrappingObjectStore};
use object_store::local::LocalFileSystem;
use snafu::prelude::*;
use crate::error::{CreateDirSnafu, Error, InvalidTableNameSnafu, Result};
use crate::io::object_store::MirroringObjectStoreWrapper;
use crate::table::{ReadParams, Table};
use crate::table::{NativeTable, ReadParams, TableRef};
pub const LANCE_FILE_EXTENSION: &str = "lance";
@@ -46,17 +46,20 @@ pub trait Connection: Send + Sync {
/// * `params` - Optional [`WriteParams`] to create the table.
///
/// # Returns
/// Created [`Table`], or [`Err(Error::TableAlreadyExists)`] if the table already exists.
/// Created [`TableRef`], or [`Err(Error::TableAlreadyExists)`] if the table already exists.
async fn create_table(
&self,
name: &str,
batches: Box<dyn RecordBatchReader + Send>,
params: Option<WriteParams>,
) -> Result<Table>;
) -> Result<TableRef>;
async fn open_table(&self, name: &str) -> Result<Table>;
async fn open_table(&self, name: &str) -> Result<TableRef> {
self.open_table_with_params(name, ReadParams::default())
.await
}
async fn open_table_with_params(&self, name: &str, params: ReadParams) -> Result<Table>;
async fn open_table_with_params(&self, name: &str, params: ReadParams) -> Result<TableRef>;
/// Drop a table in the database.
///
@@ -240,30 +243,19 @@ impl Connection for Database {
name: &str,
batches: Box<dyn RecordBatchReader + Send>,
params: Option<WriteParams>,
) -> Result<Table> {
) -> Result<TableRef> {
let table_uri = self.table_uri(name)?;
Table::create(
&table_uri,
name,
batches,
self.store_wrapper.clone(),
params,
)
.await
}
/// Open a table in the database.
///
/// # Arguments
/// * `name` - The name of the table.
///
/// # Returns
///
/// * A [Table] object.
async fn open_table(&self, name: &str) -> Result<Table> {
self.open_table_with_params(name, ReadParams::default())
.await
Ok(Arc::new(
NativeTable::create(
&table_uri,
name,
batches,
self.store_wrapper.clone(),
params,
)
.await?,
))
}
/// Open a table in the database.
@@ -274,10 +266,13 @@ impl Connection for Database {
///
/// # Returns
///
/// * A [Table] object.
async fn open_table_with_params(&self, name: &str, params: ReadParams) -> Result<Table> {
/// * A [TableRef] object.
async fn open_table_with_params(&self, name: &str, params: ReadParams) -> Result<TableRef> {
let table_uri = self.table_uri(name)?;
Table::open_with_params(&table_uri, name, self.store_wrapper.clone(), params).await
Ok(Arc::new(
NativeTable::open_with_params(&table_uri, name, self.store_wrapper.clone(), params)
.await?,
))
}
async fn drop_table(&self, name: &str) -> Result<()> {

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::PoisonError;
use arrow_schema::ArrowError;
use snafu::Snafu;
@@ -35,6 +37,8 @@ pub enum Error {
Lance { message: String },
#[snafu(display("LanceDB Schema Error: {message}"))]
Schema { message: String },
#[snafu(display("Runtime error: {message}"))]
Runtime { message: String },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -70,3 +74,11 @@ impl From<object_store::path::Error> for Error {
}
}
}
impl<T> From<PoisonError<T>> for Error {
fn from(e: PoisonError<T>) -> Self {
Self::Runtime {
message: e.to_string(),
}
}
}

View File

@@ -1,4 +1,4 @@
// Copyright 2023 Lance Developers.
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -12,4 +12,281 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{cmp::max, sync::Arc};
use lance_index::{DatasetIndexExt, IndexType};
pub use lance_linalg::distance::MetricType;
pub mod vector;
use crate::{utils::default_vector_column, Error, Result, Table};
/// Index Parameters.
pub enum IndexParams {
Scalar {
replace: bool,
},
IvfPq {
replace: bool,
metric_type: MetricType,
num_partitions: u64,
num_sub_vectors: u32,
num_bits: u32,
sample_rate: u32,
max_iterations: u32,
},
}
/// Builder for Index Parameters.
pub struct IndexBuilder {
table: Arc<dyn Table>,
columns: Vec<String>,
// General parameters
/// Index name.
name: Option<String>,
/// Replace the existing index.
replace: bool,
index_type: IndexType,
// Scalar index parameters
// Nothing to set here.
// IVF_PQ parameters
metric_type: MetricType,
num_partitions: Option<u32>,
// PQ related
num_sub_vectors: Option<u32>,
num_bits: u32,
/// The rate to find samples to train kmeans.
sample_rate: u32,
/// Max iteration to train kmeans.
max_iterations: u32,
}
impl IndexBuilder {
pub(crate) fn new(table: Arc<dyn Table>, columns: &[&str]) -> Self {
IndexBuilder {
table,
columns: columns.iter().map(|c| c.to_string()).collect(),
name: None,
replace: true,
index_type: IndexType::Scalar,
metric_type: MetricType::L2,
num_partitions: None,
num_sub_vectors: None,
num_bits: 8,
sample_rate: 256,
max_iterations: 50,
}
}
/// Build a Scalar Index.
///
/// Accepted parameters:
/// - `replace`: Replace the existing index.
/// - `name`: Index name. Default: `None`
pub fn scalar(&mut self) -> &mut Self {
self.index_type = IndexType::Scalar;
self
}
/// Build an IVF PQ index.
///
/// Accepted parameters:
/// - `replace`: Replace the existing index.
/// - `name`: Index name. Default: `None`
/// - `metric_type`: [MetricType] to use to build Vector Index.
/// - `num_partitions`: Number of IVF partitions.
/// - `num_sub_vectors`: Number of sub-vectors of PQ.
/// - `num_bits`: Number of bits used for PQ centroids.
/// - `sample_rate`: The rate to find samples to train kmeans.
/// - `max_iterations`: Max iteration to train kmeans.
pub fn ivf_pq(&mut self) -> &mut Self {
self.index_type = IndexType::Vector;
self
}
/// The columns to build index on.
pub fn columns(&mut self, cols: &[&str]) -> &mut Self {
self.columns = cols.iter().map(|s| s.to_string()).collect();
self
}
/// Whether to replace the existing index, default is `true`.
pub fn replace(&mut self, v: bool) -> &mut Self {
self.replace = v;
self
}
/// Set the index name.
pub fn name(&mut self, name: &str) -> &mut Self {
self.name = Some(name.to_string());
self
}
/// [MetricType] to use to build Vector Index.
///
/// Default value is [MetricType::L2].
pub fn metric_type(&mut self, metric_type: MetricType) -> &mut Self {
self.metric_type = metric_type;
self
}
/// Number of IVF partitions.
pub fn num_partitions(&mut self, num_partitions: u32) -> &mut Self {
self.num_partitions = Some(num_partitions);
self
}
/// Number of sub-vectors of PQ.
pub fn num_sub_vectors(&mut self, num_sub_vectors: u32) -> &mut Self {
self.num_sub_vectors = Some(num_sub_vectors);
self
}
/// Number of bits used for PQ centroids.
pub fn num_bits(&mut self, num_bits: u32) -> &mut Self {
self.num_bits = num_bits;
self
}
/// The rate to find samples to train kmeans.
pub fn sample_rate(&mut self, sample_rate: u32) -> &mut Self {
self.sample_rate = sample_rate;
self
}
/// Max iteration to train kmeans.
pub fn max_iterations(&mut self, max_iterations: u32) -> &mut Self {
self.max_iterations = max_iterations;
self
}
/// Build the parameters.
pub async fn build(&self) -> Result<()> {
let schema = self.table.schema();
// TODO: simplify this after GH lance#1864.
let mut index_type = &self.index_type;
let columns = if self.columns.is_empty() {
// By default we create vector index.
index_type = &IndexType::Vector;
vec![default_vector_column(&schema, None)?]
} else {
self.columns.clone()
};
if columns.len() != 1 {
return Err(Error::Schema {
message: "Only one column is supported for index".to_string(),
});
}
let column = &columns[0];
let field = schema.field_with_name(column)?;
let params = match index_type {
IndexType::Scalar => IndexParams::Scalar {
replace: self.replace,
},
IndexType::Vector => {
let num_partitions = if let Some(n) = self.num_partitions {
n
} else {
suggested_num_partitions(self.table.count_rows().await?)
};
let num_sub_vectors: u32 = if let Some(n) = self.num_sub_vectors {
n
} else {
match field.data_type() {
arrow_schema::DataType::FixedSizeList(_, n) => {
Ok::<u32, Error>(suggested_num_sub_vectors(*n as u32))
}
_ => Err(Error::Schema {
message: format!(
"Column '{}' is not a FixedSizeList",
&self.columns[0]
),
}),
}?
};
IndexParams::IvfPq {
replace: self.replace,
metric_type: self.metric_type,
num_partitions: num_partitions as u64,
num_sub_vectors,
num_bits: self.num_bits,
sample_rate: self.sample_rate,
max_iterations: self.max_iterations,
}
}
};
let tbl = self
.table
.as_native()
.expect("Only native table is supported here");
let mut dataset = tbl.clone_inner_dataset();
match params {
IndexParams::Scalar { replace } => {
self.table
.as_native()
.unwrap()
.create_scalar_index(column, replace)
.await?
}
IndexParams::IvfPq {
replace,
metric_type,
num_partitions,
num_sub_vectors,
num_bits,
max_iterations,
..
} => {
let lance_idx_params = lance::index::vector::VectorIndexParams::ivf_pq(
num_partitions as usize,
num_bits as u8,
num_sub_vectors as usize,
false,
metric_type,
max_iterations as usize,
);
dataset
.create_index(
&[column],
IndexType::Vector,
None,
&lance_idx_params,
replace,
)
.await?;
}
}
tbl.reset_dataset(dataset);
Ok(())
}
}
fn suggested_num_partitions(rows: usize) -> u32 {
let num_partitions = (rows as f64).sqrt() as u32;
max(1, num_partitions)
}
fn suggested_num_sub_vectors(dim: u32) -> u32 {
if dim % 16 == 0 {
// Should be more aggressive than this default.
dim / 16
} else if dim % 8 == 0 {
dim / 8
} else {
log::warn!(
"The dimension of the vector is not divisible by 8 or 16, \
which may cause performance degradation in PQ"
);
1
}
}

View File

@@ -14,104 +14,7 @@
use serde::Deserialize;
use lance::format::{Index, Manifest};
use lance::index::vector::pq::PQBuildParams;
use lance::index::vector::VectorIndexParams;
use lance_index::vector::ivf::IvfBuildParams;
use lance_linalg::distance::MetricType;
pub trait VectorIndexBuilder {
fn get_column(&self) -> Option<String>;
fn get_index_name(&self) -> Option<String>;
fn build(&self) -> VectorIndexParams;
fn get_replace(&self) -> bool;
}
pub struct IvfPQIndexBuilder {
column: Option<String>,
index_name: Option<String>,
metric_type: Option<MetricType>,
ivf_params: Option<IvfBuildParams>,
pq_params: Option<PQBuildParams>,
replace: bool,
}
impl IvfPQIndexBuilder {
pub fn new() -> IvfPQIndexBuilder {
Default::default()
}
}
impl Default for IvfPQIndexBuilder {
fn default() -> Self {
IvfPQIndexBuilder {
column: None,
index_name: None,
metric_type: None,
ivf_params: None,
pq_params: None,
replace: true,
}
}
}
impl IvfPQIndexBuilder {
pub fn column(&mut self, column: String) -> &mut IvfPQIndexBuilder {
self.column = Some(column);
self
}
pub fn index_name(&mut self, index_name: String) -> &mut IvfPQIndexBuilder {
self.index_name = Some(index_name);
self
}
pub fn metric_type(&mut self, metric_type: MetricType) -> &mut IvfPQIndexBuilder {
self.metric_type = Some(metric_type);
self
}
pub fn ivf_params(&mut self, ivf_params: IvfBuildParams) -> &mut IvfPQIndexBuilder {
self.ivf_params = Some(ivf_params);
self
}
pub fn pq_params(&mut self, pq_params: PQBuildParams) -> &mut IvfPQIndexBuilder {
self.pq_params = Some(pq_params);
self
}
pub fn replace(&mut self, replace: bool) -> &mut IvfPQIndexBuilder {
self.replace = replace;
self
}
}
impl VectorIndexBuilder for IvfPQIndexBuilder {
fn get_column(&self) -> Option<String> {
self.column.clone()
}
fn get_index_name(&self) -> Option<String> {
self.index_name.clone()
}
fn build(&self) -> VectorIndexParams {
let ivf_params = self.ivf_params.clone().unwrap_or_default();
let pq_params = self.pq_params.clone().unwrap_or_default();
VectorIndexParams::with_ivf_pq_params(
self.metric_type.unwrap_or(MetricType::L2),
ivf_params,
pq_params,
)
}
fn get_replace(&self) -> bool {
self.replace
}
}
use lance::table::format::{Index, Manifest};
pub struct VectorIndex {
pub columns: Vec<String>,
@@ -139,79 +42,3 @@ pub struct VectorIndexStatistics {
pub num_indexed_rows: usize,
pub num_unindexed_rows: usize,
}
#[cfg(test)]
mod tests {
use super::*;
use lance::index::vector::StageParams;
use lance_index::vector::ivf::IvfBuildParams;
use lance_index::vector::pq::PQBuildParams;
use crate::index::vector::{IvfPQIndexBuilder, VectorIndexBuilder};
#[test]
fn test_builder_no_params() {
let index_builder = IvfPQIndexBuilder::new();
assert!(index_builder.get_column().is_none());
assert!(index_builder.get_index_name().is_none());
let index_params = index_builder.build();
assert_eq!(index_params.stages.len(), 2);
if let StageParams::Ivf(ivf_params) = index_params.stages.get(0).unwrap() {
let default = IvfBuildParams::default();
assert_eq!(ivf_params.num_partitions, default.num_partitions);
assert_eq!(ivf_params.max_iters, default.max_iters);
} else {
panic!("Expected first stage to be ivf")
}
if let StageParams::PQ(pq_params) = index_params.stages.get(1).unwrap() {
assert_eq!(pq_params.use_opq, false);
} else {
panic!("Expected second stage to be pq")
}
}
#[test]
fn test_builder_all_params() {
let mut index_builder = IvfPQIndexBuilder::new();
index_builder
.column("c".to_owned())
.metric_type(MetricType::Cosine)
.index_name("index".to_owned());
assert_eq!(index_builder.column.clone().unwrap(), "c");
assert_eq!(index_builder.metric_type.unwrap(), MetricType::Cosine);
assert_eq!(index_builder.index_name.clone().unwrap(), "index");
let ivf_params = IvfBuildParams::new(500);
let mut pq_params = PQBuildParams::default();
pq_params.use_opq = true;
pq_params.max_iters = 1;
pq_params.num_bits = 8;
pq_params.num_sub_vectors = 50;
pq_params.max_opq_iters = 2;
index_builder.ivf_params(ivf_params);
index_builder.pq_params(pq_params);
let index_params = index_builder.build();
assert_eq!(index_params.stages.len(), 2);
if let StageParams::Ivf(ivf_params) = index_params.stages.get(0).unwrap() {
assert_eq!(ivf_params.num_partitions, 500);
} else {
assert!(false, "Expected first stage to be ivf")
}
if let StageParams::PQ(pq_params) = index_params.stages.get(1).unwrap() {
assert_eq!(pq_params.use_opq, true);
assert_eq!(pq_params.max_iters, 1);
assert_eq!(pq_params.num_bits, 8);
assert_eq!(pq_params.num_sub_vectors, 50);
assert_eq!(pq_params.max_opq_iters, 2);
} else {
assert!(false, "Expected second stage to be pq")
}
}
}

View File

@@ -23,7 +23,7 @@ use std::{
use bytes::Bytes;
use futures::{stream::BoxStream, FutureExt, StreamExt};
use lance::io::object_store::WrappingObjectStore;
use lance::io::WrappingObjectStore;
use object_store::{
path::Path, Error, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
PutOptions, PutResult, Result,
@@ -335,14 +335,15 @@ impl WrappingObjectStore for MirroringObjectStoreWrapper {
#[cfg(all(test, not(windows)))]
mod test {
use super::*;
use crate::connection::{Connection, Database};
use arrow_array::PrimitiveArray;
use futures::TryStreamExt;
use lance::{dataset::WriteParams, io::object_store::ObjectStoreParams};
use lance::{dataset::WriteParams, io::ObjectStoreParams};
use lance_testing::datagen::{BatchGenerator, IncrementingInt32, RandomVector};
use object_store::local::LocalFileSystem;
use tempfile;
use crate::connection::{Connection, Database};
#[tokio::test]
async fn test_e2e() {
let dir1 = tempfile::tempdir().unwrap().into_path();
@@ -374,11 +375,9 @@ mod test {
assert_eq!(t.count_rows().await.unwrap(), 100);
let q = t
.search(Some(PrimitiveArray::from_iter_values(vec![
0.1, 0.1, 0.1, 0.1,
])))
.search(&[0.1, 0.1, 0.1, 0.1])
.limit(10)
.execute()
.execute_stream()
.await
.unwrap();

79
rust/vectordb/src/ipc.rs Normal file
View File

@@ -0,0 +1,79 @@
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! IPC support
use std::io::Cursor;
use arrow_array::RecordBatchReader;
use arrow_ipc::reader::StreamReader;
use crate::Result;
/// Convert a Arrow IPC file to a batch reader
pub fn ipc_file_to_batches(buf: Vec<u8>) -> Result<impl RecordBatchReader> {
let buf_reader = Cursor::new(buf);
let reader = StreamReader::try_new(buf_reader, None)?;
Ok(reader)
}
#[cfg(test)]
mod tests {
use super::*;
use arrow_array::{Float32Array, Int64Array, RecordBatch};
use arrow_ipc::writer::StreamWriter;
use arrow_schema::{DataType, Field, Schema};
use std::sync::Arc;
fn create_record_batch() -> Result<RecordBatch> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Float32, false),
]);
let a = Int64Array::from(vec![1, 2, 3, 4, 5]);
let b = Float32Array::from(vec![1.1, 2.2, 3.3, 4.4, 5.5]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])?;
Ok(batch)
}
#[test]
fn test_ipc_file_to_batches() -> Result<()> {
let batch = create_record_batch()?;
let mut writer = StreamWriter::try_new(vec![], &batch.schema())?;
writer.write(&batch)?;
writer.finish()?;
let buf = writer.into_inner().unwrap();
let mut reader = ipc_file_to_batches(buf).unwrap();
let read_batch = reader.next().unwrap()?;
assert_eq!(batch.num_columns(), read_batch.num_columns());
assert_eq!(batch.num_rows(), read_batch.num_rows());
for i in 0..batch.num_columns() {
let batch_column = batch.column(i);
let read_batch_column = read_batch.column(i);
assert_eq!(batch_column.data_type(), read_batch_column.data_type());
assert_eq!(batch_column.len(), read_batch_column.len());
}
Ok(())
}
}

View File

@@ -46,8 +46,8 @@
//! #### Connect to a database.
//!
//! ```rust
//! use vectordb::{connection::{Database, Connection}, Table, WriteMode};
//! use arrow_schema::{Field, Schema};
//! use vectordb::connection::Database;
//! # use arrow_schema::{Field, Schema};
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
//! let db = Database::connect("data/sample-lancedb").await.unwrap();
//! # });
@@ -55,7 +55,7 @@
//!
//! LanceDB uses [arrow-rs](https://github.com/apache/arrow-rs) to define schema, data types and array itself.
//! It treats [`FixedSizeList<Float16/Float32>`](https://docs.rs/arrow/latest/arrow/array/struct.FixedSizeListArray.html)
//! columns as vectors.
//! columns as vector columns.
//!
//! #### Create a table
//!
@@ -90,6 +90,27 @@
//! # });
//! ```
//!
//! #### Create vector index (IVF_PQ)
//!
//! ```no_run
//! # use std::sync::Arc;
//! # use vectordb::connection::{Database, Connection};
//! # use arrow_array::{FixedSizeListArray, types::Float32Type, RecordBatch,
//! # RecordBatchIterator, Int32Array};
//! # use arrow_schema::{Schema, Field, DataType};
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
//! # let tmpdir = tempfile::tempdir().unwrap();
//! # let db = Database::connect(tmpdir.path().to_str().unwrap()).await.unwrap();
//! # let tbl = db.open_table("idx_test").await.unwrap();
//! tbl.create_index(&["vector"])
//! .ivf_pq()
//! .num_partitions(256)
//! .build()
//! .await
//! .unwrap();
//! # });
//! ```
//!
//! #### Open table and run search
//!
//! ```rust
@@ -119,8 +140,8 @@
//! # db.create_table("my_table", Box::new(batches), None).await.unwrap();
//! let table = db.open_table("my_table").await.unwrap();
//! let results = table
//! .search(Some(vec![1.0; 128]))
//! .execute()
//! .search(&[1.0; 128])
//! .execute_stream()
//! .await
//! .unwrap()
//! .try_collect::<Vec<_>>()
@@ -136,11 +157,13 @@ pub mod data;
pub mod error;
pub mod index;
pub mod io;
pub mod ipc;
pub mod query;
pub mod table;
pub mod utils;
pub use connection::Connection;
pub use table::Table;
pub use connection::{Connection, Database};
pub use error::{Error, Result};
pub use table::{Table, TableRef};
pub use lance::dataset::WriteMode;

View File

@@ -1,4 +1,4 @@
// Copyright 2023 Lance Developers.
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -15,25 +15,42 @@
use std::sync::Arc;
use arrow_array::Float32Array;
use arrow_schema::Schema;
use lance::dataset::scanner::{DatasetRecordBatchStream, Scanner};
use lance::dataset::Dataset;
use lance_linalg::distance::MetricType;
use crate::error::Result;
use crate::utils::default_vector_column;
const DEFAULT_TOP_K: usize = 10;
/// A builder for nearest neighbor queries for LanceDB.
#[derive(Clone)]
pub struct Query {
pub dataset: Arc<Dataset>,
pub query_vector: Option<Float32Array>,
pub column: String,
pub limit: Option<usize>,
pub filter: Option<String>,
pub select: Option<Vec<String>>,
pub nprobes: usize,
pub refine_factor: Option<u32>,
pub metric_type: Option<MetricType>,
pub use_index: bool,
pub prefilter: bool,
dataset: Arc<Dataset>,
// The column to run the query on. If not specified, we will attempt to guess
// the column based on the dataset's schema.
column: Option<String>,
// IVF PQ - ANN search.
query_vector: Option<Float32Array>,
nprobes: usize,
refine_factor: Option<u32>,
metric_type: Option<MetricType>,
/// limit the number of rows to return.
limit: Option<usize>,
/// Apply filter to the returned rows.
filter: Option<String>,
/// Select column projection.
select: Option<Vec<String>>,
/// Default is true. Set to false to enforce a brute force search.
use_index: bool,
/// Apply filter before ANN search/
prefilter: bool,
}
impl Query {
@@ -41,17 +58,13 @@ impl Query {
///
/// # Arguments
///
/// * `dataset` - The table / dataset the query will be run against.
/// * `vector` The vector used for this query.
/// * `dataset` - Lance dataset.
///
/// # Returns
///
/// * A [Query] object.
pub(crate) fn new(dataset: Arc<Dataset>, vector: Option<Float32Array>) -> Self {
pub(crate) fn new(dataset: Arc<Dataset>) -> Self {
Query {
dataset,
query_vector: vector,
column: crate::table::VECTOR_COLUMN_NAME.to_string(),
query_vector: None,
column: None,
limit: None,
nprobes: 20,
refine_factor: None,
@@ -63,17 +76,24 @@ impl Query {
}
}
/// Execute the queries and return its results.
/// Convert the query plan to a [`DatasetRecordBatchStream`]
///
/// # Returns
///
/// * A [DatasetRecordBatchStream] with the query's results.
pub async fn execute(&self) -> Result<DatasetRecordBatchStream> {
pub async fn execute_stream(&self) -> Result<DatasetRecordBatchStream> {
let mut scanner: Scanner = self.dataset.scan();
if let Some(query) = self.query_vector.as_ref() {
// If there is a vector query, default to limit=10 if unspecified
scanner.nearest(&self.column, query, self.limit.unwrap_or(10))?;
let column = if let Some(col) = self.column.as_ref() {
col.clone()
} else {
// Infer a vector column with the same dimension of the query vector.
let arrow_schema = Schema::from(self.dataset.schema());
default_vector_column(&arrow_schema, Some(query.len() as i32))?
};
scanner.nearest(&column, query, self.limit.unwrap_or(DEFAULT_TOP_K))?;
} else {
// If there is no vector query, it's ok to not have a limit
scanner.limit(self.limit.map(|limit| limit as i64), None)?;
@@ -94,8 +114,8 @@ impl Query {
/// # Arguments
///
/// * `column` - The column name
pub fn column(mut self, column: &str) -> Query {
self.column = column.into();
pub fn column(mut self, column: &str) -> Self {
self.column = Some(column.to_string());
self
}
@@ -104,18 +124,18 @@ impl Query {
/// # Arguments
///
/// * `limit` - The maximum number of results to return.
pub fn limit(mut self, limit: usize) -> Query {
pub fn limit(mut self, limit: usize) -> Self {
self.limit = Some(limit);
self
}
/// Set the vector used for this query.
/// Find the nearest vectors to the given query vector.
///
/// # Arguments
///
/// * `vector` - The vector that will be used for search.
pub fn query_vector(mut self, query_vector: Float32Array) -> Query {
self.query_vector = Some(query_vector);
pub fn nearest_to(mut self, vector: &[f32]) -> Self {
self.query_vector = Some(Float32Array::from(vector.to_vec()));
self
}
@@ -124,7 +144,7 @@ impl Query {
/// # Arguments
///
/// * `nprobes` - The number of probes to use.
pub fn nprobes(mut self, nprobes: usize) -> Query {
pub fn nprobes(mut self, nprobes: usize) -> Self {
self.nprobes = nprobes;
self
}
@@ -134,8 +154,8 @@ impl Query {
/// # Arguments
///
/// * `refine_factor` - The refine factor to use.
pub fn refine_factor(mut self, refine_factor: Option<u32>) -> Query {
self.refine_factor = refine_factor;
pub fn refine_factor(mut self, refine_factor: u32) -> Self {
self.refine_factor = Some(refine_factor);
self
}
@@ -144,8 +164,8 @@ impl Query {
/// # Arguments
///
/// * `metric_type` - The distance metric to use. By default [MetricType::L2] is used.
pub fn metric_type(mut self, metric_type: Option<MetricType>) -> Query {
self.metric_type = metric_type;
pub fn metric_type(mut self, metric_type: MetricType) -> Self {
self.metric_type = Some(metric_type);
self
}
@@ -154,7 +174,7 @@ impl Query {
/// # Arguments
///
/// * `use_index` - Sets Whether to use an ANN index if available
pub fn use_index(mut self, use_index: bool) -> Query {
pub fn use_index(mut self, use_index: bool) -> Self {
self.use_index = use_index;
self
}
@@ -163,21 +183,21 @@ impl Query {
///
/// # Arguments
///
/// * `filter` - value A filter in the same format used by a sql WHERE clause.
pub fn filter(mut self, filter: Option<String>) -> Query {
self.filter = filter;
/// * `filter` - SQL filter
pub fn filter(mut self, filter: impl AsRef<str>) -> Self {
self.filter = Some(filter.as_ref().to_string());
self
}
/// Return only the specified columns.
///
/// Only select the specified columns. If not specified, all columns will be returned.
pub fn select(mut self, columns: Option<Vec<String>>) -> Query {
self.select = columns;
pub fn select(mut self, columns: &[impl AsRef<str>]) -> Self {
self.select = Some(columns.iter().map(|c| c.as_ref().to_string()).collect());
self
}
pub fn prefilter(mut self, prefilter: bool) -> Query {
pub fn prefilter(mut self, prefilter: bool) -> Self {
self.prefilter = prefilter;
self
}
@@ -196,8 +216,10 @@ mod tests {
use futures::StreamExt;
use lance::dataset::Dataset;
use lance_testing::datagen::{BatchGenerator, IncrementingInt32, RandomVector};
use tempfile::tempdir;
use crate::query::Query;
use crate::table::{NativeTable, Table};
#[tokio::test]
async fn test_setters_getters() {
@@ -205,18 +227,18 @@ mod tests {
let ds = Dataset::write(batches, "memory://foo", None).await.unwrap();
let vector = Some(Float32Array::from_iter_values([0.1, 0.2]));
let query = Query::new(Arc::new(ds), vector.clone());
let query = Query::new(Arc::new(ds)).nearest_to(&[0.1, 0.2]);
assert_eq!(query.query_vector, vector);
let new_vector = Float32Array::from_iter_values([9.8, 8.7]);
let query = query
.query_vector(new_vector.clone())
.nearest_to(&[9.8, 8.7])
.limit(100)
.nprobes(1000)
.use_index(true)
.metric_type(Some(MetricType::Cosine))
.refine_factor(Some(999));
.metric_type(MetricType::Cosine)
.refine_factor(999);
assert_eq!(query.query_vector.unwrap(), new_vector);
assert_eq!(query.limit.unwrap(), 100);
@@ -231,14 +253,8 @@ mod tests {
let batches = make_non_empty_batches();
let ds = Arc::new(Dataset::write(batches, "memory://foo", None).await.unwrap());
let vector = Some(Float32Array::from_iter_values([0.1; 4]));
let query = Query::new(ds.clone(), vector.clone());
let result = query
.limit(10)
.filter(Some("id % 2 == 0".to_string()))
.execute()
.await;
let query = Query::new(ds.clone()).nearest_to(&[0.1; 4]);
let result = query.limit(10).filter("id % 2 == 0").execute_stream().await;
let mut stream = result.expect("should have result");
// should only have one batch
while let Some(batch) = stream.next().await {
@@ -246,12 +262,12 @@ mod tests {
assert!(batch.expect("should be Ok").num_rows() < 10);
}
let query = Query::new(ds, vector.clone());
let query = Query::new(ds).nearest_to(&[0.1; 4]);
let result = query
.limit(10)
.filter(Some("id % 2 == 0".to_string()))
.filter(String::from("id % 2 == 0")) // Work with String too
.prefilter(true)
.execute()
.execute_stream()
.await;
let mut stream = result.expect("should have result");
// should only have one batch
@@ -267,11 +283,8 @@ mod tests {
let batches = make_non_empty_batches();
let ds = Arc::new(Dataset::write(batches, "memory://foo", None).await.unwrap());
let query = Query::new(ds.clone(), None);
let result = query
.filter(Some("id % 2 == 0".to_string()))
.execute()
.await;
let query = Query::new(ds.clone());
let result = query.filter("id % 2 == 0").execute_stream().await;
let mut stream = result.expect("should have result");
// should only have one batch
while let Some(batch) = stream.next().await {
@@ -309,4 +322,21 @@ mod tests {
schema,
)
}
#[tokio::test]
async fn test_search() {
let tmp_dir = tempdir().unwrap();
let dataset_path = tmp_dir.path().join("test.lance");
let uri = dataset_path.to_str().unwrap();
let batches = make_test_batches();
Dataset::write(batches, dataset_path.to_str().unwrap(), None)
.await
.unwrap();
let table = NativeTable::open(uri).await.unwrap();
let query = table.search(&[0.1, 0.2]);
assert_eq!(&[0.1, 0.2], query.query_vector.unwrap().values());
}
}

View File

@@ -1,4 +1,4 @@
// Copyright 2023 LanceDB Developers.
// Copyright 2024 LanceDB Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -12,72 +12,226 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! LanceDB Table APIs
use std::path::Path;
use std::sync::{Arc, Mutex};
use arrow_array::RecordBatchReader;
use arrow_schema::{Schema, SchemaRef};
use chrono::Duration;
use lance::dataset::builder::DatasetBuilder;
use lance::index::scalar::ScalarIndexParams;
use lance_index::optimize::OptimizeOptions;
use lance_index::IndexType;
use std::sync::Arc;
use arrow_array::{Float32Array, RecordBatchReader};
use arrow_schema::SchemaRef;
use lance::dataset::cleanup::RemovalStats;
use lance::dataset::optimize::{
compact_files, CompactionMetrics, CompactionOptions, IndexRemapperOptions,
};
pub use lance::dataset::ReadParams;
use lance::dataset::{Dataset, UpdateBuilder, WriteParams};
use lance::io::object_store::WrappingObjectStore;
use lance_index::DatasetIndexExt;
use std::path::Path;
use lance::index::scalar::ScalarIndexParams;
use lance::io::WrappingObjectStore;
use lance_index::{optimize::OptimizeOptions, DatasetIndexExt, IndexType};
use crate::error::{Error, Result};
use crate::index::vector::{VectorIndex, VectorIndexBuilder, VectorIndexStatistics};
use crate::index::vector::{VectorIndex, VectorIndexStatistics};
use crate::index::IndexBuilder;
use crate::query::Query;
use crate::utils::{PatchReadParam, PatchWriteParam};
use crate::WriteMode;
pub use lance::dataset::ReadParams;
pub const VECTOR_COLUMN_NAME: &str = "vector";
/// A Table is a collection of strong typed Rows.
///
/// The type of the each row is defined in Apache Arrow [Schema].
#[async_trait::async_trait]
pub trait Table: std::fmt::Display + Send + Sync {
fn as_any(&self) -> &dyn std::any::Any;
/// Cast as [`NativeTable`], or return None it if is not a [`NativeTable`].
fn as_native(&self) -> Option<&NativeTable>;
/// Get the name of the table.
fn name(&self) -> &str;
/// Get the arrow [Schema] of the table.
fn schema(&self) -> SchemaRef;
/// Count the number of rows in this dataset.
async fn count_rows(&self) -> Result<usize>;
/// Insert new records into this Table
///
/// # Arguments
///
/// * `batches` RecordBatch to be saved in the Table
/// * `params` Append / Overwrite existing records. Default: Append
async fn add(
&self,
batches: Box<dyn RecordBatchReader + Send>,
params: Option<WriteParams>,
) -> Result<()>;
/// Delete the rows from table that match the predicate.
///
/// # Arguments
/// - `predicate` - The SQL predicate string to filter the rows to be deleted.
///
/// # Example
///
/// ```no_run
/// # use std::sync::Arc;
/// # use vectordb::connection::{Database, Connection};
/// # use arrow_array::{FixedSizeListArray, types::Float32Type, RecordBatch,
/// # RecordBatchIterator, Int32Array};
/// # use arrow_schema::{Schema, Field, DataType};
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// let tmpdir = tempfile::tempdir().unwrap();
/// let db = Database::connect(tmpdir.path().to_str().unwrap()).await.unwrap();
/// # let schema = Arc::new(Schema::new(vec![
/// # Field::new("id", DataType::Int32, false),
/// # Field::new("vector", DataType::FixedSizeList(
/// # Arc::new(Field::new("item", DataType::Float32, true)), 128), true),
/// # ]));
/// let batches = RecordBatchIterator::new(vec![
/// RecordBatch::try_new(schema.clone(),
/// vec![
/// Arc::new(Int32Array::from_iter_values(0..10)),
/// Arc::new(FixedSizeListArray::from_iter_primitive::<Float32Type, _, _>(
/// (0..10).map(|_| Some(vec![Some(1.0); 128])), 128)),
/// ]).unwrap()
/// ].into_iter().map(Ok),
/// schema.clone());
/// let tbl = db.create_table("delete_test", Box::new(batches), None).await.unwrap();
/// tbl.delete("id > 5").await.unwrap();
/// # });
/// ```
async fn delete(&self, predicate: &str) -> Result<()>;
/// Create an index on the column name.
///
/// # Examples
///
/// ```no_run
/// # use std::sync::Arc;
/// # use vectordb::connection::{Database, Connection};
/// # use arrow_array::{FixedSizeListArray, types::Float32Type, RecordBatch,
/// # RecordBatchIterator, Int32Array};
/// # use arrow_schema::{Schema, Field, DataType};
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// let tmpdir = tempfile::tempdir().unwrap();
/// let db = Database::connect(tmpdir.path().to_str().unwrap()).await.unwrap();
/// # let tbl = db.open_table("idx_test").await.unwrap();
/// tbl.create_index(&["vector"])
/// .ivf_pq()
/// .num_partitions(256)
/// .build()
/// .await
/// .unwrap();
/// # });
/// ```
fn create_index(&self, column: &[&str]) -> IndexBuilder;
/// Search the table with a given query vector.
///
/// This is a convenience method for preparing an ANN query.
fn search(&self, query: &[f32]) -> Query {
self.query().nearest_to(query)
}
/// Create a generic [`Query`] Builder.
///
/// When appropriate, various indices and statistics based pruning will be used to
/// accelerate the query.
///
/// # Examples
///
/// ## Run a vector search (ANN) query.
///
/// ```no_run
/// # use arrow_array::RecordBatch;
/// # use futures::TryStreamExt;
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// # let tbl = vectordb::table::NativeTable::open("/tmp/tbl").await.unwrap();
/// let stream = tbl.query().nearest_to(&[1.0, 2.0, 3.0])
/// .refine_factor(5)
/// .nprobes(10)
/// .execute_stream()
/// .await
/// .unwrap();
/// let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
/// # });
/// ```
///
/// ## Run a SQL-style filter
/// ```no_run
/// # use arrow_array::RecordBatch;
/// # use futures::TryStreamExt;
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// # let tbl = vectordb::table::NativeTable::open("/tmp/tbl").await.unwrap();
/// let stream = tbl
/// .query()
/// .filter("id > 5")
/// .limit(1000)
/// .execute_stream()
/// .await
/// .unwrap();
/// let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
/// # });
/// ```
///
/// ## Run a full scan query.
/// ```no_run
/// # use arrow_array::RecordBatch;
/// # use futures::TryStreamExt;
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// # let tbl = vectordb::table::NativeTable::open("/tmp/tbl").await.unwrap();
/// let stream = tbl
/// .query()
/// .execute_stream()
/// .await
/// .unwrap();
/// let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
/// # });
/// ```
fn query(&self) -> Query;
}
/// Reference to a Table pointer.
pub type TableRef = Arc<dyn Table>;
/// A table in a LanceDB database.
#[derive(Debug, Clone)]
pub struct Table {
pub struct NativeTable {
name: String,
uri: String,
dataset: Arc<Dataset>,
dataset: Arc<Mutex<Dataset>>,
// the object store wrapper to use on write path
store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
}
impl std::fmt::Display for Table {
impl std::fmt::Display for NativeTable {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Table({})", self.name)
}
}
impl Table {
impl NativeTable {
/// Opens an existing Table
///
/// # Arguments
///
/// * `uri` - The uri to a [Table]
/// * `uri` - The uri to a [NativeTable]
/// * `name` - The table name
///
/// # Returns
///
/// * A [Table] object.
/// * A [NativeTable] object.
pub async fn open(uri: &str) -> Result<Self> {
let name = Self::get_table_name(uri)?;
Self::open_with_params(uri, &name, None, ReadParams::default()).await
}
/// Open an Table with a given name.
pub async fn open_with_name(uri: &str, name: &str) -> Result<Self> {
Self::open_with_params(uri, name, None, ReadParams::default()).await
}
/// Opens an existing Table
///
/// # Arguments
@@ -88,7 +242,7 @@ impl Table {
///
/// # Returns
///
/// * A [Table] object.
/// * A [NativeTable] object.
pub async fn open_with_params(
uri: &str,
name: &str,
@@ -113,25 +267,26 @@ impl Table {
message: e.to_string(),
},
})?;
Ok(Table {
Ok(NativeTable {
name: name.to_string(),
uri: uri.to_string(),
dataset: Arc::new(dataset),
dataset: Arc::new(Mutex::new(dataset)),
store_wrapper: write_store_wrapper,
})
}
/// Checkout a specific version of this [`Table`]
/// Make a new clone of the internal lance dataset.
pub(crate) fn clone_inner_dataset(&self) -> Dataset {
self.dataset.lock().expect("Lock poison").clone()
}
/// Checkout a specific version of this [NativeTable]
///
pub async fn checkout(uri: &str, version: u64) -> Result<Self> {
let name = Self::get_table_name(uri)?;
Self::checkout_with_params(uri, &name, version, None, ReadParams::default()).await
}
pub async fn checkout_with_name(uri: &str, name: &str, version: u64) -> Result<Self> {
Self::checkout_with_params(uri, name, version, None, ReadParams::default()).await
}
pub async fn checkout_with_params(
uri: &str,
name: &str,
@@ -154,26 +309,27 @@ impl Table {
message: e.to_string(),
},
})?;
Ok(Table {
Ok(NativeTable {
name: name.to_string(),
uri: uri.to_string(),
dataset: Arc::new(dataset),
dataset: Arc::new(Mutex::new(dataset)),
store_wrapper: write_store_wrapper,
})
}
pub async fn checkout_latest(&self) -> Result<Self> {
let latest_version_id = self.dataset.latest_version_id().await?;
let dataset = if latest_version_id == self.dataset.version().version {
self.dataset.clone()
let dataset = self.clone_inner_dataset();
let latest_version_id = dataset.latest_version_id().await?;
let dataset = if latest_version_id == dataset.version().version {
dataset
} else {
Arc::new(self.dataset.checkout_version(latest_version_id).await?)
dataset.checkout_version(latest_version_id).await?
};
Ok(Table {
Ok(Self {
name: self.name.clone(),
uri: self.uri.clone(),
dataset,
dataset: Arc::new(Mutex::new(dataset)),
store_wrapper: self.store_wrapper.clone(),
})
}
@@ -203,8 +359,8 @@ impl Table {
///
/// # Returns
///
/// * A [Table] object.
pub async fn create(
/// * A [TableImpl] object.
pub(crate) async fn create(
uri: &str,
name: &str,
batches: impl RecordBatchReader + Send + 'static,
@@ -227,46 +383,22 @@ impl Table {
message: e.to_string(),
},
})?;
Ok(Table {
Ok(NativeTable {
name: name.to_string(),
uri: uri.to_string(),
dataset: Arc::new(dataset),
dataset: Arc::new(Mutex::new(dataset)),
store_wrapper: write_store_wrapper,
})
}
/// Schema of this Table.
pub fn schema(&self) -> SchemaRef {
Arc::new(self.dataset.schema().into())
}
/// Version of this Table
pub fn version(&self) -> u64 {
self.dataset.version().version
}
/// Create index on the table.
pub async fn create_index(&mut self, index_builder: &impl VectorIndexBuilder) -> Result<()> {
let mut dataset = self.dataset.as_ref().clone();
dataset
.create_index(
&[index_builder
.get_column()
.unwrap_or(VECTOR_COLUMN_NAME.to_string())
.as_str()],
IndexType::Vector,
index_builder.get_index_name(),
&index_builder.build(),
index_builder.get_replace(),
)
.await?;
self.dataset = Arc::new(dataset);
Ok(())
self.dataset.lock().expect("lock poison").version().version
}
/// Create a scalar index on the table
pub async fn create_scalar_index(&mut self, column: &str, replace: bool) -> Result<()> {
let mut dataset = self.dataset.as_ref().clone();
pub async fn create_scalar_index(&self, column: &str, replace: bool) -> Result<()> {
let mut dataset = self.clone_inner_dataset();
let params = ScalarIndexParams::default();
dataset
.create_index(&[column], IndexType::Scalar, None, &params, replace)
@@ -275,61 +407,21 @@ impl Table {
}
pub async fn optimize_indices(&mut self, options: &OptimizeOptions) -> Result<()> {
let mut dataset = self.dataset.as_ref().clone();
let mut dataset = self.clone_inner_dataset();
dataset.optimize_indices(options).await?;
Ok(())
}
/// Insert records into this Table
///
/// # Arguments
///
/// * `batches` RecordBatch to be saved in the Table
/// * `write_mode` Append / Overwrite existing records. Default: Append
/// # Returns
///
/// * The number of rows added
pub async fn add(
&mut self,
batches: impl RecordBatchReader + Send + 'static,
params: Option<WriteParams>,
) -> Result<()> {
let params = Some(params.unwrap_or(WriteParams {
mode: WriteMode::Append,
..WriteParams::default()
}));
// patch the params if we have a write store wrapper
let params = match self.store_wrapper.clone() {
Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
None => params,
};
self.dataset = Arc::new(Dataset::write(batches, &self.uri, params).await?);
Ok(())
}
/// Creates a new Query object that can be executed.
///
/// # Arguments
///
/// * `query_vector` The vector used for this query.
///
/// # Returns
/// * A [Query] object.
pub fn search<T: Into<Float32Array>>(&self, query_vector: Option<T>) -> Query {
Query::new(self.dataset.clone(), query_vector.map(|q| q.into()))
pub fn query(&self) -> Query {
Query::new(self.clone_inner_dataset().into())
}
pub fn filter(&self, expr: String) -> Query {
Query::new(self.dataset.clone(), None).filter(Some(expr))
Query::new(self.clone_inner_dataset().into()).filter(expr)
}
/// Returns the number of rows in this Table
pub async fn count_rows(&self) -> Result<usize> {
Ok(self.dataset.count_rows().await?)
}
/// Merge new data into this table.
pub async fn merge(
@@ -338,26 +430,14 @@ impl Table {
left_on: &str,
right_on: &str,
) -> Result<()> {
let mut dataset = self.dataset.as_ref().clone();
let mut dataset = self.clone_inner_dataset();
dataset.merge(batches, left_on, right_on).await?;
self.dataset = Arc::new(dataset);
self.dataset = Arc::new(Mutex::new(dataset));
Ok(())
}
/// Delete rows from the table
pub async fn delete(&mut self, predicate: &str) -> Result<()> {
let mut dataset = self.dataset.as_ref().clone();
dataset.delete(predicate).await?;
self.dataset = Arc::new(dataset);
Ok(())
}
pub async fn update(
&mut self,
predicate: Option<&str>,
updates: Vec<(&str, &str)>,
) -> Result<()> {
let mut builder = UpdateBuilder::new(self.dataset.clone());
pub async fn update(&self, predicate: Option<&str>, updates: Vec<(&str, &str)>) -> Result<()> {
let mut builder = UpdateBuilder::new(self.clone_inner_dataset().into());
if let Some(predicate) = predicate {
builder = builder.update_where(predicate)?;
}
@@ -367,9 +447,8 @@ impl Table {
}
let operation = builder.build()?;
let new_ds = operation.execute().await?;
self.dataset = new_ds;
let ds = operation.execute().await?;
self.reset_dataset(ds.as_ref().clone());
Ok(())
}
@@ -389,8 +468,8 @@ impl Table {
older_than: Duration,
delete_unverified: Option<bool>,
) -> Result<RemovalStats> {
Ok(self
.dataset
let dataset = self.clone_inner_dataset();
Ok(dataset
.cleanup_old_versions(older_than, delete_unverified)
.await?)
}
@@ -402,26 +481,28 @@ impl Table {
///
/// This calls into [lance::dataset::optimize::compact_files].
pub async fn compact_files(
&mut self,
&self,
options: CompactionOptions,
remap_options: Option<Arc<dyn IndexRemapperOptions>>,
) -> Result<CompactionMetrics> {
let mut dataset = self.dataset.as_ref().clone();
let mut dataset = self.clone_inner_dataset();
let metrics = compact_files(&mut dataset, options, remap_options).await?;
self.dataset = Arc::new(dataset);
self.reset_dataset(dataset);
Ok(metrics)
}
pub fn count_fragments(&self) -> usize {
self.dataset.count_fragments()
self.dataset.lock().expect("lock poison").count_fragments()
}
pub async fn count_deleted_rows(&self) -> Result<usize> {
Ok(self.dataset.count_deleted_rows().await?)
let dataset = self.clone_inner_dataset();
Ok(dataset.count_deleted_rows().await?)
}
pub async fn num_small_files(&self, max_rows_per_group: usize) -> usize {
self.dataset.num_small_files(max_rows_per_group).await
let dataset = self.clone_inner_dataset();
dataset.num_small_files(max_rows_per_group).await
}
pub async fn count_indexed_rows(&self, index_uuid: &str) -> Result<Option<usize>> {
@@ -439,8 +520,8 @@ impl Table {
}
pub async fn load_indices(&self) -> Result<Vec<VectorIndex>> {
let (indices, mf) =
futures::try_join!(self.dataset.load_indices(), self.dataset.latest_manifest())?;
let dataset = self.clone_inner_dataset();
let (indices, mf) = futures::try_join!(dataset.load_indices(), dataset.latest_manifest())?;
Ok(indices
.iter()
.map(|i| VectorIndex::new_from_format(&mf, i))
@@ -456,10 +537,8 @@ impl Table {
if index.is_none() {
return Ok(None);
}
let index_stats = self
.dataset
.index_statistics(&index.unwrap().index_name)
.await?;
let dataset = self.clone_inner_dataset();
let index_stats = dataset.index_statistics(&index.unwrap().index_name).await?;
let index_stats: VectorIndexStatistics =
serde_json::from_str(&index_stats).map_err(|e| Error::Lance {
message: format!(
@@ -470,6 +549,71 @@ impl Table {
Ok(Some(index_stats))
}
pub(crate) fn reset_dataset(&self, dataset: Dataset) {
*self.dataset.lock().expect("lock poison") = dataset;
}
}
#[async_trait::async_trait]
impl Table for NativeTable {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn as_native(&self) -> Option<&NativeTable> {
Some(self)
}
fn name(&self) -> &str {
self.name.as_str()
}
fn schema(&self) -> SchemaRef {
let lance_schema = { self.dataset.lock().expect("lock poison").schema().clone() };
Arc::new(Schema::from(&lance_schema))
}
async fn count_rows(&self) -> Result<usize> {
let dataset = { self.dataset.lock().expect("lock poison").clone() };
Ok(dataset.count_rows().await?)
}
async fn add(
&self,
batches: Box<dyn RecordBatchReader + Send>,
params: Option<WriteParams>,
) -> Result<()> {
let params = Some(params.unwrap_or(WriteParams {
mode: WriteMode::Append,
..WriteParams::default()
}));
// patch the params if we have a write store wrapper
let params = match self.store_wrapper.clone() {
Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
None => params,
};
self.reset_dataset(Dataset::write(batches, &self.uri, params).await?);
Ok(())
}
fn create_index(&self, columns: &[&str]) -> IndexBuilder {
IndexBuilder::new(Arc::new(self.clone()), columns)
}
fn query(&self) -> Query {
Query::new(Arc::new(self.dataset.lock().expect("lock poison").clone()))
}
/// Delete rows from the table
async fn delete(&self, predicate: &str) -> Result<()> {
let mut dataset = self.clone_inner_dataset();
dataset.delete(predicate).await?;
self.reset_dataset(dataset);
Ok(())
}
}
#[cfg(test)]
@@ -487,14 +631,11 @@ mod tests {
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use futures::TryStreamExt;
use lance::dataset::{Dataset, WriteMode};
use lance::index::vector::pq::PQBuildParams;
use lance::io::object_store::{ObjectStoreParams, WrappingObjectStore};
use lance_index::vector::ivf::IvfBuildParams;
use lance::io::{ObjectStoreParams, WrappingObjectStore};
use rand::Rng;
use tempfile::tempdir;
use super::*;
use crate::index::vector::IvfPQIndexBuilder;
#[tokio::test]
async fn test_open() {
@@ -506,7 +647,9 @@ mod tests {
.await
.unwrap();
let table = Table::open(dataset_path.to_str().unwrap()).await.unwrap();
let table = NativeTable::open(dataset_path.to_str().unwrap())
.await
.unwrap();
assert_eq!(table.name, "test")
}
@@ -515,7 +658,7 @@ mod tests {
async fn test_open_not_found() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let table = Table::open(uri).await;
let table = NativeTable::open(uri).await;
assert!(matches!(table.unwrap_err(), Error::TableNotFound { .. }));
}
@@ -535,12 +678,12 @@ mod tests {
let batches = make_test_batches();
let _ = batches.schema().clone();
Table::create(&uri, "test", batches, None, None)
NativeTable::create(&uri, "test", batches, None, None)
.await
.unwrap();
let batches = make_test_batches();
let result = Table::create(&uri, "test", batches, None, None).await;
let result = NativeTable::create(&uri, "test", batches, None, None).await;
assert!(matches!(
result.unwrap_err(),
Error::TableAlreadyExists { .. }
@@ -554,7 +697,7 @@ mod tests {
let batches = make_test_batches();
let schema = batches.schema().clone();
let mut table = Table::create(&uri, "test", batches, None, None)
let table = NativeTable::create(&uri, "test", batches, None, None)
.await
.unwrap();
assert_eq!(table.count_rows().await.unwrap(), 10);
@@ -570,7 +713,7 @@ mod tests {
schema.clone(),
);
table.add(new_batches, None).await.unwrap();
table.add(Box::new(new_batches), None).await.unwrap();
assert_eq!(table.count_rows().await.unwrap(), 20);
assert_eq!(table.name, "test");
}
@@ -582,7 +725,7 @@ mod tests {
let batches = make_test_batches();
let schema = batches.schema().clone();
let mut table = Table::create(uri, "test", batches, None, None)
let table = NativeTable::create(uri, "test", batches, None, None)
.await
.unwrap();
assert_eq!(table.count_rows().await.unwrap(), 10);
@@ -603,7 +746,7 @@ mod tests {
..Default::default()
};
table.add(new_batches, Some(param)).await.unwrap();
table.add(Box::new(new_batches), Some(param)).await.unwrap();
assert_eq!(table.count_rows().await.unwrap(), 10);
assert_eq!(table.name, "test");
}
@@ -636,7 +779,7 @@ mod tests {
);
Dataset::write(record_batch_iter, uri, None).await.unwrap();
let mut table = Table::open(uri).await.unwrap();
let table = NativeTable::open(uri).await.unwrap();
table
.update(Some("id > 5"), vec![("name", "'foo'")])
@@ -768,7 +911,7 @@ mod tests {
);
Dataset::write(record_batch_iter, uri, None).await.unwrap();
let mut table = Table::open(uri).await.unwrap();
let table = NativeTable::open(uri).await.unwrap();
// check it can do update for each type
let updates: Vec<(&str, &str)> = vec![
@@ -874,24 +1017,6 @@ mod tests {
}
}
#[tokio::test]
async fn test_search() {
let tmp_dir = tempdir().unwrap();
let dataset_path = tmp_dir.path().join("test.lance");
let uri = dataset_path.to_str().unwrap();
let batches = make_test_batches();
Dataset::write(batches, dataset_path.to_str().unwrap(), None)
.await
.unwrap();
let table = Table::open(uri).await.unwrap();
let vector = Float32Array::from_iter_values([0.1, 0.2]);
let query = table.search(Some(vector.clone()));
assert_eq!(vector, query.query_vector.unwrap());
}
#[derive(Default, Debug)]
struct NoOpCacheWrapper {
called: AtomicBool,
@@ -933,7 +1058,7 @@ mod tests {
..Default::default()
};
assert!(!wrapper.called());
let _ = Table::open_with_params(uri, "test", None, param)
let _ = NativeTable::open_with_params(uri, "test", None, param)
.await
.unwrap();
assert!(wrapper.called());
@@ -987,23 +1112,23 @@ mod tests {
schema,
);
let mut table = Table::create(uri, "test", batches, None, None)
let table = NativeTable::create(uri, "test", batches, None, None)
.await
.unwrap();
let mut i = IvfPQIndexBuilder::new();
assert_eq!(table.count_indexed_rows("my_index").await.unwrap(), None);
assert_eq!(table.count_unindexed_rows("my_index").await.unwrap(), None);
let index_builder = i
.column("embeddings".to_string())
.index_name("my_index".to_string())
.ivf_params(IvfBuildParams::new(256))
.pq_params(PQBuildParams::default());
table
.create_index(&["embeddings"])
.ivf_pq()
.name("my_index")
.num_partitions(256)
.build()
.await
.unwrap();
table.create_index(index_builder).await.unwrap();
assert_eq!(table.dataset.load_indices().await.unwrap().len(), 1);
assert_eq!(table.load_indices().await.unwrap().len(), 1);
assert_eq!(table.count_rows().await.unwrap(), 512);
assert_eq!(table.name, "test");

View File

@@ -1,9 +1,9 @@
use std::sync::Arc;
use lance::{
dataset::{ReadParams, WriteParams},
io::object_store::{ObjectStoreParams, WrappingObjectStore},
};
use arrow_schema::Schema;
use lance::dataset::{ReadParams, WriteParams};
use lance::io::{ObjectStoreParams, WrappingObjectStore};
use crate::error::{Error, Result};
@@ -65,3 +65,86 @@ impl PatchReadParam for ReadParams {
Ok(self)
}
}
/// Find one default column to create index.
pub(crate) fn default_vector_column(schema: &Schema, dim: Option<i32>) -> Result<String> {
// Try to find one fixed size list array column.
let candidates = schema
.fields()
.iter()
.filter_map(|field| match field.data_type() {
arrow_schema::DataType::FixedSizeList(f, d)
if f.data_type().is_floating()
&& dim.map(|expect| *d == expect).unwrap_or(true) =>
{
Some(field.name())
}
_ => None,
})
.collect::<Vec<_>>();
if candidates.is_empty() {
Err(Error::Store {
message: "No vector column found to create index".to_string(),
})
} else if candidates.len() != 1 {
Err(Error::Store {
message: format!(
"More than one vector columns found, \
please specify which column to create index: {:?}",
candidates
),
})
} else {
Ok(candidates[0].to_string())
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow_schema::{DataType, Field};
#[test]
fn test_guess_default_column() {
let schema_no_vector = Schema::new(vec![
Field::new("id", DataType::Int16, true),
Field::new("tag", DataType::Utf8, false),
]);
assert!(default_vector_column(&schema_no_vector, None)
.unwrap_err()
.to_string()
.contains("No vector column"));
let schema_with_vec_col = Schema::new(vec![
Field::new("id", DataType::Int16, true),
Field::new(
"vec",
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float64, false)), 10),
false,
),
]);
assert_eq!(
default_vector_column(&schema_with_vec_col, None).unwrap(),
"vec"
);
let multi_vec_col = Schema::new(vec![
Field::new("id", DataType::Int16, true),
Field::new(
"vec",
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float64, false)), 10),
false,
),
Field::new(
"vec2",
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float64, false)), 50),
false,
),
]);
assert!(default_vector_column(&multi_vec_col, None)
.unwrap_err()
.to_string()
.contains("More than one"));
}
}