Compare commits

...

8 Commits

Author SHA1 Message Date
Lance Release
4605359d3b Bump version: 0.4.10 → 0.4.11 2024-02-23 03:57:28 +00:00
Weston Pace
f1596122e6 refactor: rename the rust crate from vectordb to lancedb (#1012)
This also renames the new experimental node package to lancedb. The
classic node package remains named vectordb.

The goal here is to avoid introducing piecemeal breaking changes to the
vectordb crate. Instead, once the new API is stabilized, we will
officially release the lancedb crate and deprecate the vectordb crate.
The same pattern will eventually happen with the npm package vectordb.
2024-02-22 19:56:39 -08:00
Will Jones
3aa0c40168 feat(node): add read_consistency_interval to Node and Rust (#1002)
This PR adds the same consistency semantics as was added in #828. It
*does not* add the same lazy-loading of tables, since that breaks some
existing tests.

This closes #998.

---------

Co-authored-by: Weston Pace <weston.pace@gmail.com>
2024-02-22 15:04:30 -08:00
Lance Release
677b7c1fcc [python] Bump version: 0.5.6 → 0.5.7 2024-02-22 20:07:12 +00:00
Lei Xu
8303a7197b chore: bump pylance to 0.9.18 (#1011) 2024-02-22 11:47:36 -08:00
Raghav Dixit
5fa9bfc4a8 python(feat): Imagebind embedding fn support (#1003)
Added imagebind fn support , steps to install mentioned in docstring. 
pytest slow checks done locally

---------

Co-authored-by: Ayush Chaurasia <ayush.chaurarsia@gmail.com>
2024-02-22 11:47:08 +05:30
Ayush Chaurasia
bf2e9d0088 Docs: add meta tags (#1006) 2024-02-21 23:22:47 +05:30
Weston Pace
f04590ddad refactor: rust vectordb API stabilization of the Connection trait (#993)
This is the start of a more comprehensive refactor and stabilization of
the Rust API. The `Connection` trait is cleaned up to not require
`lance` and to match the `Connection` trait in other APIs. In addition,
the concrete implementation `Database` is hidden.

BREAKING CHANGE: The struct `crate::connection::Database` is now gone.
Several examples opened a connection using `Database::connect` or
`Database::connect_with_params`. Users should now use
`vectordb::connect`.

BREAKING CHANGE: The `connect`, `create_table`, and `open_table` methods
now all return a builder object. This means that a call like
`conn.open_table(..., opt1, opt2)` will now become
`conn.open_table(...).opt1(opt1).opt2(opt2).execute()` In addition, the
structure of options has changed slightly. However, no options
capability has been removed.

---------

Co-authored-by: Will Jones <willjones127@gmail.com>
2024-02-20 18:35:52 -08:00
73 changed files with 2129 additions and 1909 deletions

View File

@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.4.10
current_version = 0.4.11
commit = True
message = Bump version: {current_version} → {new_version}
tag = True
@@ -9,4 +9,4 @@ tag_name = v{new_version}
[bumpversion:file:rust/ffi/node/Cargo.toml]
[bumpversion:file:rust/vectordb/Cargo.toml]
[bumpversion:file:rust/lancedb/Cargo.toml]

View File

@@ -26,4 +26,4 @@ jobs:
sudo apt install -y protobuf-compiler libssl-dev
- name: Publish the package
run: |
cargo publish -p vectordb --all-features --token ${{ secrets.CARGO_REGISTRY_TOKEN }}
cargo publish -p lancedb --all-features --token ${{ secrets.CARGO_REGISTRY_TOKEN }}

View File

@@ -1,5 +1,5 @@
[workspace]
members = ["rust/ffi/node", "rust/vectordb", "nodejs"]
members = ["rust/ffi/node", "rust/lancedb", "nodejs"]
# Python package needs to be built by maturin.
exclude = ["python"]
resolver = "2"

View File

@@ -57,6 +57,16 @@ plugins:
- https://arrow.apache.org/docs/objects.inv
- https://pandas.pydata.org/docs/objects.inv
- mkdocs-jupyter
- ultralytics:
verbose: True
enabled: True
default_image: "assets/lancedb_and_lance.png" # Default image for all pages
add_image: True # Automatically add meta image
add_keywords: True # Add page keywords in the header tag
add_share_buttons: True # Add social share buttons
add_authors: False # Display page authors
add_desc: False
add_dates: False
markdown_extensions:
- admonition
@@ -206,7 +216,6 @@ extra_css:
extra_javascript:
- "extra_js/init_ask_ai_widget.js"
- "extra_js/meta_tag.js"
extra:
analytics:

View File

@@ -2,4 +2,5 @@ mkdocs==1.5.3
mkdocs-jupyter==0.24.1
mkdocs-material==9.5.3
mkdocstrings[python]==0.20.0
pydantic
pydantic
mkdocs-ultralytics-plugin==0.0.44

View File

@@ -1,6 +0,0 @@
window.addEventListener('load', function() {
var meta = document.createElement('meta');
meta.setAttribute('property', 'og:image');
meta.setAttribute('content', '/assets/lancedb_and_lance.png');
document.head.appendChild(meta);
});

View File

@@ -636,6 +636,70 @@ The `values` parameter is used to provide the new values for the columns as lite
When rows are updated, they are moved out of the index. The row will still show up in ANN queries, but the query will not be as fast as it would be if the row was in the index. If you update a large proportion of rows, consider rebuilding the index afterwards.
## Consistency
In LanceDB OSS, users can set the `read_consistency_interval` parameter on connections to achieve different levels of read consistency. This parameter determines how frequently the database synchronizes with the underlying storage system to check for updates made by other processes. If another process updates a table, the database will not see the changes until the next synchronization.
There are three possible settings for `read_consistency_interval`:
1. **Unset (default)**: The database does not check for updates to tables made by other processes. This provides the best query performance, but means that clients may not see the most up-to-date data. This setting is suitable for applications where the data does not change during the lifetime of the table reference.
2. **Zero seconds (Strong consistency)**: The database checks for updates on every read. This provides the strongest consistency guarantees, ensuring that all clients see the latest committed data. However, it has the most overhead. This setting is suitable when consistency matters more than having high QPS.
3. **Custom interval (Eventual consistency)**: The database checks for updates at a custom interval, such as every 5 seconds. This provides eventual consistency, allowing for some lag between write and read operations. Performance wise, this is a middle ground between strong consistency and no consistency check. This setting is suitable for applications where immediate consistency is not critical, but clients should see updated data eventually.
!!! tip "Consistency in LanceDB Cloud"
This is only tune-able in LanceDB OSS. In LanceDB Cloud, readers are always eventually consistent.
=== "Python"
To set strong consistency, use `timedelta(0)`:
```python
from datetime import timedelta
db = lancedb.connect("./.lancedb",. read_consistency_interval=timedelta(0))
table = db.open_table("my_table")
```
For eventual consistency, use a custom `timedelta`:
```python
from datetime import timedelta
db = lancedb.connect("./.lancedb", read_consistency_interval=timedelta(seconds=5))
table = db.open_table("my_table")
```
By default, a `Table` will never check for updates from other writers. To manually check for updates you can use `checkout_latest`:
```python
db = lancedb.connect("./.lancedb")
table = db.open_table("my_table")
# (Other writes happen to my_table from another process)
# Check for updates
table.checkout_latest()
```
=== "JavaScript/Typescript"
To set strong consistency, use `0`:
```javascript
const db = await lancedb.connect({ uri: "./.lancedb", readConsistencyInterval: 0 });
const table = await db.openTable("my_table");
```
For eventual consistency, specify the update interval as seconds:
```javascript
const db = await lancedb.connect({ uri: "./.lancedb", readConsistencyInterval: 5 });
const table = await db.openTable("my_table");
```
<!-- Node doesn't yet support the version time travel: https://github.com/lancedb/lancedb/issues/1007
Once it does, we can show manual consistency check for Node as well.
-->
## What's next?
Learn the best practices on creating an ANN index and getting the most out of it.

View File

@@ -1,12 +1,12 @@
{
"name": "vectordb",
"version": "0.4.10",
"version": "0.4.11",
"description": " Serverless, low-latency vector database for AI applications",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"tsc": "tsc -b",
"build": "npm run tsc && cargo-cp-artifact --artifact cdylib vectordb-node index.node -- cargo build --message-format=json",
"build": "npm run tsc && cargo-cp-artifact --artifact cdylib lancedb-node index.node -- cargo build --message-format=json",
"build-release": "npm run build -- --release",
"test": "npm run tsc && mocha -recursive dist/test",
"integration-test": "npm run tsc && mocha -recursive dist/integration_test",
@@ -85,10 +85,10 @@
}
},
"optionalDependencies": {
"@lancedb/vectordb-darwin-arm64": "0.4.10",
"@lancedb/vectordb-darwin-x64": "0.4.10",
"@lancedb/vectordb-linux-arm64-gnu": "0.4.10",
"@lancedb/vectordb-linux-x64-gnu": "0.4.10",
"@lancedb/vectordb-win32-x64-msvc": "0.4.10"
"@lancedb/vectordb-darwin-arm64": "0.4.11",
"@lancedb/vectordb-darwin-x64": "0.4.11",
"@lancedb/vectordb-linux-arm64-gnu": "0.4.11",
"@lancedb/vectordb-linux-x64-gnu": "0.4.11",
"@lancedb/vectordb-win32-x64-msvc": "0.4.11"
}
}
}

View File

@@ -96,6 +96,19 @@ export interface ConnectionOptions {
* This is useful for local testing.
*/
hostOverride?: string
/**
* (For LanceDB OSS only): The interval, in seconds, at which to check for
* updates to the table from other processes. If None, then consistency is not
* checked. For performance reasons, this is the default. For strong
* consistency, set this to zero seconds. Then every read will check for
* updates from other processes. As a compromise, you can set this to a
* non-zero value for eventual consistency. If more than that interval
* has passed since the last check, then the table will be checked for updates.
* Note: this consistency only applies to read operations. Write operations are
* always consistent.
*/
readConsistencyInterval?: number
}
function getAwsArgs (opts: ConnectionOptions): any[] {
@@ -181,7 +194,8 @@ export async function connect (
opts.awsCredentials?.accessKeyId,
opts.awsCredentials?.secretKey,
opts.awsCredentials?.sessionToken,
opts.awsRegion
opts.awsRegion,
opts.readConsistencyInterval
)
return new LocalConnection(db, opts)
}

View File

@@ -18,5 +18,5 @@ module.exports = {
"@typescript-eslint/method-signature-style": "off",
"@typescript-eslint/no-explicit-any": "off",
},
ignorePatterns: ["node_modules/", "dist/", "build/", "vectordb/native.*"],
ignorePatterns: ["node_modules/", "dist/", "build/", "lancedb/native.*"],
};

View File

@@ -1,5 +1,5 @@
[package]
name = "vectordb-nodejs"
name = "lancedb-nodejs"
edition.workspace = true
version = "0.0.0"
license.workspace = true
@@ -16,7 +16,7 @@ arrow-ipc.workspace = true
futures.workspace = true
lance-linalg.workspace = true
lance.workspace = true
vectordb = { path = "../rust/vectordb" }
lancedb = { path = "../rust/lancedb" }
napi = { version = "2.15", default-features = false, features = [
"napi7",
"async"

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
import { makeArrowTable, toBuffer } from "../vectordb/arrow";
import { makeArrowTable, toBuffer } from "../lancedb/arrow";
import {
Int64,
Field,

View File

@@ -29,6 +29,6 @@ test("open database", async () => {
const tbl = await db.createTable("test", [{ id: 1 }, { id: 2 }]);
expect(await db.tableNames()).toStrictEqual(["test"]);
const schema = tbl.schema;
const schema = await tbl.schema();
expect(schema).toEqual(new Schema([new Field("id", new Float64(), true)]));
});

View File

@@ -181,3 +181,37 @@ describe("Test creating index", () => {
// TODO: check index type.
});
});
describe("Read consistency interval", () => {
let tmpDir: string;
beforeEach(() => {
tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "read-consistency-"));
});
// const intervals = [undefined, 0, 0.1];
const intervals = [0];
test.each(intervals)("read consistency interval %p", async (interval) => {
const db = await connect({ uri: tmpDir });
const table = await db.createTable("my_table", [{ id: 1 }]);
const db2 = await connect({ uri: tmpDir, readConsistencyInterval: interval });
const table2 = await db2.openTable("my_table");
expect(await table2.countRows()).toEqual(await table.countRows());
await table.add([{ id: 2 }]);
if (interval === undefined) {
expect(await table2.countRows()).toEqual(1n);
// TODO: once we implement time travel we can uncomment this part of the test.
// await table2.checkout_latest();
// expect(await table2.countRows()).toEqual(2);
} else if (interval === 0) {
expect(await table2.countRows()).toEqual(2n);
} else {
// interval == 0.1
expect(await table2.countRows()).toEqual(1n);
await new Promise(r => setTimeout(r, 100));
expect(await table2.countRows()).toEqual(2n);
}
});
});

View File

@@ -53,12 +53,12 @@ export async function connect(
opts = Object.assign(
{
uri: "",
apiKey: "",
hostOverride: "",
apiKey: undefined,
hostOverride: undefined,
},
args
);
}
const nativeConn = await NativeConnection.new(opts.uri);
const nativeConn = await NativeConnection.new(opts);
return new Connection(nativeConn);
}

View File

