Compare commits

...

3 Commits

Author SHA1 Message Date
Lance Release
4605359d3b Bump version: 0.4.10 → 0.4.11 2024-02-23 03:57:28 +00:00
Weston Pace
f1596122e6 refactor: rename the rust crate from vectordb to lancedb (#1012)
This also renames the new experimental node package to lancedb. The
classic node package remains named vectordb.

The goal here is to avoid introducing piecemeal breaking changes to the
vectordb crate. Instead, once the new API is stabilized, we will
officially release the lancedb crate and deprecate the vectordb crate.
The same pattern will eventually happen with the npm package vectordb.
2024-02-22 19:56:39 -08:00
Will Jones
3aa0c40168 feat(node): add read_consistency_interval to Node and Rust (#1002)
This PR adds the same consistency semantics as was added in #828. It
*does not* add the same lazy-loading of tables, since that breaks some
existing tests.

This closes #998.

---------

Co-authored-by: Weston Pace <weston.pace@gmail.com>
2024-02-22 15:04:30 -08:00
64 changed files with 880 additions and 1269 deletions

View File

@@ -1,5 +1,5 @@
[bumpversion] [bumpversion]
current_version = 0.4.10 current_version = 0.4.11
commit = True commit = True
message = Bump version: {current_version} → {new_version} message = Bump version: {current_version} → {new_version}
tag = True tag = True
@@ -9,4 +9,4 @@ tag_name = v{new_version}
[bumpversion:file:rust/ffi/node/Cargo.toml] [bumpversion:file:rust/ffi/node/Cargo.toml]
[bumpversion:file:rust/vectordb/Cargo.toml] [bumpversion:file:rust/lancedb/Cargo.toml]

View File

@@ -26,4 +26,4 @@ jobs:
sudo apt install -y protobuf-compiler libssl-dev sudo apt install -y protobuf-compiler libssl-dev
- name: Publish the package - name: Publish the package
run: | run: |
cargo publish -p vectordb --all-features --token ${{ secrets.CARGO_REGISTRY_TOKEN }} cargo publish -p lancedb --all-features --token ${{ secrets.CARGO_REGISTRY_TOKEN }}

View File

@@ -1,5 +1,5 @@
[workspace] [workspace]
members = ["rust/ffi/node", "rust/vectordb", "nodejs"] members = ["rust/ffi/node", "rust/lancedb", "nodejs"]
# Python package needs to be built by maturin. # Python package needs to be built by maturin.
exclude = ["python"] exclude = ["python"]
resolver = "2" resolver = "2"

View File

@@ -636,6 +636,70 @@ The `values` parameter is used to provide the new values for the columns as lite
When rows are updated, they are moved out of the index. The row will still show up in ANN queries, but the query will not be as fast as it would be if the row was in the index. If you update a large proportion of rows, consider rebuilding the index afterwards. When rows are updated, they are moved out of the index. The row will still show up in ANN queries, but the query will not be as fast as it would be if the row was in the index. If you update a large proportion of rows, consider rebuilding the index afterwards.
## Consistency
In LanceDB OSS, users can set the `read_consistency_interval` parameter on connections to achieve different levels of read consistency. This parameter determines how frequently the database synchronizes with the underlying storage system to check for updates made by other processes. If another process updates a table, the database will not see the changes until the next synchronization.
There are three possible settings for `read_consistency_interval`:
1. **Unset (default)**: The database does not check for updates to tables made by other processes. This provides the best query performance, but means that clients may not see the most up-to-date data. This setting is suitable for applications where the data does not change during the lifetime of the table reference.
2. **Zero seconds (Strong consistency)**: The database checks for updates on every read. This provides the strongest consistency guarantees, ensuring that all clients see the latest committed data. However, it has the most overhead. This setting is suitable when consistency matters more than having high QPS.
3. **Custom interval (Eventual consistency)**: The database checks for updates at a custom interval, such as every 5 seconds. This provides eventual consistency, allowing for some lag between write and read operations. Performance wise, this is a middle ground between strong consistency and no consistency check. This setting is suitable for applications where immediate consistency is not critical, but clients should see updated data eventually.
!!! tip "Consistency in LanceDB Cloud"
This is only tune-able in LanceDB OSS. In LanceDB Cloud, readers are always eventually consistent.
=== "Python"
To set strong consistency, use `timedelta(0)`:
```python
from datetime import timedelta
db = lancedb.connect("./.lancedb",. read_consistency_interval=timedelta(0))
table = db.open_table("my_table")
```
For eventual consistency, use a custom `timedelta`:
```python
from datetime import timedelta
db = lancedb.connect("./.lancedb", read_consistency_interval=timedelta(seconds=5))
table = db.open_table("my_table")
```
By default, a `Table` will never check for updates from other writers. To manually check for updates you can use `checkout_latest`:
```python
db = lancedb.connect("./.lancedb")
table = db.open_table("my_table")
# (Other writes happen to my_table from another process)
# Check for updates
table.checkout_latest()
```
=== "JavaScript/Typescript"
To set strong consistency, use `0`:
```javascript
const db = await lancedb.connect({ uri: "./.lancedb", readConsistencyInterval: 0 });
const table = await db.openTable("my_table");
```
For eventual consistency, specify the update interval as seconds:
```javascript
const db = await lancedb.connect({ uri: "./.lancedb", readConsistencyInterval: 5 });
const table = await db.openTable("my_table");
```
<!-- Node doesn't yet support the version time travel: https://github.com/lancedb/lancedb/issues/1007
Once it does, we can show manual consistency check for Node as well.
-->
## What's next? ## What's next?
Learn the best practices on creating an ANN index and getting the most out of it. Learn the best practices on creating an ANN index and getting the most out of it.

View File

@@ -1,12 +1,12 @@
{ {
"name": "vectordb", "name": "vectordb",
"version": "0.4.10", "version": "0.4.11",
"description": " Serverless, low-latency vector database for AI applications", "description": " Serverless, low-latency vector database for AI applications",
"main": "dist/index.js", "main": "dist/index.js",
"types": "dist/index.d.ts", "types": "dist/index.d.ts",
"scripts": { "scripts": {
"tsc": "tsc -b", "tsc": "tsc -b",
"build": "npm run tsc && cargo-cp-artifact --artifact cdylib vectordb-node index.node -- cargo build --message-format=json", "build": "npm run tsc && cargo-cp-artifact --artifact cdylib lancedb-node index.node -- cargo build --message-format=json",
"build-release": "npm run build -- --release", "build-release": "npm run build -- --release",
"test": "npm run tsc && mocha -recursive dist/test", "test": "npm run tsc && mocha -recursive dist/test",
"integration-test": "npm run tsc && mocha -recursive dist/integration_test", "integration-test": "npm run tsc && mocha -recursive dist/integration_test",
@@ -85,10 +85,10 @@
} }
}, },
"optionalDependencies": { "optionalDependencies": {
"@lancedb/vectordb-darwin-arm64": "0.4.10", "@lancedb/vectordb-darwin-arm64": "0.4.11",
"@lancedb/vectordb-darwin-x64": "0.4.10", "@lancedb/vectordb-darwin-x64": "0.4.11",
"@lancedb/vectordb-linux-arm64-gnu": "0.4.10", "@lancedb/vectordb-linux-arm64-gnu": "0.4.11",
"@lancedb/vectordb-linux-x64-gnu": "0.4.10", "@lancedb/vectordb-linux-x64-gnu": "0.4.11",
"@lancedb/vectordb-win32-x64-msvc": "0.4.10" "@lancedb/vectordb-win32-x64-msvc": "0.4.11"
} }
} }

View File

@@ -96,6 +96,19 @@ export interface ConnectionOptions {
* This is useful for local testing. * This is useful for local testing.
*/ */
hostOverride?: string hostOverride?: string
/**
* (For LanceDB OSS only): The interval, in seconds, at which to check for
* updates to the table from other processes. If None, then consistency is not
* checked. For performance reasons, this is the default. For strong
* consistency, set this to zero seconds. Then every read will check for
* updates from other processes. As a compromise, you can set this to a
* non-zero value for eventual consistency. If more than that interval
* has passed since the last check, then the table will be checked for updates.
* Note: this consistency only applies to read operations. Write operations are
* always consistent.
*/
readConsistencyInterval?: number
} }
function getAwsArgs (opts: ConnectionOptions): any[] { function getAwsArgs (opts: ConnectionOptions): any[] {
@@ -181,7 +194,8 @@ export async function connect (
opts.awsCredentials?.accessKeyId, opts.awsCredentials?.accessKeyId,
opts.awsCredentials?.secretKey, opts.awsCredentials?.secretKey,
opts.awsCredentials?.sessionToken, opts.awsCredentials?.sessionToken,
opts.awsRegion opts.awsRegion,
opts.readConsistencyInterval
) )
return new LocalConnection(db, opts) return new LocalConnection(db, opts)
} }

View File

@@ -18,5 +18,5 @@ module.exports = {
"@typescript-eslint/method-signature-style": "off", "@typescript-eslint/method-signature-style": "off",
"@typescript-eslint/no-explicit-any": "off", "@typescript-eslint/no-explicit-any": "off",
}, },
ignorePatterns: ["node_modules/", "dist/", "build/", "vectordb/native.*"], ignorePatterns: ["node_modules/", "dist/", "build/", "lancedb/native.*"],
}; };

View File

@@ -1,5 +1,5 @@
[package] [package]
name = "vectordb-nodejs" name = "lancedb-nodejs"
edition.workspace = true edition.workspace = true
version = "0.0.0" version = "0.0.0"
license.workspace = true license.workspace = true
@@ -16,7 +16,7 @@ arrow-ipc.workspace = true
futures.workspace = true futures.workspace = true
lance-linalg.workspace = true lance-linalg.workspace = true
lance.workspace = true lance.workspace = true
vectordb = { path = "../rust/vectordb" } lancedb = { path = "../rust/lancedb" }
napi = { version = "2.15", default-features = false, features = [ napi = { version = "2.15", default-features = false, features = [
"napi7", "napi7",
"async" "async"

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
import { makeArrowTable, toBuffer } from "../vectordb/arrow"; import { makeArrowTable, toBuffer } from "../lancedb/arrow";
import { import {
Int64, Int64,
Field, Field,

View File

@@ -29,6 +29,6 @@ test("open database", async () => {
const tbl = await db.createTable("test", [{ id: 1 }, { id: 2 }]); const tbl = await db.createTable("test", [{ id: 1 }, { id: 2 }]);
expect(await db.tableNames()).toStrictEqual(["test"]); expect(await db.tableNames()).toStrictEqual(["test"]);
const schema = tbl.schema; const schema = await tbl.schema();
expect(schema).toEqual(new Schema([new Field("id", new Float64(), true)])); expect(schema).toEqual(new Schema([new Field("id", new Float64(), true)]));
}); });

View File

@@ -181,3 +181,37 @@ describe("Test creating index", () => {
// TODO: check index type. // TODO: check index type.
}); });
}); });
describe("Read consistency interval", () => {
let tmpDir: string;
beforeEach(() => {
tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "read-consistency-"));
});
// const intervals = [undefined, 0, 0.1];
const intervals = [0];
test.each(intervals)("read consistency interval %p", async (interval) => {
const db = await connect({ uri: tmpDir });
const table = await db.createTable("my_table", [{ id: 1 }]);
const db2 = await connect({ uri: tmpDir, readConsistencyInterval: interval });
const table2 = await db2.openTable("my_table");
expect(await table2.countRows()).toEqual(await table.countRows());
await table.add([{ id: 2 }]);
if (interval === undefined) {
expect(await table2.countRows()).toEqual(1n);
// TODO: once we implement time travel we can uncomment this part of the test.
// await table2.checkout_latest();
// expect(await table2.countRows()).toEqual(2);
} else if (interval === 0) {
expect(await table2.countRows()).toEqual(2n);
} else {
// interval == 0.1
expect(await table2.countRows()).toEqual(1n);
await new Promise(r => setTimeout(r, 100));
expect(await table2.countRows()).toEqual(2n);
}
});
});

