Compare commits

..

3 Commits

Author SHA1 Message Date
Lance Release
4605359d3b Bump version: 0.4.10 → 0.4.11 2024-02-23 03:57:28 +00:00
Weston Pace
f1596122e6 refactor: rename the rust crate from vectordb to lancedb (#1012)
This also renames the new experimental node package to lancedb. The
classic node package remains named vectordb.

The goal here is to avoid introducing piecemeal breaking changes to the
vectordb crate. Instead, once the new API is stabilized, we will
officially release the lancedb crate and deprecate the vectordb crate.
The same pattern will eventually happen with the npm package vectordb.
2024-02-22 19:56:39 -08:00
Will Jones
3aa0c40168 feat(node): add read_consistency_interval to Node and Rust (#1002)
This PR adds the same consistency semantics as was added in #828. It
*does not* add the same lazy-loading of tables, since that breaks some
existing tests.

This closes #998.

---------

Co-authored-by: Weston Pace <weston.pace@gmail.com>
2024-02-22 15:04:30 -08:00
64 changed files with 880 additions and 1269 deletions

View File

@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.4.10
current_version = 0.4.11
commit = True
message = Bump version: {current_version} → {new_version}
tag = True
@@ -9,4 +9,4 @@ tag_name = v{new_version}
[bumpversion:file:rust/ffi/node/Cargo.toml]
[bumpversion:file:rust/vectordb/Cargo.toml]
[bumpversion:file:rust/lancedb/Cargo.toml]

View File

@@ -26,4 +26,4 @@ jobs:
sudo apt install -y protobuf-compiler libssl-dev
- name: Publish the package
run: |
cargo publish -p vectordb --all-features --token ${{ secrets.CARGO_REGISTRY_TOKEN }}
cargo publish -p lancedb --all-features --token ${{ secrets.CARGO_REGISTRY_TOKEN }}

View File

@@ -1,5 +1,5 @@
[workspace]
members = ["rust/ffi/node", "rust/vectordb", "nodejs"]
members = ["rust/ffi/node", "rust/lancedb", "nodejs"]
# Python package needs to be built by maturin.
exclude = ["python"]
resolver = "2"

View File

@@ -636,6 +636,70 @@ The `values` parameter is used to provide the new values for the columns as lite
When rows are updated, they are moved out of the index. The row will still show up in ANN queries, but the query will not be as fast as it would be if the row was in the index. If you update a large proportion of rows, consider rebuilding the index afterwards.
## Consistency
In LanceDB OSS, users can set the `read_consistency_interval` parameter on connections to achieve different levels of read consistency. This parameter determines how frequently the database synchronizes with the underlying storage system to check for updates made by other processes. If another process updates a table, the database will not see the changes until the next synchronization.
There are three possible settings for `read_consistency_interval`:
1. **Unset (default)**: The database does not check for updates to tables made by other processes. This provides the best query performance, but means that clients may not see the most up-to-date data. This setting is suitable for applications where the data does not change during the lifetime of the table reference.
2. **Zero seconds (Strong consistency)**: The database checks for updates on every read. This provides the strongest consistency guarantees, ensuring that all clients see the latest committed data. However, it has the most overhead. This setting is suitable when consistency matters more than having high QPS.
3. **Custom interval (Eventual consistency)**: The database checks for updates at a custom interval, such as every 5 seconds. This provides eventual consistency, allowing for some lag between write and read operations. Performance wise, this is a middle ground between strong consistency and no consistency check. This setting is suitable for applications where immediate consistency is not critical, but clients should see updated data eventually.
!!! tip "Consistency in LanceDB Cloud"
This is only tune-able in LanceDB OSS. In LanceDB Cloud, readers are always eventually consistent.
=== "Python"
To set strong consistency, use `timedelta(0)`:
```python
from datetime import timedelta
db = lancedb.connect("./.lancedb",. read_consistency_interval=timedelta(0))
table = db.open_table("my_table")
```
For eventual consistency, use a custom `timedelta`:
```python
from datetime import timedelta
db = lancedb.connect("./.lancedb", read_consistency_interval=timedelta(seconds=5))
table = db.open_table("my_table")
```
By default, a `Table` will never check for updates from other writers. To manually check for updates you can use `checkout_latest`:
```python
db = lancedb.connect("./.lancedb")
table = db.open_table("my_table")
# (Other writes happen to my_table from another process)
# Check for updates
table.checkout_latest()
```
=== "JavaScript/Typescript"
To set strong consistency, use `0`:
```javascript
const db = await lancedb.connect({ uri: "./.lancedb", readConsistencyInterval: 0 });
const table = await db.openTable("my_table");
```
For eventual consistency, specify the update interval as seconds:
```javascript
const db = await lancedb.connect({ uri: "./.lancedb", readConsistencyInterval: 5 });
const table = await db.openTable("my_table");
```
<!-- Node doesn't yet support the version time travel: https://github.com/lancedb/lancedb/issues/1007
Once it does, we can show manual consistency check for Node as well.
-->
## What's next?
Learn the best practices on creating an ANN index and getting the most out of it.

View File

@@ -1,12 +1,12 @@
{
"name": "vectordb",
"version": "0.4.10",
"version": "0.4.11",
"description": " Serverless, low-latency vector database for AI applications",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"tsc": "tsc -b",
"build": "npm run tsc && cargo-cp-artifact --artifact cdylib vectordb-node index.node -- cargo build --message-format=json",
"build": "npm run tsc && cargo-cp-artifact --artifact cdylib lancedb-node index.node -- cargo build --message-format=json",
"build-release": "npm run build -- --release",
"test": "npm run tsc && mocha -recursive dist/test",
"integration-test": "npm run tsc && mocha -recursive dist/integration_test",
@@ -85,10 +85,10 @@
}
},
"optionalDependencies": {
"@lancedb/vectordb-darwin-arm64": "0.4.10",
"@lancedb/vectordb-darwin-x64": "0.4.10",
"@lancedb/vectordb-linux-arm64-gnu": "0.4.10",
"@lancedb/vectordb-linux-x64-gnu": "0.4.10",
"@lancedb/vectordb-win32-x64-msvc": "0.4.10"
"@lancedb/vectordb-darwin-arm64": "0.4.11",
"@lancedb/vectordb-darwin-x64": "0.4.11",
"@lancedb/vectordb-linux-arm64-gnu": "0.4.11",
"@lancedb/vectordb-linux-x64-gnu": "0.4.11",
"@lancedb/vectordb-win32-x64-msvc": "0.4.11"
}
}
}

View File

@@ -96,6 +96,19 @@ export interface ConnectionOptions {
* This is useful for local testing.
*/
hostOverride?: string
/**
* (For LanceDB OSS only): The interval, in seconds, at which to check for
* updates to the table from other processes. If None, then consistency is not
* checked. For performance reasons, this is the default. For strong
* consistency, set this to zero seconds. Then every read will check for
* updates from other processes. As a compromise, you can set this to a
* non-zero value for eventual consistency. If more than that interval
* has passed since the last check, then the table will be checked for updates.
* Note: this consistency only applies to read operations. Write operations are
* always consistent.
*/
readConsistencyInterval?: number
}
function getAwsArgs (opts: ConnectionOptions): any[] {
@@ -181,7 +194,8 @@ export async function connect (
opts.awsCredentials?.accessKeyId,
opts.awsCredentials?.secretKey,
opts.awsCredentials?.sessionToken,
opts.awsRegion
opts.awsRegion,
opts.readConsistencyInterval
)
return new LocalConnection(db, opts)
}

View File

@@ -18,5 +18,5 @@ module.exports = {
"@typescript-eslint/method-signature-style": "off",
"@typescript-eslint/no-explicit-any": "off",
},
ignorePatterns: ["node_modules/", "dist/", "build/", "vectordb/native.*"],
ignorePatterns: ["node_modules/", "dist/", "build/", "lancedb/native.*"],
};

View File

@@ -1,5 +1,5 @@
[package]
name = "vectordb-nodejs"
name = "lancedb-nodejs"
edition.workspace = true
version = "0.0.0"
license.workspace = true
@@ -16,7 +16,7 @@ arrow-ipc.workspace = true
futures.workspace = true
lance-linalg.workspace = true
lance.workspace = true
vectordb = { path = "../rust/vectordb" }
lancedb = { path = "../rust/lancedb" }
napi = { version = "2.15", default-features = false, features = [
"napi7",
"async"

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
import { makeArrowTable, toBuffer } from "../vectordb/arrow";
import { makeArrowTable, toBuffer } from "../lancedb/arrow";
import {
Int64,
Field,

View File

@@ -29,6 +29,6 @@ test("open database", async () => {
const tbl = await db.createTable("test", [{ id: 1 }, { id: 2 }]);
expect(await db.tableNames()).toStrictEqual(["test"]);
const schema = tbl.schema;
const schema = await tbl.schema();
expect(schema).toEqual(new Schema([new Field("id", new Float64(), true)]));
});

View File

@@ -181,3 +181,37 @@ describe("Test creating index", () => {
// TODO: check index type.
});
});
describe("Read consistency interval", () => {
let tmpDir: string;
beforeEach(() => {
tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "read-consistency-"));
});
// const intervals = [undefined, 0, 0.1];
const intervals = [0];
test.each(intervals)("read consistency interval %p", async (interval) => {
const db = await connect({ uri: tmpDir });
const table = await db.createTable("my_table", [{ id: 1 }]);
const db2 = await connect({ uri: tmpDir, readConsistencyInterval: interval });
const table2 = await db2.openTable("my_table");
expect(await table2.countRows()).toEqual(await table.countRows());
await table.add([{ id: 2 }]);
if (interval === undefined) {
expect(await table2.countRows()).toEqual(1n);
// TODO: once we implement time travel we can uncomment this part of the test.
// await table2.checkout_latest();
// expect(await table2.countRows()).toEqual(2);
} else if (interval === 0) {
expect(await table2.countRows()).toEqual(2n);
} else {
// interval == 0.1
expect(await table2.countRows()).toEqual(1n);
await new Promise(r => setTimeout(r, 100));
expect(await table2.countRows()).toEqual(2n);
}
});
});