@@ -16,6 +16,18 @@ export interface ConnectionOptions {
uri: string
apiKey?: string
hostOverride?: string
/**
* (For LanceDB OSS only): The interval, in seconds, at which to check for
* updates to the table from other processes. If None, then consistency is not
* checked. For performance reasons, this is the default. For strong
* consistency, set this to zero seconds. Then every read will check for
* updates from other processes. As a compromise, you can set this to a
* non-zero value for eventual consistency. If more than that interval
* has passed since the last check, then the table will be checked for updates.
* Note: this consistency only applies to read operations. Write operations are
* always consistent.
*/
readConsistencyInterval?: number
}
/** Write mode for writing a table. */
export const enum WriteMode {
@@ -30,7 +42,7 @@ export interface WriteOptions {
export function connect(options: ConnectionOptions): Promise<Connection>
export class Connection {
/** Create a new Connection instance from the given URI. */
static new(uri: string): Promise<Connection>
static new(options: ConnectionOptions): Promise<Connection>
/** List all tables in the dataset. */
tableNames(): Promise<Array<string>>
/**
@@ -71,7 +83,7 @@ export class Query {
}
export class Table {
/** Return Schema as empty Arrow IPC file. */
schema(): Buffer
schema(): Promise<Buffer>
add(buf: Buffer): Promise<void>
countRows(filter?: string | undefined | null): Promise<bigint>
delete(predicate: string): Promise<void>

View File

@@ -32,24 +32,24 @@ switch (platform) {
case 'android':
switch (arch) {
case 'arm64':
localFileExisted = existsSync(join(__dirname, 'vectordb-nodejs.android-arm64.node'))
localFileExisted = existsSync(join(__dirname, 'lancedb-nodejs.android-arm64.node'))
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.android-arm64.node')
nativeBinding = require('./lancedb-nodejs.android-arm64.node')
} else {
nativeBinding = require('vectordb-android-arm64')
nativeBinding = require('lancedb-android-arm64')
}
} catch (e) {
loadError = e
}
break
case 'arm':
localFileExisted = existsSync(join(__dirname, 'vectordb-nodejs.android-arm-eabi.node'))
localFileExisted = existsSync(join(__dirname, 'lancedb-nodejs.android-arm-eabi.node'))
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.android-arm-eabi.node')
nativeBinding = require('./lancedb-nodejs.android-arm-eabi.node')
} else {
nativeBinding = require('vectordb-android-arm-eabi')
nativeBinding = require('lancedb-android-arm-eabi')
}
} catch (e) {
loadError = e
@@ -63,13 +63,13 @@ switch (platform) {
switch (arch) {
case 'x64':
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.win32-x64-msvc.node')
join(__dirname, 'lancedb-nodejs.win32-x64-msvc.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.win32-x64-msvc.node')
nativeBinding = require('./lancedb-nodejs.win32-x64-msvc.node')
} else {
nativeBinding = require('vectordb-win32-x64-msvc')
nativeBinding = require('lancedb-win32-x64-msvc')
}
} catch (e) {
loadError = e
@@ -77,13 +77,13 @@ switch (platform) {
break
case 'ia32':
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.win32-ia32-msvc.node')
join(__dirname, 'lancedb-nodejs.win32-ia32-msvc.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.win32-ia32-msvc.node')
nativeBinding = require('./lancedb-nodejs.win32-ia32-msvc.node')
} else {
nativeBinding = require('vectordb-win32-ia32-msvc')
nativeBinding = require('lancedb-win32-ia32-msvc')
}
} catch (e) {
loadError = e
@@ -91,13 +91,13 @@ switch (platform) {
break
case 'arm64':
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.win32-arm64-msvc.node')
join(__dirname, 'lancedb-nodejs.win32-arm64-msvc.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.win32-arm64-msvc.node')
nativeBinding = require('./lancedb-nodejs.win32-arm64-msvc.node')
} else {
nativeBinding = require('vectordb-win32-arm64-msvc')
nativeBinding = require('lancedb-win32-arm64-msvc')
}
} catch (e) {
loadError = e
@@ -108,23 +108,23 @@ switch (platform) {
}
break
case 'darwin':
localFileExisted = existsSync(join(__dirname, 'vectordb-nodejs.darwin-universal.node'))
localFileExisted = existsSync(join(__dirname, 'lancedb-nodejs.darwin-universal.node'))
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.darwin-universal.node')
nativeBinding = require('./lancedb-nodejs.darwin-universal.node')
} else {
nativeBinding = require('vectordb-darwin-universal')
nativeBinding = require('lancedb-darwin-universal')
}
break
} catch {}
switch (arch) {
case 'x64':
localFileExisted = existsSync(join(__dirname, 'vectordb-nodejs.darwin-x64.node'))
localFileExisted = existsSync(join(__dirname, 'lancedb-nodejs.darwin-x64.node'))
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.darwin-x64.node')
nativeBinding = require('./lancedb-nodejs.darwin-x64.node')
} else {
nativeBinding = require('vectordb-darwin-x64')
nativeBinding = require('lancedb-darwin-x64')
}
} catch (e) {
loadError = e
@@ -132,13 +132,13 @@ switch (platform) {
break
case 'arm64':
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.darwin-arm64.node')
join(__dirname, 'lancedb-nodejs.darwin-arm64.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.darwin-arm64.node')
nativeBinding = require('./lancedb-nodejs.darwin-arm64.node')
} else {
nativeBinding = require('vectordb-darwin-arm64')
nativeBinding = require('lancedb-darwin-arm64')
}
} catch (e) {
loadError = e
@@ -152,12 +152,12 @@ switch (platform) {
if (arch !== 'x64') {
throw new Error(`Unsupported architecture on FreeBSD: ${arch}`)
}
localFileExisted = existsSync(join(__dirname, 'vectordb-nodejs.freebsd-x64.node'))
localFileExisted = existsSync(join(__dirname, 'lancedb-nodejs.freebsd-x64.node'))
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.freebsd-x64.node')
nativeBinding = require('./lancedb-nodejs.freebsd-x64.node')
} else {
nativeBinding = require('vectordb-freebsd-x64')
nativeBinding = require('lancedb-freebsd-x64')
}
} catch (e) {
loadError = e
@@ -168,26 +168,26 @@ switch (platform) {
case 'x64':
if (isMusl()) {
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-x64-musl.node')
join(__dirname, 'lancedb-nodejs.linux-x64-musl.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-x64-musl.node')
nativeBinding = require('./lancedb-nodejs.linux-x64-musl.node')
} else {
nativeBinding = require('vectordb-linux-x64-musl')
nativeBinding = require('lancedb-linux-x64-musl')
}
} catch (e) {
loadError = e
}
} else {
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-x64-gnu.node')
join(__dirname, 'lancedb-nodejs.linux-x64-gnu.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-x64-gnu.node')
nativeBinding = require('./lancedb-nodejs.linux-x64-gnu.node')
} else {
nativeBinding = require('vectordb-linux-x64-gnu')
nativeBinding = require('lancedb-linux-x64-gnu')
}
} catch (e) {
loadError = e
@@ -197,26 +197,26 @@ switch (platform) {
case 'arm64':
if (isMusl()) {
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-arm64-musl.node')
join(__dirname, 'lancedb-nodejs.linux-arm64-musl.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-arm64-musl.node')
nativeBinding = require('./lancedb-nodejs.linux-arm64-musl.node')
} else {
nativeBinding = require('vectordb-linux-arm64-musl')
nativeBinding = require('lancedb-linux-arm64-musl')
}
} catch (e) {
loadError = e
}
} else {
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-arm64-gnu.node')
join(__dirname, 'lancedb-nodejs.linux-arm64-gnu.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-arm64-gnu.node')
nativeBinding = require('./lancedb-nodejs.linux-arm64-gnu.node')
} else {
nativeBinding = require('vectordb-linux-arm64-gnu')
nativeBinding = require('lancedb-linux-arm64-gnu')
}
} catch (e) {
loadError = e
@@ -225,13 +225,13 @@ switch (platform) {
break
case 'arm':
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-arm-gnueabihf.node')
join(__dirname, 'lancedb-nodejs.linux-arm-gnueabihf.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-arm-gnueabihf.node')
nativeBinding = require('./lancedb-nodejs.linux-arm-gnueabihf.node')
} else {
nativeBinding = require('vectordb-linux-arm-gnueabihf')
nativeBinding = require('lancedb-linux-arm-gnueabihf')
}
} catch (e) {
loadError = e
@@ -240,26 +240,26 @@ switch (platform) {
case 'riscv64':
if (isMusl()) {
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-riscv64-musl.node')
join(__dirname, 'lancedb-nodejs.linux-riscv64-musl.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-riscv64-musl.node')
nativeBinding = require('./lancedb-nodejs.linux-riscv64-musl.node')
} else {
nativeBinding = require('vectordb-linux-riscv64-musl')
nativeBinding = require('lancedb-linux-riscv64-musl')
}
} catch (e) {
loadError = e
}
} else {
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-riscv64-gnu.node')
join(__dirname, 'lancedb-nodejs.linux-riscv64-gnu.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-riscv64-gnu.node')
nativeBinding = require('./lancedb-nodejs.linux-riscv64-gnu.node')
} else {
nativeBinding = require('vectordb-linux-riscv64-gnu')
nativeBinding = require('lancedb-linux-riscv64-gnu')
}
} catch (e) {
loadError = e
@@ -268,13 +268,13 @@ switch (platform) {
break
case 's390x':
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-s390x-gnu.node')
join(__dirname, 'lancedb-nodejs.linux-s390x-gnu.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-s390x-gnu.node')
nativeBinding = require('./lancedb-nodejs.linux-s390x-gnu.node')
} else {
nativeBinding = require('vectordb-linux-s390x-gnu')
nativeBinding = require('lancedb-linux-s390x-gnu')
}
} catch (e) {
loadError = e

View File

@@ -32,8 +32,8 @@ export class Table {
}
/** Get the schema of the table. */
get schema(): Schema {
const schemaBuf = this.inner.schema();
async schema(): Promise<Schema> {
const schemaBuf = await this.inner.schema();
const tbl = tableFromIPC(schemaBuf);
return tbl.schema;
}

View File

@@ -1,3 +1,3 @@
# `vectordb-darwin-arm64`
# `lancedb-darwin-arm64`
This is the **aarch64-apple-darwin** binary for `vectordb`
This is the **aarch64-apple-darwin** binary for `lancedb`

View File

@@ -1,5 +1,5 @@
{
"name": "vectordb-darwin-arm64",
"name": "lancedb-darwin-arm64",
"version": "0.4.3",
"os": [
"darwin"
@@ -7,9 +7,9 @@
"cpu": [
"arm64"
],
"main": "vectordb.darwin-arm64.node",
"main": "lancedb.darwin-arm64.node",
"files": [
"vectordb.darwin-arm64.node"
"lancedb.darwin-arm64.node"
],
"license": "MIT",
"engines": {

View File

@@ -1,3 +1,3 @@
# `vectordb-darwin-x64`
# `lancedb-darwin-x64`
This is the **x86_64-apple-darwin** binary for `vectordb`
This is the **x86_64-apple-darwin** binary for `lancedb`

View File

@@ -1,5 +1,5 @@
{
"name": "vectordb-darwin-x64",
"name": "lancedb-darwin-x64",
"version": "0.4.3",
"os": [
"darwin"
@@ -7,9 +7,9 @@
"cpu": [
"x64"
],
"main": "vectordb.darwin-x64.node",
"main": "lancedb.darwin-x64.node",
"files": [
"vectordb.darwin-x64.node"
"lancedb.darwin-x64.node"
],
"license": "MIT",
"engines": {

View File

@@ -1,3 +1,3 @@
# `vectordb-linux-arm64-gnu`
# `lancedb-linux-arm64-gnu`
This is the **aarch64-unknown-linux-gnu** binary for `vectordb`
This is the **aarch64-unknown-linux-gnu** binary for `lancedb`

View File

@@ -1,5 +1,5 @@
{
"name": "vectordb-linux-arm64-gnu",
"name": "lancedb-linux-arm64-gnu",
"version": "0.4.3",
"os": [
"linux"
@@ -7,9 +7,9 @@
"cpu": [
"arm64"
],
"main": "vectordb.linux-arm64-gnu.node",
"main": "lancedb.linux-arm64-gnu.node",
"files": [
"vectordb.linux-arm64-gnu.node"
"lancedb.linux-arm64-gnu.node"
],
"license": "MIT",
"engines": {

View File

@@ -1,3 +1,3 @@
# `vectordb-linux-x64-gnu`
# `lancedb-linux-x64-gnu`
This is the **x86_64-unknown-linux-gnu** binary for `vectordb`
This is the **x86_64-unknown-linux-gnu** binary for `lancedb`

View File

@@ -1,5 +1,5 @@
{
"name": "vectordb-linux-x64-gnu",
"name": "lancedb-linux-x64-gnu",
"version": "0.4.3",
"os": [
"linux"
@@ -7,9 +7,9 @@
"cpu": [
"x64"
],
"main": "vectordb.linux-x64-gnu.node",
"main": "lancedb.linux-x64-gnu.node",
"files": [
"vectordb.linux-x64-gnu.node"
"lancedb.linux-x64-gnu.node"
],
"license": "MIT",
"engines": {

1087
nodejs/package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,10 +1,10 @@
{
"name": "vectordb",
"name": "lancedb",
"version": "0.4.3",
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
"napi": {
"name": "vectordb-nodejs",
"name": "lancedb-nodejs",
"triples": {
"defaults": false,
"additional": [
@@ -18,7 +18,7 @@
"license": "Apache 2.0",
"devDependencies": {
"@napi-rs/cli": "^2.18.0",
"@types/jest": "^29.5.11",
"@types/jest": "^29.1.2",
"@typescript-eslint/eslint-plugin": "^6.19.0",
"@typescript-eslint/parser": "^6.19.0",
"eslint": "^8.56.0",
@@ -45,21 +45,22 @@
],
"scripts": {
"artifacts": "napi artifacts",
"build:native": "napi build --platform --release --js vectordb/native.js --dts vectordb/native.d.ts dist/",
"build:debug": "napi build --platform --dts ../vectordb/native.d.ts --js ../vectordb/native.js dist/",
"build:native": "napi build --platform --release --js lancedb/native.js --dts lancedb/native.d.ts dist/",
"build:debug": "napi build --platform --dts ../lancedb/native.d.ts --js ../lancedb/native.js dist/",
"build": "npm run build:debug && tsc -b",
"docs": "typedoc --plugin typedoc-plugin-markdown vectordb/index.ts",
"lint": "eslint vectordb --ext .js,.ts",
"docs": "typedoc --plugin typedoc-plugin-markdown lancedb/index.ts",
"lint": "eslint lancedb --ext .js,.ts",
"prepublishOnly": "napi prepublish -t npm",
"test": "npm run build && jest",
"//": "maxWorkers=1 is workaround for bigint issue in jest: https://github.com/jestjs/jest/issues/11617#issuecomment-1068732414",
"test": "npm run build && jest --maxWorkers=1",
"universal": "napi universal",
"version": "napi version"
},
"optionalDependencies": {
"vectordb-darwin-arm64": "0.4.3",
"vectordb-darwin-x64": "0.4.3",
"vectordb-linux-arm64-gnu": "0.4.3",
"vectordb-linux-x64-gnu": "0.4.3"
"lancedb-darwin-arm64": "0.4.3",
"lancedb-darwin-x64": "0.4.3",
"lancedb-linux-arm64-gnu": "0.4.3",
"lancedb-linux-x64-gnu": "0.4.3"
},
"dependencies": {
"apache-arrow": "^15.0.0"

View File

@@ -12,29 +12,40 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use napi::bindgen_prelude::*;
use napi_derive::*;
use crate::table::Table;
use vectordb::connection::{Connection as LanceDBConnection, Database};
use vectordb::ipc::ipc_file_to_batches;
use crate::ConnectionOptions;
use lancedb::connection::{ConnectBuilder, Connection as LanceDBConnection};
use lancedb::ipc::ipc_file_to_batches;
#[napi]
pub struct Connection {
conn: Arc<dyn LanceDBConnection>,
conn: LanceDBConnection,
}
#[napi]
impl Connection {
/// Create a new Connection instance from the given URI.
#[napi(factory)]
pub async fn new(uri: String) -> napi::Result<Self> {
pub async fn new(options: ConnectionOptions) -> napi::Result<Self> {
let mut builder = ConnectBuilder::new(&options.uri);
if let Some(api_key) = options.api_key {
builder = builder.api_key(&api_key);
}
if let Some(host_override) = options.host_override {
builder = builder.host_override(&host_override);
}
if let Some(interval) = options.read_consistency_interval {
builder =
builder.read_consistency_interval(std::time::Duration::from_secs_f64(interval));
}
Ok(Self {
conn: Arc::new(Database::connect(&uri).await.map_err(|e| {
napi::Error::from_reason(format!("Failed to connect to database: {}", e))
})?),
conn: builder
.execute()
.await
.map_err(|e| napi::Error::from_reason(format!("{}", e)))?,
})
}
@@ -59,7 +70,8 @@ impl Connection {
.map_err(|e| napi::Error::from_reason(format!("Failed to read IPC file: {}", e)))?;
let tbl = self
.conn
.create_table(&name, Box::new(batches), None)
.create_table(&name, Box::new(batches))
.execute()
.await
.map_err(|e| napi::Error::from_reason(format!("{}", e)))?;
Ok(Table::new(tbl))
@@ -70,6 +82,7 @@ impl Connection {
let tbl = self
.conn
.open_table(&name)
.execute()
.await
.map_err(|e| napi::Error::from_reason(format!("{}", e)))?;
Ok(Table::new(tbl))

View File

@@ -40,12 +40,12 @@ impl From<MetricType> for LanceMetricType {
#[napi]
pub struct IndexBuilder {
inner: vectordb::index::IndexBuilder,
inner: lancedb::index::IndexBuilder,
}
#[napi]
impl IndexBuilder {
pub fn new(tbl: &dyn vectordb::Table) -> Self {
pub fn new(tbl: &dyn lancedb::Table) -> Self {
let inner = tbl.create_index(&[]);
Self { inner }
}

View File

@@ -14,9 +14,9 @@
use futures::StreamExt;
use lance::io::RecordBatchStream;
use lancedb::ipc::batches_to_ipc_file;
use napi::bindgen_prelude::*;
use napi_derive::napi;
use vectordb::ipc::batches_to_ipc_file;
/** Typescript-style Async Iterator over RecordBatches */
#[napi]

View File

@@ -22,10 +22,21 @@ mod query;
mod table;
#[napi(object)]
#[derive(Debug)]
pub struct ConnectionOptions {
pub uri: String,
pub api_key: Option<String>,
pub host_override: Option<String>,
/// (For LanceDB OSS only): The interval, in seconds, at which to check for
/// updates to the table from other processes. If None, then consistency is not
/// checked. For performance reasons, this is the default. For strong
/// consistency, set this to zero seconds. Then every read will check for
/// updates from other processes. As a compromise, you can set this to a
/// non-zero value for eventual consistency. If more than that interval
/// has passed since the last check, then the table will be checked for updates.
/// Note: this consistency only applies to read operations. Write operations are
/// always consistent.
pub read_consistency_interval: Option<f64>,
}
/// Write mode for writing a table.
@@ -44,5 +55,5 @@ pub struct WriteOptions {
#[napi]
pub async fn connect(options: ConnectionOptions) -> napi::Result<Connection> {
Connection::new(options.uri.clone()).await
Connection::new(options).await
}

View File

@@ -12,9 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use lancedb::query::Query as LanceDBQuery;
use napi::bindgen_prelude::*;
use napi_derive::napi;
use vectordb::query::Query as LanceDBQuery;
use crate::{iterator::RecordBatchIterator, table::Table};

View File

@@ -13,9 +13,10 @@
// limitations under the License.
use arrow_ipc::writer::FileWriter;
use lancedb::table::AddDataOptions;
use lancedb::{ipc::ipc_file_to_batches, table::TableRef};
use napi::bindgen_prelude::*;
use napi_derive::napi;
use vectordb::{ipc::ipc_file_to_batches, table::TableRef};
use crate::index::IndexBuilder;
use crate::query::Query;
@@ -33,8 +34,12 @@ impl Table {
/// Return Schema as empty Arrow IPC file.
#[napi]
pub fn schema(&self) -> napi::Result<Buffer> {
let mut writer = FileWriter::try_new(vec![], &self.table.schema())
pub async fn schema(&self) -> napi::Result<Buffer> {
let schema =
self.table.schema().await.map_err(|e| {
napi::Error::from_reason(format!("Failed to create IPC file: {}", e))
})?;
let mut writer = FileWriter::try_new(vec![], &schema)
.map_err(|e| napi::Error::from_reason(format!("Failed to create IPC file: {}", e)))?;
writer
.finish()
@@ -48,12 +53,15 @@ impl Table {
pub async fn add(&self, buf: Buffer) -> napi::Result<()> {
let batches = ipc_file_to_batches(buf.to_vec())
.map_err(|e| napi::Error::from_reason(format!("Failed to read IPC file: {}", e)))?;
self.table.add(Box::new(batches), None).await.map_err(|e| {
napi::Error::from_reason(format!(
"Failed to add batches to table {}: {}",
self.table, e
))
})
self.table
.add(Box::new(batches), AddDataOptions::default())
.await
.map_err(|e| {
napi::Error::from_reason(format!(
"Failed to add batches to table {}: {}",
self.table, e
))
})
}
#[napi]

View File

@@ -1,8 +1,8 @@
{
"include": [
"vectordb/*.ts",
"vectordb/**/*.ts",
"vectordb/*.js",
"lancedb/*.ts",
"lancedb/**/*.ts",
"lancedb/*.js",
],
"compilerOptions": {
"target": "es2022",
@@ -18,7 +18,7 @@
],
"typedocOptions": {
"entryPoints": [
"vectordb/index.ts"
"lancedb/index.ts"
],
"out": "../docs/src/javascript/",
"visibilityFilters": {

View File

@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.5.6
current_version = 0.5.7
commit = True
message = [python] Bump version: {current_version} → {new_version}
tag = True

View File

@@ -0,0 +1,172 @@
# Copyright (c) 2023. LanceDB Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from functools import cached_property
from typing import List, Union
import numpy as np
import pyarrow as pa
from ..util import attempt_import_or_raise
from .base import EmbeddingFunction
from .registry import register
from .utils import AUDIO, IMAGES, TEXT
@register("imagebind")
class ImageBindEmbeddings(EmbeddingFunction):
"""
An embedding function that uses the ImageBind API
For generating multi-modal embeddings across
six different modalities: images, text, audio, depth, thermal, and IMU data
to download package, run :
`pip install imagebind@git+https://github.com/raghavdixit99/ImageBind`
"""
name: str = "imagebind_huge"
device: str = "cpu"
normalize: bool = False
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._ndims = 1024
self._audio_extensions = (".mp3", ".wav", ".flac", ".ogg", ".aac")
self._image_extensions = (".jpg", ".jpeg", ".png", ".gif", ".bmp")
@cached_property
def embedding_model(self):
"""
Get the embedding model. This is cached so that the model is only loaded
once per process.
"""
return self.get_embedding_model()
@cached_property
def _data(self):
"""
Get the data module from imagebind
"""
data = attempt_import_or_raise("imagebind.data", "imagebind")
return data
@cached_property
def _ModalityType(self):
"""
Get the ModalityType from imagebind
"""
imagebind = attempt_import_or_raise("imagebind", "imagebind")
return imagebind.imagebind_model.ModalityType
def ndims(self):
return self._ndims
def compute_query_embeddings(
self, query: Union[str], *args, **kwargs
) -> List[np.ndarray]:
"""
Compute the embeddings for a given user query
Parameters
----------
query : Union[str]
The query to embed. A query can be either text, image paths or audio paths.
"""
query = self.sanitize_input(query)
if query[0].endswith(self._audio_extensions):
return [self.generate_audio_embeddings(query)]
elif query[0].endswith(self._image_extensions):
return [self.generate_image_embeddings(query)]
else:
return [self.generate_text_embeddings(query)]
def generate_image_embeddings(self, image: IMAGES) -> np.ndarray:
torch = attempt_import_or_raise("torch")
inputs = {
self._ModalityType.VISION: self._data.load_and_transform_vision_data(
image, self.device
)
}
with torch.no_grad():
image_features = self.embedding_model(inputs)[self._ModalityType.VISION]
if self.normalize:
image_features /= image_features.norm(dim=-1, keepdim=True)
return image_features.cpu().numpy().squeeze()
def generate_audio_embeddings(self, audio: AUDIO) -> np.ndarray:
torch = attempt_import_or_raise("torch")
inputs = {
self._ModalityType.AUDIO: self._data.load_and_transform_audio_data(
audio, self.device
)
}
with torch.no_grad():
audio_features = self.embedding_model(inputs)[self._ModalityType.AUDIO]
if self.normalize:
audio_features /= audio_features.norm(dim=-1, keepdim=True)
return audio_features.cpu().numpy().squeeze()
def generate_text_embeddings(self, text: TEXT) -> np.ndarray:
torch = attempt_import_or_raise("torch")
inputs = {
self._ModalityType.TEXT: self._data.load_and_transform_text(
text, self.device
)
}
with torch.no_grad():
text_features = self.embedding_model(inputs)[self._ModalityType.TEXT]
if self.normalize:
text_features /= text_features.norm(dim=-1, keepdim=True)
return text_features.cpu().numpy().squeeze()
def compute_source_embeddings(
self, source: Union[IMAGES, AUDIO], *args, **kwargs
) -> List[np.array]:
"""
Get the embeddings for the given sourcefield column in the pydantic model.
"""
source = self.sanitize_input(source)
embeddings = []
if source[0].endswith(self._audio_extensions):
embeddings.extend(self.generate_audio_embeddings(source))
return embeddings
elif source[0].endswith(self._image_extensions):
embeddings.extend(self.generate_image_embeddings(source))
return embeddings
else:
embeddings.extend(self.generate_text_embeddings(source))
return embeddings
def sanitize_input(
self, input: Union[IMAGES, AUDIO]
) -> Union[List[bytes], np.ndarray]:
"""
Sanitize the input to the embedding function.
"""
if isinstance(input, (str, bytes)):
input = [input]
elif isinstance(input, pa.Array):
input = input.to_pylist()
elif isinstance(input, pa.ChunkedArray):
input = input.combine_chunks().to_pylist()
return input
def get_embedding_model(self):
"""
fetches the imagebind embedding model
"""
imagebind = attempt_import_or_raise("imagebind", "imagebind")
model = imagebind.imagebind_model.imagebind_huge(pretrained=True)
model.eval()
model.to(self.device)
return model

View File

@@ -36,6 +36,7 @@ TEXT = Union[str, List[str], pa.Array, pa.ChunkedArray, np.ndarray]
IMAGES = Union[
str, bytes, List[str], List[bytes], pa.Array, pa.ChunkedArray, np.ndarray
]
AUDIO = Union[str, bytes, List[str], List[bytes], pa.Array, pa.ChunkedArray, np.ndarray]
@deprecated

View File

@@ -1,9 +1,9 @@
[project]
name = "lancedb"
version = "0.5.6"
version = "0.5.7"
dependencies = [
"deprecation",
"pylance==0.9.16",
"pylance==0.9.18",
"ratelimiter~=1.0",
"retry>=0.9.2",
"tqdm>=4.27.0",
@@ -50,7 +50,7 @@ repository = "https://github.com/lancedb/lancedb"
[project.optional-dependencies]
tests = ["aiohttp", "pandas>=1.4", "pytest", "pytest-mock", "pytest-asyncio", "duckdb", "pytz", "polars>=0.19"]
dev = ["ruff", "pre-commit"]
docs = ["mkdocs", "mkdocs-jupyter", "mkdocs-material", "mkdocstrings[python]"]
docs = ["mkdocs", "mkdocs-jupyter", "mkdocs-material", "mkdocstrings[python]", "mkdocs-ultralytics-plugin==0.0.44"]
clip = ["torch", "pillow", "open-clip"]
embeddings = ["openai>=1.6.1", "sentence-transformers", "torch", "pillow", "open-clip-torch", "cohere", "huggingface_hub",
"InstructorEmbedding", "google.generativeai", "boto3>=1.28.57", "awscli>=1.29.57", "botocore>=1.31.57"]

View File

@@ -28,6 +28,23 @@ from lancedb.pydantic import LanceModel, Vector
# or connection to external api
try:
if importlib.util.find_spec("mlx.core") is not None:
_mlx = True
else:
_mlx = None
except Exception:
_mlx = None
try:
if importlib.util.find_spec("imagebind") is not None:
_imagebind = True
else:
_imagebind = None
except Exception:
_imagebind = None
@pytest.mark.slow
@pytest.mark.parametrize("alias", ["sentence-transformers", "openai"])
def test_basic_text_embeddings(alias, tmp_path):
@@ -158,6 +175,89 @@ def test_openclip(tmp_path):
)
@pytest.mark.skipif(
_imagebind is None,
reason="skip if imagebind not installed.",
)
@pytest.mark.slow
def test_imagebind(tmp_path):
import os
import shutil
import tempfile
import pandas as pd
import requests
import lancedb.embeddings.imagebind
from lancedb.embeddings import get_registry
from lancedb.pydantic import LanceModel, Vector
with tempfile.TemporaryDirectory() as temp_dir:
print(f"Created temporary directory {temp_dir}")
def download_images(image_uris):
downloaded_image_paths = []
for uri in image_uris:
try:
response = requests.get(uri, stream=True)
if response.status_code == 200:
# Extract image name from URI
image_name = os.path.basename(uri)
image_path = os.path.join(temp_dir, image_name)
with open(image_path, "wb") as out_file:
shutil.copyfileobj(response.raw, out_file)
downloaded_image_paths.append(image_path)
except Exception as e: # noqa: PERF203
print(f"Failed to download {uri}. Error: {e}")
return temp_dir, downloaded_image_paths
db = lancedb.connect(tmp_path)
registry = get_registry()
func = registry.get("imagebind").create(max_retries=0)
class Images(LanceModel):
label: str
image_uri: str = func.SourceField()
vector: Vector(func.ndims()) = func.VectorField()
table = db.create_table("images", schema=Images)
labels = ["cat", "cat", "dog", "dog", "horse", "horse"]
uris = [
"http://farm1.staticflickr.com/53/167798175_7c7845bbbd_z.jpg",
"http://farm1.staticflickr.com/134/332220238_da527d8140_z.jpg",
"http://farm9.staticflickr.com/8387/8602747737_2e5c2a45d4_z.jpg",
"http://farm5.staticflickr.com/4092/5017326486_1f46057f5f_z.jpg",
"http://farm9.staticflickr.com/8216/8434969557_d37882c42d_z.jpg",
"http://farm6.staticflickr.com/5142/5835678453_4f3a4edb45_z.jpg",
]
temp_dir, downloaded_images = download_images(uris)
table.add(pd.DataFrame({"label": labels, "image_uri": downloaded_images}))
# text search
actual = (
table.search("man's best friend", vector_column_name="vector")
.limit(1)
.to_pydantic(Images)[0]
)
assert actual.label == "dog"
# image search
query_image_uri = [
"https://live.staticflickr.com/65535/33336453970_491665f66e_h.jpg"
]
temp_dir, downloaded_images = download_images(query_image_uri)
query_image_uri = downloaded_images[0]
actual = (
table.search(query_image_uri, vector_column_name="vector")
.limit(1)
.to_pydantic(Images)[0]
)
assert actual.label == "dog"
if os.path.isdir(temp_dir):
shutil.rmtree(temp_dir)
print(f"Deleted temporary directory {temp_dir}")
@pytest.mark.slow
@pytest.mark.skipif(
os.environ.get("COHERE_API_KEY") is None, reason="COHERE_API_KEY not set"
@@ -217,13 +317,6 @@ def test_gemini_embedding(tmp_path):
assert tbl.search("hello").limit(1).to_pandas()["text"][0] == "hello world"
try:
if importlib.util.find_spec("mlx.core") is not None:
_mlx = True
except ImportError:
_mlx = None
@pytest.mark.skipif(
_mlx is None,
reason="mlx tests only required for apple users.",

View File

@@ -1,6 +1,6 @@
[package]
name = "vectordb-node"
version = "0.4.10"
name = "lancedb-node"
version = "0.4.11"
description = "Serverless, low-latency vector database for AI applications"
license.workspace = true
edition.workspace = true
@@ -24,9 +24,14 @@ half = { workspace = true }
lance = { workspace = true }
lance-index = { workspace = true }
lance-linalg = { workspace = true }
vectordb = { path = "../../vectordb" }
lancedb = { path = "../../lancedb" }
tokio = { version = "1.23", features = ["rt-multi-thread"] }
neon = {version = "0.10.1", default-features = false, features = ["channel-api", "napi-6", "promise-api", "task-api"] }
neon = { version = "0.10.1", default-features = false, features = [
"channel-api",
"napi-6",
"promise-api",
"task-api",
] }
object_store = { workspace = true, features = ["aws"] }
snafu = { workspace = true }
async-trait = "0"

View File

@@ -1,3 +1,3 @@
The LanceDB node bridge (vectordb-node) allows javascript applications to access LanceDB datasets.
The LanceDB node bridge (lancedb-node) allows javascript applications to access LanceDB datasets.
It is build using [Neon](https://neon-bindings.com). See the node project for an example of how it is used / tests

View File

@@ -34,8 +34,8 @@ pub enum Error {
pub type Result<T> = std::result::Result<T, Error>;
impl From<vectordb::error::Error> for Error {
fn from(e: vectordb::error::Error) -> Self {
impl From<lancedb::error::Error> for Error {
fn from(e: lancedb::error::Error) -> Self {
Self::LanceDB {
message: e.to_string(),
}

View File

@@ -19,7 +19,7 @@ use neon::{
};
use crate::{error::ResultExt, runtime, table::JsTable};
use vectordb::Table;
use lancedb::Table;
pub fn table_create_scalar_index(mut cx: FunctionContext) -> JsResult<JsPromise> {
let js_table = cx.this().downcast_or_throw::<JsBox<JsTable>, _>(&mut cx)?;

View File

@@ -13,10 +13,10 @@
// limitations under the License.
use lance_linalg::distance::MetricType;
use lancedb::index::IndexBuilder;
use neon::context::FunctionContext;
use neon::prelude::*;
use std::convert::TryFrom;
use vectordb::index::IndexBuilder;
use crate::error::Error::InvalidIndexType;
use crate::error::ResultExt;

View File

@@ -22,9 +22,9 @@ use object_store::CredentialProvider;
use once_cell::sync::OnceCell;
use tokio::runtime::Runtime;
use vectordb::connection::Database;
use vectordb::table::ReadParams;
use vectordb::{ConnectOptions, Connection};
use lancedb::connect;
use lancedb::connection::Connection;
use lancedb::table::ReadParams;
use crate::error::ResultExt;
use crate::query::JsQuery;
@@ -39,7 +39,7 @@ mod query;
mod table;
struct JsDatabase {
database: Arc<dyn Connection + 'static>,
database: Connection,
}
impl Finalize for JsDatabase {}
@@ -84,28 +84,36 @@ fn database_new(mut cx: FunctionContext) -> JsResult<JsPromise> {
let path = cx.argument::<JsString>(0)?.value(&mut cx);
let aws_creds = get_aws_creds(&mut cx, 1)?;
let region = get_aws_region(&mut cx, 4)?;
let read_consistency_interval = cx
.argument_opt(5)
.and_then(|arg| arg.downcast::<JsNumber, _>(&mut cx).ok())
.map(|v| v.value(&mut cx))
.map(std::time::Duration::from_secs_f64);
let rt = runtime(&mut cx)?;
let channel = cx.channel();
let (deferred, promise) = cx.promise();
let mut conn_options = ConnectOptions::new(&path);
let mut conn_builder = connect(&path);
if let Some(region) = region {
conn_options = conn_options.region(&region);
conn_builder = conn_builder.region(&region);
}
if let Some(aws_creds) = aws_creds {
conn_options = conn_options.aws_creds(AwsCredential {
conn_builder = conn_builder.aws_creds(AwsCredential {
key_id: aws_creds.key_id,
secret_key: aws_creds.secret_key,
token: aws_creds.token,
});
}
if let Some(interval) = read_consistency_interval {
conn_builder = conn_builder.read_consistency_interval(interval);
}
rt.spawn(async move {
let database = Database::connect_with_options(&conn_options).await;
let database = conn_builder.execute().await;
deferred.settle_with(&channel, move |mut cx| {
let db = JsDatabase {
database: Arc::new(database.or_throw(&mut cx)?),
database: database.or_throw(&mut cx)?,
};
Ok(cx.boxed(db))
});
@@ -217,7 +225,11 @@ fn database_open_table(mut cx: FunctionContext) -> JsResult<JsPromise> {
let (deferred, promise) = cx.promise();
rt.spawn(async move {
let table_rst = database.open_table_with_params(&table_name, params).await;
let table_rst = database
.open_table(&table_name)
.lance_read_params(params)
.execute()
.await;
deferred.settle_with(&channel, move |mut cx| {
let js_table = JsTable::from(table_rst.or_throw(&mut cx)?);

View File

@@ -93,7 +93,7 @@ impl JsQuery {
.and_then(|stream| {
stream
.try_collect::<Vec<_>>()
.map_err(vectordb::error::Error::from)
.map_err(lancedb::error::Error::from)
})
.await;

View File

@@ -18,12 +18,12 @@ use arrow_array::{RecordBatch, RecordBatchIterator};
use lance::dataset::optimize::CompactionOptions;
use lance::dataset::{WriteMode, WriteParams};
use lance::io::ObjectStoreParams;
use vectordb::table::OptimizeAction;
use lancedb::table::{AddDataOptions, OptimizeAction, WriteOptions};
use crate::arrow::{arrow_buffer_to_record_batch, record_batch_to_buffer};
use lancedb::TableRef;
use neon::prelude::*;
use neon::types::buffer::TypedArray;
use vectordb::TableRef;
use crate::error::ResultExt;
use crate::{convert, get_aws_credential_provider, get_aws_region, runtime, JsDatabase};
@@ -80,7 +80,11 @@ impl JsTable {
rt.spawn(async move {
let batch_reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
let table_rst = database
.create_table(&table_name, Box::new(batch_reader), Some(params))
.create_table(&table_name, Box::new(batch_reader))
.write_options(WriteOptions {
lance_write_params: Some(params),
})
.execute()
.await;
deferred.settle_with(&channel, move |mut cx| {
@@ -121,7 +125,13 @@ impl JsTable {
rt.spawn(async move {
let batch_reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
let add_result = table.add(Box::new(batch_reader), Some(params)).await;
let opts = AddDataOptions {
write_options: WriteOptions {
lance_write_params: Some(params),
},
..Default::default()
};
let add_result = table.add(Box::new(batch_reader), opts).await;
deferred.settle_with(&channel, move |mut cx| {
add_result.or_throw(&mut cx)?;
@@ -524,8 +534,9 @@ impl JsTable {
.value(&mut cx);
rt.spawn(async move {
let schema = table.schema().await;
deferred.settle_with(&channel, move |mut cx| {
let schema = table.schema();
let schema = schema.or_throw(&mut cx)?;
let batches = vec![RecordBatch::new_empty(schema)];
let buffer = record_batch_to_buffer(batches).or_throw(&mut cx)?;
convert::new_js_buffer(buffer, &mut cx, is_electron)

View File

@@ -1,6 +1,6 @@
[package]
name = "vectordb"
version = "0.4.10"
name = "lancedb"
version = "0.4.11"
edition.workspace = true
description = "LanceDB: A serverless, low-latency vector database for AI applications"
license.workspace = true

View File

@@ -19,8 +19,9 @@ use arrow_array::{FixedSizeListArray, Int32Array, RecordBatch, RecordBatchIterat
use arrow_schema::{DataType, Field, Schema};
use futures::TryStreamExt;
use vectordb::Connection;
use vectordb::{connect, Result, Table, TableRef};
use lancedb::connection::Connection;
use lancedb::table::AddDataOptions;
use lancedb::{connect, Result, Table, TableRef};
#[tokio::main]
async fn main() -> Result<()> {
@@ -29,18 +30,18 @@ async fn main() -> Result<()> {
}
// --8<-- [start:connect]
let uri = "data/sample-lancedb";
let db = connect(uri).await?;
let db = connect(uri).execute().await?;
// --8<-- [end:connect]
// --8<-- [start:list_names]
println!("{:?}", db.table_names().await?);
// --8<-- [end:list_names]
let tbl = create_table(db.clone()).await?;
let tbl = create_table(&db).await?;
create_index(tbl.as_ref()).await?;
let batches = search(tbl.as_ref()).await?;
println!("{:?}", batches);
create_empty_table(db.clone()).await.unwrap();
create_empty_table(&db).await.unwrap();
// --8<-- [start:delete]
tbl.delete("id > 24").await.unwrap();
@@ -55,17 +56,14 @@ async fn main() -> Result<()> {
#[allow(dead_code)]
async fn open_with_existing_tbl() -> Result<()> {
let uri = "data/sample-lancedb";
let db = connect(uri).await?;
let db = connect(uri).execute().await?;
// --8<-- [start:open_with_existing_file]
let _ = db
.open_table_with_params("my_table", Default::default())
.await
.unwrap();
let _ = db.open_table("my_table").execute().await.unwrap();
// --8<-- [end:open_with_existing_file]
Ok(())
}
async fn create_table(db: Arc<dyn Connection>) -> Result<TableRef> {
async fn create_table(db: &Connection) -> Result<TableRef> {
// --8<-- [start:create_table]
const TOTAL: usize = 1000;
const DIM: usize = 128;
@@ -102,7 +100,8 @@ async fn create_table(db: Arc<dyn Connection>) -> Result<TableRef> {
schema.clone(),
);
let tbl = db
.create_table("my_table", Box::new(batches), None)
.create_table("my_table", Box::new(batches))
.execute()
.await
.unwrap();
// --8<-- [end:create_table]
@@ -126,21 +125,21 @@ async fn create_table(db: Arc<dyn Connection>) -> Result<TableRef> {
schema.clone(),
);
// --8<-- [start:add]
tbl.add(Box::new(new_batches), None).await.unwrap();
tbl.add(Box::new(new_batches), AddDataOptions::default())
.await
.unwrap();
// --8<-- [end:add]
Ok(tbl)
}
async fn create_empty_table(db: Arc<dyn Connection>) -> Result<TableRef> {
async fn create_empty_table(db: &Connection) -> Result<TableRef> {
// --8<-- [start:create_empty_table]
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("item", DataType::Utf8, true),
]));
let batches = RecordBatchIterator::new(vec![], schema.clone());
db.create_table("empty_table", Box::new(batches), None)
.await
db.create_empty_table("empty_table", schema).execute().await
// --8<-- [end:create_empty_table]
}

View File

@@ -0,0 +1,740 @@
// Copyright 2023 LanceDB Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! LanceDB Database
use std::fs::create_dir_all;
use std::path::Path;
use std::sync::Arc;
use arrow_array::{RecordBatchIterator, RecordBatchReader};
use arrow_schema::SchemaRef;
use lance::dataset::{ReadParams, WriteMode};
use lance::io::{ObjectStore, ObjectStoreParams, WrappingObjectStore};
use object_store::{
aws::AwsCredential, local::LocalFileSystem, CredentialProvider, StaticCredentialProvider,
};
use snafu::prelude::*;
use crate::error::{CreateDirSnafu, Error, InvalidTableNameSnafu, Result};
use crate::io::object_store::MirroringObjectStoreWrapper;
use crate::table::{NativeTable, TableRef, WriteOptions};
pub const LANCE_FILE_EXTENSION: &str = "lance";
pub type TableBuilderCallback = Box<dyn FnOnce(OpenTableBuilder) -> OpenTableBuilder + Send>;
/// Describes what happens when creating a table and a table with
/// the same name already exists
pub enum CreateTableMode {
/// If the table already exists, an error is returned
Create,
/// If the table already exists, it is opened. Any provided data is
/// ignored. The function will be passed an OpenTableBuilder to customize
/// how the table is opened
ExistOk(TableBuilderCallback),
/// If the table already exists, it is overwritten
Overwrite,
}
impl CreateTableMode {
pub fn exist_ok(
callback: impl FnOnce(OpenTableBuilder) -> OpenTableBuilder + Send + 'static,
) -> Self {
Self::ExistOk(Box::new(callback))
}
}
impl Default for CreateTableMode {
fn default() -> Self {
Self::Create
}
}
/// Describes what happens when a vector either contains NaN or
/// does not have enough values
#[derive(Clone, Debug, Default)]
enum BadVectorHandling {
/// An error is returned
#[default]
Error,
#[allow(dead_code)] // https://github.com/lancedb/lancedb/issues/992
/// The offending row is droppped
Drop,
#[allow(dead_code)] // https://github.com/lancedb/lancedb/issues/992
/// The invalid/missing items are replaced by fill_value
Fill(f32),
}
/// A builder for configuring a [`Connection::create_table`] operation
pub struct CreateTableBuilder<const HAS_DATA: bool> {
parent: Arc<dyn ConnectionInternal>,
name: String,
data: Option<Box<dyn RecordBatchReader + Send>>,
schema: Option<SchemaRef>,
mode: CreateTableMode,
write_options: WriteOptions,
}
// Builder methods that only apply when we have initial data
impl CreateTableBuilder<true> {
fn new(
parent: Arc<dyn ConnectionInternal>,
name: String,
data: Box<dyn RecordBatchReader + Send>,
) -> Self {
Self {
parent,
name,
data: Some(data),
schema: None,
mode: CreateTableMode::default(),
write_options: WriteOptions::default(),
}
}
/// Apply the given write options when writing the initial data
pub fn write_options(mut self, write_options: WriteOptions) -> Self {
self.write_options = write_options;
self
}
/// Execute the create table operation
pub async fn execute(self) -> Result<TableRef> {
self.parent.clone().do_create_table(self).await
}
}
// Builder methods that only apply when we do not have initial data
impl CreateTableBuilder<false> {
fn new(parent: Arc<dyn ConnectionInternal>, name: String, schema: SchemaRef) -> Self {
Self {
parent,
name,
data: None,
schema: Some(schema),
mode: CreateTableMode::default(),
write_options: WriteOptions::default(),
}
}
/// Execute the create table operation
pub async fn execute(self) -> Result<TableRef> {
self.parent.clone().do_create_empty_table(self).await
}
}
impl<const HAS_DATA: bool> CreateTableBuilder<HAS_DATA> {
/// Set the mode for creating the table
///
/// This controls what happens if a table with the given name already exists
pub fn mode(mut self, mode: CreateTableMode) -> Self {
self.mode = mode;
self
}
}
#[derive(Clone, Debug)]
pub struct OpenTableBuilder {
parent: Arc<dyn ConnectionInternal>,
name: String,
index_cache_size: u32,
lance_read_params: Option<ReadParams>,
}
impl OpenTableBuilder {
fn new(parent: Arc<dyn ConnectionInternal>, name: String) -> Self {
Self {
parent,
name,
index_cache_size: 256,
lance_read_params: None,
}
}
/// Set the size of the index cache, specified as a number of entries
///
/// The default value is 256
///
/// The exact meaning of an "entry" will depend on the type of index:
/// * IVF - there is one entry for each IVF partition
/// * BTREE - there is one entry for the entire index
///
/// This cache applies to the entire opened table, across all indices.
/// Setting this value higher will increase performance on larger datasets
/// at the expense of more RAM
pub fn index_cache_size(mut self, index_cache_size: u32) -> Self {
self.index_cache_size = index_cache_size;
self
}
/// Advanced parameters that can be used to customize table reads
///
/// If set, these will take precedence over any overlapping `OpenTableOptions` options
pub fn lance_read_params(mut self, params: ReadParams) -> Self {
self.lance_read_params = Some(params);
self
}
/// Open the table
pub async fn execute(self) -> Result<TableRef> {
self.parent.clone().do_open_table(self).await
}
}
#[async_trait::async_trait]
trait ConnectionInternal: Send + Sync + std::fmt::Debug + 'static {
async fn table_names(&self) -> Result<Vec<String>>;
async fn do_create_table(&self, options: CreateTableBuilder<true>) -> Result<TableRef>;
async fn do_open_table(&self, options: OpenTableBuilder) -> Result<TableRef>;
async fn drop_table(&self, name: &str) -> Result<()>;
async fn drop_db(&self) -> Result<()>;
async fn do_create_empty_table(&self, options: CreateTableBuilder<false>) -> Result<TableRef> {
let batches = RecordBatchIterator::new(vec![], options.schema.unwrap());
let opts = CreateTableBuilder::<true>::new(options.parent, options.name, Box::new(batches))
.mode(options.mode)
.write_options(options.write_options);
self.do_create_table(opts).await
}
}
/// A connection to LanceDB
#[derive(Clone)]
pub struct Connection {
uri: String,
internal: Arc<dyn ConnectionInternal>,
}
impl Connection {
/// Get the URI of the connection
pub fn uri(&self) -> &str {
self.uri.as_str()
}
/// Get the names of all tables in the database.
pub async fn table_names(&self) -> Result<Vec<String>> {
self.internal.table_names().await
}
/// Create a new table from data
///
/// # Parameters
///
/// * `name` - The name of the table
/// * `initial_data` - The initial data to write to the table
pub fn create_table(
&self,
name: impl Into<String>,
initial_data: Box<dyn RecordBatchReader + Send>,
) -> CreateTableBuilder<true> {
CreateTableBuilder::<true>::new(self.internal.clone(), name.into(), initial_data)
}
/// Create an empty table with a given schema
///
/// # Parameters
///
/// * `name` - The name of the table
/// * `schema` - The schema of the table
pub fn create_empty_table(
&self,
name: impl Into<String>,
schema: SchemaRef,
) -> CreateTableBuilder<false> {
CreateTableBuilder::<false>::new(self.internal.clone(), name.into(), schema)
}
/// Open an existing table in the database
///
/// # Arguments
/// * `name` - The name of the table
///
/// # Returns
/// Created [`TableRef`], or [`Error::TableNotFound`] if the table does not exist.
pub fn open_table(&self, name: impl Into<String>) -> OpenTableBuilder {
OpenTableBuilder::new(self.internal.clone(), name.into())
}
/// Drop a table in the database.
///
/// # Arguments
/// * `name` - The name of the table to drop
pub async fn drop_table(&self, name: impl AsRef<str>) -> Result<()> {
self.internal.drop_table(name.as_ref()).await
}
/// Drop the database
///
/// This is the same as dropping all of the tables
pub async fn drop_db(&self) -> Result<()> {
self.internal.drop_db().await
}
}
#[derive(Debug)]
pub struct ConnectBuilder {
/// Database URI
///
/// ### Accpeted URI formats
///
/// - `/path/to/database` - local database on file system.
/// - `s3://bucket/path/to/database` or `gs://bucket/path/to/database` - database on cloud object store
/// - `db://dbname` - LanceDB Cloud
uri: String,
/// LanceDB Cloud API key, required if using Lance Cloud
api_key: Option<String>,
/// LanceDB Cloud region, required if using Lance Cloud
region: Option<String>,
/// LanceDB Cloud host override, only required if using an on-premises Lance Cloud instance
host_override: Option<String>,
/// User provided AWS credentials
aws_creds: Option<AwsCredential>,
/// The interval at which to check for updates from other processes.
read_consistency_interval: Option<std::time::Duration>,
}
impl ConnectBuilder {
/// Create a new [`ConnectOptions`] with the given database URI.
pub fn new(uri: &str) -> Self {
Self {
uri: uri.to_string(),
api_key: None,
region: None,
host_override: None,
aws_creds: None,
read_consistency_interval: None,
}
}
pub fn api_key(mut self, api_key: &str) -> Self {
self.api_key = Some(api_key.to_string());
self
}
pub fn region(mut self, region: &str) -> Self {
self.region = Some(region.to_string());
self
}
pub fn host_override(mut self, host_override: &str) -> Self {
self.host_override = Some(host_override.to_string());
self
}
/// [`AwsCredential`] to use when connecting to S3.
pub fn aws_creds(mut self, aws_creds: AwsCredential) -> Self {
self.aws_creds = Some(aws_creds);
self
}
/// The interval at which to check for updates from other processes. This
/// only affects LanceDB OSS.
///
/// If left unset, consistency is not checked. For maximum read
/// performance, this is the default. For strong consistency, set this to
/// zero seconds. Then every read will check for updates from other processes.
/// As a compromise, set this to a non-zero duration for eventual consistency.
/// If more than that duration has passed since the last read, the read will
/// check for updates from other processes.
///
/// This only affects read operations. Write operations are always
/// consistent.
///
/// LanceDB Cloud uses eventual consistency under the hood, and is not
/// currently configurable.
pub fn read_consistency_interval(
mut self,
read_consistency_interval: std::time::Duration,
) -> Self {
self.read_consistency_interval = Some(read_consistency_interval);
self
}
/// Establishes a connection to the database
pub async fn execute(self) -> Result<Connection> {
let internal = Arc::new(Database::connect_with_options(&self).await?);
Ok(Connection {
internal,
uri: self.uri,
})
}
}
/// Connect to a LanceDB database.
///
/// # Arguments
///
/// * `uri` - URI where the database is located, can be a local directory, supported remote cloud storage,
/// or a LanceDB Cloud database. See [ConnectOptions::uri] for a list of accepted formats
pub fn connect(uri: &str) -> ConnectBuilder {
ConnectBuilder::new(uri)
}
#[derive(Debug)]
struct Database {
object_store: ObjectStore,
query_string: Option<String>,
pub(crate) uri: String,
pub(crate) base_path: object_store::path::Path,
// the object store wrapper to use on write path
pub(crate) store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
read_consistency_interval: Option<std::time::Duration>,
}
const LANCE_EXTENSION: &str = "lance";
const ENGINE: &str = "engine";
const MIRRORED_STORE: &str = "mirroredStore";
/// A connection to LanceDB
impl Database {
async fn connect_with_options(options: &ConnectBuilder) -> Result<Self> {
let uri = &options.uri;
let parse_res = url::Url::parse(uri);
// TODO: pass params regardless of OS
match parse_res {
Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
Self::open_path(uri, options.read_consistency_interval).await
}
Ok(mut url) => {
// iter thru the query params and extract the commit store param
let mut engine = None;
let mut mirrored_store = None;
let mut filtered_querys = vec![];
// WARNING: specifying engine is NOT a publicly supported feature in lancedb yet
// THE API WILL CHANGE
for (key, value) in url.query_pairs() {
if key == ENGINE {
engine = Some(value.to_string());
} else if key == MIRRORED_STORE {
if cfg!(windows) {
return Err(Error::Lance {
message: "mirrored store is not supported on windows".into(),
});
}
mirrored_store = Some(value.to_string());
} else {
// to owned so we can modify the url
filtered_querys.push((key.to_string(), value.to_string()));
}
}
// Filter out the commit store query param -- it's a lancedb param
url.query_pairs_mut().clear();
url.query_pairs_mut().extend_pairs(filtered_querys);
// Take a copy of the query string so we can propagate it to lance
let query_string = url.query().map(|s| s.to_string());
// clear the query string so we can use the url as the base uri
// use .set_query(None) instead of .set_query("") because the latter
// will add a trailing '?' to the url
url.set_query(None);
let table_base_uri = if let Some(store) = engine {
static WARN_ONCE: std::sync::Once = std::sync::Once::new();
WARN_ONCE.call_once(|| {
log::warn!("Specifing engine is not a publicly supported feature in lancedb yet. THE API WILL CHANGE");
});
let old_scheme = url.scheme().to_string();
let new_scheme = format!("{}+{}", old_scheme, store);
url.to_string().replacen(&old_scheme, &new_scheme, 1)
} else {
url.to_string()
};
let plain_uri = url.to_string();
let os_params: ObjectStoreParams = if let Some(aws_creds) = &options.aws_creds {
let credential_provider: Arc<
dyn CredentialProvider<Credential = AwsCredential>,
> = Arc::new(StaticCredentialProvider::new(AwsCredential {
key_id: aws_creds.key_id.clone(),
secret_key: aws_creds.secret_key.clone(),
token: aws_creds.token.clone(),
}));
ObjectStoreParams::with_aws_credentials(
Some(credential_provider),
options.region.clone(),
)
} else {
ObjectStoreParams::default()
};
let (object_store, base_path) =
ObjectStore::from_uri_and_params(&plain_uri, &os_params).await?;
if object_store.is_local() {
Self::try_create_dir(&plain_uri).context(CreateDirSnafu { path: plain_uri })?;
}
let write_store_wrapper = match mirrored_store {
Some(path) => {
let mirrored_store = Arc::new(LocalFileSystem::new_with_prefix(path)?);
let wrapper = MirroringObjectStoreWrapper::new(mirrored_store);
Some(Arc::new(wrapper) as Arc<dyn WrappingObjectStore>)
}
None => None,
};
Ok(Self {
uri: table_base_uri,
query_string,
base_path,
object_store,
store_wrapper: write_store_wrapper,
read_consistency_interval: options.read_consistency_interval,
})
}
Err(_) => Self::open_path(uri, options.read_consistency_interval).await,
}
}
async fn open_path(
path: &str,
read_consistency_interval: Option<std::time::Duration>,
) -> Result<Self> {
let (object_store, base_path) = ObjectStore::from_uri(path).await?;
if object_store.is_local() {
Self::try_create_dir(path).context(CreateDirSnafu { path })?;
}
Ok(Self {
uri: path.to_string(),
query_string: None,
base_path,
object_store,
store_wrapper: None,
read_consistency_interval,
})
}
/// Try to create a local directory to store the lancedb dataset
fn try_create_dir(path: &str) -> core::result::Result<(), std::io::Error> {
let path = Path::new(path);
if !path.try_exists()? {
create_dir_all(path)?;
}
Ok(())
}
/// Get the URI of a table in the database.
fn table_uri(&self, name: &str) -> Result<String> {
let path = Path::new(&self.uri);
let table_uri = path.join(format!("{}.{}", name, LANCE_FILE_EXTENSION));
let mut uri = table_uri
.as_path()
.to_str()
.context(InvalidTableNameSnafu { name })?
.to_string();
// If there are query string set on the connection, propagate to lance
if let Some(query) = self.query_string.as_ref() {
uri.push('?');
uri.push_str(query.as_str());
}
Ok(uri)
}
}
#[async_trait::async_trait]
impl ConnectionInternal for Database {
async fn table_names(&self) -> Result<Vec<String>> {
let mut f = self
.object_store
.read_dir(self.base_path.clone())
.await?
.iter()
.map(Path::new)
.filter(|path| {
let is_lance = path
.extension()
.and_then(|e| e.to_str())
.map(|e| e == LANCE_EXTENSION);
is_lance.unwrap_or(false)
})
.filter_map(|p| p.file_stem().and_then(|s| s.to_str().map(String::from)))
.collect::<Vec<String>>();
f.sort();
Ok(f)
}
async fn do_create_table(&self, options: CreateTableBuilder<true>) -> Result<TableRef> {
let table_uri = self.table_uri(&options.name)?;
let mut write_params = options.write_options.lance_write_params.unwrap_or_default();
if matches!(&options.mode, CreateTableMode::Overwrite) {
write_params.mode = WriteMode::Overwrite;
}
match NativeTable::create(
&table_uri,
&options.name,
options.data.unwrap(),
self.store_wrapper.clone(),
Some(write_params),
self.read_consistency_interval,
)
.await
{
Ok(table) => Ok(Arc::new(table)),
Err(Error::TableAlreadyExists { name }) => match options.mode {
CreateTableMode::Create => Err(Error::TableAlreadyExists { name }),
CreateTableMode::ExistOk(callback) => {
let builder = OpenTableBuilder::new(options.parent, options.name);
let builder = (callback)(builder);
builder.execute().await
}
CreateTableMode::Overwrite => unreachable!(),
},
Err(err) => Err(err),
}
}
async fn do_open_table(&self, options: OpenTableBuilder) -> Result<TableRef> {
let table_uri = self.table_uri(&options.name)?;
Ok(Arc::new(
NativeTable::open_with_params(
&table_uri,
&options.name,
self.store_wrapper.clone(),
options.lance_read_params,
self.read_consistency_interval,
)
.await?,
))
}
async fn drop_table(&self, name: &str) -> Result<()> {
let dir_name = format!("{}.{}", name, LANCE_EXTENSION);
let full_path = self.base_path.child(dir_name.clone());
self.object_store.remove_dir_all(full_path).await?;
Ok(())
}
async fn drop_db(&self) -> Result<()> {
todo!()
}
}
#[cfg(test)]
mod tests {
use std::fs::create_dir_all;
use arrow_schema::{DataType, Field, Schema};
use tempfile::tempdir;
use super::*;
#[tokio::test]
async fn test_connect() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let db = connect(uri).execute().await.unwrap();
assert_eq!(db.uri, uri);
}
#[cfg(not(windows))]
#[tokio::test]
async fn test_connect_relative() {
let tmp_dir = tempdir().unwrap();
let uri = std::fs::canonicalize(tmp_dir.path().to_str().unwrap()).unwrap();
let current_dir = std::env::current_dir().unwrap();
let ancestors = current_dir.ancestors();
let relative_ancestors = vec![".."; ancestors.count()];
let relative_root = std::path::PathBuf::from(relative_ancestors.join("/"));
let relative_uri = relative_root.join(&uri);
let db = connect(relative_uri.to_str().unwrap())
.execute()
.await
.unwrap();
assert_eq!(db.uri, relative_uri.to_str().unwrap().to_string());
}
#[tokio::test]
async fn test_table_names() {
let tmp_dir = tempdir().unwrap();
create_dir_all(tmp_dir.path().join("table1.lance")).unwrap();
create_dir_all(tmp_dir.path().join("table2.lance")).unwrap();
create_dir_all(tmp_dir.path().join("invalidlance")).unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let db = connect(uri).execute().await.unwrap();
let tables = db.table_names().await.unwrap();
assert_eq!(tables.len(), 2);
assert!(tables[0].eq(&String::from("table1")));
assert!(tables[1].eq(&String::from("table2")));
}
#[tokio::test]
async fn test_connect_s3() {
// let db = Database::connect("s3://bucket/path/to/database").await.unwrap();
}
#[tokio::test]
async fn drop_table() {
let tmp_dir = tempdir().unwrap();
create_dir_all(tmp_dir.path().join("table1.lance")).unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let db = connect(uri).execute().await.unwrap();
db.drop_table("table1").await.unwrap();
let tables = db.table_names().await.unwrap();
assert_eq!(tables.len(), 0);
}
#[tokio::test]
async fn test_create_table_already_exists() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let db = connect(uri).execute().await.unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
db.create_empty_table("test", schema.clone())
.execute()
.await
.unwrap();
// TODO: None of the open table options are "inspectable" right now but once one is we
// should assert we are passing these options in correctly
db.create_empty_table("test", schema)
.mode(CreateTableMode::exist_ok(|builder| {
builder.index_cache_size(16)
}))
.execute()
.await
.unwrap();
let other_schema = Arc::new(Schema::new(vec![Field::new("y", DataType::Int32, false)]));
assert!(db
.create_empty_table("test", other_schema.clone())
.execute()
.await
.is_err());
let overwritten = db
.create_empty_table("test", other_schema.clone())
.mode(CreateTableMode::Overwrite)
.execute()
.await
.unwrap();
assert_eq!(other_schema, overwritten.schema().await.unwrap());
}
}

View File

@@ -174,7 +174,6 @@ fn coerce_schema_batch(
}
/// Coerce the reader (input data) to match the given [Schema].
///
pub fn coerce_schema(
reader: impl RecordBatchReader + Send + 'static,
schema: Arc<Schema>,

View File

@@ -168,7 +168,7 @@ impl IndexBuilder {
/// Build the parameters.
pub async fn build(&self) -> Result<()> {
let schema = self.table.schema();
let schema = self.table.schema().await?;
// TODO: simplify this after GH lance#1864.
let mut index_type = &self.index_type;
@@ -230,7 +230,7 @@ impl IndexBuilder {
.table
.as_native()
.expect("Only native table is supported here");
let mut dataset = tbl.clone_inner_dataset();
let mut dataset = tbl.dataset.get_mut().await?;
match params {
IndexParams::Scalar { replace } => {
dataset
@@ -271,7 +271,6 @@ impl IndexBuilder {
.await?;
}
}
tbl.reset_dataset(dataset);
Ok(())
}
}

View File

@@ -342,7 +342,7 @@ mod test {
use object_store::local::LocalFileSystem;
use tempfile;
use crate::connection::{Connection, Database};
use crate::{connect, table::WriteOptions};
#[tokio::test]
async fn test_e2e() {
@@ -354,7 +354,7 @@ mod test {
secondary: Arc::new(secondary_store),
});
let db = Database::connect(dir1.to_str().unwrap()).await.unwrap();
let db = connect(dir1.to_str().unwrap()).execute().await.unwrap();
let mut param = WriteParams::default();
let store_params = ObjectStoreParams {
@@ -368,7 +368,11 @@ mod test {
datagen = datagen.col(Box::new(RandomVector::default().named("vector".into())));
let res = db
.create_table("test", Box::new(datagen.batch(100)), Some(param.clone()))
.create_table("test", Box::new(datagen.batch(100)))
.write_options(WriteOptions {
lance_write_params: Some(param),
})
.execute()
.await;
// leave this here for easy debugging

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! # VectorDB ([LanceDB](https://github.com/lancedb/lancedb)) -- Developer-friendly, serverless vector database for AI applications
//!
//! [LanceDB](https://github.com/lancedb/lancedb) is an open-source database for vector-search built with persistent storage,
//! which greatly simplifies retrevial, filtering and management of embeddings.
//!
@@ -33,7 +31,7 @@
//! LanceDB runs in process, to use it in your Rust project, put the following in your `Cargo.toml`:
//!
//! ```ignore
//! cargo install vectordb
//! cargo install lancedb
//! ```
//!
//! ### Quick Start
@@ -43,10 +41,9 @@
//! #### Connect to a database.
//!
//! ```rust
//! use vectordb::connect;
//! # use arrow_schema::{Field, Schema};
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
//! let db = connect("data/sample-lancedb").await.unwrap();
//! let db = lancedb::connect("data/sample-lancedb").execute().await.unwrap();
//! # });
//! ```
//!
@@ -56,14 +53,20 @@
//! - `s3://bucket/path/to/database` or `gs://bucket/path/to/database` - database on cloud object store
//! - `db://dbname` - Lance Cloud
//!
//! You can also use [`ConnectOptions`] to configure the connectoin to the database.
//! You can also use [`ConnectOptions`] to configure the connection to the database.
//!
//! ```rust
//! use vectordb::{connect_with_options, ConnectOptions};
//! use object_store::aws::AwsCredential;
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
//! let options = ConnectOptions::new("data/sample-lancedb")
//! .index_cache_size(1024);
//! let db = connect_with_options(&options).await.unwrap();
//! let db = lancedb::connect("data/sample-lancedb")
//! .aws_creds(AwsCredential {
//! key_id: "some_key".to_string(),
//! secret_key: "some_secret".to_string(),
//! token: None,
//! })
//! .execute()
//! .await
//! .unwrap();
//! # });
//! ```
//!
@@ -79,31 +82,44 @@
//!
//! ```rust
//! # use std::sync::Arc;
//! use arrow_schema::{DataType, Schema, Field};
//! use arrow_array::{RecordBatch, RecordBatchIterator};
//! use arrow_schema::{DataType, Field, Schema};
//! # use arrow_array::{FixedSizeListArray, Float32Array, Int32Array, types::Float32Type};
//! # use vectordb::connection::{Database, Connection};
//! # use vectordb::connect;
//!
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
//! # let tmpdir = tempfile::tempdir().unwrap();
//! # let db = connect(tmpdir.path().to_str().unwrap()).await.unwrap();
//! # let db = lancedb::connect(tmpdir.path().to_str().unwrap()).execute().await.unwrap();
//! let schema = Arc::new(Schema::new(vec![
//! Field::new("id", DataType::Int32, false),
//! Field::new("vector", DataType::FixedSizeList(
//! Arc::new(Field::new("item", DataType::Float32, true)), 128), true),
//! Field::new("id", DataType::Int32, false),
//! Field::new(
//! "vector",
//! DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 128),
//! true,
//! ),
//! ]));
//! // Create a RecordBatch stream.
//! let batches = RecordBatchIterator::new(vec![
//! RecordBatch::try_new(schema.clone(),
//! let batches = RecordBatchIterator::new(
//! vec![RecordBatch::try_new(
//! schema.clone(),
//! vec![
//! Arc::new(Int32Array::from_iter_values(0..1000)),
//! Arc::new(FixedSizeListArray::from_iter_primitive::<Float32Type, _, _>(
//! (0..1000).map(|_| Some(vec![Some(1.0); 128])), 128)),
//! ]).unwrap()
//! ].into_iter().map(Ok),
//! schema.clone());
//! db.create_table("my_table", Box::new(batches), None).await.unwrap();
//! Arc::new(Int32Array::from_iter_values(0..256)),
//! Arc::new(
//! FixedSizeListArray::from_iter_primitive::<Float32Type, _, _>(
//! (0..256).map(|_| Some(vec![Some(1.0); 128])),
//! 128,
//! ),
//! ),
//! ],
//! )
//! .unwrap()]
//! .into_iter()
//! .map(Ok),
//! schema.clone(),
//! );
//! db.create_table("my_table", Box::new(batches))
//! .execute()
//! .await
//! .unwrap();
//! # });
//! ```
//!
@@ -111,14 +127,13 @@
//!
//! ```no_run
//! # use std::sync::Arc;
//! # use vectordb::connect;
//! # use arrow_array::{FixedSizeListArray, types::Float32Type, RecordBatch,
//! # RecordBatchIterator, Int32Array};
//! # use arrow_schema::{Schema, Field, DataType};
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
//! # let tmpdir = tempfile::tempdir().unwrap();
//! # let db = connect(tmpdir.path().to_str().unwrap()).await.unwrap();
//! # let tbl = db.open_table("idx_test").await.unwrap();
//! # let db = lancedb::connect(tmpdir.path().to_str().unwrap()).execute().await.unwrap();
//! # let tbl = db.open_table("idx_test").execute().await.unwrap();
//! tbl.create_index(&["vector"])
//! .ivf_pq()
//! .num_partitions(256)
@@ -136,10 +151,9 @@
//! # use arrow_schema::{DataType, Schema, Field};
//! # use arrow_array::{RecordBatch, RecordBatchIterator};
//! # use arrow_array::{FixedSizeListArray, Float32Array, Int32Array, types::Float32Type};
//! # use vectordb::connection::{Database, Connection};
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
//! # let tmpdir = tempfile::tempdir().unwrap();
//! # let db = Database::connect(tmpdir.path().to_str().unwrap()).await.unwrap();
//! # let db = lancedb::connect(tmpdir.path().to_str().unwrap()).execute().await.unwrap();
//! # let schema = Arc::new(Schema::new(vec![
//! # Field::new("id", DataType::Int32, false),
//! # Field::new("vector", DataType::FixedSizeList(
@@ -154,8 +168,8 @@
//! # ]).unwrap()
//! # ].into_iter().map(Ok),
//! # schema.clone());
//! # db.create_table("my_table", Box::new(batches), None).await.unwrap();
//! # let table = db.open_table("my_table").await.unwrap();
//! # db.create_table("my_table", Box::new(batches)).execute().await.unwrap();
//! # let table = db.open_table("my_table").execute().await.unwrap();
//! let results = table
//! .search(&[1.0; 128])
//! .execute_stream()
@@ -165,8 +179,6 @@
//! .await
//! .unwrap();
//! # });
//!
//!
//! ```
pub mod connection;
@@ -179,10 +191,8 @@ pub mod query;
pub mod table;
pub mod utils;
pub use connection::{Connection, Database};
pub use error::{Error, Result};
pub use table::{Table, TableRef};
/// Connect to a database
pub use connection::{connect, connect_with_options, ConnectOptions};
pub use lance::dataset::WriteMode;
pub use connection::connect;

View File

@@ -12,15 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use arrow_array::Float32Array;
use arrow_schema::Schema;
use lance::dataset::scanner::{DatasetRecordBatchStream, Scanner};
use lance::dataset::Dataset;
use lance_linalg::distance::MetricType;
use crate::error::Result;
use crate::table::dataset::DatasetConsistencyWrapper;
use crate::utils::default_vector_column;
use crate::Error;
@@ -29,7 +27,7 @@ const DEFAULT_TOP_K: usize = 10;
/// A builder for nearest neighbor queries for LanceDB.
#[derive(Clone)]
pub struct Query {
dataset: Arc<Dataset>,
dataset: DatasetConsistencyWrapper,
// The column to run the query on. If not specified, we will attempt to guess
// the column based on the dataset's schema.
@@ -61,7 +59,7 @@ impl Query {
///
/// * `dataset` - Lance dataset.
///
pub(crate) fn new(dataset: Arc<Dataset>) -> Self {
pub(crate) fn new(dataset: DatasetConsistencyWrapper) -> Self {
Self {
dataset,
query_vector: None,
@@ -83,7 +81,8 @@ impl Query {
///
/// * A [DatasetRecordBatchStream] with the query's results.
pub async fn execute_stream(&self) -> Result<DatasetRecordBatchStream> {
let mut scanner: Scanner = self.dataset.scan();
let ds_ref = self.dataset.get().await?;
let mut scanner: Scanner = ds_ref.scan();
if let Some(query) = self.query_vector.as_ref() {
// If there is a vector query, default to limit=10 if unspecified
@@ -91,10 +90,10 @@ impl Query {
col.clone()
} else {
// Infer a vector column with the same dimension of the query vector.
let arrow_schema = Schema::from(self.dataset.schema());
let arrow_schema = Schema::from(ds_ref.schema());
default_vector_column(&arrow_schema, Some(query.len() as i32))?
};
let field = self.dataset.schema().field(&column).ok_or(Error::Store {
let field = ds_ref.schema().field(&column).ok_or(Error::Store {
message: format!("Column {} not found in dataset schema", column),
})?;
if !matches!(field.data_type(), arrow_schema::DataType::FixedSizeList(f, dim) if f.data_type().is_floating() && dim == query.len() as i32)
@@ -240,8 +239,10 @@ mod tests {
let batches = make_test_batches();
let ds = Dataset::write(batches, "memory://foo", None).await.unwrap();
let ds = DatasetConsistencyWrapper::new_latest(ds, None);
let vector = Some(Float32Array::from_iter_values([0.1, 0.2]));
let query = Query::new(Arc::new(ds)).nearest_to(&[0.1, 0.2]);
let query = Query::new(ds).nearest_to(&[0.1, 0.2]);
assert_eq!(query.query_vector, vector);
let new_vector = Float32Array::from_iter_values([9.8, 8.7]);
@@ -265,7 +266,9 @@ mod tests {
#[tokio::test]
async fn test_execute() {
let batches = make_non_empty_batches();
let ds = Arc::new(Dataset::write(batches, "memory://foo", None).await.unwrap());
let ds = Dataset::write(batches, "memory://foo", None).await.unwrap();
let ds = DatasetConsistencyWrapper::new_latest(ds, None);
let query = Query::new(ds.clone()).nearest_to(&[0.1; 4]);
let result = query.limit(10).filter("id % 2 == 0").execute_stream().await;
@@ -295,9 +298,11 @@ mod tests {
async fn test_execute_no_vector() {
// test that it's ok to not specify a query vector (just filter / limit)
let batches = make_non_empty_batches();
let ds = Arc::new(Dataset::write(batches, "memory://foo", None).await.unwrap());
let ds = Dataset::write(batches, "memory://foo", None).await.unwrap();
let query = Query::new(ds.clone());
let ds = DatasetConsistencyWrapper::new_latest(ds, None);
let query = Query::new(ds);
let result = query.filter("id % 2 == 0").execute_stream().await;
let mut stream = result.expect("should have result");
// should only have one batch

View File

@@ -15,9 +15,9 @@
//! LanceDB Table APIs
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use arrow_array::RecordBatchReader;
use arrow_array::{RecordBatchIterator, RecordBatchReader};
use arrow_schema::{Schema, SchemaRef};
use async_trait::async_trait;
use chrono::Duration;
@@ -27,7 +27,7 @@ use lance::dataset::optimize::{
compact_files, CompactionMetrics, CompactionOptions, IndexRemapperOptions,
};
pub use lance::dataset::ReadParams;
use lance::dataset::{Dataset, UpdateBuilder, WhenMatched, WriteParams};
use lance::dataset::{Dataset, UpdateBuilder, WhenMatched, WriteMode, WriteParams};
use lance::dataset::{MergeInsertBuilder as LanceMergeInsertBuilder, WhenNotMatchedBySource};
use lance::io::WrappingObjectStore;
use lance_index::{optimize::OptimizeOptions, DatasetIndexExt};
@@ -38,10 +38,11 @@ use crate::index::vector::{VectorIndex, VectorIndexStatistics};
use crate::index::IndexBuilder;
use crate::query::Query;
use crate::utils::{PatchReadParam, PatchWriteParam};
use crate::WriteMode;
use self::dataset::DatasetConsistencyWrapper;
use self::merge::{MergeInsert, MergeInsertBuilder};
pub(crate) mod dataset;
pub mod merge;
/// Optimize the dataset.
@@ -85,6 +86,35 @@ pub struct OptimizeStats {
pub prune: Option<RemovalStats>,
}
/// Options to use when writing data
#[derive(Clone, Debug, Default)]
pub struct WriteOptions {
// Coming soon: https://github.com/lancedb/lancedb/issues/992
// /// What behavior to take if the data contains invalid vectors
// pub on_bad_vectors: BadVectorHandling,
/// Advanced parameters that can be used to customize table creation
///
/// If set, these will take precedence over any overlapping `OpenTableOptions` options
pub lance_write_params: Option<WriteParams>,
}
#[derive(Debug, Clone, Default)]
pub enum AddDataMode {
/// Rows will be appended to the table (the default)
#[default]
Append,
/// The existing table will be overwritten with the new data
Overwrite,
}
#[derive(Debug, Default, Clone)]
pub struct AddDataOptions {
/// Whether to add new rows (the default) or replace the existing data
pub mode: AddDataMode,
/// Options to use when writing the data
pub write_options: WriteOptions,
}
/// A Table is a collection of strong typed Rows.
///
/// The type of the each row is defined in Apache Arrow [Schema].
@@ -99,7 +129,7 @@ pub trait Table: std::fmt::Display + Send + Sync {
fn name(&self) -> &str;
/// Get the arrow [Schema] of the table.
fn schema(&self) -> SchemaRef;
async fn schema(&self) -> Result<SchemaRef>;
/// Count the number of rows in this dataset.
///
@@ -112,12 +142,12 @@ pub trait Table: std::fmt::Display + Send + Sync {
///
/// # Arguments
///
/// * `batches` RecordBatch to be saved in the Table
/// * `params` Append / Overwrite existing records. Default: Append
/// * `batches` data to be added to the Table
/// * `options` options to control how data is added
async fn add(
&self,
batches: Box<dyn RecordBatchReader + Send>,
params: Option<WriteParams>,
options: AddDataOptions,
) -> Result<()>;
/// Delete the rows from table that match the predicate.
@@ -129,28 +159,43 @@ pub trait Table: std::fmt::Display + Send + Sync {
///
/// ```no_run
/// # use std::sync::Arc;
/// # use vectordb::connection::{Database, Connection};
/// # use arrow_array::{FixedSizeListArray, types::Float32Type, RecordBatch,
/// # RecordBatchIterator, Int32Array};
/// # use arrow_schema::{Schema, Field, DataType};
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// let tmpdir = tempfile::tempdir().unwrap();
/// let db = Database::connect(tmpdir.path().to_str().unwrap()).await.unwrap();
/// let db = lancedb::connect(tmpdir.path().to_str().unwrap())
/// .execute()
/// .await
/// .unwrap();
/// # let schema = Arc::new(Schema::new(vec![
/// # Field::new("id", DataType::Int32, false),
/// # Field::new("vector", DataType::FixedSizeList(
/// # Arc::new(Field::new("item", DataType::Float32, true)), 128), true),
/// # ]));
/// let batches = RecordBatchIterator::new(vec![
/// RecordBatch::try_new(schema.clone(),
/// vec![
/// Arc::new(Int32Array::from_iter_values(0..10)),
/// Arc::new(FixedSizeListArray::from_iter_primitive::<Float32Type, _, _>(
/// (0..10).map(|_| Some(vec![Some(1.0); 128])), 128)),
/// ]).unwrap()
/// ].into_iter().map(Ok),
/// schema.clone());
/// let tbl = db.create_table("delete_test", Box::new(batches), None).await.unwrap();
/// let batches = RecordBatchIterator::new(
/// vec![RecordBatch::try_new(
/// schema.clone(),
/// vec![
/// Arc::new(Int32Array::from_iter_values(0..10)),
/// Arc::new(
/// FixedSizeListArray::from_iter_primitive::<Float32Type, _, _>(
/// (0..10).map(|_| Some(vec![Some(1.0); 128])),
/// 128,
/// ),
/// ),
/// ],
/// )
/// .unwrap()]
/// .into_iter()
/// .map(Ok),
/// schema.clone(),
/// );
/// let tbl = db
/// .create_table("delete_test", Box::new(batches))
/// .execute()
/// .await
/// .unwrap();
/// tbl.delete("id > 5").await.unwrap();
/// # });
/// ```
@@ -162,14 +207,16 @@ pub trait Table: std::fmt::Display + Send + Sync {
///
/// ```no_run
/// # use std::sync::Arc;
/// # use vectordb::connection::{Database, Connection};
/// # use arrow_array::{FixedSizeListArray, types::Float32Type, RecordBatch,
/// # RecordBatchIterator, Int32Array};
/// # use arrow_schema::{Schema, Field, DataType};
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// let tmpdir = tempfile::tempdir().unwrap();
/// let db = Database::connect(tmpdir.path().to_str().unwrap()).await.unwrap();
/// # let tbl = db.open_table("idx_test").await.unwrap();
/// let db = lancedb::connect(tmpdir.path().to_str().unwrap())
/// .execute()
/// .await
/// .unwrap();
/// # let tbl = db.open_table("idx_test").execute().await.unwrap();
/// tbl.create_index(&["vector"])
/// .ivf_pq()
/// .num_partitions(256)
@@ -214,32 +261,44 @@ pub trait Table: std::fmt::Display + Send + Sync {
///
/// ```no_run
/// # use std::sync::Arc;
/// # use vectordb::connection::{Database, Connection};
/// # use arrow_array::{FixedSizeListArray, types::Float32Type, RecordBatch,
/// # RecordBatchIterator, Int32Array};
/// # use arrow_schema::{Schema, Field, DataType};
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// let tmpdir = tempfile::tempdir().unwrap();
/// let db = Database::connect(tmpdir.path().to_str().unwrap()).await.unwrap();
/// # let tbl = db.open_table("idx_test").await.unwrap();
/// let db = lancedb::connect(tmpdir.path().to_str().unwrap())
/// .execute()
/// .await
/// .unwrap();
/// # let tbl = db.open_table("idx_test").execute().await.unwrap();
/// # let schema = Arc::new(Schema::new(vec![
/// # Field::new("id", DataType::Int32, false),
/// # Field::new("vector", DataType::FixedSizeList(
/// # Arc::new(Field::new("item", DataType::Float32, true)), 128), true),
/// # ]));
/// let new_data = RecordBatchIterator::new(vec![
/// RecordBatch::try_new(schema.clone(),
/// vec![
/// Arc::new(Int32Array::from_iter_values(0..10)),
/// Arc::new(FixedSizeListArray::from_iter_primitive::<Float32Type, _, _>(
/// (0..10).map(|_| Some(vec![Some(1.0); 128])), 128)),
/// ]).unwrap()
/// ].into_iter().map(Ok),
/// schema.clone());
/// let new_data = RecordBatchIterator::new(
/// vec![RecordBatch::try_new(
/// schema.clone(),
/// vec![
/// Arc::new(Int32Array::from_iter_values(0..10)),
/// Arc::new(
/// FixedSizeListArray::from_iter_primitive::<Float32Type, _, _>(
/// (0..10).map(|_| Some(vec![Some(1.0); 128])),
/// 128,
/// ),
/// ),
/// ],
/// )
/// .unwrap()]
/// .into_iter()
/// .map(Ok),
/// schema.clone(),
/// );
/// // Perform an upsert operation
/// let mut merge_insert = tbl.merge_insert(&["id"]);
/// merge_insert.when_matched_update_all(None)
/// .when_not_matched_insert_all();
/// merge_insert
/// .when_matched_update_all(None)
/// .when_not_matched_insert_all();
/// merge_insert.execute(Box::new(new_data)).await.unwrap();
/// # });
/// ```
@@ -265,8 +324,11 @@ pub trait Table: std::fmt::Display + Send + Sync {
/// # use arrow_array::RecordBatch;
/// # use futures::TryStreamExt;
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// # let tbl = vectordb::table::NativeTable::open("/tmp/tbl").await.unwrap();
/// let stream = tbl.query().nearest_to(&[1.0, 2.0, 3.0])
/// # let tbl = lancedb::table::NativeTable::open("/tmp/tbl").await.unwrap();
/// use crate::lancedb::Table;
/// let stream = tbl
/// .query()
/// .nearest_to(&[1.0, 2.0, 3.0])
/// .refine_factor(5)
/// .nprobes(10)
/// .execute_stream()
@@ -281,7 +343,8 @@ pub trait Table: std::fmt::Display + Send + Sync {
/// # use arrow_array::RecordBatch;
/// # use futures::TryStreamExt;
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// # let tbl = vectordb::table::NativeTable::open("/tmp/tbl").await.unwrap();
/// # let tbl = lancedb::table::NativeTable::open("/tmp/tbl").await.unwrap();
/// use crate::lancedb::Table;
/// let stream = tbl
/// .query()
/// .filter("id > 5")
@@ -298,12 +361,9 @@ pub trait Table: std::fmt::Display + Send + Sync {
/// # use arrow_array::RecordBatch;
/// # use futures::TryStreamExt;
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// # let tbl = vectordb::table::NativeTable::open("/tmp/tbl").await.unwrap();
/// let stream = tbl
/// .query()
/// .execute_stream()
/// .await
/// .unwrap();
/// # let tbl = lancedb::table::NativeTable::open("/tmp/tbl").await.unwrap();
/// use crate::lancedb::Table;
/// let stream = tbl.query().execute_stream().await.unwrap();
/// let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
/// # });
/// ```
@@ -313,7 +373,7 @@ pub trait Table: std::fmt::Display + Send + Sync {
///
/// <section class="warning">Experimental API</section>
///
/// Modeled after ``VACCUM`` in PostgreSQL.
/// Modeled after ``VACUUM`` in PostgreSQL.
/// Not all implementations support explicit optimization.
async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats>;
}
@@ -326,10 +386,14 @@ pub type TableRef = Arc<dyn Table>;
pub struct NativeTable {
name: String,
uri: String,
dataset: Arc<Mutex<Dataset>>,
pub(crate) dataset: dataset::DatasetConsistencyWrapper,
// the object store wrapper to use on write path
store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
// This comes from the connection options. We store here so we can pass down
// to the dataset when we recreate it (for example, in checkout_latest).
read_consistency_interval: Option<std::time::Duration>,
}
impl std::fmt::Display for NativeTable {
@@ -351,7 +415,7 @@ impl NativeTable {
/// * A [NativeTable] object.
pub async fn open(uri: &str) -> Result<Self> {
let name = Self::get_table_name(uri)?;
Self::open_with_params(uri, &name, None, ReadParams::default()).await
Self::open_with_params(uri, &name, None, None, None).await
}
/// Opens an existing Table
@@ -369,8 +433,10 @@ impl NativeTable {
uri: &str,
name: &str,
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
params: ReadParams,
params: Option<ReadParams>,
read_consistency_interval: Option<std::time::Duration>,
) -> Result<Self> {
let params = params.unwrap_or_default();
// patch the params if we have a write store wrapper
let params = match write_store_wrapper.clone() {
Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
@@ -389,24 +455,22 @@ impl NativeTable {
message: e.to_string(),
},
})?;
let dataset = DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval);
Ok(Self {
name: name.to_string(),
uri: uri.to_string(),
dataset: Arc::new(Mutex::new(dataset)),
dataset,
store_wrapper: write_store_wrapper,
read_consistency_interval,
})
}
/// Make a new clone of the internal lance dataset.
pub(crate) fn clone_inner_dataset(&self) -> Dataset {
self.dataset.lock().expect("Lock poison").clone()
}
/// Checkout a specific version of this [NativeTable]
///
pub async fn checkout(uri: &str, version: u64) -> Result<Self> {
let name = Self::get_table_name(uri)?;
Self::checkout_with_params(uri, &name, version, None, ReadParams::default()).await
Self::checkout_with_params(uri, &name, version, None, ReadParams::default(), None).await
}
pub async fn checkout_with_params(
@@ -415,44 +479,35 @@ impl NativeTable {
version: u64,
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
params: ReadParams,
read_consistency_interval: Option<std::time::Duration>,
) -> Result<Self> {
// patch the params if we have a write store wrapper
let params = match write_store_wrapper.clone() {
Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
None => params,
};
let dataset = Dataset::checkout_with_params(uri, version, &params)
.await
.map_err(|e| match e {
lance::Error::DatasetNotFound { .. } => Error::TableNotFound {
name: name.to_string(),
},
e => Error::Lance {
message: e.to_string(),
},
})?;
let dataset = DatasetBuilder::from_uri(uri)
.with_version(version)
.with_read_params(params)
.load()
.await?;
let dataset = DatasetConsistencyWrapper::new_time_travel(dataset, version);
Ok(Self {
name: name.to_string(),
uri: uri.to_string(),
dataset: Arc::new(Mutex::new(dataset)),
dataset,
store_wrapper: write_store_wrapper,
read_consistency_interval,
})
}
pub async fn checkout_latest(&self) -> Result<Self> {
let dataset = self.clone_inner_dataset();
let latest_version_id = dataset.latest_version_id().await?;
let dataset = if latest_version_id == dataset.version().version {
dataset
} else {
dataset.checkout_version(latest_version_id).await?
};
let mut dataset = self.dataset.duplicate().await;
dataset.as_latest(self.read_consistency_interval).await?;
Ok(Self {
name: self.name.clone(),
uri: self.uri.clone(),
dataset: Arc::new(Mutex::new(dataset)),
store_wrapper: self.store_wrapper.clone(),
dataset,
..self.clone()
})
}
@@ -488,14 +543,16 @@ impl NativeTable {
batches: impl RecordBatchReader + Send + 'static,
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
params: Option<WriteParams>,
read_consistency_interval: Option<std::time::Duration>,
) -> Result<Self> {
let params = params.unwrap_or_default();
// patch the params if we have a write store wrapper
let params = match write_store_wrapper.clone() {
Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
None => params,
};
let dataset = Dataset::write(batches, uri, params)
let dataset = Dataset::write(batches, uri, Some(params))
.await
.map_err(|e| match e {
lance::Error::DatasetAlreadyExists { .. } => Error::TableAlreadyExists {
@@ -508,34 +565,47 @@ impl NativeTable {
Ok(Self {
name: name.to_string(),
uri: uri.to_string(),
dataset: Arc::new(Mutex::new(dataset)),
dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
store_wrapper: write_store_wrapper,
read_consistency_interval,
})
}
pub async fn create_empty(
uri: &str,
name: &str,
schema: SchemaRef,
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
params: Option<WriteParams>,
read_consistency_interval: Option<std::time::Duration>,
) -> Result<Self> {
let batches = RecordBatchIterator::new(vec![], schema);
Self::create(
uri,
name,
batches,
write_store_wrapper,
params,
read_consistency_interval,
)
.await
}
/// Version of this Table
pub fn version(&self) -> u64 {
self.dataset.lock().expect("lock poison").version().version
pub async fn version(&self) -> Result<u64> {
Ok(self.dataset.get().await?.version().version)
}
async fn optimize_indices(&self, options: &OptimizeOptions) -> Result<()> {
info!("LanceDB: optimizing indices: {:?}", options);
let mut dataset = self.clone_inner_dataset();
dataset.optimize_indices(options).await?;
self.dataset
.get_mut()
.await?
.optimize_indices(options)
.await?;
Ok(())
}
pub fn query(&self) -> Query {
Query::new(self.clone_inner_dataset().into())
}
pub fn filter(&self, expr: String) -> Query {
Query::new(self.clone_inner_dataset().into()).filter(expr)
}
/// Returns the number of rows in this Table
/// Merge new data into this table.
pub async fn merge(
&mut self,
@@ -543,14 +613,17 @@ impl NativeTable {
left_on: &str,
right_on: &str,
) -> Result<()> {
let mut dataset = self.clone_inner_dataset();
dataset.merge(batches, left_on, right_on).await?;
self.dataset = Arc::new(Mutex::new(dataset));
self.dataset
.get_mut()
.await?
.merge(batches, left_on, right_on)
.await?;
Ok(())
}
pub async fn update(&self, predicate: Option<&str>, updates: Vec<(&str, &str)>) -> Result<()> {
let mut builder = UpdateBuilder::new(self.clone_inner_dataset().into());
let dataset = self.dataset.get().await?.clone();
let mut builder = UpdateBuilder::new(Arc::new(dataset));
if let Some(predicate) = predicate {
builder = builder.update_where(predicate)?;
}
@@ -561,7 +634,7 @@ impl NativeTable {
let operation = builder.build()?;
let ds = operation.execute().await?;
self.reset_dataset(ds.as_ref().clone());
self.dataset.set_latest(ds.as_ref().clone()).await;
Ok(())
}
@@ -581,8 +654,10 @@ impl NativeTable {
older_than: Duration,
delete_unverified: Option<bool>,
) -> Result<RemovalStats> {
let dataset = self.clone_inner_dataset();
Ok(dataset
Ok(self
.dataset
.get_mut()
.await?
.cleanup_old_versions(older_than, delete_unverified)
.await?)
}
@@ -598,24 +673,27 @@ impl NativeTable {
options: CompactionOptions,
remap_options: Option<Arc<dyn IndexRemapperOptions>>,
) -> Result<CompactionMetrics> {
let mut dataset = self.clone_inner_dataset();
let metrics = compact_files(&mut dataset, options, remap_options).await?;
self.reset_dataset(dataset);
let mut dataset_mut = self.dataset.get_mut().await?;
let metrics = compact_files(&mut dataset_mut, options, remap_options).await?;
Ok(metrics)
}
pub fn count_fragments(&self) -> usize {
self.dataset.lock().expect("lock poison").count_fragments()
// TODO: why are these individual methods and not some single "get_stats" method?
pub async fn count_fragments(&self) -> Result<usize> {
Ok(self.dataset.get().await?.count_fragments())
}
pub async fn count_deleted_rows(&self) -> Result<usize> {
let dataset = self.clone_inner_dataset();
Ok(dataset.count_deleted_rows().await?)
Ok(self.dataset.get().await?.count_deleted_rows().await?)
}
pub async fn num_small_files(&self, max_rows_per_group: usize) -> usize {
let dataset = self.clone_inner_dataset();
dataset.num_small_files(max_rows_per_group).await
pub async fn num_small_files(&self, max_rows_per_group: usize) -> Result<usize> {
Ok(self
.dataset
.get()
.await?
.num_small_files(max_rows_per_group)
.await)
}
pub async fn count_indexed_rows(&self, index_uuid: &str) -> Result<Option<usize>> {
@@ -633,7 +711,7 @@ impl NativeTable {
}
pub async fn load_indices(&self) -> Result<Vec<VectorIndex>> {
let dataset = self.clone_inner_dataset();
let dataset = self.dataset.get().await?;
let (indices, mf) = futures::try_join!(dataset.load_indices(), dataset.latest_manifest())?;
Ok(indices
.iter()
@@ -650,7 +728,7 @@ impl NativeTable {
if index.is_none() {
return Ok(None);
}
let dataset = self.clone_inner_dataset();
let dataset = self.dataset.get().await?;
let index_stats = dataset.index_statistics(&index.unwrap().index_name).await?;
let index_stats: VectorIndexStatistics =
serde_json::from_str(&index_stats).map_err(|e| Error::Lance {
@@ -662,10 +740,6 @@ impl NativeTable {
Ok(Some(index_stats))
}
pub(crate) fn reset_dataset(&self, dataset: Dataset) {
*self.dataset.lock().expect("lock poison") = dataset;
}
}
#[async_trait]
@@ -675,7 +749,7 @@ impl MergeInsert for NativeTable {
params: MergeInsertBuilder,
new_data: Box<dyn RecordBatchReader + Send>,
) -> Result<()> {
let dataset = Arc::new(self.clone_inner_dataset());
let dataset = Arc::new(self.dataset.get().await?.clone());
let mut builder = LanceMergeInsertBuilder::try_new(dataset.clone(), params.on)?;
match (
params.when_matched_update_all,
@@ -702,7 +776,7 @@ impl MergeInsert for NativeTable {
}
let job = builder.try_build()?;
let new_dataset = job.execute_reader(new_data).await?;
self.reset_dataset((*new_dataset).clone());
self.dataset.set_latest(new_dataset.as_ref().clone()).await;
Ok(())
}
}
@@ -721,13 +795,13 @@ impl Table for NativeTable {
self.name.as_str()
}
fn schema(&self) -> SchemaRef {
let lance_schema = { self.dataset.lock().expect("lock poison").schema().clone() };
Arc::new(Schema::from(&lance_schema))
async fn schema(&self) -> Result<SchemaRef> {
let lance_schema = self.dataset.get().await?.schema().clone();
Ok(Arc::new(Schema::from(&lance_schema)))
}
async fn count_rows(&self, filter: Option<String>) -> Result<usize> {
let dataset = { self.dataset.lock().expect("lock poison").clone() };
let dataset = self.dataset.get().await?;
if let Some(filter) = filter {
let mut scanner = dataset.scan();
scanner.filter(&filter)?;
@@ -740,20 +814,27 @@ impl Table for NativeTable {
async fn add(
&self,
batches: Box<dyn RecordBatchReader + Send>,
params: Option<WriteParams>,
params: AddDataOptions,
) -> Result<()> {
let params = Some(params.unwrap_or(WriteParams {
mode: WriteMode::Append,
..WriteParams::default()
}));
let lance_params = params
.write_options
.lance_write_params
.unwrap_or(WriteParams {
mode: match params.mode {
AddDataMode::Append => WriteMode::Append,
AddDataMode::Overwrite => WriteMode::Overwrite,
},
..Default::default()
});
// patch the params if we have a write store wrapper
let params = match self.store_wrapper.clone() {
Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
None => params,
let lance_params = match self.store_wrapper.clone() {
Some(wrapper) => lance_params.patch_with_store_wrapper(wrapper)?,
None => lance_params,
};
self.reset_dataset(Dataset::write(batches, &self.uri, params).await?);
let dataset = Dataset::write(batches, &self.uri, Some(lance_params)).await?;
self.dataset.set_latest(dataset).await;
Ok(())
}
@@ -767,14 +848,12 @@ impl Table for NativeTable {
}
fn query(&self) -> Query {
Query::new(Arc::new(self.dataset.lock().expect("lock poison").clone()))
Query::new(self.dataset.clone())
}
/// Delete rows from the table
async fn delete(&self, predicate: &str) -> Result<()> {
let mut dataset = self.clone_inner_dataset();
dataset.delete(predicate).await?;
self.reset_dataset(dataset);
self.dataset.get_mut().await?.delete(predicate).await?;
Ok(())
}
@@ -830,6 +909,7 @@ mod tests {
use std::iter;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use arrow_array::{
Array, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
@@ -845,6 +925,8 @@ mod tests {
use rand::Rng;
use tempfile::tempdir;
use crate::connection::ConnectBuilder;
use super::*;
#[tokio::test]
@@ -881,32 +963,13 @@ mod tests {
assert_eq!(c.to_str().unwrap(), "s3://bucket/path/to/file/subfile");
}
#[tokio::test]
async fn test_create_already_exists() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let batches = make_test_batches();
let _ = batches.schema().clone();
NativeTable::create(uri, "test", batches, None, None)
.await
.unwrap();
let batches = make_test_batches();
let result = NativeTable::create(uri, "test", batches, None, None).await;
assert!(matches!(
result.unwrap_err(),
Error::TableAlreadyExists { .. }
));
}
#[tokio::test]
async fn test_count_rows() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let batches = make_test_batches();
let table = NativeTable::create(uri, "test", batches, None, None)
let table = NativeTable::create(uri, "test", batches, None, None, None)
.await
.unwrap();
@@ -924,7 +987,7 @@ mod tests {
let batches = make_test_batches();
let schema = batches.schema().clone();
let table = NativeTable::create(uri, "test", batches, None, None)
let table = NativeTable::create(uri, "test", batches, None, None, None)
.await
.unwrap();
assert_eq!(table.count_rows(None).await.unwrap(), 10);
@@ -940,7 +1003,10 @@ mod tests {
schema.clone(),
);
table.add(Box::new(new_batches), None).await.unwrap();
table
.add(Box::new(new_batches), AddDataOptions::default())
.await
.unwrap();
assert_eq!(table.count_rows(None).await.unwrap(), 20);
assert_eq!(table.name, "test");
}
@@ -952,7 +1018,7 @@ mod tests {
// Create a dataset with i=0..10
let batches = merge_insert_test_batches(0, 0);
let table = NativeTable::create(uri, "test", batches, None, None)
let table = NativeTable::create(uri, "test", batches, None, None, None)
.await
.unwrap();
assert_eq!(table.count_rows(None).await.unwrap(), 10);
@@ -998,28 +1064,52 @@ mod tests {
let batches = make_test_batches();
let schema = batches.schema().clone();
let table = NativeTable::create(uri, "test", batches, None, None)
let table = NativeTable::create(uri, "test", batches, None, None, None)
.await
.unwrap();
assert_eq!(table.count_rows(None).await.unwrap(), 10);
let new_batches = RecordBatchIterator::new(
vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(100..110))],
)
.unwrap()]
.into_iter()
.map(Ok),
let batches = vec![RecordBatch::try_new(
schema.clone(),
);
vec![Arc::new(Int32Array::from_iter_values(100..110))],
)
.unwrap()]
.into_iter()
.map(Ok);
let new_batches = RecordBatchIterator::new(batches.clone(), schema.clone());
// Can overwrite using AddDataOptions::mode
table
.add(
Box::new(new_batches),
AddDataOptions {
mode: AddDataMode::Overwrite,
..Default::default()
},
)
.await
.unwrap();
assert_eq!(table.count_rows(None).await.unwrap(), 10);
assert_eq!(table.name, "test");
// Can overwrite using underlying WriteParams (which
// take precedence over AddDataOptions::mode)
let param: WriteParams = WriteParams {
mode: WriteMode::Overwrite,
..Default::default()
};
table.add(Box::new(new_batches), Some(param)).await.unwrap();
let opts = AddDataOptions {
write_options: WriteOptions {
lance_write_params: Some(param),
},
mode: AddDataMode::Append,
};
let new_batches = RecordBatchIterator::new(batches.clone(), schema.clone());
table.add(Box::new(new_batches), opts).await.unwrap();
assert_eq!(table.count_rows(None).await.unwrap(), 10);
assert_eq!(table.name, "test");
}
@@ -1329,7 +1419,7 @@ mod tests {
..Default::default()
};
assert!(!wrapper.called());
let _ = NativeTable::open_with_params(uri, "test", None, param)
let _ = NativeTable::open_with_params(uri, "test", None, Some(param), None)
.await
.unwrap();
assert!(wrapper.called());
@@ -1403,7 +1493,7 @@ mod tests {
schema,
);
let table = NativeTable::create(uri, "test", batches, None, None)
let table = NativeTable::create(uri, "test", batches, None, None, None)
.await
.unwrap();
@@ -1448,4 +1538,68 @@ mod tests {
Ok(FixedSizeListArray::from(data))
}
#[tokio::test]
async fn test_read_consistency_interval() {
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1]))],
)
.unwrap();
let intervals = vec![
None,
Some(0),
Some(100), // 100 ms
];
for interval in intervals {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let conn1 = ConnectBuilder::new(uri).execute().await.unwrap();
let table1 = conn1
.create_empty_table("my_table", batch.schema())
.execute()
.await
.unwrap();
let mut conn2 = ConnectBuilder::new(uri);
if let Some(interval) = interval {
conn2 = conn2.read_consistency_interval(std::time::Duration::from_millis(interval));
}
let conn2 = conn2.execute().await.unwrap();
let table2 = conn2.open_table("my_table").execute().await.unwrap();
assert_eq!(table1.count_rows(None).await.unwrap(), 0);
assert_eq!(table2.count_rows(None).await.unwrap(), 0);
table1
.add(
Box::new(RecordBatchIterator::new(
vec![Ok(batch.clone())],
batch.schema(),
)),
AddDataOptions::default(),
)
.await
.unwrap();
assert_eq!(table1.count_rows(None).await.unwrap(), 1);
match interval {
None => {
assert_eq!(table2.count_rows(None).await.unwrap(), 0);
}
Some(0) => {
assert_eq!(table2.count_rows(None).await.unwrap(), 1);
}
Some(100) => {
assert_eq!(table2.count_rows(None).await.unwrap(), 0);
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(table2.count_rows(None).await.unwrap(), 1);
}
_ => unreachable!(),
}
}
}
}

View File

@@ -0,0 +1,234 @@
// Copyright 2024 LanceDB Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{
ops::{Deref, DerefMut},
sync::Arc,
time::{self, Duration, Instant},
};
use lance::Dataset;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use crate::error::Result;
/// A wrapper around a [Dataset] that provides lazy-loading and consistency checks.
///
/// This can be cloned cheaply. It supports concurrent reads or exclusive writes.
#[derive(Debug, Clone)]
pub struct DatasetConsistencyWrapper(Arc<RwLock<DatasetRef>>);
/// A wrapper around a [Dataset] that provides consistency checks.
///
/// The dataset is lazily loaded, and starts off as None. On the first access,
/// the dataset is loaded.
#[derive(Debug, Clone)]
enum DatasetRef {
/// In this mode, the dataset is always the latest version.
Latest {
dataset: Dataset,
read_consistency_interval: Option<Duration>,
last_consistency_check: Option<time::Instant>,
},
/// In this mode, the dataset is a specific version. It cannot be mutated.
TimeTravel { dataset: Dataset, version: u64 },
}
impl DatasetRef {
/// Reload the dataset to the appropriate version.
async fn reload(&mut self) -> Result<()> {
match self {
Self::Latest {
dataset,
last_consistency_check,
..
} => {
*dataset = dataset
.checkout_version(dataset.latest_version_id().await?)
.await?;
last_consistency_check.replace(Instant::now());
}
Self::TimeTravel { dataset, version } => {
dataset.checkout_version(*version).await?;
}
}
Ok(())
}
async fn as_latest(&mut self, read_consistency_interval: Option<Duration>) -> Result<()> {
match self {
Self::Latest { .. } => Ok(()),
Self::TimeTravel { dataset, .. } => {
dataset
.checkout_version(dataset.latest_version_id().await?)
.await?;
*self = Self::Latest {
dataset: dataset.clone(),
read_consistency_interval,
last_consistency_check: Some(Instant::now()),
};
Ok(())
}
}
}
fn set_latest(&mut self, dataset: Dataset) {
match self {
Self::Latest {
dataset: ref mut ds,
..
} => {
*ds = dataset;
}
_ => unreachable!("Dataset should be in latest mode at this point"),
}
}
}
impl DatasetConsistencyWrapper {
/// Create a new wrapper in the latest version mode.
pub fn new_latest(dataset: Dataset, read_consistency_interval: Option<Duration>) -> Self {
Self(Arc::new(RwLock::new(DatasetRef::Latest {
dataset,
read_consistency_interval,
last_consistency_check: None,
})))
}
/// Create a new wrapper in the time travel mode.
pub fn new_time_travel(dataset: Dataset, version: u64) -> Self {
Self(Arc::new(RwLock::new(DatasetRef::TimeTravel {
dataset,
version,
})))
}
/// Create an independent copy of self.
///
/// Unlike Clone, this will track versions independently of the original wrapper and
/// will be tied to a different RwLock.
pub async fn duplicate(&self) -> Self {
let ds_ref = self.0.read().await;
Self(Arc::new(RwLock::new((*ds_ref).clone())))
}
/// Get an immutable reference to the dataset.
pub async fn get(&self) -> Result<DatasetReadGuard<'_>> {
self.ensure_up_to_date().await?;
Ok(DatasetReadGuard {
guard: self.0.read().await,
})
}
/// Get a mutable reference to the dataset.
pub async fn get_mut(&self) -> Result<DatasetWriteGuard<'_>> {
self.ensure_up_to_date().await?;
Ok(DatasetWriteGuard {
guard: self.0.write().await,
})
}
/// Convert into a wrapper in latest version mode
pub async fn as_latest(&mut self, read_consistency_interval: Option<Duration>) -> Result<()> {
self.0
.write()
.await
.as_latest(read_consistency_interval)
.await
}
/// Provide a known latest version of the dataset.
///
/// This is usually done after some write operation, which inherently will
/// have the latest version.
pub async fn set_latest(&self, dataset: Dataset) {
self.0.write().await.set_latest(dataset);
}
async fn reload(&self) -> Result<()> {
self.0.write().await.reload().await
}
async fn is_up_to_date(&self) -> Result<bool> {
let dataset_ref = self.0.read().await;
match &*dataset_ref {
DatasetRef::Latest {
read_consistency_interval,
last_consistency_check,
..
} => match (read_consistency_interval, last_consistency_check) {
(None, _) => Ok(true),
(Some(_), None) => Ok(false),
(Some(read_consistency_interval), Some(last_consistency_check)) => {
if &last_consistency_check.elapsed() < read_consistency_interval {
Ok(true)
} else {
Ok(false)
}
}
},
DatasetRef::TimeTravel { dataset, version } => {
Ok(dataset.version().version == *version)
}
}
}
/// Ensures that the dataset is loaded and up-to-date with consistency and
/// version parameters.
async fn ensure_up_to_date(&self) -> Result<()> {
if !self.is_up_to_date().await? {
self.reload().await?;
}
Ok(())
}
}
pub struct DatasetReadGuard<'a> {
guard: RwLockReadGuard<'a, DatasetRef>,
}
impl Deref for DatasetReadGuard<'_> {
type Target = Dataset;
fn deref(&self) -> &Self::Target {
match &*self.guard {
DatasetRef::Latest { dataset, .. } => dataset,
DatasetRef::TimeTravel { dataset, .. } => dataset,
}
}
}
pub struct DatasetWriteGuard<'a> {
guard: RwLockWriteGuard<'a, DatasetRef>,
}
impl Deref for DatasetWriteGuard<'_> {
type Target = Dataset;
fn deref(&self) -> &Self::Target {
match &*self.guard {
DatasetRef::Latest { dataset, .. } => dataset,
DatasetRef::TimeTravel { dataset, .. } => dataset,
}
}
}
impl DerefMut for DatasetWriteGuard<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
match &mut *self.guard {
DatasetRef::Latest { dataset, .. } => dataset,
DatasetRef::TimeTravel { dataset, .. } => dataset,
}
}
}

View File

@@ -32,20 +32,17 @@ impl PatchStoreParam for Option<ObjectStoreParams> {
}
pub trait PatchWriteParam {
fn patch_with_store_wrapper(
self,
wrapper: Arc<dyn WrappingObjectStore>,
) -> Result<Option<WriteParams>>;
fn patch_with_store_wrapper(self, wrapper: Arc<dyn WrappingObjectStore>)
-> Result<WriteParams>;
}
impl PatchWriteParam for Option<WriteParams> {
impl PatchWriteParam for WriteParams {
fn patch_with_store_wrapper(
self,
mut self,
wrapper: Arc<dyn WrappingObjectStore>,
) -> Result<Option<WriteParams>> {
let mut params = self.unwrap_or_default();
params.store_params = params.store_params.patch_with_store_wrapper(wrapper)?;
Ok(Some(params))
) -> Result<WriteParams> {
self.store_params = self.store_params.patch_with_store_wrapper(wrapper)?;
Ok(self)
}
}

View File

@@ -1,471 +0,0 @@
// Copyright 2023 LanceDB Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! LanceDB Database
//!
use std::fs::create_dir_all;
use std::path::Path;
use std::sync::Arc;
use arrow_array::RecordBatchReader;
use lance::dataset::WriteParams;
use lance::io::{ObjectStore, ObjectStoreParams, WrappingObjectStore};
use object_store::{
aws::AwsCredential, local::LocalFileSystem, CredentialProvider, StaticCredentialProvider,
};
use snafu::prelude::*;
use crate::error::{CreateDirSnafu, Error, InvalidTableNameSnafu, Result};
use crate::io::object_store::MirroringObjectStoreWrapper;
use crate::table::{NativeTable, ReadParams, TableRef};
pub const LANCE_FILE_EXTENSION: &str = "lance";
/// A connection to LanceDB
#[async_trait::async_trait]
pub trait Connection: Send + Sync {
/// Get the names of all tables in the database.
async fn table_names(&self) -> Result<Vec<String>>;
/// Create a new table in the database.
///
/// # Parameters
///
/// * `name` - The name of the table.
/// * `batches` - The initial data to write to the table.
/// * `params` - Optional [`WriteParams`] to create the table.
///
/// # Returns
/// Created [`TableRef`], or [`Err(Error::TableAlreadyExists)`] if the table already exists.
async fn create_table(
&self,
name: &str,
batches: Box<dyn RecordBatchReader + Send>,
params: Option<WriteParams>,
) -> Result<TableRef>;
async fn open_table(&self, name: &str) -> Result<TableRef> {
self.open_table_with_params(name, ReadParams::default())
.await
}
async fn open_table_with_params(&self, name: &str, params: ReadParams) -> Result<TableRef>;
/// Drop a table in the database.
///
/// # Arguments
/// * `name` - The name of the table.
async fn drop_table(&self, name: &str) -> Result<()>;
}
#[derive(Debug)]
pub struct ConnectOptions {
/// Database URI
///
/// # Accpeted URI formats
///
/// - `/path/to/database` - local database on file system.
/// - `s3://bucket/path/to/database` or `gs://bucket/path/to/database` - database on cloud object store
/// - `db://dbname` - Lance Cloud
pub uri: String,
/// Lance Cloud API key
pub api_key: Option<String>,
/// Lance Cloud region
pub region: Option<String>,
/// Lance Cloud host override
pub host_override: Option<String>,
/// User provided AWS credentials
pub aws_creds: Option<AwsCredential>,
/// The maximum number of indices to cache in memory. Defaults to 256.
pub index_cache_size: u32,
}
impl ConnectOptions {
/// Create a new [`ConnectOptions`] with the given database URI.
pub fn new(uri: &str) -> Self {
Self {
uri: uri.to_string(),
api_key: None,
region: None,
host_override: None,
aws_creds: None,
index_cache_size: 256,
}
}
pub fn api_key(mut self, api_key: &str) -> Self {
self.api_key = Some(api_key.to_string());
self
}
pub fn region(mut self, region: &str) -> Self {
self.region = Some(region.to_string());
self
}
pub fn host_override(mut self, host_override: &str) -> Self {
self.host_override = Some(host_override.to_string());
self
}
/// [`AwsCredential`] to use when connecting to S3.
///
pub fn aws_creds(mut self, aws_creds: AwsCredential) -> Self {
self.aws_creds = Some(aws_creds);
self
}
pub fn index_cache_size(mut self, index_cache_size: u32) -> Self {
self.index_cache_size = index_cache_size;
self
}
}
/// Connect to a LanceDB database.
///
/// # Arguments
///
/// - `uri` - URI where the database is located, can be a local file or a supported remote cloud storage
///
/// ## Accepted URI formats
///
/// - `/path/to/database` - local database on file system.
/// - `s3://bucket/path/to/database` or `gs://bucket/path/to/database` - database on cloud object store
/// - `db://dbname` - Lance Cloud
///
pub async fn connect(uri: &str) -> Result<Arc<dyn Connection>> {
let options = ConnectOptions::new(uri);
connect_with_options(&options).await
}
/// Connect with [`ConnectOptions`].
///
/// # Arguments
/// - `options` - [`ConnectOptions`] to connect to the database.
pub async fn connect_with_options(options: &ConnectOptions) -> Result<Arc<dyn Connection>> {
let db = Database::connect(&options.uri).await?;
Ok(Arc::new(db))
}
pub struct Database {
object_store: ObjectStore,
query_string: Option<String>,
pub(crate) uri: String,
pub(crate) base_path: object_store::path::Path,
// the object store wrapper to use on write path
pub(crate) store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
}
const LANCE_EXTENSION: &str = "lance";
const ENGINE: &str = "engine";
const MIRRORED_STORE: &str = "mirroredStore";
/// A connection to LanceDB
impl Database {
/// Connects to LanceDB
///
/// # Arguments
///
/// * `uri` - URI where the database is located, can be a local file or a supported remote cloud storage
///
/// # Returns
///
/// * A [Database] object.
pub async fn connect(uri: &str) -> Result<Self> {
let options = ConnectOptions::new(uri);
Self::connect_with_options(&options).await
}
pub async fn connect_with_options(options: &ConnectOptions) -> Result<Self> {
let uri = &options.uri;
let parse_res = url::Url::parse(uri);
match parse_res {
Ok(url) if url.scheme().len() == 1 && cfg!(windows) => Self::open_path(uri).await,
Ok(mut url) => {
// iter thru the query params and extract the commit store param
let mut engine = None;
let mut mirrored_store = None;
let mut filtered_querys = vec![];
// WARNING: specifying engine is NOT a publicly supported feature in lancedb yet
// THE API WILL CHANGE
for (key, value) in url.query_pairs() {
if key == ENGINE {
engine = Some(value.to_string());
} else if key == MIRRORED_STORE {
if cfg!(windows) {
return Err(Error::Lance {
message: "mirrored store is not supported on windows".into(),
});
}
mirrored_store = Some(value.to_string());
} else {
// to owned so we can modify the url
filtered_querys.push((key.to_string(), value.to_string()));
}
}
// Filter out the commit store query param -- it's a lancedb param
url.query_pairs_mut().clear();
url.query_pairs_mut().extend_pairs(filtered_querys);
// Take a copy of the query string so we can propagate it to lance
let query_string = url.query().map(|s| s.to_string());
// clear the query string so we can use the url as the base uri
// use .set_query(None) instead of .set_query("") because the latter
// will add a trailing '?' to the url
url.set_query(None);
let table_base_uri = if let Some(store) = engine {
static WARN_ONCE: std::sync::Once = std::sync::Once::new();
WARN_ONCE.call_once(|| {
log::warn!("Specifing engine is not a publicly supported feature in lancedb yet. THE API WILL CHANGE");
});
let old_scheme = url.scheme().to_string();
let new_scheme = format!("{}+{}", old_scheme, store);
url.to_string().replacen(&old_scheme, &new_scheme, 1)
} else {
url.to_string()
};
let plain_uri = url.to_string();
let os_params: ObjectStoreParams = if let Some(aws_creds) = &options.aws_creds {
let credential_provider: Arc<
dyn CredentialProvider<Credential = AwsCredential>,
> = Arc::new(StaticCredentialProvider::new(AwsCredential {
key_id: aws_creds.key_id.clone(),
secret_key: aws_creds.secret_key.clone(),
token: aws_creds.token.clone(),
}));
ObjectStoreParams::with_aws_credentials(
Some(credential_provider),
options.region.clone(),
)
} else {
ObjectStoreParams::default()
};
let (object_store, base_path) =
ObjectStore::from_uri_and_params(&plain_uri, &os_params).await?;
if object_store.is_local() {
Self::try_create_dir(&plain_uri).context(CreateDirSnafu { path: plain_uri })?;
}
let write_store_wrapper = match mirrored_store {
Some(path) => {
let mirrored_store = Arc::new(LocalFileSystem::new_with_prefix(path)?);
let wrapper = MirroringObjectStoreWrapper::new(mirrored_store);
Some(Arc::new(wrapper) as Arc<dyn WrappingObjectStore>)
}
None => None,
};
Ok(Self {
uri: table_base_uri,
query_string,
base_path,
object_store,
store_wrapper: write_store_wrapper,
})
}
Err(_) => Self::open_path(uri).await,
}
}
async fn open_path(path: &str) -> Result<Self> {
let (object_store, base_path) = ObjectStore::from_uri(path).await?;
if object_store.is_local() {
Self::try_create_dir(path).context(CreateDirSnafu { path })?;
}
Ok(Self {
uri: path.to_string(),
query_string: None,
base_path,
object_store,
store_wrapper: None,
})
}
/// Try to create a local directory to store the lancedb dataset
fn try_create_dir(path: &str) -> core::result::Result<(), std::io::Error> {
let path = Path::new(path);
if !path.try_exists()? {
create_dir_all(path)?;
}
Ok(())
}
/// Get the URI of a table in the database.
fn table_uri(&self, name: &str) -> Result<String> {
let path = Path::new(&self.uri);
let table_uri = path.join(format!("{}.{}", name, LANCE_FILE_EXTENSION));
let mut uri = table_uri
.as_path()
.to_str()
.context(InvalidTableNameSnafu { name })?
.to_string();
// If there are query string set on the connection, propagate to lance
if let Some(query) = self.query_string.as_ref() {
uri.push('?');
uri.push_str(query.as_str());
}
Ok(uri)
}
}
#[async_trait::async_trait]
impl Connection for Database {
async fn table_names(&self) -> Result<Vec<String>> {
let mut f = self
.object_store
.read_dir(self.base_path.clone())
.await?
.iter()
.map(Path::new)
.filter(|path| {
let is_lance = path
.extension()
.and_then(|e| e.to_str())
.map(|e| e == LANCE_EXTENSION);
is_lance.unwrap_or(false)
})
.filter_map(|p| p.file_stem().and_then(|s| s.to_str().map(String::from)))
.collect::<Vec<String>>();
f.sort();
Ok(f)
}
async fn create_table(
&self,
name: &str,
batches: Box<dyn RecordBatchReader + Send>,
params: Option<WriteParams>,
) -> Result<TableRef> {
let table_uri = self.table_uri(name)?;
Ok(Arc::new(
NativeTable::create(
&table_uri,
name,
batches,
self.store_wrapper.clone(),
params,
)
.await?,
))
}
/// Open a table in the database.
///
/// # Arguments
/// * `name` - The name of the table.
/// * `params` - The parameters to open the table.
///
/// # Returns
///
/// * A [TableRef] object.
async fn open_table_with_params(&self, name: &str, params: ReadParams) -> Result<TableRef> {
let table_uri = self.table_uri(name)?;
Ok(Arc::new(
NativeTable::open_with_params(&table_uri, name, self.store_wrapper.clone(), params)
.await?,
))
}
async fn drop_table(&self, name: &str) -> Result<()> {
let dir_name = format!("{}.{}", name, LANCE_EXTENSION);
let full_path = self.base_path.child(dir_name.clone());
self.object_store.remove_dir_all(full_path).await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::fs::create_dir_all;
use tempfile::tempdir;
use super::*;
#[tokio::test]
async fn test_connect() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let db = Database::connect(uri).await.unwrap();
assert_eq!(db.uri, uri);
}
#[cfg(not(windows))]
#[tokio::test]
async fn test_connect_relative() {
let tmp_dir = tempdir().unwrap();
let uri = std::fs::canonicalize(tmp_dir.path().to_str().unwrap()).unwrap();
let current_dir = std::env::current_dir().unwrap();
let ancestors = current_dir.ancestors();
let relative_ancestors = vec![".."; ancestors.count()];
let relative_root = std::path::PathBuf::from(relative_ancestors.join("/"));
let relative_uri = relative_root.join(&uri);
let db = Database::connect(relative_uri.to_str().unwrap())
.await
.unwrap();
assert_eq!(db.uri, relative_uri.to_str().unwrap().to_string());
}
#[tokio::test]
async fn test_table_names() {
let tmp_dir = tempdir().unwrap();
create_dir_all(tmp_dir.path().join("table1.lance")).unwrap();
create_dir_all(tmp_dir.path().join("table2.lance")).unwrap();
create_dir_all(tmp_dir.path().join("invalidlance")).unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let db = Database::connect(uri).await.unwrap();
let tables = db.table_names().await.unwrap();
assert_eq!(tables.len(), 2);
assert!(tables[0].eq(&String::from("table1")));
assert!(tables[1].eq(&String::from("table2")));
}
#[tokio::test]
async fn test_connect_s3() {
// let db = Database::connect("s3://bucket/path/to/database").await.unwrap();
}
#[tokio::test]
async fn drop_table() {
let tmp_dir = tempdir().unwrap();
create_dir_all(tmp_dir.path().join("table1.lance")).unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let db = Database::connect(uri).await.unwrap();
db.drop_table("table1").await.unwrap();
let tables = db.table_names().await.unwrap();
assert_eq!(tables.len(), 0);
}
}