View File

@@ -53,12 +53,12 @@ export async function connect(
opts = Object.assign( opts = Object.assign(
{ {
uri: "", uri: "",
apiKey: "", apiKey: undefined,
hostOverride: "", hostOverride: undefined,
}, },
args args
); );
} }
const nativeConn = await NativeConnection.new(opts.uri); const nativeConn = await NativeConnection.new(opts);
return new Connection(nativeConn); return new Connection(nativeConn);
} }

View File

@@ -16,6 +16,18 @@ export interface ConnectionOptions {
uri: string uri: string
apiKey?: string apiKey?: string
hostOverride?: string hostOverride?: string
/**
* (For LanceDB OSS only): The interval, in seconds, at which to check for
* updates to the table from other processes. If None, then consistency is not
* checked. For performance reasons, this is the default. For strong
* consistency, set this to zero seconds. Then every read will check for
* updates from other processes. As a compromise, you can set this to a
* non-zero value for eventual consistency. If more than that interval
* has passed since the last check, then the table will be checked for updates.
* Note: this consistency only applies to read operations. Write operations are
* always consistent.
*/
readConsistencyInterval?: number
} }
/** Write mode for writing a table. */ /** Write mode for writing a table. */
export const enum WriteMode { export const enum WriteMode {
@@ -30,7 +42,7 @@ export interface WriteOptions {
export function connect(options: ConnectionOptions): Promise<Connection> export function connect(options: ConnectionOptions): Promise<Connection>
export class Connection { export class Connection {
/** Create a new Connection instance from the given URI. */ /** Create a new Connection instance from the given URI. */
static new(uri: string): Promise<Connection> static new(options: ConnectionOptions): Promise<Connection>
/** List all tables in the dataset. */ /** List all tables in the dataset. */
tableNames(): Promise<Array<string>> tableNames(): Promise<Array<string>>
/** /**
@@ -71,7 +83,7 @@ export class Query {
} }
export class Table { export class Table {
/** Return Schema as empty Arrow IPC file. */ /** Return Schema as empty Arrow IPC file. */
schema(): Buffer schema(): Promise<Buffer>
add(buf: Buffer): Promise<void> add(buf: Buffer): Promise<void>
countRows(filter?: string | undefined | null): Promise<bigint> countRows(filter?: string | undefined | null): Promise<bigint>
delete(predicate: string): Promise<void> delete(predicate: string): Promise<void>

View File

@@ -32,24 +32,24 @@ switch (platform) {
case 'android': case 'android':
switch (arch) { switch (arch) {
case 'arm64': case 'arm64':
localFileExisted = existsSync(join(__dirname, 'vectordb-nodejs.android-arm64.node')) localFileExisted = existsSync(join(__dirname, 'lancedb-nodejs.android-arm64.node'))
try { try {
if (localFileExisted) { if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.android-arm64.node') nativeBinding = require('./lancedb-nodejs.android-arm64.node')
} else { } else {
nativeBinding = require('vectordb-android-arm64') nativeBinding = require('lancedb-android-arm64')
} }
} catch (e) { } catch (e) {
loadError = e loadError = e
} }
break break
case 'arm': case 'arm':
localFileExisted = existsSync(join(__dirname, 'vectordb-nodejs.android-arm-eabi.node')) localFileExisted = existsSync(join(__dirname, 'lancedb-nodejs.android-arm-eabi.node'))
try { try {
if (localFileExisted) { if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.android-arm-eabi.node') nativeBinding = require('./lancedb-nodejs.android-arm-eabi.node')
} else { } else {
nativeBinding = require('vectordb-android-arm-eabi') nativeBinding = require('lancedb-android-arm-eabi')
} }
} catch (e) { } catch (e) {
loadError = e loadError = e
@@ -63,13 +63,13 @@ switch (platform) {
switch (arch) { switch (arch) {
case 'x64': case 'x64':
localFileExisted = existsSync( localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.win32-x64-msvc.node') join(__dirname, 'lancedb-nodejs.win32-x64-msvc.node')
) )
try { try {
if (localFileExisted) { if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.win32-x64-msvc.node') nativeBinding = require('./lancedb-nodejs.win32-x64-msvc.node')
} else { } else {
nativeBinding = require('vectordb-win32-x64-msvc') nativeBinding = require('lancedb-win32-x64-msvc')
} }
} catch (e) { } catch (e) {
loadError = e loadError = e
@@ -77,13 +77,13 @@ switch (platform) {
break break
case 'ia32': case 'ia32':
localFileExisted = existsSync( localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.win32-ia32-msvc.node') join(__dirname, 'lancedb-nodejs.win32-ia32-msvc.node')
) )
try { try {
if (localFileExisted) { if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.win32-ia32-msvc.node') nativeBinding = require('./lancedb-nodejs.win32-ia32-msvc.node')
} else { } else {
nativeBinding = require('vectordb-win32-ia32-msvc') nativeBinding = require('lancedb-win32-ia32-msvc')
} }
} catch (e) { } catch (e) {
loadError = e loadError = e
@@ -91,13 +91,13 @@ switch (platform) {
break break
case 'arm64': case 'arm64':
localFileExisted = existsSync( localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.win32-arm64-msvc.node') join(__dirname, 'lancedb-nodejs.win32-arm64-msvc.node')
) )
try { try {
if (localFileExisted) { if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.win32-arm64-msvc.node') nativeBinding = require('./lancedb-nodejs.win32-arm64-msvc.node')
} else { } else {
nativeBinding = require('vectordb-win32-arm64-msvc') nativeBinding = require('lancedb-win32-arm64-msvc')
} }
} catch (e) { } catch (e) {
loadError = e loadError = e
@@ -108,23 +108,23 @@ switch (platform) {
} }
break break
case 'darwin': case 'darwin':
localFileExisted = existsSync(join(__dirname, 'vectordb-nodejs.darwin-universal.node')) localFileExisted = existsSync(join(__dirname, 'lancedb-nodejs.darwin-universal.node'))
try { try {
if (localFileExisted) { if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.darwin-universal.node') nativeBinding = require('./lancedb-nodejs.darwin-universal.node')
} else { } else {
nativeBinding = require('vectordb-darwin-universal') nativeBinding = require('lancedb-darwin-universal')
} }
break break
} catch {} } catch {}
switch (arch) { switch (arch) {
case 'x64': case 'x64':
localFileExisted = existsSync(join(__dirname, 'vectordb-nodejs.darwin-x64.node')) localFileExisted = existsSync(join(__dirname, 'lancedb-nodejs.darwin-x64.node'))
try { try {
if (localFileExisted) { if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.darwin-x64.node') nativeBinding = require('./lancedb-nodejs.darwin-x64.node')
} else { } else {
nativeBinding = require('vectordb-darwin-x64') nativeBinding = require('lancedb-darwin-x64')
} }
} catch (e) { } catch (e) {
loadError = e loadError = e
@@ -132,13 +132,13 @@ switch (platform) {
break break
case 'arm64': case 'arm64':
localFileExisted = existsSync( localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.darwin-arm64.node') join(__dirname, 'lancedb-nodejs.darwin-arm64.node')
) )
try { try {
if (localFileExisted) { if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.darwin-arm64.node') nativeBinding = require('./lancedb-nodejs.darwin-arm64.node')
} else { } else {
nativeBinding = require('vectordb-darwin-arm64') nativeBinding = require('lancedb-darwin-arm64')
} }
} catch (e) { } catch (e) {
loadError = e loadError = e
@@ -152,12 +152,12 @@ switch (platform) {
if (arch !== 'x64') { if (arch !== 'x64') {
throw new Error(`Unsupported architecture on FreeBSD: ${arch}`) throw new Error(`Unsupported architecture on FreeBSD: ${arch}`)
} }
localFileExisted = existsSync(join(__dirname, 'vectordb-nodejs.freebsd-x64.node')) localFileExisted = existsSync(join(__dirname, 'lancedb-nodejs.freebsd-x64.node'))
try { try {
if (localFileExisted) { if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.freebsd-x64.node') nativeBinding = require('./lancedb-nodejs.freebsd-x64.node')
} else { } else {
nativeBinding = require('vectordb-freebsd-x64') nativeBinding = require('lancedb-freebsd-x64')
} }
} catch (e) { } catch (e) {
loadError = e loadError = e
@@ -168,26 +168,26 @@ switch (platform) {
case 'x64': case 'x64':
if (isMusl()) { if (isMusl()) {
localFileExisted = existsSync( localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-x64-musl.node') join(__dirname, 'lancedb-nodejs.linux-x64-musl.node')
) )
try { try {
if (localFileExisted) { if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-x64-musl.node') nativeBinding = require('./lancedb-nodejs.linux-x64-musl.node')
} else { } else {
nativeBinding = require('vectordb-linux-x64-musl') nativeBinding = require('lancedb-linux-x64-musl')
} }
} catch (e) { } catch (e) {
loadError = e loadError = e
} }
} else { } else {
localFileExisted = existsSync( localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-x64-gnu.node') join(__dirname, 'lancedb-nodejs.linux-x64-gnu.node')
) )
try { try {
if (localFileExisted) { if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-x64-gnu.node') nativeBinding = require('./lancedb-nodejs.linux-x64-gnu.node')
} else { } else {
nativeBinding = require('vectordb-linux-x64-gnu') nativeBinding = require('lancedb-linux-x64-gnu')
} }
} catch (e) { } catch (e) {
loadError = e loadError = e
@@ -197,26 +197,26 @@ switch (platform) {
case 'arm64': case 'arm64':
if (isMusl()) { if (isMusl()) {
localFileExisted = existsSync( localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-arm64-musl.node') join(__dirname, 'lancedb-nodejs.linux-arm64-musl.node')
) )
try { try {
if (localFileExisted) { if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-arm64-musl.node') nativeBinding = require('./lancedb-nodejs.linux-arm64-musl.node')
} else { } else {
nativeBinding = require('vectordb-linux-arm64-musl') nativeBinding = require('lancedb-linux-arm64-musl')
} }
} catch (e) { } catch (e) {
loadError = e loadError = e
} }
} else { } else {
localFileExisted = existsSync( localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-arm64-gnu.node') join(__dirname, 'lancedb-nodejs.linux-arm64-gnu.node')
) )
try { try {
if (localFileExisted) { if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-arm64-gnu.node') nativeBinding = require('./lancedb-nodejs.linux-arm64-gnu.node')
} else { } else {
nativeBinding = require('vectordb-linux-arm64-gnu') nativeBinding = require('lancedb-linux-arm64-gnu')
} }
} catch (e) { } catch (e) {
loadError = e loadError = e
@@ -225,13 +225,13 @@ switch (platform) {
break break
case 'arm': case 'arm':
localFileExisted = existsSync( localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-arm-gnueabihf.node') join(__dirname, 'lancedb-nodejs.linux-arm-gnueabihf.node')
) )
try { try {
if (localFileExisted) { if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-arm-gnueabihf.node') nativeBinding = require('./lancedb-nodejs.linux-arm-gnueabihf.node')
} else { } else {
nativeBinding = require('vectordb-linux-arm-gnueabihf') nativeBinding = require('lancedb-linux-arm-gnueabihf')
} }
} catch (e) { } catch (e) {
loadError = e loadError = e
@@ -240,26 +240,26 @@ switch (platform) {
case 'riscv64': case 'riscv64':
if (isMusl()) { if (isMusl()) {
localFileExisted = existsSync( localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-riscv64-musl.node') join(__dirname, 'lancedb-nodejs.linux-riscv64-musl.node')
) )
try { try {
if (localFileExisted) { if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-riscv64-musl.node') nativeBinding = require('./lancedb-nodejs.linux-riscv64-musl.node')
} else { } else {
nativeBinding = require('vectordb-linux-riscv64-musl') nativeBinding = require('lancedb-linux-riscv64-musl')
} }
} catch (e) { } catch (e) {
loadError = e loadError = e
} }
} else { } else {
localFileExisted = existsSync( localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-riscv64-gnu.node') join(__dirname, 'lancedb-nodejs.linux-riscv64-gnu.node')
) )
try { try {
if (localFileExisted) { if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-riscv64-gnu.node') nativeBinding = require('./lancedb-nodejs.linux-riscv64-gnu.node')
} else { } else {
nativeBinding = require('vectordb-linux-riscv64-gnu') nativeBinding = require('lancedb-linux-riscv64-gnu')
} }
} catch (e) { } catch (e) {
loadError = e loadError = e
@@ -268,13 +268,13 @@ switch (platform) {
break break
case 's390x': case 's390x':
localFileExisted = existsSync( localFileExisted = existsSync(
join(__dirname, 'vectordb-nodejs.linux-s390x-gnu.node') join(__dirname, 'lancedb-nodejs.linux-s390x-gnu.node')
) )
try { try {
if (localFileExisted) { if (localFileExisted) {
nativeBinding = require('./vectordb-nodejs.linux-s390x-gnu.node') nativeBinding = require('./lancedb-nodejs.linux-s390x-gnu.node')
} else { } else {
nativeBinding = require('vectordb-linux-s390x-gnu') nativeBinding = require('lancedb-linux-s390x-gnu')
} }
} catch (e) { } catch (e) {
loadError = e loadError = e

View File

@@ -32,8 +32,8 @@ export class Table {
} }
/** Get the schema of the table. */ /** Get the schema of the table. */
get schema(): Schema { async schema(): Promise<Schema> {
const schemaBuf = this.inner.schema(); const schemaBuf = await this.inner.schema();
const tbl = tableFromIPC(schemaBuf); const tbl = tableFromIPC(schemaBuf);
return tbl.schema; return tbl.schema;
} }

View File

@@ -1,3 +1,3 @@
# `vectordb-darwin-arm64` # `lancedb-darwin-arm64`
This is the **aarch64-apple-darwin** binary for `vectordb` This is the **aarch64-apple-darwin** binary for `lancedb`

View File

@@ -1,5 +1,5 @@
{ {
"name": "vectordb-darwin-arm64", "name": "lancedb-darwin-arm64",
"version": "0.4.3", "version": "0.4.3",
"os": [ "os": [
"darwin" "darwin"
@@ -7,9 +7,9 @@
"cpu": [ "cpu": [
"arm64" "arm64"
], ],
"main": "vectordb.darwin-arm64.node", "main": "lancedb.darwin-arm64.node",
"files": [ "files": [
"vectordb.darwin-arm64.node" "lancedb.darwin-arm64.node"
], ],
"license": "MIT", "license": "MIT",
"engines": { "engines": {

View File

@@ -1,3 +1,3 @@
# `vectordb-darwin-x64` # `lancedb-darwin-x64`
This is the **x86_64-apple-darwin** binary for `vectordb` This is the **x86_64-apple-darwin** binary for `lancedb`

View File

@@ -1,5 +1,5 @@
{ {
"name": "vectordb-darwin-x64", "name": "lancedb-darwin-x64",
"version": "0.4.3", "version": "0.4.3",
"os": [ "os": [
"darwin" "darwin"
@@ -7,9 +7,9 @@
"cpu": [ "cpu": [
"x64" "x64"
], ],
"main": "vectordb.darwin-x64.node", "main": "lancedb.darwin-x64.node",
"files": [ "files": [
"vectordb.darwin-x64.node" "lancedb.darwin-x64.node"
], ],
"license": "MIT", "license": "MIT",
"engines": { "engines": {

View File

@@ -1,3 +1,3 @@
# `vectordb-linux-arm64-gnu` # `lancedb-linux-arm64-gnu`
This is the **aarch64-unknown-linux-gnu** binary for `vectordb` This is the **aarch64-unknown-linux-gnu** binary for `lancedb`

View File

@@ -1,5 +1,5 @@
{ {
"name": "vectordb-linux-arm64-gnu", "name": "lancedb-linux-arm64-gnu",
"version": "0.4.3", "version": "0.4.3",
"os": [ "os": [
"linux" "linux"
@@ -7,9 +7,9 @@
"cpu": [ "cpu": [
"arm64" "arm64"
], ],
"main": "vectordb.linux-arm64-gnu.node", "main": "lancedb.linux-arm64-gnu.node",
"files": [ "files": [
"vectordb.linux-arm64-gnu.node" "lancedb.linux-arm64-gnu.node"
], ],
"license": "MIT", "license": "MIT",
"engines": { "engines": {

View File

@@ -1,3 +1,3 @@
# `vectordb-linux-x64-gnu` # `lancedb-linux-x64-gnu`
This is the **x86_64-unknown-linux-gnu** binary for `vectordb` This is the **x86_64-unknown-linux-gnu** binary for `lancedb`

View File

@@ -1,5 +1,5 @@
{ {
"name": "vectordb-linux-x64-gnu", "name": "lancedb-linux-x64-gnu",
"version": "0.4.3", "version": "0.4.3",
"os": [ "os": [
"linux" "linux"
@@ -7,9 +7,9 @@
"cpu": [ "cpu": [
"x64" "x64"
], ],
"main": "vectordb.linux-x64-gnu.node", "main": "lancedb.linux-x64-gnu.node",
"files": [ "files": [
"vectordb.linux-x64-gnu.node" "lancedb.linux-x64-gnu.node"
], ],
"license": "MIT", "license": "MIT",
"engines": { "engines": {

1087
nodejs/package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,10 +1,10 @@
{ {
"name": "vectordb", "name": "lancedb",
"version": "0.4.3", "version": "0.4.3",
"main": "./dist/index.js", "main": "./dist/index.js",
"types": "./dist/index.d.ts", "types": "./dist/index.d.ts",
"napi": { "napi": {
"name": "vectordb-nodejs", "name": "lancedb-nodejs",
"triples": { "triples": {
"defaults": false, "defaults": false,
"additional": [ "additional": [
@@ -18,7 +18,7 @@
"license": "Apache 2.0", "license": "Apache 2.0",
"devDependencies": { "devDependencies": {
"@napi-rs/cli": "^2.18.0", "@napi-rs/cli": "^2.18.0",
"@types/jest": "^29.5.11", "@types/jest": "^29.1.2",
"@typescript-eslint/eslint-plugin": "^6.19.0", "@typescript-eslint/eslint-plugin": "^6.19.0",
"@typescript-eslint/parser": "^6.19.0", "@typescript-eslint/parser": "^6.19.0",
"eslint": "^8.56.0", "eslint": "^8.56.0",
@@ -45,21 +45,22 @@
], ],
"scripts": { "scripts": {
"artifacts": "napi artifacts", "artifacts": "napi artifacts",
"build:native": "napi build --platform --release --js vectordb/native.js --dts vectordb/native.d.ts dist/", "build:native": "napi build --platform --release --js lancedb/native.js --dts lancedb/native.d.ts dist/",
"build:debug": "napi build --platform --dts ../vectordb/native.d.ts --js ../vectordb/native.js dist/", "build:debug": "napi build --platform --dts ../lancedb/native.d.ts --js ../lancedb/native.js dist/",
"build": "npm run build:debug && tsc -b", "build": "npm run build:debug && tsc -b",
"docs": "typedoc --plugin typedoc-plugin-markdown vectordb/index.ts", "docs": "typedoc --plugin typedoc-plugin-markdown lancedb/index.ts",
"lint": "eslint vectordb --ext .js,.ts", "lint": "eslint lancedb --ext .js,.ts",
"prepublishOnly": "napi prepublish -t npm", "prepublishOnly": "napi prepublish -t npm",
"test": "npm run build && jest", "//": "maxWorkers=1 is workaround for bigint issue in jest: https://github.com/jestjs/jest/issues/11617#issuecomment-1068732414",
"test": "npm run build && jest --maxWorkers=1",
"universal": "napi universal", "universal": "napi universal",
"version": "napi version" "version": "napi version"
}, },
"optionalDependencies": { "optionalDependencies": {
"vectordb-darwin-arm64": "0.4.3", "lancedb-darwin-arm64": "0.4.3",
"vectordb-darwin-x64": "0.4.3", "lancedb-darwin-x64": "0.4.3",
"vectordb-linux-arm64-gnu": "0.4.3", "lancedb-linux-arm64-gnu": "0.4.3",
"vectordb-linux-x64-gnu": "0.4.3" "lancedb-linux-x64-gnu": "0.4.3"
}, },
"dependencies": { "dependencies": {
"apache-arrow": "^15.0.0" "apache-arrow": "^15.0.0"

View File

@@ -16,8 +16,9 @@ use napi::bindgen_prelude::*;
use napi_derive::*; use napi_derive::*;
use crate::table::Table; use crate::table::Table;
use vectordb::connection::Connection as LanceDBConnection; use crate::ConnectionOptions;
use vectordb::ipc::ipc_file_to_batches; use lancedb::connection::{ConnectBuilder, Connection as LanceDBConnection};
use lancedb::ipc::ipc_file_to_batches;
#[napi] #[napi]
pub struct Connection { pub struct Connection {
@@ -28,11 +29,23 @@ pub struct Connection {
impl Connection { impl Connection {
/// Create a new Connection instance from the given URI. /// Create a new Connection instance from the given URI.
#[napi(factory)] #[napi(factory)]
pub async fn new(uri: String) -> napi::Result<Self> { pub async fn new(options: ConnectionOptions) -> napi::Result<Self> {
let mut builder = ConnectBuilder::new(&options.uri);
if let Some(api_key) = options.api_key {
builder = builder.api_key(&api_key);
}
if let Some(host_override) = options.host_override {
builder = builder.host_override(&host_override);
}
if let Some(interval) = options.read_consistency_interval {
builder =
builder.read_consistency_interval(std::time::Duration::from_secs_f64(interval));
}
Ok(Self { Ok(Self {
conn: vectordb::connect(&uri).execute().await.map_err(|e| { conn: builder
napi::Error::from_reason(format!("Failed to connect to database: {}", e)) .execute()
})?, .await
.map_err(|e| napi::Error::from_reason(format!("{}", e)))?,
}) })
} }

View File

@@ -40,12 +40,12 @@ impl From<MetricType> for LanceMetricType {
#[napi] #[napi]
pub struct IndexBuilder { pub struct IndexBuilder {
inner: vectordb::index::IndexBuilder, inner: lancedb::index::IndexBuilder,
} }
#[napi] #[napi]
impl IndexBuilder { impl IndexBuilder {
pub fn new(tbl: &dyn vectordb::Table) -> Self { pub fn new(tbl: &dyn lancedb::Table) -> Self {
let inner = tbl.create_index(&[]); let inner = tbl.create_index(&[]);
Self { inner } Self { inner }
} }

View File

@@ -14,9 +14,9 @@
use futures::StreamExt; use futures::StreamExt;
use lance::io::RecordBatchStream; use lance::io::RecordBatchStream;
use lancedb::ipc::batches_to_ipc_file;
use napi::bindgen_prelude::*; use napi::bindgen_prelude::*;
use napi_derive::napi; use napi_derive::napi;
use vectordb::ipc::batches_to_ipc_file;
/** Typescript-style Async Iterator over RecordBatches */ /** Typescript-style Async Iterator over RecordBatches */
#[napi] #[napi]

View File

@@ -22,10 +22,21 @@ mod query;
mod table; mod table;
#[napi(object)] #[napi(object)]
#[derive(Debug)]
pub struct ConnectionOptions { pub struct ConnectionOptions {
pub uri: String, pub uri: String,
pub api_key: Option<String>, pub api_key: Option<String>,
pub host_override: Option<String>, pub host_override: Option<String>,
/// (For LanceDB OSS only): The interval, in seconds, at which to check for
/// updates to the table from other processes. If None, then consistency is not
/// checked. For performance reasons, this is the default. For strong
/// consistency, set this to zero seconds. Then every read will check for
/// updates from other processes. As a compromise, you can set this to a
/// non-zero value for eventual consistency. If more than that interval
/// has passed since the last check, then the table will be checked for updates.
/// Note: this consistency only applies to read operations. Write operations are
/// always consistent.
pub read_consistency_interval: Option<f64>,
} }
/// Write mode for writing a table. /// Write mode for writing a table.
@@ -44,5 +55,5 @@ pub struct WriteOptions {
#[napi] #[napi]
pub async fn connect(options: ConnectionOptions) -> napi::Result<Connection> { pub async fn connect(options: ConnectionOptions) -> napi::Result<Connection> {
Connection::new(options.uri.clone()).await Connection::new(options).await
} }

View File

@@ -12,9 +12,9 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use lancedb::query::Query as LanceDBQuery;
use napi::bindgen_prelude::*; use napi::bindgen_prelude::*;
use napi_derive::napi; use napi_derive::napi;
use vectordb::query::Query as LanceDBQuery;
use crate::{iterator::RecordBatchIterator, table::Table}; use crate::{iterator::RecordBatchIterator, table::Table};

View File

@@ -13,10 +13,10 @@
// limitations under the License. // limitations under the License.
use arrow_ipc::writer::FileWriter; use arrow_ipc::writer::FileWriter;
use lancedb::table::AddDataOptions;
use lancedb::{ipc::ipc_file_to_batches, table::TableRef};
use napi::bindgen_prelude::*; use napi::bindgen_prelude::*;
use napi_derive::napi; use napi_derive::napi;
use vectordb::table::AddDataOptions;
use vectordb::{ipc::ipc_file_to_batches, table::TableRef};
use crate::index::IndexBuilder; use crate::index::IndexBuilder;
use crate::query::Query; use crate::query::Query;
@@ -34,8 +34,12 @@ impl Table {
/// Return Schema as empty Arrow IPC file. /// Return Schema as empty Arrow IPC file.
#[napi] #[napi]
pub fn schema(&self) -> napi::Result<Buffer> { pub async fn schema(&self) -> napi::Result<Buffer> {
let mut writer = FileWriter::try_new(vec![], &self.table.schema()) let schema =
self.table.schema().await.map_err(|e| {
napi::Error::from_reason(format!("Failed to create IPC file: {}", e))
})?;
let mut writer = FileWriter::try_new(vec![], &schema)
.map_err(|e| napi::Error::from_reason(format!("Failed to create IPC file: {}", e)))?; .map_err(|e| napi::Error::from_reason(format!("Failed to create IPC file: {}", e)))?;
writer writer
.finish() .finish()

View File

@@ -1,8 +1,8 @@
{ {
"include": [ "include": [
"vectordb/*.ts", "lancedb/*.ts",
"vectordb/**/*.ts", "lancedb/**/*.ts",
"vectordb/*.js", "lancedb/*.js",
], ],
"compilerOptions": { "compilerOptions": {
"target": "es2022", "target": "es2022",
@@ -18,7 +18,7 @@
], ],
"typedocOptions": { "typedocOptions": {
"entryPoints": [ "entryPoints": [
"vectordb/index.ts" "lancedb/index.ts"
], ],
"out": "../docs/src/javascript/", "out": "../docs/src/javascript/",
"visibilityFilters": { "visibilityFilters": {

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "vectordb-node" name = "lancedb-node"
version = "0.4.10" version = "0.4.11"
description = "Serverless, low-latency vector database for AI applications" description = "Serverless, low-latency vector database for AI applications"
license.workspace = true license.workspace = true
edition.workspace = true edition.workspace = true
@@ -24,9 +24,14 @@ half = { workspace = true }
lance = { workspace = true } lance = { workspace = true }
lance-index = { workspace = true } lance-index = { workspace = true }
lance-linalg = { workspace = true } lance-linalg = { workspace = true }
vectordb = { path = "../../vectordb" } lancedb = { path = "../../lancedb" }
tokio = { version = "1.23", features = ["rt-multi-thread"] } tokio = { version = "1.23", features = ["rt-multi-thread"] }
neon = {version = "0.10.1", default-features = false, features = ["channel-api", "napi-6", "promise-api", "task-api"] } neon = { version = "0.10.1", default-features = false, features = [
"channel-api",
"napi-6",
"promise-api",
"task-api",
] }
object_store = { workspace = true, features = ["aws"] } object_store = { workspace = true, features = ["aws"] }
snafu = { workspace = true } snafu = { workspace = true }
async-trait = "0" async-trait = "0"

View File

@@ -1,3 +1,3 @@
The LanceDB node bridge (vectordb-node) allows javascript applications to access LanceDB datasets. The LanceDB node bridge (lancedb-node) allows javascript applications to access LanceDB datasets.
It is build using [Neon](https://neon-bindings.com). See the node project for an example of how it is used / tests It is build using [Neon](https://neon-bindings.com). See the node project for an example of how it is used / tests

View File

@@ -34,8 +34,8 @@ pub enum Error {
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
impl From<vectordb::error::Error> for Error { impl From<lancedb::error::Error> for Error {
fn from(e: vectordb::error::Error) -> Self { fn from(e: lancedb::error::Error) -> Self {
Self::LanceDB { Self::LanceDB {
message: e.to_string(), message: e.to_string(),
} }

View File

@@ -19,7 +19,7 @@ use neon::{
}; };
use crate::{error::ResultExt, runtime, table::JsTable}; use crate::{error::ResultExt, runtime, table::JsTable};
use vectordb::Table; use lancedb::Table;
pub fn table_create_scalar_index(mut cx: FunctionContext) -> JsResult<JsPromise> { pub fn table_create_scalar_index(mut cx: FunctionContext) -> JsResult<JsPromise> {
let js_table = cx.this().downcast_or_throw::<JsBox<JsTable>, _>(&mut cx)?; let js_table = cx.this().downcast_or_throw::<JsBox<JsTable>, _>(&mut cx)?;

View File

@@ -13,10 +13,10 @@
// limitations under the License. // limitations under the License.
use lance_linalg::distance::MetricType; use lance_linalg::distance::MetricType;
use lancedb::index::IndexBuilder;
use neon::context::FunctionContext; use neon::context::FunctionContext;
use neon::prelude::*; use neon::prelude::*;
use std::convert::TryFrom; use std::convert::TryFrom;
use vectordb::index::IndexBuilder;
use crate::error::Error::InvalidIndexType; use crate::error::Error::InvalidIndexType;
use crate::error::ResultExt; use crate::error::ResultExt;

View File

@@ -22,9 +22,9 @@ use object_store::CredentialProvider;
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
use vectordb::connect; use lancedb::connect;
use vectordb::connection::Connection; use lancedb::connection::Connection;
use vectordb::table::ReadParams; use lancedb::table::ReadParams;
use crate::error::ResultExt; use crate::error::ResultExt;
use crate::query::JsQuery; use crate::query::JsQuery;
@@ -84,6 +84,11 @@ fn database_new(mut cx: FunctionContext) -> JsResult<JsPromise> {
let path = cx.argument::<JsString>(0)?.value(&mut cx); let path = cx.argument::<JsString>(0)?.value(&mut cx);
let aws_creds = get_aws_creds(&mut cx, 1)?; let aws_creds = get_aws_creds(&mut cx, 1)?;
let region = get_aws_region(&mut cx, 4)?; let region = get_aws_region(&mut cx, 4)?;
let read_consistency_interval = cx
.argument_opt(5)
.and_then(|arg| arg.downcast::<JsNumber, _>(&mut cx).ok())
.map(|v| v.value(&mut cx))
.map(std::time::Duration::from_secs_f64);
let rt = runtime(&mut cx)?; let rt = runtime(&mut cx)?;
let channel = cx.channel(); let channel = cx.channel();
@@ -100,6 +105,9 @@ fn database_new(mut cx: FunctionContext) -> JsResult<JsPromise> {
token: aws_creds.token, token: aws_creds.token,
}); });
} }
if let Some(interval) = read_consistency_interval {
conn_builder = conn_builder.read_consistency_interval(interval);
}
rt.spawn(async move { rt.spawn(async move {
let database = conn_builder.execute().await; let database = conn_builder.execute().await;

View File

@@ -93,7 +93,7 @@ impl JsQuery {
.and_then(|stream| { .and_then(|stream| {
stream stream
.try_collect::<Vec<_>>() .try_collect::<Vec<_>>()
.map_err(vectordb::error::Error::from) .map_err(lancedb::error::Error::from)
}) })
.await; .await;

View File

@@ -18,12 +18,12 @@ use arrow_array::{RecordBatch, RecordBatchIterator};
use lance::dataset::optimize::CompactionOptions; use lance::dataset::optimize::CompactionOptions;
use lance::dataset::{WriteMode, WriteParams}; use lance::dataset::{WriteMode, WriteParams};
use lance::io::ObjectStoreParams; use lance::io::ObjectStoreParams;
use vectordb::table::{AddDataOptions, OptimizeAction, WriteOptions}; use lancedb::table::{AddDataOptions, OptimizeAction, WriteOptions};
use crate::arrow::{arrow_buffer_to_record_batch, record_batch_to_buffer}; use crate::arrow::{arrow_buffer_to_record_batch, record_batch_to_buffer};
use lancedb::TableRef;
use neon::prelude::*; use neon::prelude::*;
use neon::types::buffer::TypedArray; use neon::types::buffer::TypedArray;
use vectordb::TableRef;
use crate::error::ResultExt; use crate::error::ResultExt;
use crate::{convert, get_aws_credential_provider, get_aws_region, runtime, JsDatabase}; use crate::{convert, get_aws_credential_provider, get_aws_region, runtime, JsDatabase};
@@ -534,8 +534,9 @@ impl JsTable {
.value(&mut cx); .value(&mut cx);
rt.spawn(async move { rt.spawn(async move {
let schema = table.schema().await;
deferred.settle_with(&channel, move |mut cx| { deferred.settle_with(&channel, move |mut cx| {
let schema = table.schema(); let schema = schema.or_throw(&mut cx)?;
let batches = vec![RecordBatch::new_empty(schema)]; let batches = vec![RecordBatch::new_empty(schema)];
let buffer = record_batch_to_buffer(batches).or_throw(&mut cx)?; let buffer = record_batch_to_buffer(batches).or_throw(&mut cx)?;
convert::new_js_buffer(buffer, &mut cx, is_electron) convert::new_js_buffer(buffer, &mut cx, is_electron)

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "vectordb" name = "lancedb"
version = "0.4.10" version = "0.4.11"
edition.workspace = true edition.workspace = true
description = "LanceDB: A serverless, low-latency vector database for AI applications" description = "LanceDB: A serverless, low-latency vector database for AI applications"
license.workspace = true license.workspace = true

View File

@@ -19,9 +19,9 @@ use arrow_array::{FixedSizeListArray, Int32Array, RecordBatch, RecordBatchIterat
use arrow_schema::{DataType, Field, Schema}; use arrow_schema::{DataType, Field, Schema};
use futures::TryStreamExt; use futures::TryStreamExt;
use vectordb::connection::Connection; use lancedb::connection::Connection;
use vectordb::table::AddDataOptions; use lancedb::table::AddDataOptions;
use vectordb::{connect, Result, Table, TableRef}; use lancedb::{connect, Result, Table, TableRef};
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {

View File

@@ -303,6 +303,9 @@ pub struct ConnectBuilder {
/// User provided AWS credentials /// User provided AWS credentials
aws_creds: Option<AwsCredential>, aws_creds: Option<AwsCredential>,
/// The interval at which to check for updates from other processes.
read_consistency_interval: Option<std::time::Duration>,
} }
impl ConnectBuilder { impl ConnectBuilder {
@@ -314,6 +317,7 @@ impl ConnectBuilder {
region: None, region: None,
host_override: None, host_override: None,
aws_creds: None, aws_creds: None,
read_consistency_interval: None,
} }
} }
@@ -338,6 +342,29 @@ impl ConnectBuilder {
self self
} }
/// The interval at which to check for updates from other processes. This
/// only affects LanceDB OSS.
///
/// If left unset, consistency is not checked. For maximum read
/// performance, this is the default. For strong consistency, set this to
/// zero seconds. Then every read will check for updates from other processes.
/// As a compromise, set this to a non-zero duration for eventual consistency.
/// If more than that duration has passed since the last read, the read will
/// check for updates from other processes.
///
/// This only affects read operations. Write operations are always
/// consistent.
///
/// LanceDB Cloud uses eventual consistency under the hood, and is not
/// currently configurable.
pub fn read_consistency_interval(
mut self,
read_consistency_interval: std::time::Duration,
) -> Self {
self.read_consistency_interval = Some(read_consistency_interval);
self
}
/// Establishes a connection to the database /// Establishes a connection to the database
pub async fn execute(self) -> Result<Connection> { pub async fn execute(self) -> Result<Connection> {
let internal = Arc::new(Database::connect_with_options(&self).await?); let internal = Arc::new(Database::connect_with_options(&self).await?);
@@ -368,6 +395,8 @@ struct Database {
// the object store wrapper to use on write path // the object store wrapper to use on write path
pub(crate) store_wrapper: Option<Arc<dyn WrappingObjectStore>>, pub(crate) store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
read_consistency_interval: Option<std::time::Duration>,
} }
const LANCE_EXTENSION: &str = "lance"; const LANCE_EXTENSION: &str = "lance";
@@ -380,8 +409,11 @@ impl Database {
let uri = &options.uri; let uri = &options.uri;
let parse_res = url::Url::parse(uri); let parse_res = url::Url::parse(uri);
// TODO: pass params regardless of OS
match parse_res { match parse_res {
Ok(url) if url.scheme().len() == 1 && cfg!(windows) => Self::open_path(uri).await, Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
Self::open_path(uri, options.read_consistency_interval).await
}
Ok(mut url) => { Ok(mut url) => {
// iter thru the query params and extract the commit store param // iter thru the query params and extract the commit store param
let mut engine = None; let mut engine = None;
@@ -465,13 +497,17 @@ impl Database {
base_path, base_path,
object_store, object_store,
store_wrapper: write_store_wrapper, store_wrapper: write_store_wrapper,
read_consistency_interval: options.read_consistency_interval,
}) })
} }
Err(_) => Self::open_path(uri).await, Err(_) => Self::open_path(uri, options.read_consistency_interval).await,
} }
} }
async fn open_path(path: &str) -> Result<Self> { async fn open_path(
path: &str,
read_consistency_interval: Option<std::time::Duration>,
) -> Result<Self> {
let (object_store, base_path) = ObjectStore::from_uri(path).await?; let (object_store, base_path) = ObjectStore::from_uri(path).await?;
if object_store.is_local() { if object_store.is_local() {
Self::try_create_dir(path).context(CreateDirSnafu { path })?; Self::try_create_dir(path).context(CreateDirSnafu { path })?;
@@ -482,6 +518,7 @@ impl Database {
base_path, base_path,
object_store, object_store,
store_wrapper: None, store_wrapper: None,
read_consistency_interval,
}) })
} }
@@ -551,6 +588,7 @@ impl ConnectionInternal for Database {
options.data.unwrap(), options.data.unwrap(),
self.store_wrapper.clone(), self.store_wrapper.clone(),
Some(write_params), Some(write_params),
self.read_consistency_interval,
) )
.await .await
{ {
@@ -576,6 +614,7 @@ impl ConnectionInternal for Database {
&options.name, &options.name,
self.store_wrapper.clone(), self.store_wrapper.clone(),
options.lance_read_params, options.lance_read_params,
self.read_consistency_interval,
) )
.await?, .await?,
)) ))
@@ -696,6 +735,6 @@ mod tests {
.execute() .execute()
.await .await
.unwrap(); .unwrap();
assert_eq!(other_schema, overwritten.schema()); assert_eq!(other_schema, overwritten.schema().await.unwrap());
} }
} }

View File

@@ -168,7 +168,7 @@ impl IndexBuilder {
/// Build the parameters. /// Build the parameters.
pub async fn build(&self) -> Result<()> { pub async fn build(&self) -> Result<()> {
let schema = self.table.schema(); let schema = self.table.schema().await?;
// TODO: simplify this after GH lance#1864. // TODO: simplify this after GH lance#1864.
let mut index_type = &self.index_type; let mut index_type = &self.index_type;
@@ -230,7 +230,7 @@ impl IndexBuilder {
.table .table
.as_native() .as_native()
.expect("Only native table is supported here"); .expect("Only native table is supported here");
let mut dataset = tbl.clone_inner_dataset(); let mut dataset = tbl.dataset.get_mut().await?;
match params { match params {
IndexParams::Scalar { replace } => { IndexParams::Scalar { replace } => {
dataset dataset
@@ -271,7 +271,6 @@ impl IndexBuilder {
.await?; .await?;
} }
} }
tbl.reset_dataset(dataset);
Ok(()) Ok(())
} }
} }

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
//! # VectorDB ([LanceDB](https://github.com/lancedb/lancedb)) -- Developer-friendly, serverless vector database for AI applications
//!
//! [LanceDB](https://github.com/lancedb/lancedb) is an open-source database for vector-search built with persistent storage, //! [LanceDB](https://github.com/lancedb/lancedb) is an open-source database for vector-search built with persistent storage,
//! which greatly simplifies retrevial, filtering and management of embeddings. //! which greatly simplifies retrevial, filtering and management of embeddings.
//! //!
@@ -33,7 +31,7 @@
//! LanceDB runs in process, to use it in your Rust project, put the following in your `Cargo.toml`: //! LanceDB runs in process, to use it in your Rust project, put the following in your `Cargo.toml`:
//! //!
//! ```ignore //! ```ignore
//! cargo install vectordb //! cargo install lancedb
//! ``` //! ```
//! //!
//! ### Quick Start //! ### Quick Start
@@ -45,7 +43,7 @@
//! ```rust //! ```rust
//! # use arrow_schema::{Field, Schema}; //! # use arrow_schema::{Field, Schema};
//! # tokio::runtime::Runtime::new().unwrap().block_on(async { //! # tokio::runtime::Runtime::new().unwrap().block_on(async {
//! let db = vectordb::connect("data/sample-lancedb").execute().await.unwrap(); //! let db = lancedb::connect("data/sample-lancedb").execute().await.unwrap();
//! # }); //! # });
//! ``` //! ```
//! //!
@@ -60,7 +58,7 @@
//! ```rust //! ```rust
//! use object_store::aws::AwsCredential; //! use object_store::aws::AwsCredential;
//! # tokio::runtime::Runtime::new().unwrap().block_on(async { //! # tokio::runtime::Runtime::new().unwrap().block_on(async {
//! let db = vectordb::connect("data/sample-lancedb") //! let db = lancedb::connect("data/sample-lancedb")
//! .aws_creds(AwsCredential { //! .aws_creds(AwsCredential {
//! key_id: "some_key".to_string(), //! key_id: "some_key".to_string(),
//! secret_key: "some_secret".to_string(), //! secret_key: "some_secret".to_string(),
@@ -90,7 +88,7 @@
//! //!
//! # tokio::runtime::Runtime::new().unwrap().block_on(async { //! # tokio::runtime::Runtime::new().unwrap().block_on(async {
//! # let tmpdir = tempfile::tempdir().unwrap(); //! # let tmpdir = tempfile::tempdir().unwrap();
//! # let db = vectordb::connect(tmpdir.path().to_str().unwrap()).execute().await.unwrap(); //! # let db = lancedb::connect(tmpdir.path().to_str().unwrap()).execute().await.unwrap();
//! let schema = Arc::new(Schema::new(vec![ //! let schema = Arc::new(Schema::new(vec![
//! Field::new("id", DataType::Int32, false), //! Field::new("id", DataType::Int32, false),
//! Field::new( //! Field::new(
@@ -134,7 +132,7 @@
//! # use arrow_schema::{Schema, Field, DataType}; //! # use arrow_schema::{Schema, Field, DataType};
//! # tokio::runtime::Runtime::new().unwrap().block_on(async { //! # tokio::runtime::Runtime::new().unwrap().block_on(async {
//! # let tmpdir = tempfile::tempdir().unwrap(); //! # let tmpdir = tempfile::tempdir().unwrap();
//! # let db = vectordb::connect(tmpdir.path().to_str().unwrap()).execute().await.unwrap(); //! # let db = lancedb::connect(tmpdir.path().to_str().unwrap()).execute().await.unwrap();
//! # let tbl = db.open_table("idx_test").execute().await.unwrap(); //! # let tbl = db.open_table("idx_test").execute().await.unwrap();
//! tbl.create_index(&["vector"]) //! tbl.create_index(&["vector"])
//! .ivf_pq() //! .ivf_pq()
@@ -155,7 +153,7 @@
//! # use arrow_array::{FixedSizeListArray, Float32Array, Int32Array, types::Float32Type}; //! # use arrow_array::{FixedSizeListArray, Float32Array, Int32Array, types::Float32Type};
//! # tokio::runtime::Runtime::new().unwrap().block_on(async { //! # tokio::runtime::Runtime::new().unwrap().block_on(async {
//! # let tmpdir = tempfile::tempdir().unwrap(); //! # let tmpdir = tempfile::tempdir().unwrap();
//! # let db = vectordb::connect(tmpdir.path().to_str().unwrap()).execute().await.unwrap(); //! # let db = lancedb::connect(tmpdir.path().to_str().unwrap()).execute().await.unwrap();
//! # let schema = Arc::new(Schema::new(vec![ //! # let schema = Arc::new(Schema::new(vec![
//! # Field::new("id", DataType::Int32, false), //! # Field::new("id", DataType::Int32, false),
//! # Field::new("vector", DataType::FixedSizeList( //! # Field::new("vector", DataType::FixedSizeList(

View File

@@ -12,15 +12,13 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::sync::Arc;
use arrow_array::Float32Array; use arrow_array::Float32Array;
use arrow_schema::Schema; use arrow_schema::Schema;
use lance::dataset::scanner::{DatasetRecordBatchStream, Scanner}; use lance::dataset::scanner::{DatasetRecordBatchStream, Scanner};
use lance::dataset::Dataset;
use lance_linalg::distance::MetricType; use lance_linalg::distance::MetricType;
use crate::error::Result; use crate::error::Result;
use crate::table::dataset::DatasetConsistencyWrapper;
use crate::utils::default_vector_column; use crate::utils::default_vector_column;
use crate::Error; use crate::Error;
@@ -29,7 +27,7 @@ const DEFAULT_TOP_K: usize = 10;
/// A builder for nearest neighbor queries for LanceDB. /// A builder for nearest neighbor queries for LanceDB.
#[derive(Clone)] #[derive(Clone)]
pub struct Query { pub struct Query {
dataset: Arc<Dataset>, dataset: DatasetConsistencyWrapper,
// The column to run the query on. If not specified, we will attempt to guess // The column to run the query on. If not specified, we will attempt to guess
// the column based on the dataset's schema. // the column based on the dataset's schema.
@@ -60,7 +58,8 @@ impl Query {
/// # Arguments /// # Arguments
/// ///
/// * `dataset` - Lance dataset. /// * `dataset` - Lance dataset.
pub(crate) fn new(dataset: Arc<Dataset>) -> Self { ///
pub(crate) fn new(dataset: DatasetConsistencyWrapper) -> Self {
Self { Self {
dataset, dataset,
query_vector: None, query_vector: None,
@@ -82,7 +81,8 @@ impl Query {
/// ///
/// * A [DatasetRecordBatchStream] with the query's results. /// * A [DatasetRecordBatchStream] with the query's results.
pub async fn execute_stream(&self) -> Result<DatasetRecordBatchStream> { pub async fn execute_stream(&self) -> Result<DatasetRecordBatchStream> {
let mut scanner: Scanner = self.dataset.scan(); let ds_ref = self.dataset.get().await?;
let mut scanner: Scanner = ds_ref.scan();
if let Some(query) = self.query_vector.as_ref() { if let Some(query) = self.query_vector.as_ref() {
// If there is a vector query, default to limit=10 if unspecified // If there is a vector query, default to limit=10 if unspecified
@@ -90,10 +90,10 @@ impl Query {
col.clone() col.clone()
} else { } else {
// Infer a vector column with the same dimension of the query vector. // Infer a vector column with the same dimension of the query vector.
let arrow_schema = Schema::from(self.dataset.schema()); let arrow_schema = Schema::from(ds_ref.schema());
default_vector_column(&arrow_schema, Some(query.len() as i32))? default_vector_column(&arrow_schema, Some(query.len() as i32))?
}; };
let field = self.dataset.schema().field(&column).ok_or(Error::Store { let field = ds_ref.schema().field(&column).ok_or(Error::Store {
message: format!("Column {} not found in dataset schema", column), message: format!("Column {} not found in dataset schema", column),
})?; })?;
if !matches!(field.data_type(), arrow_schema::DataType::FixedSizeList(f, dim) if f.data_type().is_floating() && dim == query.len() as i32) if !matches!(field.data_type(), arrow_schema::DataType::FixedSizeList(f, dim) if f.data_type().is_floating() && dim == query.len() as i32)
@@ -239,8 +239,10 @@ mod tests {
let batches = make_test_batches(); let batches = make_test_batches();
let ds = Dataset::write(batches, "memory://foo", None).await.unwrap(); let ds = Dataset::write(batches, "memory://foo", None).await.unwrap();
let ds = DatasetConsistencyWrapper::new_latest(ds, None);
let vector = Some(Float32Array::from_iter_values([0.1, 0.2])); let vector = Some(Float32Array::from_iter_values([0.1, 0.2]));
let query = Query::new(Arc::new(ds)).nearest_to(&[0.1, 0.2]); let query = Query::new(ds).nearest_to(&[0.1, 0.2]);
assert_eq!(query.query_vector, vector); assert_eq!(query.query_vector, vector);
let new_vector = Float32Array::from_iter_values([9.8, 8.7]); let new_vector = Float32Array::from_iter_values([9.8, 8.7]);
@@ -264,7 +266,9 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_execute() { async fn test_execute() {
let batches = make_non_empty_batches(); let batches = make_non_empty_batches();
let ds = Arc::new(Dataset::write(batches, "memory://foo", None).await.unwrap()); let ds = Dataset::write(batches, "memory://foo", None).await.unwrap();
let ds = DatasetConsistencyWrapper::new_latest(ds, None);
let query = Query::new(ds.clone()).nearest_to(&[0.1; 4]); let query = Query::new(ds.clone()).nearest_to(&[0.1; 4]);
let result = query.limit(10).filter("id % 2 == 0").execute_stream().await; let result = query.limit(10).filter("id % 2 == 0").execute_stream().await;
@@ -294,9 +298,11 @@ mod tests {
async fn test_execute_no_vector() { async fn test_execute_no_vector() {
// test that it's ok to not specify a query vector (just filter / limit) // test that it's ok to not specify a query vector (just filter / limit)
let batches = make_non_empty_batches(); let batches = make_non_empty_batches();
let ds = Arc::new(Dataset::write(batches, "memory://foo", None).await.unwrap()); let ds = Dataset::write(batches, "memory://foo", None).await.unwrap();
let query = Query::new(ds.clone()); let ds = DatasetConsistencyWrapper::new_latest(ds, None);
let query = Query::new(ds);
let result = query.filter("id % 2 == 0").execute_stream().await; let result = query.filter("id % 2 == 0").execute_stream().await;
let mut stream = result.expect("should have result"); let mut stream = result.expect("should have result");
// should only have one batch // should only have one batch

View File

@@ -15,7 +15,7 @@
//! LanceDB Table APIs //! LanceDB Table APIs
use std::path::Path; use std::path::Path;
use std::sync::{Arc, Mutex}; use std::sync::Arc;
use arrow_array::{RecordBatchIterator, RecordBatchReader}; use arrow_array::{RecordBatchIterator, RecordBatchReader};
use arrow_schema::{Schema, SchemaRef}; use arrow_schema::{Schema, SchemaRef};
@@ -39,8 +39,10 @@ use crate::index::IndexBuilder;
use crate::query::Query; use crate::query::Query;
use crate::utils::{PatchReadParam, PatchWriteParam}; use crate::utils::{PatchReadParam, PatchWriteParam};
use self::dataset::DatasetConsistencyWrapper;
use self::merge::{MergeInsert, MergeInsertBuilder}; use self::merge::{MergeInsert, MergeInsertBuilder};
pub(crate) mod dataset;
pub mod merge; pub mod merge;
/// Optimize the dataset. /// Optimize the dataset.
@@ -127,7 +129,7 @@ pub trait Table: std::fmt::Display + Send + Sync {
fn name(&self) -> &str; fn name(&self) -> &str;
/// Get the arrow [Schema] of the table. /// Get the arrow [Schema] of the table.
fn schema(&self) -> SchemaRef; async fn schema(&self) -> Result<SchemaRef>;
/// Count the number of rows in this dataset. /// Count the number of rows in this dataset.
/// ///
@@ -162,7 +164,7 @@ pub trait Table: std::fmt::Display + Send + Sync {
/// # use arrow_schema::{Schema, Field, DataType}; /// # use arrow_schema::{Schema, Field, DataType};
/// # tokio::runtime::Runtime::new().unwrap().block_on(async { /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// let tmpdir = tempfile::tempdir().unwrap(); /// let tmpdir = tempfile::tempdir().unwrap();
/// let db = vectordb::connect(tmpdir.path().to_str().unwrap()) /// let db = lancedb::connect(tmpdir.path().to_str().unwrap())
/// .execute() /// .execute()
/// .await /// .await
/// .unwrap(); /// .unwrap();
@@ -210,7 +212,7 @@ pub trait Table: std::fmt::Display + Send + Sync {
/// # use arrow_schema::{Schema, Field, DataType}; /// # use arrow_schema::{Schema, Field, DataType};
/// # tokio::runtime::Runtime::new().unwrap().block_on(async { /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// let tmpdir = tempfile::tempdir().unwrap(); /// let tmpdir = tempfile::tempdir().unwrap();
/// let db = vectordb::connect(tmpdir.path().to_str().unwrap()) /// let db = lancedb::connect(tmpdir.path().to_str().unwrap())
/// .execute() /// .execute()
/// .await /// .await
/// .unwrap(); /// .unwrap();
@@ -264,7 +266,7 @@ pub trait Table: std::fmt::Display + Send + Sync {
/// # use arrow_schema::{Schema, Field, DataType}; /// # use arrow_schema::{Schema, Field, DataType};
/// # tokio::runtime::Runtime::new().unwrap().block_on(async { /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// let tmpdir = tempfile::tempdir().unwrap(); /// let tmpdir = tempfile::tempdir().unwrap();
/// let db = vectordb::connect(tmpdir.path().to_str().unwrap()) /// let db = lancedb::connect(tmpdir.path().to_str().unwrap())
/// .execute() /// .execute()
/// .await /// .await
/// .unwrap(); /// .unwrap();
@@ -322,7 +324,8 @@ pub trait Table: std::fmt::Display + Send + Sync {
/// # use arrow_array::RecordBatch; /// # use arrow_array::RecordBatch;
/// # use futures::TryStreamExt; /// # use futures::TryStreamExt;
/// # tokio::runtime::Runtime::new().unwrap().block_on(async { /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// # let tbl = vectordb::table::NativeTable::open("/tmp/tbl").await.unwrap(); /// # let tbl = lancedb::table::NativeTable::open("/tmp/tbl").await.unwrap();
/// use crate::lancedb::Table;
/// let stream = tbl /// let stream = tbl
/// .query() /// .query()
/// .nearest_to(&[1.0, 2.0, 3.0]) /// .nearest_to(&[1.0, 2.0, 3.0])
@@ -340,7 +343,8 @@ pub trait Table: std::fmt::Display + Send + Sync {
/// # use arrow_array::RecordBatch; /// # use arrow_array::RecordBatch;
/// # use futures::TryStreamExt; /// # use futures::TryStreamExt;
/// # tokio::runtime::Runtime::new().unwrap().block_on(async { /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// # let tbl = vectordb::table::NativeTable::open("/tmp/tbl").await.unwrap(); /// # let tbl = lancedb::table::NativeTable::open("/tmp/tbl").await.unwrap();
/// use crate::lancedb::Table;
/// let stream = tbl /// let stream = tbl
/// .query() /// .query()
/// .filter("id > 5") /// .filter("id > 5")
@@ -357,7 +361,8 @@ pub trait Table: std::fmt::Display + Send + Sync {
/// # use arrow_array::RecordBatch; /// # use arrow_array::RecordBatch;
/// # use futures::TryStreamExt; /// # use futures::TryStreamExt;
/// # tokio::runtime::Runtime::new().unwrap().block_on(async { /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// # let tbl = vectordb::table::NativeTable::open("/tmp/tbl").await.unwrap(); /// # let tbl = lancedb::table::NativeTable::open("/tmp/tbl").await.unwrap();
/// use crate::lancedb::Table;
/// let stream = tbl.query().execute_stream().await.unwrap(); /// let stream = tbl.query().execute_stream().await.unwrap();
/// let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap(); /// let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
/// # }); /// # });
@@ -368,7 +373,7 @@ pub trait Table: std::fmt::Display + Send + Sync {
/// ///
/// <section class="warning">Experimental API</section> /// <section class="warning">Experimental API</section>
/// ///
/// Modeled after ``VACCUM`` in PostgreSQL. /// Modeled after ``VACUUM`` in PostgreSQL.
/// Not all implementations support explicit optimization. /// Not all implementations support explicit optimization.
async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats>; async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats>;
} }
@@ -381,10 +386,14 @@ pub type TableRef = Arc<dyn Table>;
pub struct NativeTable { pub struct NativeTable {
name: String, name: String,
uri: String, uri: String,
dataset: Arc<Mutex<Dataset>>, pub(crate) dataset: dataset::DatasetConsistencyWrapper,
// the object store wrapper to use on write path // the object store wrapper to use on write path
store_wrapper: Option<Arc<dyn WrappingObjectStore>>, store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
// This comes from the connection options. We store here so we can pass down
// to the dataset when we recreate it (for example, in checkout_latest).
read_consistency_interval: Option<std::time::Duration>,
} }
impl std::fmt::Display for NativeTable { impl std::fmt::Display for NativeTable {
@@ -406,7 +415,7 @@ impl NativeTable {
/// * A [NativeTable] object. /// * A [NativeTable] object.
pub async fn open(uri: &str) -> Result<Self> { pub async fn open(uri: &str) -> Result<Self> {
let name = Self::get_table_name(uri)?; let name = Self::get_table_name(uri)?;
Self::open_with_params(uri, &name, None, None).await Self::open_with_params(uri, &name, None, None, None).await
} }
/// Opens an existing Table /// Opens an existing Table
@@ -425,6 +434,7 @@ impl NativeTable {
name: &str, name: &str,
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>, write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
params: Option<ReadParams>, params: Option<ReadParams>,
read_consistency_interval: Option<std::time::Duration>,
) -> Result<Self> { ) -> Result<Self> {
let params = params.unwrap_or_default(); let params = params.unwrap_or_default();
// patch the params if we have a write store wrapper // patch the params if we have a write store wrapper
@@ -445,23 +455,22 @@ impl NativeTable {
message: e.to_string(), message: e.to_string(),
}, },
})?; })?;
let dataset = DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval);
Ok(Self { Ok(Self {
name: name.to_string(), name: name.to_string(),
uri: uri.to_string(), uri: uri.to_string(),
dataset: Arc::new(Mutex::new(dataset)), dataset,
store_wrapper: write_store_wrapper, store_wrapper: write_store_wrapper,
read_consistency_interval,
}) })
} }
/// Make a new clone of the internal lance dataset.
pub(crate) fn clone_inner_dataset(&self) -> Dataset {
self.dataset.lock().expect("Lock poison").clone()
}
/// Checkout a specific version of this [NativeTable] /// Checkout a specific version of this [NativeTable]
pub async fn checkout(uri: &str, version: u64) -> Result<Self> { pub async fn checkout(uri: &str, version: u64) -> Result<Self> {
let name = Self::get_table_name(uri)?; let name = Self::get_table_name(uri)?;
Self::checkout_with_params(uri, &name, version, None, ReadParams::default()).await Self::checkout_with_params(uri, &name, version, None, ReadParams::default(), None).await
} }
pub async fn checkout_with_params( pub async fn checkout_with_params(
@@ -470,44 +479,35 @@ impl NativeTable {
version: u64, version: u64,
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>, write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
params: ReadParams, params: ReadParams,
read_consistency_interval: Option<std::time::Duration>,
) -> Result<Self> { ) -> Result<Self> {
// patch the params if we have a write store wrapper // patch the params if we have a write store wrapper
let params = match write_store_wrapper.clone() { let params = match write_store_wrapper.clone() {
Some(wrapper) => params.patch_with_store_wrapper(wrapper)?, Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
None => params, None => params,
}; };
let dataset = Dataset::checkout_with_params(uri, version, &params) let dataset = DatasetBuilder::from_uri(uri)
.await .with_version(version)
.map_err(|e| match e { .with_read_params(params)
lance::Error::DatasetNotFound { .. } => Error::TableNotFound { .load()
name: name.to_string(), .await?;
}, let dataset = DatasetConsistencyWrapper::new_time_travel(dataset, version);
e => Error::Lance {
message: e.to_string(),
},
})?;
Ok(Self { Ok(Self {
name: name.to_string(), name: name.to_string(),
uri: uri.to_string(), uri: uri.to_string(),
dataset: Arc::new(Mutex::new(dataset)), dataset,
store_wrapper: write_store_wrapper, store_wrapper: write_store_wrapper,
read_consistency_interval,
}) })
} }
pub async fn checkout_latest(&self) -> Result<Self> { pub async fn checkout_latest(&self) -> Result<Self> {
let dataset = self.clone_inner_dataset(); let mut dataset = self.dataset.duplicate().await;
let latest_version_id = dataset.latest_version_id().await?; dataset.as_latest(self.read_consistency_interval).await?;
let dataset = if latest_version_id == dataset.version().version {
dataset
} else {
dataset.checkout_version(latest_version_id).await?
};
Ok(Self { Ok(Self {
name: self.name.clone(), dataset,
uri: self.uri.clone(), ..self.clone()
dataset: Arc::new(Mutex::new(dataset)),
store_wrapper: self.store_wrapper.clone(),
}) })
} }
@@ -543,6 +543,7 @@ impl NativeTable {
batches: impl RecordBatchReader + Send + 'static, batches: impl RecordBatchReader + Send + 'static,
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>, write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
params: Option<WriteParams>, params: Option<WriteParams>,
read_consistency_interval: Option<std::time::Duration>,
) -> Result<Self> { ) -> Result<Self> {
let params = params.unwrap_or_default(); let params = params.unwrap_or_default();
// patch the params if we have a write store wrapper // patch the params if we have a write store wrapper
@@ -564,8 +565,9 @@ impl NativeTable {
Ok(Self { Ok(Self {
name: name.to_string(), name: name.to_string(),
uri: uri.to_string(), uri: uri.to_string(),
dataset: Arc::new(Mutex::new(dataset)), dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
store_wrapper: write_store_wrapper, store_wrapper: write_store_wrapper,
read_consistency_interval,
}) })
} }
@@ -575,34 +577,35 @@ impl NativeTable {
schema: SchemaRef, schema: SchemaRef,
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>, write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
params: Option<WriteParams>, params: Option<WriteParams>,
read_consistency_interval: Option<std::time::Duration>,
) -> Result<Self> { ) -> Result<Self> {
let batches = RecordBatchIterator::new(vec![], schema); let batches = RecordBatchIterator::new(vec![], schema);
Self::create(uri, name, batches, write_store_wrapper, params).await Self::create(
uri,
name,
batches,
write_store_wrapper,
params,
read_consistency_interval,
)
.await
} }
/// Version of this Table /// Version of this Table
pub fn version(&self) -> u64 { pub async fn version(&self) -> Result<u64> {
self.dataset.lock().expect("lock poison").version().version Ok(self.dataset.get().await?.version().version)
} }
async fn optimize_indices(&self, options: &OptimizeOptions) -> Result<()> { async fn optimize_indices(&self, options: &OptimizeOptions) -> Result<()> {
info!("LanceDB: optimizing indices: {:?}", options); info!("LanceDB: optimizing indices: {:?}", options);
let mut dataset = self.clone_inner_dataset(); self.dataset
dataset.optimize_indices(options).await?; .get_mut()
.await?
.optimize_indices(options)
.await?;
Ok(()) Ok(())
} }
pub fn query(&self) -> Query {
Query::new(self.clone_inner_dataset().into())
}
pub fn filter(&self, expr: String) -> Query {
Query::new(self.clone_inner_dataset().into()).filter(expr)
}
/// Returns the number of rows in this Table
/// Merge new data into this table. /// Merge new data into this table.
pub async fn merge( pub async fn merge(
&mut self, &mut self,
@@ -610,14 +613,17 @@ impl NativeTable {
left_on: &str, left_on: &str,
right_on: &str, right_on: &str,
) -> Result<()> { ) -> Result<()> {
let mut dataset = self.clone_inner_dataset(); self.dataset
dataset.merge(batches, left_on, right_on).await?; .get_mut()
self.dataset = Arc::new(Mutex::new(dataset)); .await?
.merge(batches, left_on, right_on)
.await?;
Ok(()) Ok(())
} }
pub async fn update(&self, predicate: Option<&str>, updates: Vec<(&str, &str)>) -> Result<()> { pub async fn update(&self, predicate: Option<&str>, updates: Vec<(&str, &str)>) -> Result<()> {
let mut builder = UpdateBuilder::new(self.clone_inner_dataset().into()); let dataset = self.dataset.get().await?.clone();
let mut builder = UpdateBuilder::new(Arc::new(dataset));
if let Some(predicate) = predicate { if let Some(predicate) = predicate {
builder = builder.update_where(predicate)?; builder = builder.update_where(predicate)?;
} }
@@ -628,7 +634,7 @@ impl NativeTable {
let operation = builder.build()?; let operation = builder.build()?;
let ds = operation.execute().await?; let ds = operation.execute().await?;
self.reset_dataset(ds.as_ref().clone()); self.dataset.set_latest(ds.as_ref().clone()).await;
Ok(()) Ok(())
} }
@@ -648,8 +654,10 @@ impl NativeTable {
older_than: Duration, older_than: Duration,
delete_unverified: Option<bool>, delete_unverified: Option<bool>,
) -> Result<RemovalStats> { ) -> Result<RemovalStats> {
let dataset = self.clone_inner_dataset(); Ok(self
Ok(dataset .dataset
.get_mut()
.await?
.cleanup_old_versions(older_than, delete_unverified) .cleanup_old_versions(older_than, delete_unverified)
.await?) .await?)
} }
@@ -665,24 +673,27 @@ impl NativeTable {
options: CompactionOptions, options: CompactionOptions,
remap_options: Option<Arc<dyn IndexRemapperOptions>>, remap_options: Option<Arc<dyn IndexRemapperOptions>>,
) -> Result<CompactionMetrics> { ) -> Result<CompactionMetrics> {
let mut dataset = self.clone_inner_dataset(); let mut dataset_mut = self.dataset.get_mut().await?;
let metrics = compact_files(&mut dataset, options, remap_options).await?; let metrics = compact_files(&mut dataset_mut, options, remap_options).await?;
self.reset_dataset(dataset);
Ok(metrics) Ok(metrics)
} }
pub fn count_fragments(&self) -> usize { // TODO: why are these individual methods and not some single "get_stats" method?
self.dataset.lock().expect("lock poison").count_fragments() pub async fn count_fragments(&self) -> Result<usize> {
Ok(self.dataset.get().await?.count_fragments())
} }
pub async fn count_deleted_rows(&self) -> Result<usize> { pub async fn count_deleted_rows(&self) -> Result<usize> {
let dataset = self.clone_inner_dataset(); Ok(self.dataset.get().await?.count_deleted_rows().await?)
Ok(dataset.count_deleted_rows().await?)
} }
pub async fn num_small_files(&self, max_rows_per_group: usize) -> usize { pub async fn num_small_files(&self, max_rows_per_group: usize) -> Result<usize> {
let dataset = self.clone_inner_dataset(); Ok(self
dataset.num_small_files(max_rows_per_group).await .dataset
.get()
.await?
.num_small_files(max_rows_per_group)
.await)
} }
pub async fn count_indexed_rows(&self, index_uuid: &str) -> Result<Option<usize>> { pub async fn count_indexed_rows(&self, index_uuid: &str) -> Result<Option<usize>> {
@@ -700,7 +711,7 @@ impl NativeTable {
} }
pub async fn load_indices(&self) -> Result<Vec<VectorIndex>> { pub async fn load_indices(&self) -> Result<Vec<VectorIndex>> {
let dataset = self.clone_inner_dataset(); let dataset = self.dataset.get().await?;
let (indices, mf) = futures::try_join!(dataset.load_indices(), dataset.latest_manifest())?; let (indices, mf) = futures::try_join!(dataset.load_indices(), dataset.latest_manifest())?;
Ok(indices Ok(indices
.iter() .iter()
@@ -717,7 +728,7 @@ impl NativeTable {
if index.is_none() { if index.is_none() {
return Ok(None); return Ok(None);
} }
let dataset = self.clone_inner_dataset(); let dataset = self.dataset.get().await?;
let index_stats = dataset.index_statistics(&index.unwrap().index_name).await?; let index_stats = dataset.index_statistics(&index.unwrap().index_name).await?;
let index_stats: VectorIndexStatistics = let index_stats: VectorIndexStatistics =
serde_json::from_str(&index_stats).map_err(|e| Error::Lance { serde_json::from_str(&index_stats).map_err(|e| Error::Lance {
@@ -729,10 +740,6 @@ impl NativeTable {
Ok(Some(index_stats)) Ok(Some(index_stats))
} }
pub(crate) fn reset_dataset(&self, dataset: Dataset) {
*self.dataset.lock().expect("lock poison") = dataset;
}
} }
#[async_trait] #[async_trait]
@@ -742,7 +749,7 @@ impl MergeInsert for NativeTable {
params: MergeInsertBuilder, params: MergeInsertBuilder,
new_data: Box<dyn RecordBatchReader + Send>, new_data: Box<dyn RecordBatchReader + Send>,
) -> Result<()> { ) -> Result<()> {
let dataset = Arc::new(self.clone_inner_dataset()); let dataset = Arc::new(self.dataset.get().await?.clone());
let mut builder = LanceMergeInsertBuilder::try_new(dataset.clone(), params.on)?; let mut builder = LanceMergeInsertBuilder::try_new(dataset.clone(), params.on)?;
match ( match (
params.when_matched_update_all, params.when_matched_update_all,
@@ -769,7 +776,7 @@ impl MergeInsert for NativeTable {
} }
let job = builder.try_build()?; let job = builder.try_build()?;
let new_dataset = job.execute_reader(new_data).await?; let new_dataset = job.execute_reader(new_data).await?;
self.reset_dataset((*new_dataset).clone()); self.dataset.set_latest(new_dataset.as_ref().clone()).await;
Ok(()) Ok(())
} }
} }
@@ -788,13 +795,13 @@ impl Table for NativeTable {
self.name.as_str() self.name.as_str()
} }
fn schema(&self) -> SchemaRef { async fn schema(&self) -> Result<SchemaRef> {
let lance_schema = { self.dataset.lock().expect("lock poison").schema().clone() }; let lance_schema = self.dataset.get().await?.schema().clone();
Arc::new(Schema::from(&lance_schema)) Ok(Arc::new(Schema::from(&lance_schema)))
} }
async fn count_rows(&self, filter: Option<String>) -> Result<usize> { async fn count_rows(&self, filter: Option<String>) -> Result<usize> {
let dataset = { self.dataset.lock().expect("lock poison").clone() }; let dataset = self.dataset.get().await?;
if let Some(filter) = filter { if let Some(filter) = filter {
let mut scanner = dataset.scan(); let mut scanner = dataset.scan();
scanner.filter(&filter)?; scanner.filter(&filter)?;
@@ -826,7 +833,8 @@ impl Table for NativeTable {
None => lance_params, None => lance_params,
}; };
self.reset_dataset(Dataset::write(batches, &self.uri, Some(lance_params)).await?); let dataset = Dataset::write(batches, &self.uri, Some(lance_params)).await?;
self.dataset.set_latest(dataset).await;
Ok(()) Ok(())
} }
@@ -840,14 +848,12 @@ impl Table for NativeTable {
} }
fn query(&self) -> Query { fn query(&self) -> Query {
Query::new(Arc::new(self.dataset.lock().expect("lock poison").clone())) Query::new(self.dataset.clone())
} }
/// Delete rows from the table /// Delete rows from the table
async fn delete(&self, predicate: &str) -> Result<()> { async fn delete(&self, predicate: &str) -> Result<()> {
let mut dataset = self.clone_inner_dataset(); self.dataset.get_mut().await?.delete(predicate).await?;
dataset.delete(predicate).await?;
self.reset_dataset(dataset);
Ok(()) Ok(())
} }
@@ -903,6 +909,7 @@ mod tests {
use std::iter; use std::iter;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use arrow_array::{ use arrow_array::{
Array, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array, Array, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
@@ -918,6 +925,8 @@ mod tests {
use rand::Rng; use rand::Rng;
use tempfile::tempdir; use tempfile::tempdir;
use crate::connection::ConnectBuilder;
use super::*; use super::*;
#[tokio::test] #[tokio::test]
@@ -960,7 +969,7 @@ mod tests {
let uri = tmp_dir.path().to_str().unwrap(); let uri = tmp_dir.path().to_str().unwrap();
let batches = make_test_batches(); let batches = make_test_batches();
let table = NativeTable::create(uri, "test", batches, None, None) let table = NativeTable::create(uri, "test", batches, None, None, None)
.await .await
.unwrap(); .unwrap();
@@ -978,7 +987,7 @@ mod tests {
let batches = make_test_batches(); let batches = make_test_batches();
let schema = batches.schema().clone(); let schema = batches.schema().clone();
let table = NativeTable::create(uri, "test", batches, None, None) let table = NativeTable::create(uri, "test", batches, None, None, None)
.await .await
.unwrap(); .unwrap();
assert_eq!(table.count_rows(None).await.unwrap(), 10); assert_eq!(table.count_rows(None).await.unwrap(), 10);
@@ -1009,7 +1018,7 @@ mod tests {
// Create a dataset with i=0..10 // Create a dataset with i=0..10
let batches = merge_insert_test_batches(0, 0); let batches = merge_insert_test_batches(0, 0);
let table = NativeTable::create(uri, "test", batches, None, None) let table = NativeTable::create(uri, "test", batches, None, None, None)
.await .await
.unwrap(); .unwrap();
assert_eq!(table.count_rows(None).await.unwrap(), 10); assert_eq!(table.count_rows(None).await.unwrap(), 10);
@@ -1055,7 +1064,7 @@ mod tests {
let batches = make_test_batches(); let batches = make_test_batches();
let schema = batches.schema().clone(); let schema = batches.schema().clone();
let table = NativeTable::create(uri, "test", batches, None, None) let table = NativeTable::create(uri, "test", batches, None, None, None)
.await .await
.unwrap(); .unwrap();
assert_eq!(table.count_rows(None).await.unwrap(), 10); assert_eq!(table.count_rows(None).await.unwrap(), 10);
@@ -1410,7 +1419,7 @@ mod tests {
..Default::default() ..Default::default()
}; };
assert!(!wrapper.called()); assert!(!wrapper.called());
let _ = NativeTable::open_with_params(uri, "test", None, Some(param)) let _ = NativeTable::open_with_params(uri, "test", None, Some(param), None)
.await .await
.unwrap(); .unwrap();
assert!(wrapper.called()); assert!(wrapper.called());
@@ -1484,7 +1493,7 @@ mod tests {
schema, schema,
); );
let table = NativeTable::create(uri, "test", batches, None, None) let table = NativeTable::create(uri, "test", batches, None, None, None)
.await .await
.unwrap(); .unwrap();
@@ -1529,4 +1538,68 @@ mod tests {
Ok(FixedSizeListArray::from(data)) Ok(FixedSizeListArray::from(data))
} }
#[tokio::test]
async fn test_read_consistency_interval() {
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1]))],
)
.unwrap();
let intervals = vec![
None,
Some(0),
Some(100), // 100 ms
];
for interval in intervals {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let conn1 = ConnectBuilder::new(uri).execute().await.unwrap();
let table1 = conn1
.create_empty_table("my_table", batch.schema())
.execute()
.await
.unwrap();
let mut conn2 = ConnectBuilder::new(uri);
if let Some(interval) = interval {
conn2 = conn2.read_consistency_interval(std::time::Duration::from_millis(interval));
}
let conn2 = conn2.execute().await.unwrap();
let table2 = conn2.open_table("my_table").execute().await.unwrap();
assert_eq!(table1.count_rows(None).await.unwrap(), 0);
assert_eq!(table2.count_rows(None).await.unwrap(), 0);
table1
.add(
Box::new(RecordBatchIterator::new(
vec![Ok(batch.clone())],
batch.schema(),
)),
AddDataOptions::default(),
)
.await
.unwrap();
assert_eq!(table1.count_rows(None).await.unwrap(), 1);
match interval {
None => {
assert_eq!(table2.count_rows(None).await.unwrap(), 0);
}
Some(0) => {
assert_eq!(table2.count_rows(None).await.unwrap(), 1);
}
Some(100) => {
assert_eq!(table2.count_rows(None).await.unwrap(), 0);
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(table2.count_rows(None).await.unwrap(), 1);
}
_ => unreachable!(),
}
}
}
} }

View File

@@ -0,0 +1,234 @@
// Copyright 2024 LanceDB Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{
ops::{Deref, DerefMut},
sync::Arc,
time::{self, Duration, Instant},
};
use lance::Dataset;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use crate::error::Result;
/// A wrapper around a [Dataset] that provides lazy-loading and consistency checks.
///
/// This can be cloned cheaply. It supports concurrent reads or exclusive writes.
#[derive(Debug, Clone)]
pub struct DatasetConsistencyWrapper(Arc<RwLock<DatasetRef>>);
/// A wrapper around a [Dataset] that provides consistency checks.
///
/// The dataset is lazily loaded, and starts off as None. On the first access,
/// the dataset is loaded.
#[derive(Debug, Clone)]
enum DatasetRef {
/// In this mode, the dataset is always the latest version.
Latest {
dataset: Dataset,
read_consistency_interval: Option<Duration>,
last_consistency_check: Option<time::Instant>,
},
/// In this mode, the dataset is a specific version. It cannot be mutated.
TimeTravel { dataset: Dataset, version: u64 },
}
impl DatasetRef {
/// Reload the dataset to the appropriate version.
async fn reload(&mut self) -> Result<()> {
match self {
Self::Latest {
dataset,
last_consistency_check,
..
} => {
*dataset = dataset
.checkout_version(dataset.latest_version_id().await?)
.await?;
last_consistency_check.replace(Instant::now());
}
Self::TimeTravel { dataset, version } => {
dataset.checkout_version(*version).await?;
}
}
Ok(())
}
async fn as_latest(&mut self, read_consistency_interval: Option<Duration>) -> Result<()> {
match self {
Self::Latest { .. } => Ok(()),
Self::TimeTravel { dataset, .. } => {
dataset
.checkout_version(dataset.latest_version_id().await?)
.await?;
*self = Self::Latest {
dataset: dataset.clone(),
read_consistency_interval,
last_consistency_check: Some(Instant::now()),
};
Ok(())
}
}
}
fn set_latest(&mut self, dataset: Dataset) {
match self {
Self::Latest {
dataset: ref mut ds,
..
} => {
*ds = dataset;
}
_ => unreachable!("Dataset should be in latest mode at this point"),
}
}
}
impl DatasetConsistencyWrapper {
/// Create a new wrapper in the latest version mode.
pub fn new_latest(dataset: Dataset, read_consistency_interval: Option<Duration>) -> Self {
Self(Arc::new(RwLock::new(DatasetRef::Latest {
dataset,
read_consistency_interval,
last_consistency_check: None,
})))
}
/// Create a new wrapper in the time travel mode.
pub fn new_time_travel(dataset: Dataset, version: u64) -> Self {
Self(Arc::new(RwLock::new(DatasetRef::TimeTravel {
dataset,
version,
})))
}
/// Create an independent copy of self.
///
/// Unlike Clone, this will track versions independently of the original wrapper and
/// will be tied to a different RwLock.
pub async fn duplicate(&self) -> Self {
let ds_ref = self.0.read().await;
Self(Arc::new(RwLock::new((*ds_ref).clone())))
}
/// Get an immutable reference to the dataset.
pub async fn get(&self) -> Result<DatasetReadGuard<'_>> {
self.ensure_up_to_date().await?;
Ok(DatasetReadGuard {
guard: self.0.read().await,
})
}
/// Get a mutable reference to the dataset.
pub async fn get_mut(&self) -> Result<DatasetWriteGuard<'_>> {
self.ensure_up_to_date().await?;
Ok(DatasetWriteGuard {
guard: self.0.write().await,
})
}
/// Convert into a wrapper in latest version mode
pub async fn as_latest(&mut self, read_consistency_interval: Option<Duration>) -> Result<()> {
self.0
.write()
.await
.as_latest(read_consistency_interval)
.await
}
/// Provide a known latest version of the dataset.
///
/// This is usually done after some write operation, which inherently will
/// have the latest version.
pub async fn set_latest(&self, dataset: Dataset) {
self.0.write().await.set_latest(dataset);
}
async fn reload(&self) -> Result<()> {
self.0.write().await.reload().await
}
async fn is_up_to_date(&self) -> Result<bool> {
let dataset_ref = self.0.read().await;
match &*dataset_ref {
DatasetRef::Latest {
read_consistency_interval,
last_consistency_check,
..
} => match (read_consistency_interval, last_consistency_check) {
(None, _) => Ok(true),
(Some(_), None) => Ok(false),
(Some(read_consistency_interval), Some(last_consistency_check)) => {
if &last_consistency_check.elapsed() < read_consistency_interval {
Ok(true)
} else {
Ok(false)
}
}
},
DatasetRef::TimeTravel { dataset, version } => {
Ok(dataset.version().version == *version)
}
}
}
/// Ensures that the dataset is loaded and up-to-date with consistency and
/// version parameters.
async fn ensure_up_to_date(&self) -> Result<()> {
if !self.is_up_to_date().await? {
self.reload().await?;
}
Ok(())
}
}
pub struct DatasetReadGuard<'a> {
guard: RwLockReadGuard<'a, DatasetRef>,
}
impl Deref for DatasetReadGuard<'_> {
type Target = Dataset;
fn deref(&self) -> &Self::Target {
match &*self.guard {
DatasetRef::Latest { dataset, .. } => dataset,
DatasetRef::TimeTravel { dataset, .. } => dataset,
}
}
}
pub struct DatasetWriteGuard<'a> {
guard: RwLockWriteGuard<'a, DatasetRef>,
}
impl Deref for DatasetWriteGuard<'_> {
type Target = Dataset;
fn deref(&self) -> &Self::Target {
match &*self.guard {
DatasetRef::Latest { dataset, .. } => dataset,
DatasetRef::TimeTravel { dataset, .. } => dataset,
}
}
}
impl DerefMut for DatasetWriteGuard<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
match &mut *self.guard {
DatasetRef::Latest { dataset, .. } => dataset,
DatasetRef::TimeTravel { dataset, .. } => dataset,
}
}
}