View File

@@ -53,12 +53,12 @@ export async function connect(
opts = Object.assign(
{
uri: "",
apiKey: "",
hostOverride: "",
apiKey: undefined,
hostOverride: undefined,
},
args
);
}
const nativeConn = await NativeConnection.new(opts.uri);
const nativeConn = await NativeConnection.new(opts);
return new Connection(nativeConn);
}

View File

@@ -16,6 +16,18 @@ export interface ConnectionOptions {
uri: string
apiKey?: string
hostOverride?: string
/**
* (For LanceDB OSS only): The interval, in seconds, at which to check for
* updates to the table from other processes. If None, then consistency is not
* checked. For performance reasons, this is the default. For strong
* consistency, set this to zero seconds. Then every read will check for
* updates from other processes. As a compromise, you can set this to a
* non-zero value for eventual consistency. If more than that interval
* has passed since the last check, then the table will be checked for updates.
* Note: this consistency only applies to read operations. Write operations are
* always consistent.
*/
readConsistencyInterval?: number
}
/** Write mode for writing a table. */
export const enum WriteMode {
@@ -30,7 +42,7 @@ export interface WriteOptions {
export function connect(options: ConnectionOptions): Promise<Connection>
export class Connection {
/** Create a new Connection instance from the given URI. */
static new(uri: string): Promise<Connection>
static new(options: ConnectionOptions): Promise<Connection>
/** List all tables in the dataset. */
tableNames(): Promise<Array<string>>
/**
@@ -71,7 +83,7 @@ export class Query {
}
export class Table {
/** Return Schema as empty Arrow IPC file. */
schema(): Buffer
schema(): Promise<Buffer>
add(buf: Buffer): Promise<void>
countRows(filter?: string | undefined | null): Promise<bigint>
delete(predicate: string): Promise<void>

View File

@@ -32,24 +32,24 @@ switch (platform) {
case 'android':
switch (arch) {
case 'arm64':
localFileExisted = existsSync(join(__dirname, 'vectordb-nodejs.android-arm64.node'))
localFileExisted = existsSync(join(__dirname, 'lancedb-nodejs.android-arm64.node'))
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.android-arm64.node')
nativeBinding = require('./lancedb-nodejs.android-arm64.node')
} else {
nativeBinding = require('vectordb-android-arm64')
nativeBinding = require('lancedb-android-arm64')
}
} catch (e) {
loadError = e
}
break
case 'arm':
localFileExisted = existsSync(join(__dirname, 'vectordb-nodejs.android-arm-eabi.node'))
localFileExisted = existsSync(join(__dirname, 'lancedb-nodejs.android-arm-eabi.node'))
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.android-arm-eabi.node')
nativeBinding = require('./lancedb-nodejs.android-arm-eabi.node')
} else {
nativeBinding = require('vectordb-android-arm-eabi')
nativeBinding = require('lancedb-android-arm-eabi')
}
} catch (e) {
loadError = e
@@ -63,13 +63,13 @@ switch (platform) {
switch (arch) {
case 'x64':
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.win32-x64-msvc.node')
join(__dirname, 'lancedb-nodejs.win32-x64-msvc.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.win32-x64-msvc.node')
nativeBinding = require('./lancedb-nodejs.win32-x64-msvc.node')
} else {
nativeBinding = require('vectordb-win32-x64-msvc')
nativeBinding = require('lancedb-win32-x64-msvc')
}
} catch (e) {
loadError = e
@@ -77,13 +77,13 @@ switch (platform) {
break
case 'ia32':
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.win32-ia32-msvc.node')
join(__dirname, 'lancedb-nodejs.win32-ia32-msvc.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.win32-ia32-msvc.node')
nativeBinding = require('./lancedb-nodejs.win32-ia32-msvc.node')
} else {
nativeBinding = require('vectordb-win32-ia32-msvc')
nativeBinding = require('lancedb-win32-ia32-msvc')
}
} catch (e) {
loadError = e
@@ -91,13 +91,13 @@ switch (platform) {
break
case 'arm64':
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.win32-arm64-msvc.node')
join(__dirname, 'lancedb-nodejs.win32-arm64-msvc.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.win32-arm64-msvc.node')
nativeBinding = require('./lancedb-nodejs.win32-arm64-msvc.node')
} else {
nativeBinding = require('vectordb-win32-arm64-msvc')
nativeBinding = require('lancedb-win32-arm64-msvc')
}
} catch (e) {
loadError = e
@@ -108,23 +108,23 @@ switch (platform) {
}
break
case 'darwin':
localFileExisted = existsSync(join(__dirname, 'vectordb-nodejs.darwin-universal.node'))
localFileExisted = existsSync(join(__dirname, 'lancedb-nodejs.darwin-universal.node'))
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.darwin-universal.node')
nativeBinding = require('./lancedb-nodejs.darwin-universal.node')
} else {
nativeBinding = require('vectordb-darwin-universal')
nativeBinding = require('lancedb-darwin-universal')
}
break
} catch {}
switch (arch) {
case 'x64':
localFileExisted = existsSync(join(__dirname, 'vectordb-nodejs.darwin-x64.node'))
localFileExisted = existsSync(join(__dirname, 'lancedb-nodejs.darwin-x64.node'))
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.darwin-x64.node')
nativeBinding = require('./lancedb-nodejs.darwin-x64.node')
} else {
nativeBinding = require('vectordb-darwin-x64')
nativeBinding = require('lancedb-darwin-x64')
}
} catch (e) {
loadError = e
@@ -132,13 +132,13 @@ switch (platform) {
break
case 'arm64':
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.darwin-arm64.node')
join(__dirname, 'lancedb-nodejs.darwin-arm64.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.darwin-arm64.node')
nativeBinding = require('./lancedb-nodejs.darwin-arm64.node')
} else {
nativeBinding = require('vectordb-darwin-arm64')
nativeBinding = require('lancedb-darwin-arm64')
}
} catch (e) {
loadError = e
@@ -152,12 +152,12 @@ switch (platform) {
if (arch !== 'x64') {
throw new Error(`Unsupported architecture on FreeBSD: ${arch}`)
}
localFileExisted = existsSync(join(__dirname, 'vectordb-nodejs.freebsd-x64.node'))
localFileExisted = existsSync(join(__dirname, 'lancedb-nodejs.freebsd-x64.node'))
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.freebsd-x64.node')
nativeBinding = require('./lancedb-nodejs.freebsd-x64.node')
} else {
nativeBinding = require('vectordb-freebsd-x64')
nativeBinding = require('lancedb-freebsd-x64')
}
} catch (e) {
loadError = e
@@ -168,26 +168,26 @@ switch (platform) {
case 'x64':
if (isMusl()) {
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-x64-musl.node')
join(__dirname, 'lancedb-nodejs.linux-x64-musl.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-x64-musl.node')
nativeBinding = require('./lancedb-nodejs.linux-x64-musl.node')
} else {
nativeBinding = require('vectordb-linux-x64-musl')
nativeBinding = require('lancedb-linux-x64-musl')
}
} catch (e) {
loadError = e
}
} else {
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-x64-gnu.node')
join(__dirname, 'lancedb-nodejs.linux-x64-gnu.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-x64-gnu.node')
nativeBinding = require('./lancedb-nodejs.linux-x64-gnu.node')
} else {
nativeBinding = require('vectordb-linux-x64-gnu')
nativeBinding = require('lancedb-linux-x64-gnu')
}
} catch (e) {
loadError = e
@@ -197,26 +197,26 @@ switch (platform) {
case 'arm64':
if (isMusl()) {
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-arm64-musl.node')
join(__dirname, 'lancedb-nodejs.linux-arm64-musl.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-arm64-musl.node')
nativeBinding = require('./lancedb-nodejs.linux-arm64-musl.node')
} else {
nativeBinding = require('vectordb-linux-arm64-musl')
nativeBinding = require('lancedb-linux-arm64-musl')
}
} catch (e) {
loadError = e
}
} else {
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-arm64-gnu.node')
join(__dirname, 'lancedb-nodejs.linux-arm64-gnu.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-arm64-gnu.node')
nativeBinding = require('./lancedb-nodejs.linux-arm64-gnu.node')
} else {
nativeBinding = require('vectordb-linux-arm64-gnu')
nativeBinding = require('lancedb-linux-arm64-gnu')
}
} catch (e) {
loadError = e
@@ -225,13 +225,13 @@ switch (platform) {
break
case 'arm':
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-arm-gnueabihf.node')
join(__dirname, 'lancedb-nodejs.linux-arm-gnueabihf.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-arm-gnueabihf.node')
nativeBinding = require('./lancedb-nodejs.linux-arm-gnueabihf.node')
} else {
nativeBinding = require('vectordb-linux-arm-gnueabihf')
nativeBinding = require('lancedb-linux-arm-gnueabihf')
}
} catch (e) {
loadError = e
@@ -240,26 +240,26 @@ switch (platform) {
case 'riscv64':
if (isMusl()) {
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-riscv64-musl.node')
join(__dirname, 'lancedb-nodejs.linux-riscv64-musl.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-riscv64-musl.node')
nativeBinding = require('./lancedb-nodejs.linux-riscv64-musl.node')
} else {
nativeBinding = require('vectordb-linux-riscv64-musl')
nativeBinding = require('lancedb-linux-riscv64-musl')
}
} catch (e) {
loadError = e
}
} else {
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-riscv64-gnu.node')
join(__dirname, 'lancedb-nodejs.linux-riscv64-gnu.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-riscv64-gnu.node')
nativeBinding = require('./lancedb-nodejs.linux-riscv64-gnu.node')
} else {
nativeBinding = require('vectordb-linux-riscv64-gnu')
nativeBinding = require('lancedb-linux-riscv64-gnu')
}
} catch (e) {
loadError = e
@@ -268,13 +268,13 @@ switch (platform) {
break
case 's390x':
localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-s390x-gnu.node')
join(__dirname, 'lancedb-nodejs.linux-s390x-gnu.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-s390x-gnu.node')
nativeBinding = require('./lancedb-nodejs.linux-s390x-gnu.node')
} else {
nativeBinding = require('vectordb-linux-s390x-gnu')
nativeBinding = require('lancedb-linux-s390x-gnu')
}
} catch (e) {
loadError = e

View File

@@ -32,8 +32,8 @@ export class Table {
}
/** Get the schema of the table. */
get schema(): Schema {
const schemaBuf = this.inner.schema();
async schema(): Promise<Schema> {
const schemaBuf = await this.inner.schema();
const tbl = tableFromIPC(schemaBuf);
return tbl.schema;
}

View File

@@ -1,3 +1,3 @@
# `vectordb-darwin-arm64`
# `lancedb-darwin-arm64`
This is the **aarch64-apple-darwin** binary for `vectordb`
This is the **aarch64-apple-darwin** binary for `lancedb`

View File

@@ -1,5 +1,5 @@
{
"name": "vectordb-darwin-arm64",
"name": "lancedb-darwin-arm64",
"version": "0.4.3",
"os": [
"darwin"
@@ -7,9 +7,9 @@
"cpu": [
"arm64"
],
"main": "vectordb.darwin-arm64.node",
"main": "lancedb.darwin-arm64.node",
"files": [
"vectordb.darwin-arm64.node"
"lancedb.darwin-arm64.node"
],
"license": "MIT",
"engines": {

View File

@@ -1,3 +1,3 @@
# `vectordb-darwin-x64`
# `lancedb-darwin-x64`
This is the **x86_64-apple-darwin** binary for `vectordb`
This is the **x86_64-apple-darwin** binary for `lancedb`

View File

@@ -1,5 +1,5 @@
{
"name": "vectordb-darwin-x64",
"name": "lancedb-darwin-x64",
"version": "0.4.3",
"os": [
"darwin"
@@ -7,9 +7,9 @@
"cpu": [
"x64"
],
"main": "vectordb.darwin-x64.node",
"main": "lancedb.darwin-x64.node",
"files": [
"vectordb.darwin-x64.node"
"lancedb.darwin-x64.node"
],
"license": "MIT",
"engines": {

View File

@@ -1,3 +1,3 @@
# `vectordb-linux-arm64-gnu`
# `lancedb-linux-arm64-gnu`
This is the **aarch64-unknown-linux-gnu** binary for `vectordb`
This is the **aarch64-unknown-linux-gnu** binary for `lancedb`

View File

@@ -1,5 +1,5 @@
{
"name": "vectordb-linux-arm64-gnu",
"name": "lancedb-linux-arm64-gnu",
"version": "0.4.3",
"os": [
"linux"
@@ -7,9 +7,9 @@
"cpu": [
"arm64"
],
"main": "vectordb.linux-arm64-gnu.node",
"main": "lancedb.linux-arm64-gnu.node",
"files": [
"vectordb.linux-arm64-gnu.node"
"lancedb.linux-arm64-gnu.node"
],
"license": "MIT",
"engines": {

View File

@@ -1,3 +1,3 @@
# `vectordb-linux-x64-gnu`
# `lancedb-linux-x64-gnu`
This is the **x86_64-unknown-linux-gnu** binary for `vectordb`
This is the **x86_64-unknown-linux-gnu** binary for `lancedb`

View File

@@ -1,5 +1,5 @@
{
"name": "vectordb-linux-x64-gnu",
"name": "lancedb-linux-x64-gnu",
"version": "0.4.3",
"os": [
"linux"
@@ -7,9 +7,9 @@
"cpu": [
"x64"
],
"main": "vectordb.linux-x64-gnu.node",
"main": "lancedb.linux-x64-gnu.node",
"files": [
"vectordb.linux-x64-gnu.node"
"lancedb.linux-x64-gnu.node"
],
"license": "MIT",
"engines": {

1087
nodejs/package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,10 +1,10 @@
{
"name": "vectordb",
"name": "lancedb",
"version": "0.4.3",
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
"napi": {
"name": "vectordb-nodejs",
"name": "lancedb-nodejs",
"triples": {
"defaults": false,
"additional": [
@@ -18,7 +18,7 @@
"license": "Apache 2.0",
"devDependencies": {
"@napi-rs/cli": "^2.18.0",
"@types/jest": "^29.5.11",
"@types/jest": "^29.1.2",
"@typescript-eslint/eslint-plugin": "^6.19.0",
"@typescript-eslint/parser": "^6.19.0",
"eslint": "^8.56.0",
@@ -45,21 +45,22 @@
],
"scripts": {
"artifacts": "napi artifacts",
"build:native": "napi build --platform --release --js vectordb/native.js --dts vectordb/native.d.ts dist/",
"build:debug": "napi build --platform --dts ../vectordb/native.d.ts --js ../vectordb/native.js dist/",
"build:native": "napi build --platform --release --js lancedb/native.js --dts lancedb/native.d.ts dist/",
"build:debug": "napi build --platform --dts ../lancedb/native.d.ts --js ../lancedb/native.js dist/",
"build": "npm run build:debug && tsc -b",
"docs": "typedoc --plugin typedoc-plugin-markdown vectordb/index.ts",
"lint": "eslint vectordb --ext .js,.ts",
"docs": "typedoc --plugin typedoc-plugin-markdown lancedb/index.ts",
"lint": "eslint lancedb --ext .js,.ts",
"prepublishOnly": "napi prepublish -t npm",
"test": "npm run build && jest",
"//": "maxWorkers=1 is workaround for bigint issue in jest: https://github.com/jestjs/jest/issues/11617#issuecomment-1068732414",
"test": "npm run build && jest --maxWorkers=1",
"universal": "napi universal",
"version": "napi version"
},
"optionalDependencies": {
"vectordb-darwin-arm64": "0.4.3",
"vectordb-darwin-x64": "0.4.3",
"vectordb-linux-arm64-gnu": "0.4.3",
"vectordb-linux-x64-gnu": "0.4.3"
"lancedb-darwin-arm64": "0.4.3",
"lancedb-darwin-x64": "0.4.3",
"lancedb-linux-arm64-gnu": "0.4.3",
"lancedb-linux-x64-gnu": "0.4.3"
},
"dependencies": {
"apache-arrow": "^15.0.0"

View File

@@ -16,8 +16,9 @@ use napi::bindgen_prelude::*;
use napi_derive::*;
use crate::table::Table;
use vectordb::connection::Connection as LanceDBConnection;
use vectordb::ipc::ipc_file_to_batches;
use crate::ConnectionOptions;
use lancedb::connection::{ConnectBuilder, Connection as LanceDBConnection};
use lancedb::ipc::ipc_file_to_batches;
#[napi]
pub struct Connection {
@@ -28,11 +29,23 @@ pub struct Connection {
impl Connection {
/// Create a new Connection instance from the given URI.
#[napi(factory)]
pub async fn new(uri: String) -> napi::Result<Self> {
pub async fn new(options: ConnectionOptions) -> napi::Result<Self> {
let mut builder = ConnectBuilder::new(&options.uri);
if let Some(api_key) = options.api_key {
builder = builder.api_key(&api_key);
}
if let Some(host_override) = options.host_override {
builder = builder.host_override(&host_override);
}
if let Some(interval) = options.read_consistency_interval {
builder =
builder.read_consistency_interval(std::time::Duration::from_secs_f64(interval));
}
Ok(Self {
conn: vectordb::connect(&uri).execute().await.map_err(|e| {
napi::Error::from_reason(format!("Failed to connect to database: {}", e))
})?,
conn: builder
.execute()
.await
.map_err(|e| napi::Error::from_reason(format!("{}", e)))?,
})
}

View File

@@ -40,12 +40,12 @@ impl From<MetricType> for LanceMetricType {
#[napi]
pub struct IndexBuilder {
inner: vectordb::index::IndexBuilder,
inner: lancedb::index::IndexBuilder,
}
#[napi]
impl IndexBuilder {
pub fn new(tbl: &dyn vectordb::Table) -> Self {
pub fn new(tbl: &dyn lancedb::Table) -> Self {
let inner = tbl.create_index(&[]);
Self { inner }
}

View File

@@ -14,9 +14,9 @@
use futures::StreamExt;
use lance::io::RecordBatchStream;
use lancedb::ipc::batches_to_ipc_file;
use napi::bindgen_prelude::*;
use napi_derive::napi;
use vectordb::ipc::batches_to_ipc_file;
/** Typescript-style Async Iterator over RecordBatches */
#[napi]

View File

@@ -22,10 +22,21 @@ mod query;
mod table;
#[napi(object)]
#[derive(Debug)]
pub struct ConnectionOptions {
pub uri: String,
pub api_key: Option<String>,
pub host_override: Option<String>,
/// (For LanceDB OSS only): The interval, in seconds, at which to check for
/// updates to the table from other processes. If None, then consistency is not
/// checked. For performance reasons, this is the default. For strong
/// consistency, set this to zero seconds. Then every read will check for
/// updates from other processes. As a compromise, you can set this to a
/// non-zero value for eventual consistency. If more than that interval
/// has passed since the last check, then the table will be checked for updates.
/// Note: this consistency only applies to read operations. Write operations are
/// always consistent.
pub read_consistency_interval: Option<f64>,
}
/// Write mode for writing a table.
@@ -44,5 +55,5 @@ pub struct WriteOptions {
#[napi]
pub async fn connect(options: ConnectionOptions) -> napi::Result<Connection> {
Connection::new(options.uri.clone()).await
Connection::new(options).await
}

View File

@@ -12,9 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use lancedb::query::Query as LanceDBQuery;
use napi::bindgen_prelude::*;
use napi_derive::napi;
use vectordb::query::Query as LanceDBQuery;
use crate::{iterator::RecordBatchIterator, table::Table};

View File

@@ -13,10 +13,10 @@
// limitations under the License.
use arrow_ipc::writer::FileWriter;
use lancedb::table::AddDataOptions;
use lancedb::{ipc::ipc_file_to_batches, table::TableRef};
use napi::bindgen_prelude::*;
use napi_derive::napi;
use vectordb::table::AddDataOptions;
use vectordb::{ipc::ipc_file_to_batches, table::TableRef};
use crate::index::IndexBuilder;
use crate::query::Query;
@@ -34,8 +34,12 @@ impl Table {
/// Return Schema as empty Arrow IPC file.
#[napi]
pub fn schema(&self) -> napi::Result<Buffer> {
let mut writer = FileWriter::try_new(vec![], &self.table.schema())
pub async fn schema(&self) -> napi::Result<Buffer> {
let schema =
self.table.schema().await.map_err(|e| {
napi::Error::from_reason(format!("Failed to create IPC file: {}", e))
})?;
let mut writer = FileWriter::try_new(vec![], &schema)
.map_err(|e| napi::Error::from_reason(format!("Failed to create IPC file: {}", e)))?;
writer
.finish()

View File

@@ -1,8 +1,8 @@
{
"include": [
"vectordb/*.ts",
"vectordb/**/*.ts",
"vectordb/*.js",
"lancedb/*.ts",
"lancedb/**/*.ts",
"lancedb/*.js",
],
"compilerOptions": {
"target": "es2022",
@@ -18,7 +18,7 @@
],
"typedocOptions": {
"entryPoints": [
"vectordb/index.ts"
"lancedb/index.ts"
],
"out": "../docs/src/javascript/",
"visibilityFilters": {

View File

@@ -1,6 +1,6 @@
[package]
name = "vectordb-node"
version = "0.4.10"
name = "lancedb-node"
version = "0.4.11"
description = "Serverless, low-latency vector database for AI applications"
license.workspace = true
edition.workspace = true
@@ -24,9 +24,14 @@ half = { workspace = true }
lance = { workspace = true }
lance-index = { workspace = true }
lance-linalg = { workspace = true }
vectordb = { path = "../../vectordb" }
lancedb = { path = "../../lancedb" }
tokio = { version = "1.23", features = ["rt-multi-thread"] }
neon = {version = "0.10.1", default-features = false, features = ["channel-api", "napi-6", "promise-api", "task-api"] }
neon = { version = "0.10.1", default-features = false, features = [
"channel-api",
"napi-6",
"promise-api",
"task-api",
] }
object_store = { workspace = true, features = ["aws"] }
snafu = { workspace = true }
async-trait = "0"

View File

@@ -1,3 +1,3 @@
The LanceDB node bridge (vectordb-node) allows javascript applications to access LanceDB datasets.
The LanceDB node bridge (lancedb-node) allows javascript applications to access LanceDB datasets.
It is build using [Neon](https://neon-bindings.com). See the node project for an example of how it is used / tests

View File

@@ -34,8 +34,8 @@ pub enum Error {
pub type Result<T> = std::result::Result<T, Error>;
impl From<vectordb::error::Error> for Error {
fn from(e: vectordb::error::Error) -> Self {
impl From<lancedb::error::Error> for Error {
fn from(e: lancedb::error::Error) -> Self {
Self::LanceDB {
message: e.to_string(),
}

View File

@@ -19,7 +19,7 @@ use neon::{
};
use crate::{error::ResultExt, runtime, table::JsTable};
use vectordb::Table;
use lancedb::Table;
pub fn table_create_scalar_index(mut cx: FunctionContext) -> JsResult<JsPromise> {
let js_table = cx.this().downcast_or_throw::<JsBox<JsTable>, _>(&mut cx)?;

View File

@@ -13,10 +13,10 @@
// limitations under the License.
use lance_linalg::distance::MetricType;
use lancedb::index::IndexBuilder;
use neon::context::FunctionContext;
use neon::prelude::*;
use std::convert::TryFrom;
use vectordb::index::IndexBuilder;
use crate::error::Error::InvalidIndexType;
use crate::error::ResultExt;

View File

@@ -22,9 +22,9 @@ use object_store::CredentialProvider;
use once_cell::sync::OnceCell;
use tokio::runtime::Runtime;
use vectordb::connect;
use vectordb::connection::Connection;
use vectordb::table::ReadParams;
use lancedb::connect;
use lancedb::connection::Connection;
use lancedb::table::ReadParams;
use crate::error::ResultExt;
use crate::query::JsQuery;
@@ -84,6 +84,11 @@ fn database_new(mut cx: FunctionContext) -> JsResult<JsPromise> {
let path = cx.argument::<JsString>(0)?.value(&mut cx);
let aws_creds = get_aws_creds(&mut cx, 1)?;
let region = get_aws_region(&mut cx, 4)?;
let read_consistency_interval = cx
.argument_opt(5)
.and_then(|arg| arg.downcast::<JsNumber, _>(&mut cx).ok())
.map(|v| v.value(&mut cx))
.map(std::time::Duration::from_secs_f64);
let rt = runtime(&mut cx)?;
let channel = cx.channel();
@@ -100,6 +105,9 @@ fn database_new(mut cx: FunctionContext) -> JsResult<JsPromise> {
token: aws_creds.token,
});
}
if let Some(interval) = read_consistency_interval {
conn_builder = conn_builder.read_consistency_interval(interval);
}
rt.spawn(async move {
let database = conn_builder.execute().await;

View File

@@ -93,7 +93,7 @@ impl JsQuery {
.and_then(|stream| {
stream
.try_collect::<Vec<_>>()
.map_err(vectordb::error::Error::from)
.map_err(lancedb::error::Error::from)
})
.await;

View File

@@ -18,12 +18,12 @@ use arrow_array::{RecordBatch, RecordBatchIterator};
use lance::dataset::optimize::CompactionOptions;
use lance::dataset::{WriteMode, WriteParams};
use lance::io::ObjectStoreParams;
use vectordb::table::{AddDataOptions, OptimizeAction, WriteOptions};
use lancedb::table::{AddDataOptions, OptimizeAction, WriteOptions};
use crate::arrow::{arrow_buffer_to_record_batch, record_batch_to_buffer};
use lancedb::TableRef;
use neon::prelude::*;
use neon::types::buffer::TypedArray;
use vectordb::TableRef;
use crate::error::ResultExt;
use crate::{convert, get_aws_credential_provider, get_aws_region, runtime, JsDatabase};
@@ -534,8 +534,9 @@ impl JsTable {
.value(&mut cx);
rt.spawn(async move {
let schema = table.schema().await;
deferred.settle_with(&channel, move |mut cx| {
let schema = table.schema();
let schema = schema.or_throw(&mut cx)?;
let batches = vec![RecordBatch::new_empty(schema)];
let buffer = record_batch_to_buffer(batches).or_throw(&mut cx)?;
convert::new_js_buffer(buffer, &mut cx, is_electron)

View File

@@ -1,6 +1,6 @@
[package]
name = "vectordb"
version = "0.4.10"
name = "lancedb"
version = "0.4.11"
edition.workspace = true
description = "LanceDB: A serverless, low-latency vector database for AI applications"
license.workspace = true

View File

@@ -19,9 +19,9 @@ use arrow_array::{FixedSizeListArray, Int32Array, RecordBatch, RecordBatchIterat
use arrow_schema::{DataType, Field, Schema};
use futures::TryStreamExt;
use vectordb::connection::Connection;
use vectordb::table::AddDataOptions;
use vectordb::{connect, Result, Table, TableRef};
use lancedb::connection::Connection;
use lancedb::table::AddDataOptions;
use lancedb::{connect, Result, Table, TableRef};
#[tokio::main]
async fn main() -> Result<()> {

View File

@@ -303,6 +303,9 @@ pub struct ConnectBuilder {
/// User provided AWS credentials
aws_creds: Option<AwsCredential>,
/// The interval at which to check for updates from other processes.
read_consistency_interval: Option<std::time::Duration>,
}
impl ConnectBuilder {
@@ -314,6 +317,7 @@ impl ConnectBuilder {
region: None,
host_override: None,
aws_creds: None,
read_consistency_interval: None,
}
}
@@ -338,6 +342,29 @@ impl ConnectBuilder {
self
}
/// The interval at which to check for updates from other processes. This
/// only affects LanceDB OSS.
///
/// If left unset, consistency is not checked. For maximum read
/// performance, this is the default. For strong consistency, set this to
/// zero seconds. Then every read will check for updates from other processes.
/// As a compromise, set this to a non-zero duration for eventual consistency.
/// If more than that duration has passed since the last read, the read will
/// check for updates from other processes.
///
/// This only affects read operations. Write operations are always
/// consistent.
///
/// LanceDB Cloud uses eventual consistency under the hood, and is not
/// currently configurable.
pub fn read_consistency_interval(
mut self,
read_consistency_interval: std::time::Duration,
) -> Self {
self.read_consistency_interval = Some(read_consistency_interval);
self
}
/// Establishes a connection to the database
pub async fn execute(self) -> Result<Connection> {
let internal = Arc::new(Database::connect_with_options(&self).await?);
@@ -368,6 +395,8 @@ struct Database {
// the object store wrapper to use on write path
pub(crate) store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
read_consistency_interval: Option<std::time::Duration>,
}
const LANCE_EXTENSION: &str = "lance";
@@ -380,8 +409,11 @@ impl Database {
let uri = &options.uri;
let parse_res = url::Url::parse(uri);
// TODO: pass params regardless of OS
match parse_res {
Ok(url) if url.scheme().len() == 1 && cfg!(windows) => Self::open_path(uri).await,
Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
Self::open_path(uri, options.read_consistency_interval).await
}
Ok(mut url) => {
// iter thru the query params and extract the commit store param
let mut engine = None;
@@ -465,13 +497,17 @@ impl Database {
base_path,
object_store,
store_wrapper: write_store_wrapper,
read_consistency_interval: options.read_consistency_interval,
})
}
Err(_) => Self::open_path(uri).await,
Err(_) => Self::open_path(uri, options.read_consistency_interval).await,
}
}
async fn open_path(path: &str) -> Result<Self> {
async fn open_path(
path: &str,
read_consistency_interval: Option<std::time::Duration>,
) -> Result<Self> {
let (object_store, base_path) = ObjectStore::from_uri(path).await?;
if object_store.is_local() {
Self::try_create_dir(path).context(CreateDirSnafu { path })?;
@@ -482,6 +518,7 @@ impl Database {
base_path,
object_store,
store_wrapper: None,
read_consistency_interval,
})
}
@@ -551,6 +588,7 @@ impl ConnectionInternal for Database {
options.data.unwrap(),
self.store_wrapper.clone(),
Some(write_params),
self.read_consistency_interval,
)
.await
{
@@ -576,6 +614,7 @@ impl ConnectionInternal for Database {
&options.name,
self.store_wrapper.clone(),
options.lance_read_params,
self.read_consistency_interval,
)
.await?,
))
@@ -696,6 +735,6 @@ mod tests {
.execute()
.await
.unwrap();
assert_eq!(other_schema, overwritten.schema());
assert_eq!(other_schema, overwritten.schema().await.unwrap());
}
}

View File

@@ -168,7 +168,7 @@ impl IndexBuilder {
/// Build the parameters.
pub async fn build(&self) -> Result<()> {
let schema = self.table.schema();
let schema = self.table.schema().await?;
// TODO: simplify this after GH lance#1864.
let mut index_type = &self.index_type;
@@ -230,7 +230,7 @@ impl IndexBuilder {
.table
.as_native()
.expect("Only native table is supported here");
let mut dataset = tbl.clone_inner_dataset();
let mut dataset = tbl.dataset.get_mut().await?;
match params {
IndexParams::Scalar { replace } => {
dataset
@@ -271,7 +271,6 @@ impl IndexBuilder {
.await?;
}
}
tbl.reset_dataset(dataset);
Ok(())
}
}

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! # VectorDB ([LanceDB](https://github.com/lancedb/lancedb)) -- Developer-friendly, serverless vector database for AI applications
//!
//! [LanceDB](https://github.com/lancedb/lancedb) is an open-source database for vector-search built with persistent storage,
//! which greatly simplifies retrevial, filtering and management of embeddings.
//!
@@ -33,7 +31,7 @@
//! LanceDB runs in process, to use it in your Rust project, put the following in your `Cargo.toml`:
//!
//! ```ignore
//! cargo install vectordb
//! cargo install lancedb
//! ```
//!
//! ### Quick Start
@@ -45,7 +43,7 @@
//! ```rust
//! # use arrow_schema::{Field, Schema};
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
//! let db = vectordb::connect("data/sample-lancedb").execute().await.unwrap();
//! let db = lancedb::connect("data/sample-lancedb").execute().await.unwrap();
//! # });
//! ```
//!
@@ -60,7 +58,7 @@
//! ```rust
//! use object_store::aws::AwsCredential;
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
//! let db = vectordb::connect("data/sample-lancedb")
//! let db = lancedb::connect("data/sample-lancedb")
//! .aws_creds(AwsCredential {
//! key_id: "some_key".to_string(),
//! secret_key: "some_secret".to_string(),
@@ -90,7 +88,7 @@
//!
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
//! # let tmpdir = tempfile::tempdir().unwrap();
//! # let db = vectordb::connect(tmpdir.path().to_str().unwrap()).execute().await.unwrap();
//! # let db = lancedb::connect(tmpdir.path().to_str().unwrap()).execute().await.unwrap();
//! let schema = Arc::new(Schema::new(vec![
//! Field::new("id", DataType::Int32, false),
//! Field::new(
@@ -134,7 +132,7 @@
//! # use arrow_schema::{Schema, Field, DataType};
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
//! # let tmpdir = tempfile::tempdir().unwrap();
//! # let db = vectordb::connect(tmpdir.path().to_str().unwrap()).execute().await.unwrap();
//! # let db = lancedb::connect(tmpdir.path().to_str().unwrap()).execute().await.unwrap();
//! # let tbl = db.open_table("idx_test").execute().await.unwrap();
//! tbl.create_index(&["vector"])
//! .ivf_pq()
@@ -155,7 +153,7 @@
//! # use arrow_array::{FixedSizeListArray, Float32Array, Int32Array, types::Float32Type};
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
//! # let tmpdir = tempfile::tempdir().unwrap();
//! # let db = vectordb::connect(tmpdir.path().to_str().unwrap()).execute().await.unwrap();
//! # let db = lancedb::connect(tmpdir.path().to_str().unwrap()).execute().await.unwrap();
//! # let schema = Arc::new(Schema::new(vec![
//! # Field::new("id", DataType::Int32, false),
//! # Field::new("vector", DataType::FixedSizeList(

View File

@@ -12,15 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use arrow_array::Float32Array;
use arrow_schema::Schema;
use lance::dataset::scanner::{DatasetRecordBatchStream, Scanner};
use lance::dataset::Dataset;
use lance_linalg::distance::MetricType;
use crate::error::Result;
use crate::table::dataset::DatasetConsistencyWrapper;
use crate::utils::default_vector_column;
use crate::Error;
@@ -29,7 +27,7 @@ const DEFAULT_TOP_K: usize = 10;
/// A builder for nearest neighbor queries for LanceDB.
#[derive(Clone)]
pub struct Query {
dataset: Arc<Dataset>,
dataset: DatasetConsistencyWrapper,
// The column to run the query on. If not specified, we will attempt to guess
// the column based on the dataset's schema.
@@ -60,7 +58,8 @@ impl Query {
/// # Arguments
///
/// * `dataset` - Lance dataset.
pub(crate) fn new(dataset: Arc<Dataset>) -> Self {
///
pub(crate) fn new(dataset: DatasetConsistencyWrapper) -> Self {
Self {
dataset,
query_vector: None,
@@ -82,7 +81,8 @@ impl Query {
///
/// * A [DatasetRecordBatchStream] with the query's results.
pub async fn execute_stream(&self) -> Result<DatasetRecordBatchStream> {
let mut scanner: Scanner = self.dataset.scan();
let ds_ref = self.dataset.get().await?;
let mut scanner: Scanner = ds_ref.scan();
if let Some(query) = self.query_vector.as_ref() {
// If there is a vector query, default to limit=10 if unspecified
@@ -90,10 +90,10 @@ impl Query {
col.clone()
} else {
// Infer a vector column with the same dimension of the query vector.
let arrow_schema = Schema::from(self.dataset.schema());
let arrow_schema = Schema::from(ds_ref.schema());
default_vector_column(&arrow_schema, Some(query.len() as i32))?
};
let field = self.dataset.schema().field(&column).ok_or(Error::Store {
let field = ds_ref.schema().field(&column).ok_or(Error::Store {
message: format!("Column {} not found in dataset schema", column),
})?;
if !matches!(field.data_type(), arrow_schema::DataType::FixedSizeList(f, dim) if f.data_type().is_floating() && dim == query.len() as i32)
@@ -239,8 +239,10 @@ mod tests {
let batches = make_test_batches();
let ds = Dataset::write(batches, "memory://foo", None).await.unwrap();
let ds = DatasetConsistencyWrapper::new_latest(ds, None);
let vector = Some(Float32Array::from_iter_values([0.1, 0.2]));
let query = Query::new(Arc::new(ds)).nearest_to(&[0.1, 0.2]);
let query = Query::new(ds).nearest_to(&[0.1, 0.2]);
assert_eq!(query.query_vector, vector);
let new_vector = Float32Array::from_iter_values([9.8, 8.7]);
@@ -264,7 +266,9 @@ mod tests {
#[tokio::test]
async fn test_execute() {
let batches = make_non_empty_batches();
let ds = Arc::new(Dataset::write(batches, "memory://foo", None).await.unwrap());
let ds = Dataset::write(batches, "memory://foo", None).await.unwrap();
let ds = DatasetConsistencyWrapper::new_latest(ds, None);
let query = Query::new(ds.clone()).nearest_to(&[0.1; 4]);
let result = query.limit(10).filter("id % 2 == 0").execute_stream().await;
@@ -294,9 +298,11 @@ mod tests {
async fn test_execute_no_vector() {
// test that it's ok to not specify a query vector (just filter / limit)
let batches = make_non_empty_batches();
let ds = Arc::new(Dataset::write(batches, "memory://foo", None).await.unwrap());
let ds = Dataset::write(batches, "memory://foo", None).await.unwrap();
let query = Query::new(ds.clone());
let ds = DatasetConsistencyWrapper::new_latest(ds, None);
let query = Query::new(ds);
let result = query.filter("id % 2 == 0").execute_stream().await;
let mut stream = result.expect("should have result");
// should only have one batch

View File

@@ -15,7 +15,7 @@
//! LanceDB Table APIs
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use arrow_array::{RecordBatchIterator, RecordBatchReader};
use arrow_schema::{Schema, SchemaRef};
@@ -39,8 +39,10 @@ use crate::index::IndexBuilder;
use crate::query::Query;
use crate::utils::{PatchReadParam, PatchWriteParam};
use self::dataset::DatasetConsistencyWrapper;
use self::merge::{MergeInsert, MergeInsertBuilder};
pub(crate) mod dataset;
pub mod merge;
/// Optimize the dataset.
@@ -127,7 +129,7 @@ pub trait Table: std::fmt::Display + Send + Sync {
fn name(&self) -> &str;
/// Get the arrow [Schema] of the table.
fn schema(&self) -> SchemaRef;
async fn schema(&self) -> Result<SchemaRef>;
/// Count the number of rows in this dataset.
///
@@ -162,7 +164,7 @@ pub trait Table: std::fmt::Display + Send + Sync {
/// # use arrow_schema::{Schema, Field, DataType};
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// let tmpdir = tempfile::tempdir().unwrap();
/// let db = vectordb::connect(tmpdir.path().to_str().unwrap())
/// let db = lancedb::connect(tmpdir.path().to_str().unwrap())
/// .execute()
/// .await
/// .unwrap();
@@ -210,7 +212,7 @@ pub trait Table: std::fmt::Display + Send + Sync {
/// # use arrow_schema::{Schema, Field, DataType};
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// let tmpdir = tempfile::tempdir().unwrap();
/// let db = vectordb::connect(tmpdir.path().to_str().unwrap())
/// let db = lancedb::connect(tmpdir.path().to_str().unwrap())
/// .execute()
/// .await
/// .unwrap();
@@ -264,7 +266,7 @@ pub trait Table: std::fmt::Display + Send + Sync {
/// # use arrow_schema::{Schema, Field, DataType};
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// let tmpdir = tempfile::tempdir().unwrap();
/// let db = vectordb::connect(tmpdir.path().to_str().unwrap())
/// let db = lancedb::connect(tmpdir.path().to_str().unwrap())
/// .execute()
/// .await
/// .unwrap();
@@ -322,7 +324,8 @@ pub trait Table: std::fmt::Display + Send + Sync {
/// # use arrow_array::RecordBatch;
/// # use futures::TryStreamExt;
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// # let tbl = vectordb::table::NativeTable::open("/tmp/tbl").await.unwrap();
/// # let tbl = lancedb::table::NativeTable::open("/tmp/tbl").await.unwrap();
/// use crate::lancedb::Table;
/// let stream = tbl
/// .query()
/// .nearest_to(&[1.0, 2.0, 3.0])
@@ -340,7 +343,8 @@ pub trait Table: std::fmt::Display + Send + Sync {
/// # use arrow_array::RecordBatch;
/// # use futures::TryStreamExt;
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// # let tbl = vectordb::table::NativeTable::open("/tmp/tbl").await.unwrap();
/// # let tbl = lancedb::table::NativeTable::open("/tmp/tbl").await.unwrap();
/// use crate::lancedb::Table;
/// let stream = tbl
/// .query()
/// .filter("id > 5")
@@ -357,7 +361,8 @@ pub trait Table: std::fmt::Display + Send + Sync {
/// # use arrow_array::RecordBatch;
/// # use futures::TryStreamExt;
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// # let tbl = vectordb::table::NativeTable::open("/tmp/tbl").await.unwrap();
/// # let tbl = lancedb::table::NativeTable::open("/tmp/tbl").await.unwrap();
/// use crate::lancedb::Table;
/// let stream = tbl.query().execute_stream().await.unwrap();
/// let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
/// # });
@@ -368,7 +373,7 @@ pub trait Table: std::fmt::Display + Send + Sync {
///
/// <section class="warning">Experimental API</section>
///
/// Modeled after ``VACCUM`` in PostgreSQL.
/// Modeled after ``VACUUM`` in PostgreSQL.
/// Not all implementations support explicit optimization.
async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats>;
}
@@ -381,10 +386,14 @@ pub type TableRef = Arc<dyn Table>;
pub struct NativeTable {
name: String,
uri: String,
dataset: Arc<Mutex<Dataset>>,
pub(crate) dataset: dataset::DatasetConsistencyWrapper,
// the object store wrapper to use on write path
store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
// This comes from the connection options. We store here so we can pass down
// to the dataset when we recreate it (for example, in checkout_latest).
read_consistency_interval: Option<std::time::Duration>,
}
impl std::fmt::Display for NativeTable {
@@ -406,7 +415,7 @@ impl NativeTable {
/// * A [NativeTable] object.
pub async fn open(uri: &str) -> Result<Self> {
let name = Self::get_table_name(uri)?;
Self::open_with_params(uri, &name, None, None).await
Self::open_with_params(uri, &name, None, None, None).await
}
/// Opens an existing Table
@@ -425,6 +434,7 @@ impl NativeTable {
name: &str,
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
params: Option<ReadParams>,
read_consistency_interval: Option<std::time::Duration>,
) -> Result<Self> {
let params = params.unwrap_or_default();
// patch the params if we have a write store wrapper
@@ -445,23 +455,22 @@ impl NativeTable {
message: e.to_string(),
},
})?;
let dataset = DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval);
Ok(Self {
name: name.to_string(),
uri: uri.to_string(),
dataset: Arc::new(Mutex::new(dataset)),
dataset,
store_wrapper: write_store_wrapper,
read_consistency_interval,
})
}
/// Make a new clone of the internal lance dataset.
pub(crate) fn clone_inner_dataset(&self) -> Dataset {
self.dataset.lock().expect("Lock poison").clone()
}
/// Checkout a specific version of this [NativeTable]
pub async fn checkout(uri: &str, version: u64) -> Result<Self> {
let name = Self::get_table_name(uri)?;
Self::checkout_with_params(uri, &name, version, None, ReadParams::default()).await
Self::checkout_with_params(uri, &name, version, None, ReadParams::default(), None).await
}
pub async fn checkout_with_params(
@@ -470,44 +479,35 @@ impl NativeTable {
version: u64,
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
params: ReadParams,
read_consistency_interval: Option<std::time::Duration>,
) -> Result<Self> {
// patch the params if we have a write store wrapper
let params = match write_store_wrapper.clone() {
Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
None => params,
};
let dataset = Dataset::checkout_with_params(uri, version, &params)
.await
.map_err(|e| match e {
lance::Error::DatasetNotFound { .. } => Error::TableNotFound {
name: name.to_string(),
},
e => Error::Lance {
message: e.to_string(),
},
})?;
let dataset = DatasetBuilder::from_uri(uri)
.with_version(version)
.with_read_params(params)
.load()
.await?;
let dataset = DatasetConsistencyWrapper::new_time_travel(dataset, version);
Ok(Self {
name: name.to_string(),
uri: uri.to_string(),
dataset: Arc::new(Mutex::new(dataset)),
dataset,
store_wrapper: write_store_wrapper,
read_consistency_interval,
})
}
pub async fn checkout_latest(&self) -> Result<Self> {
let dataset = self.clone_inner_dataset();
let latest_version_id = dataset.latest_version_id().await?;
let dataset = if latest_version_id == dataset.version().version {
dataset
} else {
dataset.checkout_version(latest_version_id).await?
};
let mut dataset = self.dataset.duplicate().await;
dataset.as_latest(self.read_consistency_interval).await?;
Ok(Self {
name: self.name.clone(),
uri: self.uri.clone(),
dataset: Arc::new(Mutex::new(dataset)),
store_wrapper: self.store_wrapper.clone(),
dataset,
..self.clone()
})
}
@@ -543,6 +543,7 @@ impl NativeTable {
batches: impl RecordBatchReader + Send + 'static,
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
params: Option<WriteParams>,
read_consistency_interval: Option<std::time::Duration>,
) -> Result<Self> {
let params = params.unwrap_or_default();
// patch the params if we have a write store wrapper
@@ -564,8 +565,9 @@ impl NativeTable {
Ok(Self {
name: name.to_string(),
uri: uri.to_string(),
dataset: Arc::new(Mutex::new(dataset)),
dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
store_wrapper: write_store_wrapper,
read_consistency_interval,
})
}
@@ -575,34 +577,35 @@ impl NativeTable {
schema: SchemaRef,
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
params: Option<WriteParams>,
read_consistency_interval: Option<std::time::Duration>,
) -> Result<Self> {
let batches = RecordBatchIterator::new(vec![], schema);
Self::create(uri, name, batches, write_store_wrapper, params).await
Self::create(
uri,
name,
batches,
write_store_wrapper,
params,
read_consistency_interval,
)
.await
}
/// Version of this Table
pub fn version(&self) -> u64 {
self.dataset.lock().expect("lock poison").version().version
pub async fn version(&self) -> Result<u64> {
Ok(self.dataset.get().await?.version().version)
}
async fn optimize_indices(&self, options: &OptimizeOptions) -> Result<()> {
info!("LanceDB: optimizing indices: {:?}", options);
let mut dataset = self.clone_inner_dataset();
dataset.optimize_indices(options).await?;
self.dataset
.get_mut()
.await?
.optimize_indices(options)
.await?;
Ok(())
}
pub fn query(&self) -> Query {
Query::new(self.clone_inner_dataset().into())
}
pub fn filter(&self, expr: String) -> Query {
Query::new(self.clone_inner_dataset().into()).filter(expr)
}
/// Returns the number of rows in this Table
/// Merge new data into this table.
pub async fn merge(
&mut self,
@@ -610,14 +613,17 @@ impl NativeTable {
left_on: &str,
right_on: &str,
) -> Result<()> {
let mut dataset = self.clone_inner_dataset();
dataset.merge(batches, left_on, right_on).await?;
self.dataset = Arc::new(Mutex::new(dataset));
self.dataset
.get_mut()
.await?
.merge(batches, left_on, right_on)
.await?;
Ok(())
}
pub async fn update(&self, predicate: Option<&str>, updates: Vec<(&str, &str)>) -> Result<()> {
let mut builder = UpdateBuilder::new(self.clone_inner_dataset().into());
let dataset = self.dataset.get().await?.clone();
let mut builder = UpdateBuilder::new(Arc::new(dataset));
if let Some(predicate) = predicate {
builder = builder.update_where(predicate)?;
}
@@ -628,7 +634,7 @@ impl NativeTable {
let operation = builder.build()?;
let ds = operation.execute().await?;
self.reset_dataset(ds.as_ref().clone());
self.dataset.set_latest(ds.as_ref().clone()).await;
Ok(())
}
@@ -648,8 +654,10 @@ impl NativeTable {
older_than: Duration,
delete_unverified: Option<bool>,
) -> Result<RemovalStats> {
let dataset = self.clone_inner_dataset();
Ok(dataset
Ok(self
.dataset
.get_mut()
.await?
.cleanup_old_versions(older_than, delete_unverified)
.await?)
}
@@ -665,24 +673,27 @@ impl NativeTable {
options: CompactionOptions,
remap_options: Option<Arc<dyn IndexRemapperOptions>>,
) -> Result<CompactionMetrics> {
let mut dataset = self.clone_inner_dataset();
let metrics = compact_files(&mut dataset, options, remap_options).await?;
self.reset_dataset(dataset);
let mut dataset_mut = self.dataset.get_mut().await?;
let metrics = compact_files(&mut dataset_mut, options, remap_options).await?;
Ok(metrics)
}
pub fn count_fragments(&self) -> usize {
self.dataset.lock().expect("lock poison").count_fragments()
// TODO: why are these individual methods and not some single "get_stats" method?
pub async fn count_fragments(&self) -> Result<usize> {
Ok(self.dataset.get().await?.count_fragments())
}
pub async fn count_deleted_rows(&self) -> Result<usize> {
let dataset = self.clone_inner_dataset();
Ok(dataset.count_deleted_rows().await?)
Ok(self.dataset.get().await?.count_deleted_rows().await?)
}
pub async fn num_small_files(&self, max_rows_per_group: usize) -> usize {
let dataset = self.clone_inner_dataset();
dataset.num_small_files(max_rows_per_group).await
pub async fn num_small_files(&self, max_rows_per_group: usize) -> Result<usize> {
Ok(self
.dataset
.get()
.await?
.num_small_files(max_rows_per_group)
.await)
}
pub async fn count_indexed_rows(&self, index_uuid: &str) -> Result<Option<usize>> {
@@ -700,7 +711,7 @@ impl NativeTable {
}
pub async fn load_indices(&self) -> Result<Vec<VectorIndex>> {
let dataset = self.clone_inner_dataset();
let dataset = self.dataset.get().await?;
let (indices, mf) = futures::try_join!(dataset.load_indices(), dataset.latest_manifest())?;
Ok(indices
.iter()
@@ -717,7 +728,7 @@ impl NativeTable {
if index.is_none() {
return Ok(None);
}
let dataset = self.clone_inner_dataset();
let dataset = self.dataset.get().await?;
let index_stats = dataset.index_statistics(&index.unwrap().index_name).await?;
let index_stats: VectorIndexStatistics =
serde_json::from_str(&index_stats).map_err(|e| Error::Lance {
@@ -729,10 +740,6 @@ impl NativeTable {
Ok(Some(index_stats))
}
pub(crate) fn reset_dataset(&self, dataset: Dataset) {
*self.dataset.lock().expect("lock poison") = dataset;
}
}
#[async_trait]
@@ -742,7 +749,7 @@ impl MergeInsert for NativeTable {
params: MergeInsertBuilder,
new_data: Box<dyn RecordBatchReader + Send>,
) -> Result<()> {
let dataset = Arc::new(self.clone_inner_dataset());
let dataset = Arc::new(self.dataset.get().await?.clone());
let mut builder = LanceMergeInsertBuilder::try_new(dataset.clone(), params.on)?;
match (
params.when_matched_update_all,
@@ -769,7 +776,7 @@ impl MergeInsert for NativeTable {
}
let job = builder.try_build()?;
let new_dataset = job.execute_reader(new_data).await?;
self.reset_dataset((*new_dataset).clone());
self.dataset.set_latest(new_dataset.as_ref().clone()).await;
Ok(())
}
}
@@ -788,13 +795,13 @@ impl Table for NativeTable {
self.name.as_str()
}
fn schema(&self) -> SchemaRef {
let lance_schema = { self.dataset.lock().expect("lock poison").schema().clone() };
Arc::new(Schema::from(&lance_schema))
async fn schema(&self) -> Result<SchemaRef> {
let lance_schema = self.dataset.get().await?.schema().clone();
Ok(Arc::new(Schema::from(&lance_schema)))
}
async fn count_rows(&self, filter: Option<String>) -> Result<usize> {
let dataset = { self.dataset.lock().expect("lock poison").clone() };
let dataset = self.dataset.get().await?;
if let Some(filter) = filter {
let mut scanner = dataset.scan();
scanner.filter(&filter)?;
@@ -826,7 +833,8 @@ impl Table for NativeTable {
None => lance_params,
};
self.reset_dataset(Dataset::write(batches, &self.uri, Some(lance_params)).await?);
let dataset = Dataset::write(batches, &self.uri, Some(lance_params)).await?;
self.dataset.set_latest(dataset).await;
Ok(())
}
@@ -840,14 +848,12 @@ impl Table for NativeTable {
}
fn query(&self) -> Query {
Query::new(Arc::new(self.dataset.lock().expect("lock poison").clone()))
Query::new(self.dataset.clone())
}
/// Delete rows from the table
async fn delete(&self, predicate: &str) -> Result<()> {
let mut dataset = self.clone_inner_dataset();
dataset.delete(predicate).await?;
self.reset_dataset(dataset);
self.dataset.get_mut().await?.delete(predicate).await?;
Ok(())
}
@@ -903,6 +909,7 @@ mod tests {
use std::iter;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use arrow_array::{
Array, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
@@ -918,6 +925,8 @@ mod tests {
use rand::Rng;
use tempfile::tempdir;
use crate::connection::ConnectBuilder;
use super::*;
#[tokio::test]
@@ -960,7 +969,7 @@ mod tests {
let uri = tmp_dir.path().to_str().unwrap();
let batches = make_test_batches();
let table = NativeTable::create(uri, "test", batches, None, None)
let table = NativeTable::create(uri, "test", batches, None, None, None)
.await
.unwrap();
@@ -978,7 +987,7 @@ mod tests {
let batches = make_test_batches();
let schema = batches.schema().clone();
let table = NativeTable::create(uri, "test", batches, None, None)
let table = NativeTable::create(uri, "test", batches, None, None, None)
.await
.unwrap();
assert_eq!(table.count_rows(None).await.unwrap(), 10);
@@ -1009,7 +1018,7 @@ mod tests {
// Create a dataset with i=0..10
let batches = merge_insert_test_batches(0, 0);
let table = NativeTable::create(uri, "test", batches, None, None)
let table = NativeTable::create(uri, "test", batches, None, None, None)
.await
.unwrap();
assert_eq!(table.count_rows(None).await.unwrap(), 10);
@@ -1055,7 +1064,7 @@ mod tests {
let batches = make_test_batches();
let schema = batches.schema().clone();
let table = NativeTable::create(uri, "test", batches, None, None)
let table = NativeTable::create(uri, "test", batches, None, None, None)
.await
.unwrap();
assert_eq!(table.count_rows(None).await.unwrap(), 10);
@@ -1410,7 +1419,7 @@ mod tests {
..Default::default()
};
assert!(!wrapper.called());
let _ = NativeTable::open_with_params(uri, "test", None, Some(param))
let _ = NativeTable::open_with_params(uri, "test", None, Some(param), None)
.await
.unwrap();
assert!(wrapper.called());
@@ -1484,7 +1493,7 @@ mod tests {
schema,
);
let table = NativeTable::create(uri, "test", batches, None, None)
let table = NativeTable::create(uri, "test", batches, None, None, None)
.await
.unwrap();
@@ -1529,4 +1538,68 @@ mod tests {
Ok(FixedSizeListArray::from(data))
}
#[tokio::test]
async fn test_read_consistency_interval() {
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1]))],
)
.unwrap();
let intervals = vec![
None,
Some(0),
Some(100), // 100 ms
];
for interval in intervals {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let conn1 = ConnectBuilder::new(uri).execute().await.unwrap();
let table1 = conn1
.create_empty_table("my_table", batch.schema())
.execute()
.await
.unwrap();
let mut conn2 = ConnectBuilder::new(uri);
if let Some(interval) = interval {
conn2 = conn2.read_consistency_interval(std::time::Duration::from_millis(interval));
}
let conn2 = conn2.execute().await.unwrap();
let table2 = conn2.open_table("my_table").execute().await.unwrap();
assert_eq!(table1.count_rows(None).await.unwrap(), 0);
assert_eq!(table2.count_rows(None).await.unwrap(), 0);
table1
.add(
Box::new(RecordBatchIterator::new(
vec![Ok(batch.clone())],
batch.schema(),
)),
AddDataOptions::default(),
)
.await
.unwrap();
assert_eq!(table1.count_rows(None).await.unwrap(), 1);
match interval {
None => {
assert_eq!(table2.count_rows(None).await.unwrap(), 0);
}
Some(0) => {
assert_eq!(table2.count_rows(None).await.unwrap(), 1);
}
Some(100) => {
assert_eq!(table2.count_rows(None).await.unwrap(), 0);
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(table2.count_rows(None).await.unwrap(), 1);
}
_ => unreachable!(),
}
}
}
}

View File

@@ -0,0 +1,234 @@
// Copyright 2024 LanceDB Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{
ops::{Deref, DerefMut},
sync::Arc,
time::{self, Duration, Instant},
};
use lance::Dataset;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use crate::error::Result;
/// A wrapper around a [Dataset] that provides lazy-loading and consistency checks.
///
/// This can be cloned cheaply. It supports concurrent reads or exclusive writes.
#[derive(Debug, Clone)]
pub struct DatasetConsistencyWrapper(Arc<RwLock<DatasetRef>>);
/// A wrapper around a [Dataset] that provides consistency checks.
///
/// The dataset is lazily loaded, and starts off as None. On the first access,
/// the dataset is loaded.
#[derive(Debug, Clone)]
enum DatasetRef {
/// In this mode, the dataset is always the latest version.
Latest {
dataset: Dataset,
read_consistency_interval: Option<Duration>,
last_consistency_check: Option<time::Instant>,
},
/// In this mode, the dataset is a specific version. It cannot be mutated.
TimeTravel { dataset: Dataset, version: u64 },
}
impl DatasetRef {
/// Reload the dataset to the appropriate version.
async fn reload(&mut self) -> Result<()> {
match self {
Self::Latest {
dataset,
last_consistency_check,
..
} => {
*dataset = dataset
.checkout_version(dataset.latest_version_id().await?)
.await?;
last_consistency_check.replace(Instant::now());
}
Self::TimeTravel { dataset, version } => {
dataset.checkout_version(*version).await?;
}
}
Ok(())
}
async fn as_latest(&mut self, read_consistency_interval: Option<Duration>) -> Result<()> {
match self {
Self::Latest { .. } => Ok(()),
Self::TimeTravel { dataset, .. } => {
dataset
.checkout_version(dataset.latest_version_id().await?)
.await?;
*self = Self::Latest {
dataset: dataset.clone(),
read_consistency_interval,
last_consistency_check: Some(Instant::now()),
};
Ok(())
}
}
}
fn set_latest(&mut self, dataset: Dataset) {
match self {
Self::Latest {
dataset: ref mut ds,
..
} => {
*ds = dataset;
}
_ => unreachable!("Dataset should be in latest mode at this point"),
}
}
}
impl DatasetConsistencyWrapper {
/// Create a new wrapper in the latest version mode.
pub fn new_latest(dataset: Dataset, read_consistency_interval: Option<Duration>) -> Self {
Self(Arc::new(RwLock::new(DatasetRef::Latest {
dataset,
read_consistency_interval,
last_consistency_check: None,
})))
}
/// Create a new wrapper in the time travel mode.
pub fn new_time_travel(dataset: Dataset, version: u64) -> Self {
Self(Arc::new(RwLock::new(DatasetRef::TimeTravel {
dataset,
version,
})))
}
/// Create an independent copy of self.
///
/// Unlike Clone, this will track versions independently of the original wrapper and
/// will be tied to a different RwLock.
pub async fn duplicate(&self) -> Self {
let ds_ref = self.0.read().await;
Self(Arc::new(RwLock::new((*ds_ref).clone())))
}
/// Get an immutable reference to the dataset.
pub async fn get(&self) -> Result<DatasetReadGuard<'_>> {
self.ensure_up_to_date().await?;
Ok(DatasetReadGuard {
guard: self.0.read().await,
})
}
/// Get a mutable reference to the dataset.
pub async fn get_mut(&self) -> Result<DatasetWriteGuard<'_>> {
self.ensure_up_to_date().await?;
Ok(DatasetWriteGuard {
guard: self.0.write().await,
})
}
/// Convert into a wrapper in latest version mode
pub async fn as_latest(&mut self, read_consistency_interval: Option<Duration>) -> Result<()> {
self.0
.write()
.await
.as_latest(read_consistency_interval)
.await
}
/// Provide a known latest version of the dataset.
///
/// This is usually done after some write operation, which inherently will
/// have the latest version.
pub async fn set_latest(&self, dataset: Dataset) {
self.0.write().await.set_latest(dataset);
}
async fn reload(&self) -> Result<()> {
self.0.write().await.reload().await
}
async fn is_up_to_date(&self) -> Result<bool> {
let dataset_ref = self.0.read().await;
match &*dataset_ref {
DatasetRef::Latest {
read_consistency_interval,
last_consistency_check,
..
} => match (read_consistency_interval, last_consistency_check) {
(None, _) => Ok(true),
(Some(_), None) => Ok(false),
(Some(read_consistency_interval), Some(last_consistency_check)) => {
if &last_consistency_check.elapsed() < read_consistency_interval {
Ok(true)
} else {
Ok(false)
}
}
},
DatasetRef::TimeTravel { dataset, version } => {
Ok(dataset.version().version == *version)
}
}
}
/// Ensures that the dataset is loaded and up-to-date with consistency and
/// version parameters.
async fn ensure_up_to_date(&self) -> Result<()> {
if !self.is_up_to_date().await? {
self.reload().await?;
}
Ok(())
}
}
pub struct DatasetReadGuard<'a> {
guard: RwLockReadGuard<'a, DatasetRef>,
}
impl Deref for DatasetReadGuard<'_> {
type Target = Dataset;
fn deref(&self) -> &Self::Target {
match &*self.guard {
DatasetRef::Latest { dataset, .. } => dataset,
DatasetRef::TimeTravel { dataset, .. } => dataset,
}
}
}
pub struct DatasetWriteGuard<'a> {
guard: RwLockWriteGuard<'a, DatasetRef>,
}
impl Deref for DatasetWriteGuard<'_> {
type Target = Dataset;
fn deref(&self) -> &Self::Target {
match &*self.guard {
DatasetRef::Latest { dataset, .. } => dataset,
DatasetRef::TimeTravel { dataset, .. } => dataset,
}
}
}
impl DerefMut for DatasetWriteGuard<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
match &mut *self.guard {
DatasetRef::Latest { dataset, .. } => dataset,
DatasetRef::TimeTravel { dataset, .. } => dataset,
}
}
}