mirror of
https://github.com/lancedb/lancedb.git
synced 2026-01-10 05:42:58 +00:00
Compare commits
3 Commits
python-v0.
...
v0.4.11
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4605359d3b | ||
|
|
f1596122e6 | ||
|
|
3aa0c40168 |
@@ -1,5 +1,5 @@
|
|||||||
[bumpversion]
|
[bumpversion]
|
||||||
current_version = 0.4.10
|
current_version = 0.4.11
|
||||||
commit = True
|
commit = True
|
||||||
message = Bump version: {current_version} → {new_version}
|
message = Bump version: {current_version} → {new_version}
|
||||||
tag = True
|
tag = True
|
||||||
@@ -9,4 +9,4 @@ tag_name = v{new_version}
|
|||||||
|
|
||||||
[bumpversion:file:rust/ffi/node/Cargo.toml]
|
[bumpversion:file:rust/ffi/node/Cargo.toml]
|
||||||
|
|
||||||
[bumpversion:file:rust/vectordb/Cargo.toml]
|
[bumpversion:file:rust/lancedb/Cargo.toml]
|
||||||
|
|||||||
2
.github/workflows/cargo-publish.yml
vendored
2
.github/workflows/cargo-publish.yml
vendored
@@ -26,4 +26,4 @@ jobs:
|
|||||||
sudo apt install -y protobuf-compiler libssl-dev
|
sudo apt install -y protobuf-compiler libssl-dev
|
||||||
- name: Publish the package
|
- name: Publish the package
|
||||||
run: |
|
run: |
|
||||||
cargo publish -p vectordb --all-features --token ${{ secrets.CARGO_REGISTRY_TOKEN }}
|
cargo publish -p lancedb --all-features --token ${{ secrets.CARGO_REGISTRY_TOKEN }}
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
[workspace]
|
[workspace]
|
||||||
members = ["rust/ffi/node", "rust/vectordb", "nodejs"]
|
members = ["rust/ffi/node", "rust/lancedb", "nodejs"]
|
||||||
# Python package needs to be built by maturin.
|
# Python package needs to be built by maturin.
|
||||||
exclude = ["python"]
|
exclude = ["python"]
|
||||||
resolver = "2"
|
resolver = "2"
|
||||||
|
|||||||
@@ -636,6 +636,70 @@ The `values` parameter is used to provide the new values for the columns as lite
|
|||||||
|
|
||||||
When rows are updated, they are moved out of the index. The row will still show up in ANN queries, but the query will not be as fast as it would be if the row was in the index. If you update a large proportion of rows, consider rebuilding the index afterwards.
|
When rows are updated, they are moved out of the index. The row will still show up in ANN queries, but the query will not be as fast as it would be if the row was in the index. If you update a large proportion of rows, consider rebuilding the index afterwards.
|
||||||
|
|
||||||
|
## Consistency
|
||||||
|
|
||||||
|
In LanceDB OSS, users can set the `read_consistency_interval` parameter on connections to achieve different levels of read consistency. This parameter determines how frequently the database synchronizes with the underlying storage system to check for updates made by other processes. If another process updates a table, the database will not see the changes until the next synchronization.
|
||||||
|
|
||||||
|
There are three possible settings for `read_consistency_interval`:
|
||||||
|
|
||||||
|
1. **Unset (default)**: The database does not check for updates to tables made by other processes. This provides the best query performance, but means that clients may not see the most up-to-date data. This setting is suitable for applications where the data does not change during the lifetime of the table reference.
|
||||||
|
2. **Zero seconds (Strong consistency)**: The database checks for updates on every read. This provides the strongest consistency guarantees, ensuring that all clients see the latest committed data. However, it has the most overhead. This setting is suitable when consistency matters more than having high QPS.
|
||||||
|
3. **Custom interval (Eventual consistency)**: The database checks for updates at a custom interval, such as every 5 seconds. This provides eventual consistency, allowing for some lag between write and read operations. Performance wise, this is a middle ground between strong consistency and no consistency check. This setting is suitable for applications where immediate consistency is not critical, but clients should see updated data eventually.
|
||||||
|
|
||||||
|
!!! tip "Consistency in LanceDB Cloud"
|
||||||
|
|
||||||
|
This is only tune-able in LanceDB OSS. In LanceDB Cloud, readers are always eventually consistent.
|
||||||
|
|
||||||
|
=== "Python"
|
||||||
|
|
||||||
|
To set strong consistency, use `timedelta(0)`:
|
||||||
|
|
||||||
|
```python
|
||||||
|
from datetime import timedelta
|
||||||
|
db = lancedb.connect("./.lancedb",. read_consistency_interval=timedelta(0))
|
||||||
|
table = db.open_table("my_table")
|
||||||
|
```
|
||||||
|
|
||||||
|
For eventual consistency, use a custom `timedelta`:
|
||||||
|
|
||||||
|
```python
|
||||||
|
from datetime import timedelta
|
||||||
|
db = lancedb.connect("./.lancedb", read_consistency_interval=timedelta(seconds=5))
|
||||||
|
table = db.open_table("my_table")
|
||||||
|
```
|
||||||
|
|
||||||
|
By default, a `Table` will never check for updates from other writers. To manually check for updates you can use `checkout_latest`:
|
||||||
|
|
||||||
|
```python
|
||||||
|
db = lancedb.connect("./.lancedb")
|
||||||
|
table = db.open_table("my_table")
|
||||||
|
|
||||||
|
# (Other writes happen to my_table from another process)
|
||||||
|
|
||||||
|
# Check for updates
|
||||||
|
table.checkout_latest()
|
||||||
|
```
|
||||||
|
|
||||||
|
=== "JavaScript/Typescript"
|
||||||
|
|
||||||
|
To set strong consistency, use `0`:
|
||||||
|
|
||||||
|
```javascript
|
||||||
|
const db = await lancedb.connect({ uri: "./.lancedb", readConsistencyInterval: 0 });
|
||||||
|
const table = await db.openTable("my_table");
|
||||||
|
```
|
||||||
|
|
||||||
|
For eventual consistency, specify the update interval as seconds:
|
||||||
|
|
||||||
|
```javascript
|
||||||
|
const db = await lancedb.connect({ uri: "./.lancedb", readConsistencyInterval: 5 });
|
||||||
|
const table = await db.openTable("my_table");
|
||||||
|
```
|
||||||
|
|
||||||
|
<!-- Node doesn't yet support the version time travel: https://github.com/lancedb/lancedb/issues/1007
|
||||||
|
Once it does, we can show manual consistency check for Node as well.
|
||||||
|
-->
|
||||||
|
|
||||||
## What's next?
|
## What's next?
|
||||||
|
|
||||||
Learn the best practices on creating an ANN index and getting the most out of it.
|
Learn the best practices on creating an ANN index and getting the most out of it.
|
||||||
@@ -1,12 +1,12 @@
|
|||||||
{
|
{
|
||||||
"name": "vectordb",
|
"name": "vectordb",
|
||||||
"version": "0.4.10",
|
"version": "0.4.11",
|
||||||
"description": " Serverless, low-latency vector database for AI applications",
|
"description": " Serverless, low-latency vector database for AI applications",
|
||||||
"main": "dist/index.js",
|
"main": "dist/index.js",
|
||||||
"types": "dist/index.d.ts",
|
"types": "dist/index.d.ts",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
"tsc": "tsc -b",
|
"tsc": "tsc -b",
|
||||||
"build": "npm run tsc && cargo-cp-artifact --artifact cdylib vectordb-node index.node -- cargo build --message-format=json",
|
"build": "npm run tsc && cargo-cp-artifact --artifact cdylib lancedb-node index.node -- cargo build --message-format=json",
|
||||||
"build-release": "npm run build -- --release",
|
"build-release": "npm run build -- --release",
|
||||||
"test": "npm run tsc && mocha -recursive dist/test",
|
"test": "npm run tsc && mocha -recursive dist/test",
|
||||||
"integration-test": "npm run tsc && mocha -recursive dist/integration_test",
|
"integration-test": "npm run tsc && mocha -recursive dist/integration_test",
|
||||||
@@ -85,10 +85,10 @@
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"optionalDependencies": {
|
"optionalDependencies": {
|
||||||
"@lancedb/vectordb-darwin-arm64": "0.4.10",
|
"@lancedb/vectordb-darwin-arm64": "0.4.11",
|
||||||
"@lancedb/vectordb-darwin-x64": "0.4.10",
|
"@lancedb/vectordb-darwin-x64": "0.4.11",
|
||||||
"@lancedb/vectordb-linux-arm64-gnu": "0.4.10",
|
"@lancedb/vectordb-linux-arm64-gnu": "0.4.11",
|
||||||
"@lancedb/vectordb-linux-x64-gnu": "0.4.10",
|
"@lancedb/vectordb-linux-x64-gnu": "0.4.11",
|
||||||
"@lancedb/vectordb-win32-x64-msvc": "0.4.10"
|
"@lancedb/vectordb-win32-x64-msvc": "0.4.11"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -96,6 +96,19 @@ export interface ConnectionOptions {
|
|||||||
* This is useful for local testing.
|
* This is useful for local testing.
|
||||||
*/
|
*/
|
||||||
hostOverride?: string
|
hostOverride?: string
|
||||||
|
|
||||||
|
/**
|
||||||
|
* (For LanceDB OSS only): The interval, in seconds, at which to check for
|
||||||
|
* updates to the table from other processes. If None, then consistency is not
|
||||||
|
* checked. For performance reasons, this is the default. For strong
|
||||||
|
* consistency, set this to zero seconds. Then every read will check for
|
||||||
|
* updates from other processes. As a compromise, you can set this to a
|
||||||
|
* non-zero value for eventual consistency. If more than that interval
|
||||||
|
* has passed since the last check, then the table will be checked for updates.
|
||||||
|
* Note: this consistency only applies to read operations. Write operations are
|
||||||
|
* always consistent.
|
||||||
|
*/
|
||||||
|
readConsistencyInterval?: number
|
||||||
}
|
}
|
||||||
|
|
||||||
function getAwsArgs (opts: ConnectionOptions): any[] {
|
function getAwsArgs (opts: ConnectionOptions): any[] {
|
||||||
@@ -181,7 +194,8 @@ export async function connect (
|
|||||||
opts.awsCredentials?.accessKeyId,
|
opts.awsCredentials?.accessKeyId,
|
||||||
opts.awsCredentials?.secretKey,
|
opts.awsCredentials?.secretKey,
|
||||||
opts.awsCredentials?.sessionToken,
|
opts.awsCredentials?.sessionToken,
|
||||||
opts.awsRegion
|
opts.awsRegion,
|
||||||
|
opts.readConsistencyInterval
|
||||||
)
|
)
|
||||||
return new LocalConnection(db, opts)
|
return new LocalConnection(db, opts)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,5 +18,5 @@ module.exports = {
|
|||||||
"@typescript-eslint/method-signature-style": "off",
|
"@typescript-eslint/method-signature-style": "off",
|
||||||
"@typescript-eslint/no-explicit-any": "off",
|
"@typescript-eslint/no-explicit-any": "off",
|
||||||
},
|
},
|
||||||
ignorePatterns: ["node_modules/", "dist/", "build/", "vectordb/native.*"],
|
ignorePatterns: ["node_modules/", "dist/", "build/", "lancedb/native.*"],
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "vectordb-nodejs"
|
name = "lancedb-nodejs"
|
||||||
edition.workspace = true
|
edition.workspace = true
|
||||||
version = "0.0.0"
|
version = "0.0.0"
|
||||||
license.workspace = true
|
license.workspace = true
|
||||||
@@ -16,7 +16,7 @@ arrow-ipc.workspace = true
|
|||||||
futures.workspace = true
|
futures.workspace = true
|
||||||
lance-linalg.workspace = true
|
lance-linalg.workspace = true
|
||||||
lance.workspace = true
|
lance.workspace = true
|
||||||
vectordb = { path = "../rust/vectordb" }
|
lancedb = { path = "../rust/lancedb" }
|
||||||
napi = { version = "2.15", default-features = false, features = [
|
napi = { version = "2.15", default-features = false, features = [
|
||||||
"napi7",
|
"napi7",
|
||||||
"async"
|
"async"
|
||||||
|
|||||||
@@ -12,7 +12,7 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
import { makeArrowTable, toBuffer } from "../vectordb/arrow";
|
import { makeArrowTable, toBuffer } from "../lancedb/arrow";
|
||||||
import {
|
import {
|
||||||
Int64,
|
Int64,
|
||||||
Field,
|
Field,
|
||||||
|
|||||||
@@ -29,6 +29,6 @@ test("open database", async () => {
|
|||||||
const tbl = await db.createTable("test", [{ id: 1 }, { id: 2 }]);
|
const tbl = await db.createTable("test", [{ id: 1 }, { id: 2 }]);
|
||||||
expect(await db.tableNames()).toStrictEqual(["test"]);
|
expect(await db.tableNames()).toStrictEqual(["test"]);
|
||||||
|
|
||||||
const schema = tbl.schema;
|
const schema = await tbl.schema();
|
||||||
expect(schema).toEqual(new Schema([new Field("id", new Float64(), true)]));
|
expect(schema).toEqual(new Schema([new Field("id", new Float64(), true)]));
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -181,3 +181,37 @@ describe("Test creating index", () => {
|
|||||||
// TODO: check index type.
|
// TODO: check index type.
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe("Read consistency interval", () => {
|
||||||
|
let tmpDir: string;
|
||||||
|
beforeEach(() => {
|
||||||
|
tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "read-consistency-"));
|
||||||
|
});
|
||||||
|
|
||||||
|
// const intervals = [undefined, 0, 0.1];
|
||||||
|
const intervals = [0];
|
||||||
|
test.each(intervals)("read consistency interval %p", async (interval) => {
|
||||||
|
const db = await connect({ uri: tmpDir });
|
||||||
|
const table = await db.createTable("my_table", [{ id: 1 }]);
|
||||||
|
|
||||||
|
const db2 = await connect({ uri: tmpDir, readConsistencyInterval: interval });
|
||||||
|
const table2 = await db2.openTable("my_table");
|
||||||
|
expect(await table2.countRows()).toEqual(await table.countRows());
|
||||||
|
|
||||||
|
await table.add([{ id: 2 }]);
|
||||||
|
|
||||||
|
if (interval === undefined) {
|
||||||
|
expect(await table2.countRows()).toEqual(1n);
|
||||||
|
// TODO: once we implement time travel we can uncomment this part of the test.
|
||||||
|
// await table2.checkout_latest();
|
||||||
|
// expect(await table2.countRows()).toEqual(2);
|
||||||
|
} else if (interval === 0) {
|
||||||
|
expect(await table2.countRows()).toEqual(2n);
|
||||||
|
} else {
|
||||||
|
// interval == 0.1
|
||||||
|
expect(await table2.countRows()).toEqual(1n);
|
||||||
|
await new Promise(r => setTimeout(r, 100));
|
||||||
|
expect(await table2.countRows()).toEqual(2n);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -53,12 +53,12 @@ export async function connect(
|
|||||||
opts = Object.assign(
|
opts = Object.assign(
|
||||||
{
|
{
|
||||||
uri: "",
|
uri: "",
|
||||||
apiKey: "",
|
apiKey: undefined,
|
||||||
hostOverride: "",
|
hostOverride: undefined,
|
||||||
},
|
},
|
||||||
args
|
args
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
const nativeConn = await NativeConnection.new(opts.uri);
|
const nativeConn = await NativeConnection.new(opts);
|
||||||
return new Connection(nativeConn);
|
return new Connection(nativeConn);
|
||||||
}
|
}
|
||||||
@@ -16,6 +16,18 @@ export interface ConnectionOptions {
|
|||||||
uri: string
|
uri: string
|
||||||
apiKey?: string
|
apiKey?: string
|
||||||
hostOverride?: string
|
hostOverride?: string
|
||||||
|
/**
|
||||||
|
* (For LanceDB OSS only): The interval, in seconds, at which to check for
|
||||||
|
* updates to the table from other processes. If None, then consistency is not
|
||||||
|
* checked. For performance reasons, this is the default. For strong
|
||||||
|
* consistency, set this to zero seconds. Then every read will check for
|
||||||
|
* updates from other processes. As a compromise, you can set this to a
|
||||||
|
* non-zero value for eventual consistency. If more than that interval
|
||||||
|
* has passed since the last check, then the table will be checked for updates.
|
||||||
|
* Note: this consistency only applies to read operations. Write operations are
|
||||||
|
* always consistent.
|
||||||
|
*/
|
||||||
|
readConsistencyInterval?: number
|
||||||
}
|
}
|
||||||
/** Write mode for writing a table. */
|
/** Write mode for writing a table. */
|
||||||
export const enum WriteMode {
|
export const enum WriteMode {
|
||||||
@@ -30,7 +42,7 @@ export interface WriteOptions {
|
|||||||
export function connect(options: ConnectionOptions): Promise<Connection>
|
export function connect(options: ConnectionOptions): Promise<Connection>
|
||||||
export class Connection {
|
export class Connection {
|
||||||
/** Create a new Connection instance from the given URI. */
|
/** Create a new Connection instance from the given URI. */
|
||||||
static new(uri: string): Promise<Connection>
|
static new(options: ConnectionOptions): Promise<Connection>
|
||||||
/** List all tables in the dataset. */
|
/** List all tables in the dataset. */
|
||||||
tableNames(): Promise<Array<string>>
|
tableNames(): Promise<Array<string>>
|
||||||
/**
|
/**
|
||||||
@@ -71,7 +83,7 @@ export class Query {
|
|||||||
}
|
}
|
||||||
export class Table {
|
export class Table {
|
||||||
/** Return Schema as empty Arrow IPC file. */
|
/** Return Schema as empty Arrow IPC file. */
|
||||||
schema(): Buffer
|
schema(): Promise<Buffer>
|
||||||
add(buf: Buffer): Promise<void>
|
add(buf: Buffer): Promise<void>
|
||||||
countRows(filter?: string | undefined | null): Promise<bigint>
|
countRows(filter?: string | undefined | null): Promise<bigint>
|
||||||
delete(predicate: string): Promise<void>
|
delete(predicate: string): Promise<void>
|
||||||
@@ -32,24 +32,24 @@ switch (platform) {
|
|||||||
case 'android':
|
case 'android':
|
||||||
switch (arch) {
|
switch (arch) {
|
||||||
case 'arm64':
|
case 'arm64':
|
||||||
localFileExisted = existsSync(join(__dirname, 'vectordb-nodejs.android-arm64.node'))
|
localFileExisted = existsSync(join(__dirname, 'lancedb-nodejs.android-arm64.node'))
|
||||||
try {
|
try {
|
||||||
if (localFileExisted) {
|
if (localFileExisted) {
|
||||||
nativeBinding = require('./vectordb-nodejs.android-arm64.node')
|
nativeBinding = require('./lancedb-nodejs.android-arm64.node')
|
||||||
} else {
|
} else {
|
||||||
nativeBinding = require('vectordb-android-arm64')
|
nativeBinding = require('lancedb-android-arm64')
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
loadError = e
|
loadError = e
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
case 'arm':
|
case 'arm':
|
||||||
localFileExisted = existsSync(join(__dirname, 'vectordb-nodejs.android-arm-eabi.node'))
|
localFileExisted = existsSync(join(__dirname, 'lancedb-nodejs.android-arm-eabi.node'))
|
||||||
try {
|
try {
|
||||||
if (localFileExisted) {
|
if (localFileExisted) {
|
||||||
nativeBinding = require('./vectordb-nodejs.android-arm-eabi.node')
|
nativeBinding = require('./lancedb-nodejs.android-arm-eabi.node')
|
||||||
} else {
|
} else {
|
||||||
nativeBinding = require('vectordb-android-arm-eabi')
|
nativeBinding = require('lancedb-android-arm-eabi')
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
loadError = e
|
loadError = e
|
||||||
@@ -63,13 +63,13 @@ switch (platform) {
|
|||||||
switch (arch) {
|
switch (arch) {
|
||||||
case 'x64':
|
case 'x64':
|
||||||
localFileExisted = existsSync(
|
localFileExisted = existsSync(
|
||||||
join(__dirname, 'vectordb-nodejs.win32-x64-msvc.node')
|
join(__dirname, 'lancedb-nodejs.win32-x64-msvc.node')
|
||||||
)
|
)
|
||||||
try {
|
try {
|
||||||
if (localFileExisted) {
|
if (localFileExisted) {
|
||||||
nativeBinding = require('./vectordb-nodejs.win32-x64-msvc.node')
|
nativeBinding = require('./lancedb-nodejs.win32-x64-msvc.node')
|
||||||
} else {
|
} else {
|
||||||
nativeBinding = require('vectordb-win32-x64-msvc')
|
nativeBinding = require('lancedb-win32-x64-msvc')
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
loadError = e
|
loadError = e
|
||||||
@@ -77,13 +77,13 @@ switch (platform) {
|
|||||||
break
|
break
|
||||||
case 'ia32':
|
case 'ia32':
|
||||||
localFileExisted = existsSync(
|
localFileExisted = existsSync(
|
||||||
join(__dirname, 'vectordb-nodejs.win32-ia32-msvc.node')
|
join(__dirname, 'lancedb-nodejs.win32-ia32-msvc.node')
|
||||||
)
|
)
|
||||||
try {
|
try {
|
||||||
if (localFileExisted) {
|
if (localFileExisted) {
|
||||||
nativeBinding = require('./vectordb-nodejs.win32-ia32-msvc.node')
|
nativeBinding = require('./lancedb-nodejs.win32-ia32-msvc.node')
|
||||||
} else {
|
} else {
|
||||||
nativeBinding = require('vectordb-win32-ia32-msvc')
|
nativeBinding = require('lancedb-win32-ia32-msvc')
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
loadError = e
|
loadError = e
|
||||||
@@ -91,13 +91,13 @@ switch (platform) {
|
|||||||
break
|
break
|
||||||
case 'arm64':
|
case 'arm64':
|
||||||
localFileExisted = existsSync(
|
localFileExisted = existsSync(
|
||||||
join(__dirname, 'vectordb-nodejs.win32-arm64-msvc.node')
|
join(__dirname, 'lancedb-nodejs.win32-arm64-msvc.node')
|
||||||
)
|
)
|
||||||
try {
|
try {
|
||||||
if (localFileExisted) {
|
if (localFileExisted) {
|
||||||
nativeBinding = require('./vectordb-nodejs.win32-arm64-msvc.node')
|
nativeBinding = require('./lancedb-nodejs.win32-arm64-msvc.node')
|
||||||
} else {
|
} else {
|
||||||
nativeBinding = require('vectordb-win32-arm64-msvc')
|
nativeBinding = require('lancedb-win32-arm64-msvc')
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
loadError = e
|
loadError = e
|
||||||
@@ -108,23 +108,23 @@ switch (platform) {
|
|||||||
}
|
}
|
||||||
break
|
break
|
||||||
case 'darwin':
|
case 'darwin':
|
||||||
localFileExisted = existsSync(join(__dirname, 'vectordb-nodejs.darwin-universal.node'))
|
localFileExisted = existsSync(join(__dirname, 'lancedb-nodejs.darwin-universal.node'))
|
||||||
try {
|
try {
|
||||||
if (localFileExisted) {
|
if (localFileExisted) {
|
||||||
nativeBinding = require('./vectordb-nodejs.darwin-universal.node')
|
nativeBinding = require('./lancedb-nodejs.darwin-universal.node')
|
||||||
} else {
|
} else {
|
||||||
nativeBinding = require('vectordb-darwin-universal')
|
nativeBinding = require('lancedb-darwin-universal')
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
} catch {}
|
} catch {}
|
||||||
switch (arch) {
|
switch (arch) {
|
||||||
case 'x64':
|
case 'x64':
|
||||||
localFileExisted = existsSync(join(__dirname, 'vectordb-nodejs.darwin-x64.node'))
|
localFileExisted = existsSync(join(__dirname, 'lancedb-nodejs.darwin-x64.node'))
|
||||||
try {
|
try {
|
||||||
if (localFileExisted) {
|
if (localFileExisted) {
|
||||||
nativeBinding = require('./vectordb-nodejs.darwin-x64.node')
|
nativeBinding = require('./lancedb-nodejs.darwin-x64.node')
|
||||||
} else {
|
} else {
|
||||||
nativeBinding = require('vectordb-darwin-x64')
|
nativeBinding = require('lancedb-darwin-x64')
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
loadError = e
|
loadError = e
|
||||||
@@ -132,13 +132,13 @@ switch (platform) {
|
|||||||
break
|
break
|
||||||
case 'arm64':
|
case 'arm64':
|
||||||
localFileExisted = existsSync(
|
localFileExisted = existsSync(
|
||||||
join(__dirname, 'vectordb-nodejs.darwin-arm64.node')
|
join(__dirname, 'lancedb-nodejs.darwin-arm64.node')
|
||||||
)
|
)
|
||||||
try {
|
try {
|
||||||
if (localFileExisted) {
|
if (localFileExisted) {
|
||||||
nativeBinding = require('./vectordb-nodejs.darwin-arm64.node')
|
nativeBinding = require('./lancedb-nodejs.darwin-arm64.node')
|
||||||
} else {
|
} else {
|
||||||
nativeBinding = require('vectordb-darwin-arm64')
|
nativeBinding = require('lancedb-darwin-arm64')
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
loadError = e
|
loadError = e
|
||||||
@@ -152,12 +152,12 @@ switch (platform) {
|
|||||||
if (arch !== 'x64') {
|
if (arch !== 'x64') {
|
||||||
throw new Error(`Unsupported architecture on FreeBSD: ${arch}`)
|
throw new Error(`Unsupported architecture on FreeBSD: ${arch}`)
|
||||||
}
|
}
|
||||||
localFileExisted = existsSync(join(__dirname, 'vectordb-nodejs.freebsd-x64.node'))
|
localFileExisted = existsSync(join(__dirname, 'lancedb-nodejs.freebsd-x64.node'))
|
||||||
try {
|
try {
|
||||||
if (localFileExisted) {
|
if (localFileExisted) {
|
||||||
nativeBinding = require('./vectordb-nodejs.freebsd-x64.node')
|
nativeBinding = require('./lancedb-nodejs.freebsd-x64.node')
|
||||||
} else {
|
} else {
|
||||||
nativeBinding = require('vectordb-freebsd-x64')
|
nativeBinding = require('lancedb-freebsd-x64')
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
loadError = e
|
loadError = e
|
||||||
@@ -168,26 +168,26 @@ switch (platform) {
|
|||||||
case 'x64':
|
case 'x64':
|
||||||
if (isMusl()) {
|
if (isMusl()) {
|
||||||
localFileExisted = existsSync(
|
localFileExisted = existsSync(
|
||||||
join(__dirname, 'vectordb-nodejs.linux-x64-musl.node')
|
join(__dirname, 'lancedb-nodejs.linux-x64-musl.node')
|
||||||
)
|
)
|
||||||
try {
|
try {
|
||||||
if (localFileExisted) {
|
if (localFileExisted) {
|
||||||
nativeBinding = require('./vectordb-nodejs.linux-x64-musl.node')
|
nativeBinding = require('./lancedb-nodejs.linux-x64-musl.node')
|
||||||
} else {
|
} else {
|
||||||
nativeBinding = require('vectordb-linux-x64-musl')
|
nativeBinding = require('lancedb-linux-x64-musl')
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
loadError = e
|
loadError = e
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
localFileExisted = existsSync(
|
localFileExisted = existsSync(
|
||||||
join(__dirname, 'vectordb-nodejs.linux-x64-gnu.node')
|
join(__dirname, 'lancedb-nodejs.linux-x64-gnu.node')
|
||||||
)
|
)
|
||||||
try {
|
try {
|
||||||
if (localFileExisted) {
|
if (localFileExisted) {
|
||||||
nativeBinding = require('./vectordb-nodejs.linux-x64-gnu.node')
|
nativeBinding = require('./lancedb-nodejs.linux-x64-gnu.node')
|
||||||
} else {
|
} else {
|
||||||
nativeBinding = require('vectordb-linux-x64-gnu')
|
nativeBinding = require('lancedb-linux-x64-gnu')
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
loadError = e
|
loadError = e
|
||||||
@@ -197,26 +197,26 @@ switch (platform) {
|
|||||||
case 'arm64':
|
case 'arm64':
|
||||||
if (isMusl()) {
|
if (isMusl()) {
|
||||||
localFileExisted = existsSync(
|
localFileExisted = existsSync(
|
||||||
join(__dirname, 'vectordb-nodejs.linux-arm64-musl.node')
|
join(__dirname, 'lancedb-nodejs.linux-arm64-musl.node')
|
||||||
)
|
)
|
||||||
try {
|
try {
|
||||||
if (localFileExisted) {
|
if (localFileExisted) {
|
||||||
nativeBinding = require('./vectordb-nodejs.linux-arm64-musl.node')
|
nativeBinding = require('./lancedb-nodejs.linux-arm64-musl.node')
|
||||||
} else {
|
} else {
|
||||||
nativeBinding = require('vectordb-linux-arm64-musl')
|
nativeBinding = require('lancedb-linux-arm64-musl')
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
loadError = e
|
loadError = e
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
localFileExisted = existsSync(
|
localFileExisted = existsSync(
|
||||||
join(__dirname, 'vectordb-nodejs.linux-arm64-gnu.node')
|
join(__dirname, 'lancedb-nodejs.linux-arm64-gnu.node')
|
||||||
)
|
)
|
||||||
try {
|
try {
|
||||||
if (localFileExisted) {
|
if (localFileExisted) {
|
||||||
nativeBinding = require('./vectordb-nodejs.linux-arm64-gnu.node')
|
nativeBinding = require('./lancedb-nodejs.linux-arm64-gnu.node')
|
||||||
} else {
|
} else {
|
||||||
nativeBinding = require('vectordb-linux-arm64-gnu')
|
nativeBinding = require('lancedb-linux-arm64-gnu')
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
loadError = e
|
loadError = e
|
||||||
@@ -225,13 +225,13 @@ switch (platform) {
|
|||||||
break
|
break
|
||||||
case 'arm':
|
case 'arm':
|
||||||
localFileExisted = existsSync(
|
localFileExisted = existsSync(
|
||||||
join(__dirname, 'vectordb-nodejs.linux-arm-gnueabihf.node')
|
join(__dirname, 'lancedb-nodejs.linux-arm-gnueabihf.node')
|
||||||
)
|
)
|
||||||
try {
|
try {
|
||||||
if (localFileExisted) {
|
if (localFileExisted) {
|
||||||
nativeBinding = require('./vectordb-nodejs.linux-arm-gnueabihf.node')
|
nativeBinding = require('./lancedb-nodejs.linux-arm-gnueabihf.node')
|
||||||
} else {
|
} else {
|
||||||
nativeBinding = require('vectordb-linux-arm-gnueabihf')
|
nativeBinding = require('lancedb-linux-arm-gnueabihf')
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
loadError = e
|
loadError = e
|
||||||
@@ -240,26 +240,26 @@ switch (platform) {
|
|||||||
case 'riscv64':
|
case 'riscv64':
|
||||||
if (isMusl()) {
|
if (isMusl()) {
|
||||||
localFileExisted = existsSync(
|
localFileExisted = existsSync(
|
||||||
join(__dirname, 'vectordb-nodejs.linux-riscv64-musl.node')
|
join(__dirname, 'lancedb-nodejs.linux-riscv64-musl.node')
|
||||||
)
|
)
|
||||||
try {
|
try {
|
||||||
if (localFileExisted) {
|
if (localFileExisted) {
|
||||||
nativeBinding = require('./vectordb-nodejs.linux-riscv64-musl.node')
|
nativeBinding = require('./lancedb-nodejs.linux-riscv64-musl.node')
|
||||||
} else {
|
} else {
|
||||||
nativeBinding = require('vectordb-linux-riscv64-musl')
|
nativeBinding = require('lancedb-linux-riscv64-musl')
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
loadError = e
|
loadError = e
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
localFileExisted = existsSync(
|
localFileExisted = existsSync(
|
||||||
join(__dirname, 'vectordb-nodejs.linux-riscv64-gnu.node')
|
join(__dirname, 'lancedb-nodejs.linux-riscv64-gnu.node')
|
||||||
)
|
)
|
||||||
try {
|
try {
|
||||||
if (localFileExisted) {
|
if (localFileExisted) {
|
||||||
nativeBinding = require('./vectordb-nodejs.linux-riscv64-gnu.node')
|
nativeBinding = require('./lancedb-nodejs.linux-riscv64-gnu.node')
|
||||||
} else {
|
} else {
|
||||||
nativeBinding = require('vectordb-linux-riscv64-gnu')
|
nativeBinding = require('lancedb-linux-riscv64-gnu')
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
loadError = e
|
loadError = e
|
||||||
@@ -268,13 +268,13 @@ switch (platform) {
|
|||||||
break
|
break
|
||||||
case 's390x':
|
case 's390x':
|
||||||
localFileExisted = existsSync(
|
localFileExisted = existsSync(
|
||||||
join(__dirname, 'vectordb-nodejs.linux-s390x-gnu.node')
|
join(__dirname, 'lancedb-nodejs.linux-s390x-gnu.node')
|
||||||
)
|
)
|
||||||
try {
|
try {
|
||||||
if (localFileExisted) {
|
if (localFileExisted) {
|
||||||
nativeBinding = require('./vectordb-nodejs.linux-s390x-gnu.node')
|
nativeBinding = require('./lancedb-nodejs.linux-s390x-gnu.node')
|
||||||
} else {
|
} else {
|
||||||
nativeBinding = require('vectordb-linux-s390x-gnu')
|
nativeBinding = require('lancedb-linux-s390x-gnu')
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
loadError = e
|
loadError = e
|
||||||
@@ -32,8 +32,8 @@ export class Table {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/** Get the schema of the table. */
|
/** Get the schema of the table. */
|
||||||
get schema(): Schema {
|
async schema(): Promise<Schema> {
|
||||||
const schemaBuf = this.inner.schema();
|
const schemaBuf = await this.inner.schema();
|
||||||
const tbl = tableFromIPC(schemaBuf);
|
const tbl = tableFromIPC(schemaBuf);
|
||||||
return tbl.schema;
|
return tbl.schema;
|
||||||
}
|
}
|
||||||
@@ -1,3 +1,3 @@
|
|||||||
# `vectordb-darwin-arm64`
|
# `lancedb-darwin-arm64`
|
||||||
|
|
||||||
This is the **aarch64-apple-darwin** binary for `vectordb`
|
This is the **aarch64-apple-darwin** binary for `lancedb`
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
{
|
{
|
||||||
"name": "vectordb-darwin-arm64",
|
"name": "lancedb-darwin-arm64",
|
||||||
"version": "0.4.3",
|
"version": "0.4.3",
|
||||||
"os": [
|
"os": [
|
||||||
"darwin"
|
"darwin"
|
||||||
@@ -7,9 +7,9 @@
|
|||||||
"cpu": [
|
"cpu": [
|
||||||
"arm64"
|
"arm64"
|
||||||
],
|
],
|
||||||
"main": "vectordb.darwin-arm64.node",
|
"main": "lancedb.darwin-arm64.node",
|
||||||
"files": [
|
"files": [
|
||||||
"vectordb.darwin-arm64.node"
|
"lancedb.darwin-arm64.node"
|
||||||
],
|
],
|
||||||
"license": "MIT",
|
"license": "MIT",
|
||||||
"engines": {
|
"engines": {
|
||||||
|
|||||||
@@ -1,3 +1,3 @@
|
|||||||
# `vectordb-darwin-x64`
|
# `lancedb-darwin-x64`
|
||||||
|
|
||||||
This is the **x86_64-apple-darwin** binary for `vectordb`
|
This is the **x86_64-apple-darwin** binary for `lancedb`
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
{
|
{
|
||||||
"name": "vectordb-darwin-x64",
|
"name": "lancedb-darwin-x64",
|
||||||
"version": "0.4.3",
|
"version": "0.4.3",
|
||||||
"os": [
|
"os": [
|
||||||
"darwin"
|
"darwin"
|
||||||
@@ -7,9 +7,9 @@
|
|||||||
"cpu": [
|
"cpu": [
|
||||||
"x64"
|
"x64"
|
||||||
],
|
],
|
||||||
"main": "vectordb.darwin-x64.node",
|
"main": "lancedb.darwin-x64.node",
|
||||||
"files": [
|
"files": [
|
||||||
"vectordb.darwin-x64.node"
|
"lancedb.darwin-x64.node"
|
||||||
],
|
],
|
||||||
"license": "MIT",
|
"license": "MIT",
|
||||||
"engines": {
|
"engines": {
|
||||||
|
|||||||
@@ -1,3 +1,3 @@
|
|||||||
# `vectordb-linux-arm64-gnu`
|
# `lancedb-linux-arm64-gnu`
|
||||||
|
|
||||||
This is the **aarch64-unknown-linux-gnu** binary for `vectordb`
|
This is the **aarch64-unknown-linux-gnu** binary for `lancedb`
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
{
|
{
|
||||||
"name": "vectordb-linux-arm64-gnu",
|
"name": "lancedb-linux-arm64-gnu",
|
||||||
"version": "0.4.3",
|
"version": "0.4.3",
|
||||||
"os": [
|
"os": [
|
||||||
"linux"
|
"linux"
|
||||||
@@ -7,9 +7,9 @@
|
|||||||
"cpu": [
|
"cpu": [
|
||||||
"arm64"
|
"arm64"
|
||||||
],
|
],
|
||||||
"main": "vectordb.linux-arm64-gnu.node",
|
"main": "lancedb.linux-arm64-gnu.node",
|
||||||
"files": [
|
"files": [
|
||||||
"vectordb.linux-arm64-gnu.node"
|
"lancedb.linux-arm64-gnu.node"
|
||||||
],
|
],
|
||||||
"license": "MIT",
|
"license": "MIT",
|
||||||
"engines": {
|
"engines": {
|
||||||
|
|||||||
@@ -1,3 +1,3 @@
|
|||||||
# `vectordb-linux-x64-gnu`
|
# `lancedb-linux-x64-gnu`
|
||||||
|
|
||||||
This is the **x86_64-unknown-linux-gnu** binary for `vectordb`
|
This is the **x86_64-unknown-linux-gnu** binary for `lancedb`
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
{
|
{
|
||||||
"name": "vectordb-linux-x64-gnu",
|
"name": "lancedb-linux-x64-gnu",
|
||||||
"version": "0.4.3",
|
"version": "0.4.3",
|
||||||
"os": [
|
"os": [
|
||||||
"linux"
|
"linux"
|
||||||
@@ -7,9 +7,9 @@
|
|||||||
"cpu": [
|
"cpu": [
|
||||||
"x64"
|
"x64"
|
||||||
],
|
],
|
||||||
"main": "vectordb.linux-x64-gnu.node",
|
"main": "lancedb.linux-x64-gnu.node",
|
||||||
"files": [
|
"files": [
|
||||||
"vectordb.linux-x64-gnu.node"
|
"lancedb.linux-x64-gnu.node"
|
||||||
],
|
],
|
||||||
"license": "MIT",
|
"license": "MIT",
|
||||||
"engines": {
|
"engines": {
|
||||||
|
|||||||
1087
nodejs/package-lock.json
generated
1087
nodejs/package-lock.json
generated
File diff suppressed because it is too large
Load Diff
@@ -1,10 +1,10 @@
|
|||||||
{
|
{
|
||||||
"name": "vectordb",
|
"name": "lancedb",
|
||||||
"version": "0.4.3",
|
"version": "0.4.3",
|
||||||
"main": "./dist/index.js",
|
"main": "./dist/index.js",
|
||||||
"types": "./dist/index.d.ts",
|
"types": "./dist/index.d.ts",
|
||||||
"napi": {
|
"napi": {
|
||||||
"name": "vectordb-nodejs",
|
"name": "lancedb-nodejs",
|
||||||
"triples": {
|
"triples": {
|
||||||
"defaults": false,
|
"defaults": false,
|
||||||
"additional": [
|
"additional": [
|
||||||
@@ -18,7 +18,7 @@
|
|||||||
"license": "Apache 2.0",
|
"license": "Apache 2.0",
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"@napi-rs/cli": "^2.18.0",
|
"@napi-rs/cli": "^2.18.0",
|
||||||
"@types/jest": "^29.5.11",
|
"@types/jest": "^29.1.2",
|
||||||
"@typescript-eslint/eslint-plugin": "^6.19.0",
|
"@typescript-eslint/eslint-plugin": "^6.19.0",
|
||||||
"@typescript-eslint/parser": "^6.19.0",
|
"@typescript-eslint/parser": "^6.19.0",
|
||||||
"eslint": "^8.56.0",
|
"eslint": "^8.56.0",
|
||||||
@@ -45,21 +45,22 @@
|
|||||||
],
|
],
|
||||||
"scripts": {
|
"scripts": {
|
||||||
"artifacts": "napi artifacts",
|
"artifacts": "napi artifacts",
|
||||||
"build:native": "napi build --platform --release --js vectordb/native.js --dts vectordb/native.d.ts dist/",
|
"build:native": "napi build --platform --release --js lancedb/native.js --dts lancedb/native.d.ts dist/",
|
||||||
"build:debug": "napi build --platform --dts ../vectordb/native.d.ts --js ../vectordb/native.js dist/",
|
"build:debug": "napi build --platform --dts ../lancedb/native.d.ts --js ../lancedb/native.js dist/",
|
||||||
"build": "npm run build:debug && tsc -b",
|
"build": "npm run build:debug && tsc -b",
|
||||||
"docs": "typedoc --plugin typedoc-plugin-markdown vectordb/index.ts",
|
"docs": "typedoc --plugin typedoc-plugin-markdown lancedb/index.ts",
|
||||||
"lint": "eslint vectordb --ext .js,.ts",
|
"lint": "eslint lancedb --ext .js,.ts",
|
||||||
"prepublishOnly": "napi prepublish -t npm",
|
"prepublishOnly": "napi prepublish -t npm",
|
||||||
"test": "npm run build && jest",
|
"//": "maxWorkers=1 is workaround for bigint issue in jest: https://github.com/jestjs/jest/issues/11617#issuecomment-1068732414",
|
||||||
|
"test": "npm run build && jest --maxWorkers=1",
|
||||||
"universal": "napi universal",
|
"universal": "napi universal",
|
||||||
"version": "napi version"
|
"version": "napi version"
|
||||||
},
|
},
|
||||||
"optionalDependencies": {
|
"optionalDependencies": {
|
||||||
"vectordb-darwin-arm64": "0.4.3",
|
"lancedb-darwin-arm64": "0.4.3",
|
||||||
"vectordb-darwin-x64": "0.4.3",
|
"lancedb-darwin-x64": "0.4.3",
|
||||||
"vectordb-linux-arm64-gnu": "0.4.3",
|
"lancedb-linux-arm64-gnu": "0.4.3",
|
||||||
"vectordb-linux-x64-gnu": "0.4.3"
|
"lancedb-linux-x64-gnu": "0.4.3"
|
||||||
},
|
},
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"apache-arrow": "^15.0.0"
|
"apache-arrow": "^15.0.0"
|
||||||
|
|||||||
@@ -16,8 +16,9 @@ use napi::bindgen_prelude::*;
|
|||||||
use napi_derive::*;
|
use napi_derive::*;
|
||||||
|
|
||||||
use crate::table::Table;
|
use crate::table::Table;
|
||||||
use vectordb::connection::Connection as LanceDBConnection;
|
use crate::ConnectionOptions;
|
||||||
use vectordb::ipc::ipc_file_to_batches;
|
use lancedb::connection::{ConnectBuilder, Connection as LanceDBConnection};
|
||||||
|
use lancedb::ipc::ipc_file_to_batches;
|
||||||
|
|
||||||
#[napi]
|
#[napi]
|
||||||
pub struct Connection {
|
pub struct Connection {
|
||||||
@@ -28,11 +29,23 @@ pub struct Connection {
|
|||||||
impl Connection {
|
impl Connection {
|
||||||
/// Create a new Connection instance from the given URI.
|
/// Create a new Connection instance from the given URI.
|
||||||
#[napi(factory)]
|
#[napi(factory)]
|
||||||
pub async fn new(uri: String) -> napi::Result<Self> {
|
pub async fn new(options: ConnectionOptions) -> napi::Result<Self> {
|
||||||
|
let mut builder = ConnectBuilder::new(&options.uri);
|
||||||
|
if let Some(api_key) = options.api_key {
|
||||||
|
builder = builder.api_key(&api_key);
|
||||||
|
}
|
||||||
|
if let Some(host_override) = options.host_override {
|
||||||
|
builder = builder.host_override(&host_override);
|
||||||
|
}
|
||||||
|
if let Some(interval) = options.read_consistency_interval {
|
||||||
|
builder =
|
||||||
|
builder.read_consistency_interval(std::time::Duration::from_secs_f64(interval));
|
||||||
|
}
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
conn: vectordb::connect(&uri).execute().await.map_err(|e| {
|
conn: builder
|
||||||
napi::Error::from_reason(format!("Failed to connect to database: {}", e))
|
.execute()
|
||||||
})?,
|
.await
|
||||||
|
.map_err(|e| napi::Error::from_reason(format!("{}", e)))?,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -40,12 +40,12 @@ impl From<MetricType> for LanceMetricType {
|
|||||||
|
|
||||||
#[napi]
|
#[napi]
|
||||||
pub struct IndexBuilder {
|
pub struct IndexBuilder {
|
||||||
inner: vectordb::index::IndexBuilder,
|
inner: lancedb::index::IndexBuilder,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[napi]
|
#[napi]
|
||||||
impl IndexBuilder {
|
impl IndexBuilder {
|
||||||
pub fn new(tbl: &dyn vectordb::Table) -> Self {
|
pub fn new(tbl: &dyn lancedb::Table) -> Self {
|
||||||
let inner = tbl.create_index(&[]);
|
let inner = tbl.create_index(&[]);
|
||||||
Self { inner }
|
Self { inner }
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,9 +14,9 @@
|
|||||||
|
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use lance::io::RecordBatchStream;
|
use lance::io::RecordBatchStream;
|
||||||
|
use lancedb::ipc::batches_to_ipc_file;
|
||||||
use napi::bindgen_prelude::*;
|
use napi::bindgen_prelude::*;
|
||||||
use napi_derive::napi;
|
use napi_derive::napi;
|
||||||
use vectordb::ipc::batches_to_ipc_file;
|
|
||||||
|
|
||||||
/** Typescript-style Async Iterator over RecordBatches */
|
/** Typescript-style Async Iterator over RecordBatches */
|
||||||
#[napi]
|
#[napi]
|
||||||
|
|||||||
@@ -22,10 +22,21 @@ mod query;
|
|||||||
mod table;
|
mod table;
|
||||||
|
|
||||||
#[napi(object)]
|
#[napi(object)]
|
||||||
|
#[derive(Debug)]
|
||||||
pub struct ConnectionOptions {
|
pub struct ConnectionOptions {
|
||||||
pub uri: String,
|
pub uri: String,
|
||||||
pub api_key: Option<String>,
|
pub api_key: Option<String>,
|
||||||
pub host_override: Option<String>,
|
pub host_override: Option<String>,
|
||||||
|
/// (For LanceDB OSS only): The interval, in seconds, at which to check for
|
||||||
|
/// updates to the table from other processes. If None, then consistency is not
|
||||||
|
/// checked. For performance reasons, this is the default. For strong
|
||||||
|
/// consistency, set this to zero seconds. Then every read will check for
|
||||||
|
/// updates from other processes. As a compromise, you can set this to a
|
||||||
|
/// non-zero value for eventual consistency. If more than that interval
|
||||||
|
/// has passed since the last check, then the table will be checked for updates.
|
||||||
|
/// Note: this consistency only applies to read operations. Write operations are
|
||||||
|
/// always consistent.
|
||||||
|
pub read_consistency_interval: Option<f64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Write mode for writing a table.
|
/// Write mode for writing a table.
|
||||||
@@ -44,5 +55,5 @@ pub struct WriteOptions {
|
|||||||
|
|
||||||
#[napi]
|
#[napi]
|
||||||
pub async fn connect(options: ConnectionOptions) -> napi::Result<Connection> {
|
pub async fn connect(options: ConnectionOptions) -> napi::Result<Connection> {
|
||||||
Connection::new(options.uri.clone()).await
|
Connection::new(options).await
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,9 +12,9 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
|
use lancedb::query::Query as LanceDBQuery;
|
||||||
use napi::bindgen_prelude::*;
|
use napi::bindgen_prelude::*;
|
||||||
use napi_derive::napi;
|
use napi_derive::napi;
|
||||||
use vectordb::query::Query as LanceDBQuery;
|
|
||||||
|
|
||||||
use crate::{iterator::RecordBatchIterator, table::Table};
|
use crate::{iterator::RecordBatchIterator, table::Table};
|
||||||
|
|
||||||
|
|||||||
@@ -13,10 +13,10 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
use arrow_ipc::writer::FileWriter;
|
use arrow_ipc::writer::FileWriter;
|
||||||
|
use lancedb::table::AddDataOptions;
|
||||||
|
use lancedb::{ipc::ipc_file_to_batches, table::TableRef};
|
||||||
use napi::bindgen_prelude::*;
|
use napi::bindgen_prelude::*;
|
||||||
use napi_derive::napi;
|
use napi_derive::napi;
|
||||||
use vectordb::table::AddDataOptions;
|
|
||||||
use vectordb::{ipc::ipc_file_to_batches, table::TableRef};
|
|
||||||
|
|
||||||
use crate::index::IndexBuilder;
|
use crate::index::IndexBuilder;
|
||||||
use crate::query::Query;
|
use crate::query::Query;
|
||||||
@@ -34,8 +34,12 @@ impl Table {
|
|||||||
|
|
||||||
/// Return Schema as empty Arrow IPC file.
|
/// Return Schema as empty Arrow IPC file.
|
||||||
#[napi]
|
#[napi]
|
||||||
pub fn schema(&self) -> napi::Result<Buffer> {
|
pub async fn schema(&self) -> napi::Result<Buffer> {
|
||||||
let mut writer = FileWriter::try_new(vec![], &self.table.schema())
|
let schema =
|
||||||
|
self.table.schema().await.map_err(|e| {
|
||||||
|
napi::Error::from_reason(format!("Failed to create IPC file: {}", e))
|
||||||
|
})?;
|
||||||
|
let mut writer = FileWriter::try_new(vec![], &schema)
|
||||||
.map_err(|e| napi::Error::from_reason(format!("Failed to create IPC file: {}", e)))?;
|
.map_err(|e| napi::Error::from_reason(format!("Failed to create IPC file: {}", e)))?;
|
||||||
writer
|
writer
|
||||||
.finish()
|
.finish()
|
||||||
|
|||||||
@@ -1,8 +1,8 @@
|
|||||||
{
|
{
|
||||||
"include": [
|
"include": [
|
||||||
"vectordb/*.ts",
|
"lancedb/*.ts",
|
||||||
"vectordb/**/*.ts",
|
"lancedb/**/*.ts",
|
||||||
"vectordb/*.js",
|
"lancedb/*.js",
|
||||||
],
|
],
|
||||||
"compilerOptions": {
|
"compilerOptions": {
|
||||||
"target": "es2022",
|
"target": "es2022",
|
||||||
@@ -18,7 +18,7 @@
|
|||||||
],
|
],
|
||||||
"typedocOptions": {
|
"typedocOptions": {
|
||||||
"entryPoints": [
|
"entryPoints": [
|
||||||
"vectordb/index.ts"
|
"lancedb/index.ts"
|
||||||
],
|
],
|
||||||
"out": "../docs/src/javascript/",
|
"out": "../docs/src/javascript/",
|
||||||
"visibilityFilters": {
|
"visibilityFilters": {
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "vectordb-node"
|
name = "lancedb-node"
|
||||||
version = "0.4.10"
|
version = "0.4.11"
|
||||||
description = "Serverless, low-latency vector database for AI applications"
|
description = "Serverless, low-latency vector database for AI applications"
|
||||||
license.workspace = true
|
license.workspace = true
|
||||||
edition.workspace = true
|
edition.workspace = true
|
||||||
@@ -24,9 +24,14 @@ half = { workspace = true }
|
|||||||
lance = { workspace = true }
|
lance = { workspace = true }
|
||||||
lance-index = { workspace = true }
|
lance-index = { workspace = true }
|
||||||
lance-linalg = { workspace = true }
|
lance-linalg = { workspace = true }
|
||||||
vectordb = { path = "../../vectordb" }
|
lancedb = { path = "../../lancedb" }
|
||||||
tokio = { version = "1.23", features = ["rt-multi-thread"] }
|
tokio = { version = "1.23", features = ["rt-multi-thread"] }
|
||||||
neon = {version = "0.10.1", default-features = false, features = ["channel-api", "napi-6", "promise-api", "task-api"] }
|
neon = { version = "0.10.1", default-features = false, features = [
|
||||||
|
"channel-api",
|
||||||
|
"napi-6",
|
||||||
|
"promise-api",
|
||||||
|
"task-api",
|
||||||
|
] }
|
||||||
object_store = { workspace = true, features = ["aws"] }
|
object_store = { workspace = true, features = ["aws"] }
|
||||||
snafu = { workspace = true }
|
snafu = { workspace = true }
|
||||||
async-trait = "0"
|
async-trait = "0"
|
||||||
|
|||||||
@@ -1,3 +1,3 @@
|
|||||||
The LanceDB node bridge (vectordb-node) allows javascript applications to access LanceDB datasets.
|
The LanceDB node bridge (lancedb-node) allows javascript applications to access LanceDB datasets.
|
||||||
|
|
||||||
It is build using [Neon](https://neon-bindings.com). See the node project for an example of how it is used / tests
|
It is build using [Neon](https://neon-bindings.com). See the node project for an example of how it is used / tests
|
||||||
|
|||||||
@@ -34,8 +34,8 @@ pub enum Error {
|
|||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, Error>;
|
pub type Result<T> = std::result::Result<T, Error>;
|
||||||
|
|
||||||
impl From<vectordb::error::Error> for Error {
|
impl From<lancedb::error::Error> for Error {
|
||||||
fn from(e: vectordb::error::Error) -> Self {
|
fn from(e: lancedb::error::Error) -> Self {
|
||||||
Self::LanceDB {
|
Self::LanceDB {
|
||||||
message: e.to_string(),
|
message: e.to_string(),
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ use neon::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
use crate::{error::ResultExt, runtime, table::JsTable};
|
use crate::{error::ResultExt, runtime, table::JsTable};
|
||||||
use vectordb::Table;
|
use lancedb::Table;
|
||||||
|
|
||||||
pub fn table_create_scalar_index(mut cx: FunctionContext) -> JsResult<JsPromise> {
|
pub fn table_create_scalar_index(mut cx: FunctionContext) -> JsResult<JsPromise> {
|
||||||
let js_table = cx.this().downcast_or_throw::<JsBox<JsTable>, _>(&mut cx)?;
|
let js_table = cx.this().downcast_or_throw::<JsBox<JsTable>, _>(&mut cx)?;
|
||||||
|
|||||||
@@ -13,10 +13,10 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
use lance_linalg::distance::MetricType;
|
use lance_linalg::distance::MetricType;
|
||||||
|
use lancedb::index::IndexBuilder;
|
||||||
use neon::context::FunctionContext;
|
use neon::context::FunctionContext;
|
||||||
use neon::prelude::*;
|
use neon::prelude::*;
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
use vectordb::index::IndexBuilder;
|
|
||||||
|
|
||||||
use crate::error::Error::InvalidIndexType;
|
use crate::error::Error::InvalidIndexType;
|
||||||
use crate::error::ResultExt;
|
use crate::error::ResultExt;
|
||||||
|
|||||||
@@ -22,9 +22,9 @@ use object_store::CredentialProvider;
|
|||||||
use once_cell::sync::OnceCell;
|
use once_cell::sync::OnceCell;
|
||||||
use tokio::runtime::Runtime;
|
use tokio::runtime::Runtime;
|
||||||
|
|
||||||
use vectordb::connect;
|
use lancedb::connect;
|
||||||
use vectordb::connection::Connection;
|
use lancedb::connection::Connection;
|
||||||
use vectordb::table::ReadParams;
|
use lancedb::table::ReadParams;
|
||||||
|
|
||||||
use crate::error::ResultExt;
|
use crate::error::ResultExt;
|
||||||
use crate::query::JsQuery;
|
use crate::query::JsQuery;
|
||||||
@@ -84,6 +84,11 @@ fn database_new(mut cx: FunctionContext) -> JsResult<JsPromise> {
|
|||||||
let path = cx.argument::<JsString>(0)?.value(&mut cx);
|
let path = cx.argument::<JsString>(0)?.value(&mut cx);
|
||||||
let aws_creds = get_aws_creds(&mut cx, 1)?;
|
let aws_creds = get_aws_creds(&mut cx, 1)?;
|
||||||
let region = get_aws_region(&mut cx, 4)?;
|
let region = get_aws_region(&mut cx, 4)?;
|
||||||
|
let read_consistency_interval = cx
|
||||||
|
.argument_opt(5)
|
||||||
|
.and_then(|arg| arg.downcast::<JsNumber, _>(&mut cx).ok())
|
||||||
|
.map(|v| v.value(&mut cx))
|
||||||
|
.map(std::time::Duration::from_secs_f64);
|
||||||
|
|
||||||
let rt = runtime(&mut cx)?;
|
let rt = runtime(&mut cx)?;
|
||||||
let channel = cx.channel();
|
let channel = cx.channel();
|
||||||
@@ -100,6 +105,9 @@ fn database_new(mut cx: FunctionContext) -> JsResult<JsPromise> {
|
|||||||
token: aws_creds.token,
|
token: aws_creds.token,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
if let Some(interval) = read_consistency_interval {
|
||||||
|
conn_builder = conn_builder.read_consistency_interval(interval);
|
||||||
|
}
|
||||||
rt.spawn(async move {
|
rt.spawn(async move {
|
||||||
let database = conn_builder.execute().await;
|
let database = conn_builder.execute().await;
|
||||||
|
|
||||||
|
|||||||
@@ -93,7 +93,7 @@ impl JsQuery {
|
|||||||
.and_then(|stream| {
|
.and_then(|stream| {
|
||||||
stream
|
stream
|
||||||
.try_collect::<Vec<_>>()
|
.try_collect::<Vec<_>>()
|
||||||
.map_err(vectordb::error::Error::from)
|
.map_err(lancedb::error::Error::from)
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
|||||||
@@ -18,12 +18,12 @@ use arrow_array::{RecordBatch, RecordBatchIterator};
|
|||||||
use lance::dataset::optimize::CompactionOptions;
|
use lance::dataset::optimize::CompactionOptions;
|
||||||
use lance::dataset::{WriteMode, WriteParams};
|
use lance::dataset::{WriteMode, WriteParams};
|
||||||
use lance::io::ObjectStoreParams;
|
use lance::io::ObjectStoreParams;
|
||||||
use vectordb::table::{AddDataOptions, OptimizeAction, WriteOptions};
|
use lancedb::table::{AddDataOptions, OptimizeAction, WriteOptions};
|
||||||
|
|
||||||
use crate::arrow::{arrow_buffer_to_record_batch, record_batch_to_buffer};
|
use crate::arrow::{arrow_buffer_to_record_batch, record_batch_to_buffer};
|
||||||
|
use lancedb::TableRef;
|
||||||
use neon::prelude::*;
|
use neon::prelude::*;
|
||||||
use neon::types::buffer::TypedArray;
|
use neon::types::buffer::TypedArray;
|
||||||
use vectordb::TableRef;
|
|
||||||
|
|
||||||
use crate::error::ResultExt;
|
use crate::error::ResultExt;
|
||||||
use crate::{convert, get_aws_credential_provider, get_aws_region, runtime, JsDatabase};
|
use crate::{convert, get_aws_credential_provider, get_aws_region, runtime, JsDatabase};
|
||||||
@@ -534,8 +534,9 @@ impl JsTable {
|
|||||||
.value(&mut cx);
|
.value(&mut cx);
|
||||||
|
|
||||||
rt.spawn(async move {
|
rt.spawn(async move {
|
||||||
|
let schema = table.schema().await;
|
||||||
deferred.settle_with(&channel, move |mut cx| {
|
deferred.settle_with(&channel, move |mut cx| {
|
||||||
let schema = table.schema();
|
let schema = schema.or_throw(&mut cx)?;
|
||||||
let batches = vec![RecordBatch::new_empty(schema)];
|
let batches = vec![RecordBatch::new_empty(schema)];
|
||||||
let buffer = record_batch_to_buffer(batches).or_throw(&mut cx)?;
|
let buffer = record_batch_to_buffer(batches).or_throw(&mut cx)?;
|
||||||
convert::new_js_buffer(buffer, &mut cx, is_electron)
|
convert::new_js_buffer(buffer, &mut cx, is_electron)
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "vectordb"
|
name = "lancedb"
|
||||||
version = "0.4.10"
|
version = "0.4.11"
|
||||||
edition.workspace = true
|
edition.workspace = true
|
||||||
description = "LanceDB: A serverless, low-latency vector database for AI applications"
|
description = "LanceDB: A serverless, low-latency vector database for AI applications"
|
||||||
license.workspace = true
|
license.workspace = true
|
||||||
@@ -19,9 +19,9 @@ use arrow_array::{FixedSizeListArray, Int32Array, RecordBatch, RecordBatchIterat
|
|||||||
use arrow_schema::{DataType, Field, Schema};
|
use arrow_schema::{DataType, Field, Schema};
|
||||||
use futures::TryStreamExt;
|
use futures::TryStreamExt;
|
||||||
|
|
||||||
use vectordb::connection::Connection;
|
use lancedb::connection::Connection;
|
||||||
use vectordb::table::AddDataOptions;
|
use lancedb::table::AddDataOptions;
|
||||||
use vectordb::{connect, Result, Table, TableRef};
|
use lancedb::{connect, Result, Table, TableRef};
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<()> {
|
async fn main() -> Result<()> {
|
||||||
@@ -303,6 +303,9 @@ pub struct ConnectBuilder {
|
|||||||
|
|
||||||
/// User provided AWS credentials
|
/// User provided AWS credentials
|
||||||
aws_creds: Option<AwsCredential>,
|
aws_creds: Option<AwsCredential>,
|
||||||
|
|
||||||
|
/// The interval at which to check for updates from other processes.
|
||||||
|
read_consistency_interval: Option<std::time::Duration>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ConnectBuilder {
|
impl ConnectBuilder {
|
||||||
@@ -314,6 +317,7 @@ impl ConnectBuilder {
|
|||||||
region: None,
|
region: None,
|
||||||
host_override: None,
|
host_override: None,
|
||||||
aws_creds: None,
|
aws_creds: None,
|
||||||
|
read_consistency_interval: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -338,6 +342,29 @@ impl ConnectBuilder {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The interval at which to check for updates from other processes. This
|
||||||
|
/// only affects LanceDB OSS.
|
||||||
|
///
|
||||||
|
/// If left unset, consistency is not checked. For maximum read
|
||||||
|
/// performance, this is the default. For strong consistency, set this to
|
||||||
|
/// zero seconds. Then every read will check for updates from other processes.
|
||||||
|
/// As a compromise, set this to a non-zero duration for eventual consistency.
|
||||||
|
/// If more than that duration has passed since the last read, the read will
|
||||||
|
/// check for updates from other processes.
|
||||||
|
///
|
||||||
|
/// This only affects read operations. Write operations are always
|
||||||
|
/// consistent.
|
||||||
|
///
|
||||||
|
/// LanceDB Cloud uses eventual consistency under the hood, and is not
|
||||||
|
/// currently configurable.
|
||||||
|
pub fn read_consistency_interval(
|
||||||
|
mut self,
|
||||||
|
read_consistency_interval: std::time::Duration,
|
||||||
|
) -> Self {
|
||||||
|
self.read_consistency_interval = Some(read_consistency_interval);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Establishes a connection to the database
|
/// Establishes a connection to the database
|
||||||
pub async fn execute(self) -> Result<Connection> {
|
pub async fn execute(self) -> Result<Connection> {
|
||||||
let internal = Arc::new(Database::connect_with_options(&self).await?);
|
let internal = Arc::new(Database::connect_with_options(&self).await?);
|
||||||
@@ -368,6 +395,8 @@ struct Database {
|
|||||||
|
|
||||||
// the object store wrapper to use on write path
|
// the object store wrapper to use on write path
|
||||||
pub(crate) store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
|
pub(crate) store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
|
||||||
|
|
||||||
|
read_consistency_interval: Option<std::time::Duration>,
|
||||||
}
|
}
|
||||||
|
|
||||||
const LANCE_EXTENSION: &str = "lance";
|
const LANCE_EXTENSION: &str = "lance";
|
||||||
@@ -380,8 +409,11 @@ impl Database {
|
|||||||
let uri = &options.uri;
|
let uri = &options.uri;
|
||||||
let parse_res = url::Url::parse(uri);
|
let parse_res = url::Url::parse(uri);
|
||||||
|
|
||||||
|
// TODO: pass params regardless of OS
|
||||||
match parse_res {
|
match parse_res {
|
||||||
Ok(url) if url.scheme().len() == 1 && cfg!(windows) => Self::open_path(uri).await,
|
Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
|
||||||
|
Self::open_path(uri, options.read_consistency_interval).await
|
||||||
|
}
|
||||||
Ok(mut url) => {
|
Ok(mut url) => {
|
||||||
// iter thru the query params and extract the commit store param
|
// iter thru the query params and extract the commit store param
|
||||||
let mut engine = None;
|
let mut engine = None;
|
||||||
@@ -465,13 +497,17 @@ impl Database {
|
|||||||
base_path,
|
base_path,
|
||||||
object_store,
|
object_store,
|
||||||
store_wrapper: write_store_wrapper,
|
store_wrapper: write_store_wrapper,
|
||||||
|
read_consistency_interval: options.read_consistency_interval,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
Err(_) => Self::open_path(uri).await,
|
Err(_) => Self::open_path(uri, options.read_consistency_interval).await,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn open_path(path: &str) -> Result<Self> {
|
async fn open_path(
|
||||||
|
path: &str,
|
||||||
|
read_consistency_interval: Option<std::time::Duration>,
|
||||||
|
) -> Result<Self> {
|
||||||
let (object_store, base_path) = ObjectStore::from_uri(path).await?;
|
let (object_store, base_path) = ObjectStore::from_uri(path).await?;
|
||||||
if object_store.is_local() {
|
if object_store.is_local() {
|
||||||
Self::try_create_dir(path).context(CreateDirSnafu { path })?;
|
Self::try_create_dir(path).context(CreateDirSnafu { path })?;
|
||||||
@@ -482,6 +518,7 @@ impl Database {
|
|||||||
base_path,
|
base_path,
|
||||||
object_store,
|
object_store,
|
||||||
store_wrapper: None,
|
store_wrapper: None,
|
||||||
|
read_consistency_interval,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -551,6 +588,7 @@ impl ConnectionInternal for Database {
|
|||||||
options.data.unwrap(),
|
options.data.unwrap(),
|
||||||
self.store_wrapper.clone(),
|
self.store_wrapper.clone(),
|
||||||
Some(write_params),
|
Some(write_params),
|
||||||
|
self.read_consistency_interval,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
@@ -576,6 +614,7 @@ impl ConnectionInternal for Database {
|
|||||||
&options.name,
|
&options.name,
|
||||||
self.store_wrapper.clone(),
|
self.store_wrapper.clone(),
|
||||||
options.lance_read_params,
|
options.lance_read_params,
|
||||||
|
self.read_consistency_interval,
|
||||||
)
|
)
|
||||||
.await?,
|
.await?,
|
||||||
))
|
))
|
||||||
@@ -696,6 +735,6 @@ mod tests {
|
|||||||
.execute()
|
.execute()
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(other_schema, overwritten.schema());
|
assert_eq!(other_schema, overwritten.schema().await.unwrap());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -168,7 +168,7 @@ impl IndexBuilder {
|
|||||||
|
|
||||||
/// Build the parameters.
|
/// Build the parameters.
|
||||||
pub async fn build(&self) -> Result<()> {
|
pub async fn build(&self) -> Result<()> {
|
||||||
let schema = self.table.schema();
|
let schema = self.table.schema().await?;
|
||||||
|
|
||||||
// TODO: simplify this after GH lance#1864.
|
// TODO: simplify this after GH lance#1864.
|
||||||
let mut index_type = &self.index_type;
|
let mut index_type = &self.index_type;
|
||||||
@@ -230,7 +230,7 @@ impl IndexBuilder {
|
|||||||
.table
|
.table
|
||||||
.as_native()
|
.as_native()
|
||||||
.expect("Only native table is supported here");
|
.expect("Only native table is supported here");
|
||||||
let mut dataset = tbl.clone_inner_dataset();
|
let mut dataset = tbl.dataset.get_mut().await?;
|
||||||
match params {
|
match params {
|
||||||
IndexParams::Scalar { replace } => {
|
IndexParams::Scalar { replace } => {
|
||||||
dataset
|
dataset
|
||||||
@@ -271,7 +271,6 @@ impl IndexBuilder {
|
|||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tbl.reset_dataset(dataset);
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -12,8 +12,6 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
//! # VectorDB ([LanceDB](https://github.com/lancedb/lancedb)) -- Developer-friendly, serverless vector database for AI applications
|
|
||||||
//!
|
|
||||||
//! [LanceDB](https://github.com/lancedb/lancedb) is an open-source database for vector-search built with persistent storage,
|
//! [LanceDB](https://github.com/lancedb/lancedb) is an open-source database for vector-search built with persistent storage,
|
||||||
//! which greatly simplifies retrevial, filtering and management of embeddings.
|
//! which greatly simplifies retrevial, filtering and management of embeddings.
|
||||||
//!
|
//!
|
||||||
@@ -33,7 +31,7 @@
|
|||||||
//! LanceDB runs in process, to use it in your Rust project, put the following in your `Cargo.toml`:
|
//! LanceDB runs in process, to use it in your Rust project, put the following in your `Cargo.toml`:
|
||||||
//!
|
//!
|
||||||
//! ```ignore
|
//! ```ignore
|
||||||
//! cargo install vectordb
|
//! cargo install lancedb
|
||||||
//! ```
|
//! ```
|
||||||
//!
|
//!
|
||||||
//! ### Quick Start
|
//! ### Quick Start
|
||||||
@@ -45,7 +43,7 @@
|
|||||||
//! ```rust
|
//! ```rust
|
||||||
//! # use arrow_schema::{Field, Schema};
|
//! # use arrow_schema::{Field, Schema};
|
||||||
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
||||||
//! let db = vectordb::connect("data/sample-lancedb").execute().await.unwrap();
|
//! let db = lancedb::connect("data/sample-lancedb").execute().await.unwrap();
|
||||||
//! # });
|
//! # });
|
||||||
//! ```
|
//! ```
|
||||||
//!
|
//!
|
||||||
@@ -60,7 +58,7 @@
|
|||||||
//! ```rust
|
//! ```rust
|
||||||
//! use object_store::aws::AwsCredential;
|
//! use object_store::aws::AwsCredential;
|
||||||
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
||||||
//! let db = vectordb::connect("data/sample-lancedb")
|
//! let db = lancedb::connect("data/sample-lancedb")
|
||||||
//! .aws_creds(AwsCredential {
|
//! .aws_creds(AwsCredential {
|
||||||
//! key_id: "some_key".to_string(),
|
//! key_id: "some_key".to_string(),
|
||||||
//! secret_key: "some_secret".to_string(),
|
//! secret_key: "some_secret".to_string(),
|
||||||
@@ -90,7 +88,7 @@
|
|||||||
//!
|
//!
|
||||||
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
||||||
//! # let tmpdir = tempfile::tempdir().unwrap();
|
//! # let tmpdir = tempfile::tempdir().unwrap();
|
||||||
//! # let db = vectordb::connect(tmpdir.path().to_str().unwrap()).execute().await.unwrap();
|
//! # let db = lancedb::connect(tmpdir.path().to_str().unwrap()).execute().await.unwrap();
|
||||||
//! let schema = Arc::new(Schema::new(vec![
|
//! let schema = Arc::new(Schema::new(vec![
|
||||||
//! Field::new("id", DataType::Int32, false),
|
//! Field::new("id", DataType::Int32, false),
|
||||||
//! Field::new(
|
//! Field::new(
|
||||||
@@ -134,7 +132,7 @@
|
|||||||
//! # use arrow_schema::{Schema, Field, DataType};
|
//! # use arrow_schema::{Schema, Field, DataType};
|
||||||
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
||||||
//! # let tmpdir = tempfile::tempdir().unwrap();
|
//! # let tmpdir = tempfile::tempdir().unwrap();
|
||||||
//! # let db = vectordb::connect(tmpdir.path().to_str().unwrap()).execute().await.unwrap();
|
//! # let db = lancedb::connect(tmpdir.path().to_str().unwrap()).execute().await.unwrap();
|
||||||
//! # let tbl = db.open_table("idx_test").execute().await.unwrap();
|
//! # let tbl = db.open_table("idx_test").execute().await.unwrap();
|
||||||
//! tbl.create_index(&["vector"])
|
//! tbl.create_index(&["vector"])
|
||||||
//! .ivf_pq()
|
//! .ivf_pq()
|
||||||
@@ -155,7 +153,7 @@
|
|||||||
//! # use arrow_array::{FixedSizeListArray, Float32Array, Int32Array, types::Float32Type};
|
//! # use arrow_array::{FixedSizeListArray, Float32Array, Int32Array, types::Float32Type};
|
||||||
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
||||||
//! # let tmpdir = tempfile::tempdir().unwrap();
|
//! # let tmpdir = tempfile::tempdir().unwrap();
|
||||||
//! # let db = vectordb::connect(tmpdir.path().to_str().unwrap()).execute().await.unwrap();
|
//! # let db = lancedb::connect(tmpdir.path().to_str().unwrap()).execute().await.unwrap();
|
||||||
//! # let schema = Arc::new(Schema::new(vec![
|
//! # let schema = Arc::new(Schema::new(vec![
|
||||||
//! # Field::new("id", DataType::Int32, false),
|
//! # Field::new("id", DataType::Int32, false),
|
||||||
//! # Field::new("vector", DataType::FixedSizeList(
|
//! # Field::new("vector", DataType::FixedSizeList(
|
||||||
@@ -12,15 +12,13 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use arrow_array::Float32Array;
|
use arrow_array::Float32Array;
|
||||||
use arrow_schema::Schema;
|
use arrow_schema::Schema;
|
||||||
use lance::dataset::scanner::{DatasetRecordBatchStream, Scanner};
|
use lance::dataset::scanner::{DatasetRecordBatchStream, Scanner};
|
||||||
use lance::dataset::Dataset;
|
|
||||||
use lance_linalg::distance::MetricType;
|
use lance_linalg::distance::MetricType;
|
||||||
|
|
||||||
use crate::error::Result;
|
use crate::error::Result;
|
||||||
|
use crate::table::dataset::DatasetConsistencyWrapper;
|
||||||
use crate::utils::default_vector_column;
|
use crate::utils::default_vector_column;
|
||||||
use crate::Error;
|
use crate::Error;
|
||||||
|
|
||||||
@@ -29,7 +27,7 @@ const DEFAULT_TOP_K: usize = 10;
|
|||||||
/// A builder for nearest neighbor queries for LanceDB.
|
/// A builder for nearest neighbor queries for LanceDB.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Query {
|
pub struct Query {
|
||||||
dataset: Arc<Dataset>,
|
dataset: DatasetConsistencyWrapper,
|
||||||
|
|
||||||
// The column to run the query on. If not specified, we will attempt to guess
|
// The column to run the query on. If not specified, we will attempt to guess
|
||||||
// the column based on the dataset's schema.
|
// the column based on the dataset's schema.
|
||||||
@@ -60,7 +58,8 @@ impl Query {
|
|||||||
/// # Arguments
|
/// # Arguments
|
||||||
///
|
///
|
||||||
/// * `dataset` - Lance dataset.
|
/// * `dataset` - Lance dataset.
|
||||||
pub(crate) fn new(dataset: Arc<Dataset>) -> Self {
|
///
|
||||||
|
pub(crate) fn new(dataset: DatasetConsistencyWrapper) -> Self {
|
||||||
Self {
|
Self {
|
||||||
dataset,
|
dataset,
|
||||||
query_vector: None,
|
query_vector: None,
|
||||||
@@ -82,7 +81,8 @@ impl Query {
|
|||||||
///
|
///
|
||||||
/// * A [DatasetRecordBatchStream] with the query's results.
|
/// * A [DatasetRecordBatchStream] with the query's results.
|
||||||
pub async fn execute_stream(&self) -> Result<DatasetRecordBatchStream> {
|
pub async fn execute_stream(&self) -> Result<DatasetRecordBatchStream> {
|
||||||
let mut scanner: Scanner = self.dataset.scan();
|
let ds_ref = self.dataset.get().await?;
|
||||||
|
let mut scanner: Scanner = ds_ref.scan();
|
||||||
|
|
||||||
if let Some(query) = self.query_vector.as_ref() {
|
if let Some(query) = self.query_vector.as_ref() {
|
||||||
// If there is a vector query, default to limit=10 if unspecified
|
// If there is a vector query, default to limit=10 if unspecified
|
||||||
@@ -90,10 +90,10 @@ impl Query {
|
|||||||
col.clone()
|
col.clone()
|
||||||
} else {
|
} else {
|
||||||
// Infer a vector column with the same dimension of the query vector.
|
// Infer a vector column with the same dimension of the query vector.
|
||||||
let arrow_schema = Schema::from(self.dataset.schema());
|
let arrow_schema = Schema::from(ds_ref.schema());
|
||||||
default_vector_column(&arrow_schema, Some(query.len() as i32))?
|
default_vector_column(&arrow_schema, Some(query.len() as i32))?
|
||||||
};
|
};
|
||||||
let field = self.dataset.schema().field(&column).ok_or(Error::Store {
|
let field = ds_ref.schema().field(&column).ok_or(Error::Store {
|
||||||
message: format!("Column {} not found in dataset schema", column),
|
message: format!("Column {} not found in dataset schema", column),
|
||||||
})?;
|
})?;
|
||||||
if !matches!(field.data_type(), arrow_schema::DataType::FixedSizeList(f, dim) if f.data_type().is_floating() && dim == query.len() as i32)
|
if !matches!(field.data_type(), arrow_schema::DataType::FixedSizeList(f, dim) if f.data_type().is_floating() && dim == query.len() as i32)
|
||||||
@@ -239,8 +239,10 @@ mod tests {
|
|||||||
let batches = make_test_batches();
|
let batches = make_test_batches();
|
||||||
let ds = Dataset::write(batches, "memory://foo", None).await.unwrap();
|
let ds = Dataset::write(batches, "memory://foo", None).await.unwrap();
|
||||||
|
|
||||||
|
let ds = DatasetConsistencyWrapper::new_latest(ds, None);
|
||||||
|
|
||||||
let vector = Some(Float32Array::from_iter_values([0.1, 0.2]));
|
let vector = Some(Float32Array::from_iter_values([0.1, 0.2]));
|
||||||
let query = Query::new(Arc::new(ds)).nearest_to(&[0.1, 0.2]);
|
let query = Query::new(ds).nearest_to(&[0.1, 0.2]);
|
||||||
assert_eq!(query.query_vector, vector);
|
assert_eq!(query.query_vector, vector);
|
||||||
|
|
||||||
let new_vector = Float32Array::from_iter_values([9.8, 8.7]);
|
let new_vector = Float32Array::from_iter_values([9.8, 8.7]);
|
||||||
@@ -264,7 +266,9 @@ mod tests {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_execute() {
|
async fn test_execute() {
|
||||||
let batches = make_non_empty_batches();
|
let batches = make_non_empty_batches();
|
||||||
let ds = Arc::new(Dataset::write(batches, "memory://foo", None).await.unwrap());
|
let ds = Dataset::write(batches, "memory://foo", None).await.unwrap();
|
||||||
|
|
||||||
|
let ds = DatasetConsistencyWrapper::new_latest(ds, None);
|
||||||
|
|
||||||
let query = Query::new(ds.clone()).nearest_to(&[0.1; 4]);
|
let query = Query::new(ds.clone()).nearest_to(&[0.1; 4]);
|
||||||
let result = query.limit(10).filter("id % 2 == 0").execute_stream().await;
|
let result = query.limit(10).filter("id % 2 == 0").execute_stream().await;
|
||||||
@@ -294,9 +298,11 @@ mod tests {
|
|||||||
async fn test_execute_no_vector() {
|
async fn test_execute_no_vector() {
|
||||||
// test that it's ok to not specify a query vector (just filter / limit)
|
// test that it's ok to not specify a query vector (just filter / limit)
|
||||||
let batches = make_non_empty_batches();
|
let batches = make_non_empty_batches();
|
||||||
let ds = Arc::new(Dataset::write(batches, "memory://foo", None).await.unwrap());
|
let ds = Dataset::write(batches, "memory://foo", None).await.unwrap();
|
||||||
|
|
||||||
let query = Query::new(ds.clone());
|
let ds = DatasetConsistencyWrapper::new_latest(ds, None);
|
||||||
|
|
||||||
|
let query = Query::new(ds);
|
||||||
let result = query.filter("id % 2 == 0").execute_stream().await;
|
let result = query.filter("id % 2 == 0").execute_stream().await;
|
||||||
let mut stream = result.expect("should have result");
|
let mut stream = result.expect("should have result");
|
||||||
// should only have one batch
|
// should only have one batch
|
||||||
@@ -15,7 +15,7 @@
|
|||||||
//! LanceDB Table APIs
|
//! LanceDB Table APIs
|
||||||
|
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::Arc;
|
||||||
|
|
||||||
use arrow_array::{RecordBatchIterator, RecordBatchReader};
|
use arrow_array::{RecordBatchIterator, RecordBatchReader};
|
||||||
use arrow_schema::{Schema, SchemaRef};
|
use arrow_schema::{Schema, SchemaRef};
|
||||||
@@ -39,8 +39,10 @@ use crate::index::IndexBuilder;
|
|||||||
use crate::query::Query;
|
use crate::query::Query;
|
||||||
use crate::utils::{PatchReadParam, PatchWriteParam};
|
use crate::utils::{PatchReadParam, PatchWriteParam};
|
||||||
|
|
||||||
|
use self::dataset::DatasetConsistencyWrapper;
|
||||||
use self::merge::{MergeInsert, MergeInsertBuilder};
|
use self::merge::{MergeInsert, MergeInsertBuilder};
|
||||||
|
|
||||||
|
pub(crate) mod dataset;
|
||||||
pub mod merge;
|
pub mod merge;
|
||||||
|
|
||||||
/// Optimize the dataset.
|
/// Optimize the dataset.
|
||||||
@@ -127,7 +129,7 @@ pub trait Table: std::fmt::Display + Send + Sync {
|
|||||||
fn name(&self) -> &str;
|
fn name(&self) -> &str;
|
||||||
|
|
||||||
/// Get the arrow [Schema] of the table.
|
/// Get the arrow [Schema] of the table.
|
||||||
fn schema(&self) -> SchemaRef;
|
async fn schema(&self) -> Result<SchemaRef>;
|
||||||
|
|
||||||
/// Count the number of rows in this dataset.
|
/// Count the number of rows in this dataset.
|
||||||
///
|
///
|
||||||
@@ -162,7 +164,7 @@ pub trait Table: std::fmt::Display + Send + Sync {
|
|||||||
/// # use arrow_schema::{Schema, Field, DataType};
|
/// # use arrow_schema::{Schema, Field, DataType};
|
||||||
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
||||||
/// let tmpdir = tempfile::tempdir().unwrap();
|
/// let tmpdir = tempfile::tempdir().unwrap();
|
||||||
/// let db = vectordb::connect(tmpdir.path().to_str().unwrap())
|
/// let db = lancedb::connect(tmpdir.path().to_str().unwrap())
|
||||||
/// .execute()
|
/// .execute()
|
||||||
/// .await
|
/// .await
|
||||||
/// .unwrap();
|
/// .unwrap();
|
||||||
@@ -210,7 +212,7 @@ pub trait Table: std::fmt::Display + Send + Sync {
|
|||||||
/// # use arrow_schema::{Schema, Field, DataType};
|
/// # use arrow_schema::{Schema, Field, DataType};
|
||||||
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
||||||
/// let tmpdir = tempfile::tempdir().unwrap();
|
/// let tmpdir = tempfile::tempdir().unwrap();
|
||||||
/// let db = vectordb::connect(tmpdir.path().to_str().unwrap())
|
/// let db = lancedb::connect(tmpdir.path().to_str().unwrap())
|
||||||
/// .execute()
|
/// .execute()
|
||||||
/// .await
|
/// .await
|
||||||
/// .unwrap();
|
/// .unwrap();
|
||||||
@@ -264,7 +266,7 @@ pub trait Table: std::fmt::Display + Send + Sync {
|
|||||||
/// # use arrow_schema::{Schema, Field, DataType};
|
/// # use arrow_schema::{Schema, Field, DataType};
|
||||||
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
||||||
/// let tmpdir = tempfile::tempdir().unwrap();
|
/// let tmpdir = tempfile::tempdir().unwrap();
|
||||||
/// let db = vectordb::connect(tmpdir.path().to_str().unwrap())
|
/// let db = lancedb::connect(tmpdir.path().to_str().unwrap())
|
||||||
/// .execute()
|
/// .execute()
|
||||||
/// .await
|
/// .await
|
||||||
/// .unwrap();
|
/// .unwrap();
|
||||||
@@ -322,7 +324,8 @@ pub trait Table: std::fmt::Display + Send + Sync {
|
|||||||
/// # use arrow_array::RecordBatch;
|
/// # use arrow_array::RecordBatch;
|
||||||
/// # use futures::TryStreamExt;
|
/// # use futures::TryStreamExt;
|
||||||
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
||||||
/// # let tbl = vectordb::table::NativeTable::open("/tmp/tbl").await.unwrap();
|
/// # let tbl = lancedb::table::NativeTable::open("/tmp/tbl").await.unwrap();
|
||||||
|
/// use crate::lancedb::Table;
|
||||||
/// let stream = tbl
|
/// let stream = tbl
|
||||||
/// .query()
|
/// .query()
|
||||||
/// .nearest_to(&[1.0, 2.0, 3.0])
|
/// .nearest_to(&[1.0, 2.0, 3.0])
|
||||||
@@ -340,7 +343,8 @@ pub trait Table: std::fmt::Display + Send + Sync {
|
|||||||
/// # use arrow_array::RecordBatch;
|
/// # use arrow_array::RecordBatch;
|
||||||
/// # use futures::TryStreamExt;
|
/// # use futures::TryStreamExt;
|
||||||
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
||||||
/// # let tbl = vectordb::table::NativeTable::open("/tmp/tbl").await.unwrap();
|
/// # let tbl = lancedb::table::NativeTable::open("/tmp/tbl").await.unwrap();
|
||||||
|
/// use crate::lancedb::Table;
|
||||||
/// let stream = tbl
|
/// let stream = tbl
|
||||||
/// .query()
|
/// .query()
|
||||||
/// .filter("id > 5")
|
/// .filter("id > 5")
|
||||||
@@ -357,7 +361,8 @@ pub trait Table: std::fmt::Display + Send + Sync {
|
|||||||
/// # use arrow_array::RecordBatch;
|
/// # use arrow_array::RecordBatch;
|
||||||
/// # use futures::TryStreamExt;
|
/// # use futures::TryStreamExt;
|
||||||
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
||||||
/// # let tbl = vectordb::table::NativeTable::open("/tmp/tbl").await.unwrap();
|
/// # let tbl = lancedb::table::NativeTable::open("/tmp/tbl").await.unwrap();
|
||||||
|
/// use crate::lancedb::Table;
|
||||||
/// let stream = tbl.query().execute_stream().await.unwrap();
|
/// let stream = tbl.query().execute_stream().await.unwrap();
|
||||||
/// let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
|
/// let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
|
||||||
/// # });
|
/// # });
|
||||||
@@ -368,7 +373,7 @@ pub trait Table: std::fmt::Display + Send + Sync {
|
|||||||
///
|
///
|
||||||
/// <section class="warning">Experimental API</section>
|
/// <section class="warning">Experimental API</section>
|
||||||
///
|
///
|
||||||
/// Modeled after ``VACCUM`` in PostgreSQL.
|
/// Modeled after ``VACUUM`` in PostgreSQL.
|
||||||
/// Not all implementations support explicit optimization.
|
/// Not all implementations support explicit optimization.
|
||||||
async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats>;
|
async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats>;
|
||||||
}
|
}
|
||||||
@@ -381,10 +386,14 @@ pub type TableRef = Arc<dyn Table>;
|
|||||||
pub struct NativeTable {
|
pub struct NativeTable {
|
||||||
name: String,
|
name: String,
|
||||||
uri: String,
|
uri: String,
|
||||||
dataset: Arc<Mutex<Dataset>>,
|
pub(crate) dataset: dataset::DatasetConsistencyWrapper,
|
||||||
|
|
||||||
// the object store wrapper to use on write path
|
// the object store wrapper to use on write path
|
||||||
store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
|
store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
|
||||||
|
|
||||||
|
// This comes from the connection options. We store here so we can pass down
|
||||||
|
// to the dataset when we recreate it (for example, in checkout_latest).
|
||||||
|
read_consistency_interval: Option<std::time::Duration>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Display for NativeTable {
|
impl std::fmt::Display for NativeTable {
|
||||||
@@ -406,7 +415,7 @@ impl NativeTable {
|
|||||||
/// * A [NativeTable] object.
|
/// * A [NativeTable] object.
|
||||||
pub async fn open(uri: &str) -> Result<Self> {
|
pub async fn open(uri: &str) -> Result<Self> {
|
||||||
let name = Self::get_table_name(uri)?;
|
let name = Self::get_table_name(uri)?;
|
||||||
Self::open_with_params(uri, &name, None, None).await
|
Self::open_with_params(uri, &name, None, None, None).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Opens an existing Table
|
/// Opens an existing Table
|
||||||
@@ -425,6 +434,7 @@ impl NativeTable {
|
|||||||
name: &str,
|
name: &str,
|
||||||
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
|
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
|
||||||
params: Option<ReadParams>,
|
params: Option<ReadParams>,
|
||||||
|
read_consistency_interval: Option<std::time::Duration>,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
let params = params.unwrap_or_default();
|
let params = params.unwrap_or_default();
|
||||||
// patch the params if we have a write store wrapper
|
// patch the params if we have a write store wrapper
|
||||||
@@ -445,23 +455,22 @@ impl NativeTable {
|
|||||||
message: e.to_string(),
|
message: e.to_string(),
|
||||||
},
|
},
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
|
let dataset = DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval);
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
name: name.to_string(),
|
name: name.to_string(),
|
||||||
uri: uri.to_string(),
|
uri: uri.to_string(),
|
||||||
dataset: Arc::new(Mutex::new(dataset)),
|
dataset,
|
||||||
store_wrapper: write_store_wrapper,
|
store_wrapper: write_store_wrapper,
|
||||||
|
read_consistency_interval,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Make a new clone of the internal lance dataset.
|
|
||||||
pub(crate) fn clone_inner_dataset(&self) -> Dataset {
|
|
||||||
self.dataset.lock().expect("Lock poison").clone()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Checkout a specific version of this [NativeTable]
|
/// Checkout a specific version of this [NativeTable]
|
||||||
pub async fn checkout(uri: &str, version: u64) -> Result<Self> {
|
pub async fn checkout(uri: &str, version: u64) -> Result<Self> {
|
||||||
let name = Self::get_table_name(uri)?;
|
let name = Self::get_table_name(uri)?;
|
||||||
Self::checkout_with_params(uri, &name, version, None, ReadParams::default()).await
|
Self::checkout_with_params(uri, &name, version, None, ReadParams::default(), None).await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn checkout_with_params(
|
pub async fn checkout_with_params(
|
||||||
@@ -470,44 +479,35 @@ impl NativeTable {
|
|||||||
version: u64,
|
version: u64,
|
||||||
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
|
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
|
||||||
params: ReadParams,
|
params: ReadParams,
|
||||||
|
read_consistency_interval: Option<std::time::Duration>,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
// patch the params if we have a write store wrapper
|
// patch the params if we have a write store wrapper
|
||||||
let params = match write_store_wrapper.clone() {
|
let params = match write_store_wrapper.clone() {
|
||||||
Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
|
Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
|
||||||
None => params,
|
None => params,
|
||||||
};
|
};
|
||||||
let dataset = Dataset::checkout_with_params(uri, version, ¶ms)
|
let dataset = DatasetBuilder::from_uri(uri)
|
||||||
.await
|
.with_version(version)
|
||||||
.map_err(|e| match e {
|
.with_read_params(params)
|
||||||
lance::Error::DatasetNotFound { .. } => Error::TableNotFound {
|
.load()
|
||||||
name: name.to_string(),
|
.await?;
|
||||||
},
|
let dataset = DatasetConsistencyWrapper::new_time_travel(dataset, version);
|
||||||
e => Error::Lance {
|
|
||||||
message: e.to_string(),
|
|
||||||
},
|
|
||||||
})?;
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
name: name.to_string(),
|
name: name.to_string(),
|
||||||
uri: uri.to_string(),
|
uri: uri.to_string(),
|
||||||
dataset: Arc::new(Mutex::new(dataset)),
|
dataset,
|
||||||
store_wrapper: write_store_wrapper,
|
store_wrapper: write_store_wrapper,
|
||||||
|
read_consistency_interval,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn checkout_latest(&self) -> Result<Self> {
|
pub async fn checkout_latest(&self) -> Result<Self> {
|
||||||
let dataset = self.clone_inner_dataset();
|
let mut dataset = self.dataset.duplicate().await;
|
||||||
let latest_version_id = dataset.latest_version_id().await?;
|
dataset.as_latest(self.read_consistency_interval).await?;
|
||||||
let dataset = if latest_version_id == dataset.version().version {
|
|
||||||
dataset
|
|
||||||
} else {
|
|
||||||
dataset.checkout_version(latest_version_id).await?
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
name: self.name.clone(),
|
dataset,
|
||||||
uri: self.uri.clone(),
|
..self.clone()
|
||||||
dataset: Arc::new(Mutex::new(dataset)),
|
|
||||||
store_wrapper: self.store_wrapper.clone(),
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -543,6 +543,7 @@ impl NativeTable {
|
|||||||
batches: impl RecordBatchReader + Send + 'static,
|
batches: impl RecordBatchReader + Send + 'static,
|
||||||
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
|
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
|
||||||
params: Option<WriteParams>,
|
params: Option<WriteParams>,
|
||||||
|
read_consistency_interval: Option<std::time::Duration>,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
let params = params.unwrap_or_default();
|
let params = params.unwrap_or_default();
|
||||||
// patch the params if we have a write store wrapper
|
// patch the params if we have a write store wrapper
|
||||||
@@ -564,8 +565,9 @@ impl NativeTable {
|
|||||||
Ok(Self {
|
Ok(Self {
|
||||||
name: name.to_string(),
|
name: name.to_string(),
|
||||||
uri: uri.to_string(),
|
uri: uri.to_string(),
|
||||||
dataset: Arc::new(Mutex::new(dataset)),
|
dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
|
||||||
store_wrapper: write_store_wrapper,
|
store_wrapper: write_store_wrapper,
|
||||||
|
read_consistency_interval,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -575,34 +577,35 @@ impl NativeTable {
|
|||||||
schema: SchemaRef,
|
schema: SchemaRef,
|
||||||
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
|
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
|
||||||
params: Option<WriteParams>,
|
params: Option<WriteParams>,
|
||||||
|
read_consistency_interval: Option<std::time::Duration>,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
let batches = RecordBatchIterator::new(vec![], schema);
|
let batches = RecordBatchIterator::new(vec![], schema);
|
||||||
Self::create(uri, name, batches, write_store_wrapper, params).await
|
Self::create(
|
||||||
|
uri,
|
||||||
|
name,
|
||||||
|
batches,
|
||||||
|
write_store_wrapper,
|
||||||
|
params,
|
||||||
|
read_consistency_interval,
|
||||||
|
)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Version of this Table
|
/// Version of this Table
|
||||||
pub fn version(&self) -> u64 {
|
pub async fn version(&self) -> Result<u64> {
|
||||||
self.dataset.lock().expect("lock poison").version().version
|
Ok(self.dataset.get().await?.version().version)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn optimize_indices(&self, options: &OptimizeOptions) -> Result<()> {
|
async fn optimize_indices(&self, options: &OptimizeOptions) -> Result<()> {
|
||||||
info!("LanceDB: optimizing indices: {:?}", options);
|
info!("LanceDB: optimizing indices: {:?}", options);
|
||||||
let mut dataset = self.clone_inner_dataset();
|
self.dataset
|
||||||
dataset.optimize_indices(options).await?;
|
.get_mut()
|
||||||
|
.await?
|
||||||
|
.optimize_indices(options)
|
||||||
|
.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn query(&self) -> Query {
|
|
||||||
Query::new(self.clone_inner_dataset().into())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn filter(&self, expr: String) -> Query {
|
|
||||||
Query::new(self.clone_inner_dataset().into()).filter(expr)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the number of rows in this Table
|
|
||||||
|
|
||||||
/// Merge new data into this table.
|
/// Merge new data into this table.
|
||||||
pub async fn merge(
|
pub async fn merge(
|
||||||
&mut self,
|
&mut self,
|
||||||
@@ -610,14 +613,17 @@ impl NativeTable {
|
|||||||
left_on: &str,
|
left_on: &str,
|
||||||
right_on: &str,
|
right_on: &str,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let mut dataset = self.clone_inner_dataset();
|
self.dataset
|
||||||
dataset.merge(batches, left_on, right_on).await?;
|
.get_mut()
|
||||||
self.dataset = Arc::new(Mutex::new(dataset));
|
.await?
|
||||||
|
.merge(batches, left_on, right_on)
|
||||||
|
.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn update(&self, predicate: Option<&str>, updates: Vec<(&str, &str)>) -> Result<()> {
|
pub async fn update(&self, predicate: Option<&str>, updates: Vec<(&str, &str)>) -> Result<()> {
|
||||||
let mut builder = UpdateBuilder::new(self.clone_inner_dataset().into());
|
let dataset = self.dataset.get().await?.clone();
|
||||||
|
let mut builder = UpdateBuilder::new(Arc::new(dataset));
|
||||||
if let Some(predicate) = predicate {
|
if let Some(predicate) = predicate {
|
||||||
builder = builder.update_where(predicate)?;
|
builder = builder.update_where(predicate)?;
|
||||||
}
|
}
|
||||||
@@ -628,7 +634,7 @@ impl NativeTable {
|
|||||||
|
|
||||||
let operation = builder.build()?;
|
let operation = builder.build()?;
|
||||||
let ds = operation.execute().await?;
|
let ds = operation.execute().await?;
|
||||||
self.reset_dataset(ds.as_ref().clone());
|
self.dataset.set_latest(ds.as_ref().clone()).await;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -648,8 +654,10 @@ impl NativeTable {
|
|||||||
older_than: Duration,
|
older_than: Duration,
|
||||||
delete_unverified: Option<bool>,
|
delete_unverified: Option<bool>,
|
||||||
) -> Result<RemovalStats> {
|
) -> Result<RemovalStats> {
|
||||||
let dataset = self.clone_inner_dataset();
|
Ok(self
|
||||||
Ok(dataset
|
.dataset
|
||||||
|
.get_mut()
|
||||||
|
.await?
|
||||||
.cleanup_old_versions(older_than, delete_unverified)
|
.cleanup_old_versions(older_than, delete_unverified)
|
||||||
.await?)
|
.await?)
|
||||||
}
|
}
|
||||||
@@ -665,24 +673,27 @@ impl NativeTable {
|
|||||||
options: CompactionOptions,
|
options: CompactionOptions,
|
||||||
remap_options: Option<Arc<dyn IndexRemapperOptions>>,
|
remap_options: Option<Arc<dyn IndexRemapperOptions>>,
|
||||||
) -> Result<CompactionMetrics> {
|
) -> Result<CompactionMetrics> {
|
||||||
let mut dataset = self.clone_inner_dataset();
|
let mut dataset_mut = self.dataset.get_mut().await?;
|
||||||
let metrics = compact_files(&mut dataset, options, remap_options).await?;
|
let metrics = compact_files(&mut dataset_mut, options, remap_options).await?;
|
||||||
self.reset_dataset(dataset);
|
|
||||||
Ok(metrics)
|
Ok(metrics)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn count_fragments(&self) -> usize {
|
// TODO: why are these individual methods and not some single "get_stats" method?
|
||||||
self.dataset.lock().expect("lock poison").count_fragments()
|
pub async fn count_fragments(&self) -> Result<usize> {
|
||||||
|
Ok(self.dataset.get().await?.count_fragments())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn count_deleted_rows(&self) -> Result<usize> {
|
pub async fn count_deleted_rows(&self) -> Result<usize> {
|
||||||
let dataset = self.clone_inner_dataset();
|
Ok(self.dataset.get().await?.count_deleted_rows().await?)
|
||||||
Ok(dataset.count_deleted_rows().await?)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn num_small_files(&self, max_rows_per_group: usize) -> usize {
|
pub async fn num_small_files(&self, max_rows_per_group: usize) -> Result<usize> {
|
||||||
let dataset = self.clone_inner_dataset();
|
Ok(self
|
||||||
dataset.num_small_files(max_rows_per_group).await
|
.dataset
|
||||||
|
.get()
|
||||||
|
.await?
|
||||||
|
.num_small_files(max_rows_per_group)
|
||||||
|
.await)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn count_indexed_rows(&self, index_uuid: &str) -> Result<Option<usize>> {
|
pub async fn count_indexed_rows(&self, index_uuid: &str) -> Result<Option<usize>> {
|
||||||
@@ -700,7 +711,7 @@ impl NativeTable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn load_indices(&self) -> Result<Vec<VectorIndex>> {
|
pub async fn load_indices(&self) -> Result<Vec<VectorIndex>> {
|
||||||
let dataset = self.clone_inner_dataset();
|
let dataset = self.dataset.get().await?;
|
||||||
let (indices, mf) = futures::try_join!(dataset.load_indices(), dataset.latest_manifest())?;
|
let (indices, mf) = futures::try_join!(dataset.load_indices(), dataset.latest_manifest())?;
|
||||||
Ok(indices
|
Ok(indices
|
||||||
.iter()
|
.iter()
|
||||||
@@ -717,7 +728,7 @@ impl NativeTable {
|
|||||||
if index.is_none() {
|
if index.is_none() {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
let dataset = self.clone_inner_dataset();
|
let dataset = self.dataset.get().await?;
|
||||||
let index_stats = dataset.index_statistics(&index.unwrap().index_name).await?;
|
let index_stats = dataset.index_statistics(&index.unwrap().index_name).await?;
|
||||||
let index_stats: VectorIndexStatistics =
|
let index_stats: VectorIndexStatistics =
|
||||||
serde_json::from_str(&index_stats).map_err(|e| Error::Lance {
|
serde_json::from_str(&index_stats).map_err(|e| Error::Lance {
|
||||||
@@ -729,10 +740,6 @@ impl NativeTable {
|
|||||||
|
|
||||||
Ok(Some(index_stats))
|
Ok(Some(index_stats))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn reset_dataset(&self, dataset: Dataset) {
|
|
||||||
*self.dataset.lock().expect("lock poison") = dataset;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
@@ -742,7 +749,7 @@ impl MergeInsert for NativeTable {
|
|||||||
params: MergeInsertBuilder,
|
params: MergeInsertBuilder,
|
||||||
new_data: Box<dyn RecordBatchReader + Send>,
|
new_data: Box<dyn RecordBatchReader + Send>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let dataset = Arc::new(self.clone_inner_dataset());
|
let dataset = Arc::new(self.dataset.get().await?.clone());
|
||||||
let mut builder = LanceMergeInsertBuilder::try_new(dataset.clone(), params.on)?;
|
let mut builder = LanceMergeInsertBuilder::try_new(dataset.clone(), params.on)?;
|
||||||
match (
|
match (
|
||||||
params.when_matched_update_all,
|
params.when_matched_update_all,
|
||||||
@@ -769,7 +776,7 @@ impl MergeInsert for NativeTable {
|
|||||||
}
|
}
|
||||||
let job = builder.try_build()?;
|
let job = builder.try_build()?;
|
||||||
let new_dataset = job.execute_reader(new_data).await?;
|
let new_dataset = job.execute_reader(new_data).await?;
|
||||||
self.reset_dataset((*new_dataset).clone());
|
self.dataset.set_latest(new_dataset.as_ref().clone()).await;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -788,13 +795,13 @@ impl Table for NativeTable {
|
|||||||
self.name.as_str()
|
self.name.as_str()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn schema(&self) -> SchemaRef {
|
async fn schema(&self) -> Result<SchemaRef> {
|
||||||
let lance_schema = { self.dataset.lock().expect("lock poison").schema().clone() };
|
let lance_schema = self.dataset.get().await?.schema().clone();
|
||||||
Arc::new(Schema::from(&lance_schema))
|
Ok(Arc::new(Schema::from(&lance_schema)))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn count_rows(&self, filter: Option<String>) -> Result<usize> {
|
async fn count_rows(&self, filter: Option<String>) -> Result<usize> {
|
||||||
let dataset = { self.dataset.lock().expect("lock poison").clone() };
|
let dataset = self.dataset.get().await?;
|
||||||
if let Some(filter) = filter {
|
if let Some(filter) = filter {
|
||||||
let mut scanner = dataset.scan();
|
let mut scanner = dataset.scan();
|
||||||
scanner.filter(&filter)?;
|
scanner.filter(&filter)?;
|
||||||
@@ -826,7 +833,8 @@ impl Table for NativeTable {
|
|||||||
None => lance_params,
|
None => lance_params,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.reset_dataset(Dataset::write(batches, &self.uri, Some(lance_params)).await?);
|
let dataset = Dataset::write(batches, &self.uri, Some(lance_params)).await?;
|
||||||
|
self.dataset.set_latest(dataset).await;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -840,14 +848,12 @@ impl Table for NativeTable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn query(&self) -> Query {
|
fn query(&self) -> Query {
|
||||||
Query::new(Arc::new(self.dataset.lock().expect("lock poison").clone()))
|
Query::new(self.dataset.clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Delete rows from the table
|
/// Delete rows from the table
|
||||||
async fn delete(&self, predicate: &str) -> Result<()> {
|
async fn delete(&self, predicate: &str) -> Result<()> {
|
||||||
let mut dataset = self.clone_inner_dataset();
|
self.dataset.get_mut().await?.delete(predicate).await?;
|
||||||
dataset.delete(predicate).await?;
|
|
||||||
self.reset_dataset(dataset);
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -903,6 +909,7 @@ mod tests {
|
|||||||
use std::iter;
|
use std::iter;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use arrow_array::{
|
use arrow_array::{
|
||||||
Array, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
|
Array, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
|
||||||
@@ -918,6 +925,8 @@ mod tests {
|
|||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use tempfile::tempdir;
|
use tempfile::tempdir;
|
||||||
|
|
||||||
|
use crate::connection::ConnectBuilder;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
@@ -960,7 +969,7 @@ mod tests {
|
|||||||
let uri = tmp_dir.path().to_str().unwrap();
|
let uri = tmp_dir.path().to_str().unwrap();
|
||||||
|
|
||||||
let batches = make_test_batches();
|
let batches = make_test_batches();
|
||||||
let table = NativeTable::create(uri, "test", batches, None, None)
|
let table = NativeTable::create(uri, "test", batches, None, None, None)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
@@ -978,7 +987,7 @@ mod tests {
|
|||||||
|
|
||||||
let batches = make_test_batches();
|
let batches = make_test_batches();
|
||||||
let schema = batches.schema().clone();
|
let schema = batches.schema().clone();
|
||||||
let table = NativeTable::create(uri, "test", batches, None, None)
|
let table = NativeTable::create(uri, "test", batches, None, None, None)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(table.count_rows(None).await.unwrap(), 10);
|
assert_eq!(table.count_rows(None).await.unwrap(), 10);
|
||||||
@@ -1009,7 +1018,7 @@ mod tests {
|
|||||||
|
|
||||||
// Create a dataset with i=0..10
|
// Create a dataset with i=0..10
|
||||||
let batches = merge_insert_test_batches(0, 0);
|
let batches = merge_insert_test_batches(0, 0);
|
||||||
let table = NativeTable::create(uri, "test", batches, None, None)
|
let table = NativeTable::create(uri, "test", batches, None, None, None)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(table.count_rows(None).await.unwrap(), 10);
|
assert_eq!(table.count_rows(None).await.unwrap(), 10);
|
||||||
@@ -1055,7 +1064,7 @@ mod tests {
|
|||||||
|
|
||||||
let batches = make_test_batches();
|
let batches = make_test_batches();
|
||||||
let schema = batches.schema().clone();
|
let schema = batches.schema().clone();
|
||||||
let table = NativeTable::create(uri, "test", batches, None, None)
|
let table = NativeTable::create(uri, "test", batches, None, None, None)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(table.count_rows(None).await.unwrap(), 10);
|
assert_eq!(table.count_rows(None).await.unwrap(), 10);
|
||||||
@@ -1410,7 +1419,7 @@ mod tests {
|
|||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
assert!(!wrapper.called());
|
assert!(!wrapper.called());
|
||||||
let _ = NativeTable::open_with_params(uri, "test", None, Some(param))
|
let _ = NativeTable::open_with_params(uri, "test", None, Some(param), None)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert!(wrapper.called());
|
assert!(wrapper.called());
|
||||||
@@ -1484,7 +1493,7 @@ mod tests {
|
|||||||
schema,
|
schema,
|
||||||
);
|
);
|
||||||
|
|
||||||
let table = NativeTable::create(uri, "test", batches, None, None)
|
let table = NativeTable::create(uri, "test", batches, None, None, None)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
@@ -1529,4 +1538,68 @@ mod tests {
|
|||||||
|
|
||||||
Ok(FixedSizeListArray::from(data))
|
Ok(FixedSizeListArray::from(data))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_read_consistency_interval() {
|
||||||
|
let batch = RecordBatch::try_new(
|
||||||
|
Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])),
|
||||||
|
vec![Arc::new(Int32Array::from(vec![1]))],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let intervals = vec![
|
||||||
|
None,
|
||||||
|
Some(0),
|
||||||
|
Some(100), // 100 ms
|
||||||
|
];
|
||||||
|
|
||||||
|
for interval in intervals {
|
||||||
|
let tmp_dir = tempdir().unwrap();
|
||||||
|
let uri = tmp_dir.path().to_str().unwrap();
|
||||||
|
|
||||||
|
let conn1 = ConnectBuilder::new(uri).execute().await.unwrap();
|
||||||
|
let table1 = conn1
|
||||||
|
.create_empty_table("my_table", batch.schema())
|
||||||
|
.execute()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let mut conn2 = ConnectBuilder::new(uri);
|
||||||
|
if let Some(interval) = interval {
|
||||||
|
conn2 = conn2.read_consistency_interval(std::time::Duration::from_millis(interval));
|
||||||
|
}
|
||||||
|
let conn2 = conn2.execute().await.unwrap();
|
||||||
|
let table2 = conn2.open_table("my_table").execute().await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(table1.count_rows(None).await.unwrap(), 0);
|
||||||
|
assert_eq!(table2.count_rows(None).await.unwrap(), 0);
|
||||||
|
|
||||||
|
table1
|
||||||
|
.add(
|
||||||
|
Box::new(RecordBatchIterator::new(
|
||||||
|
vec![Ok(batch.clone())],
|
||||||
|
batch.schema(),
|
||||||
|
)),
|
||||||
|
AddDataOptions::default(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(table1.count_rows(None).await.unwrap(), 1);
|
||||||
|
|
||||||
|
match interval {
|
||||||
|
None => {
|
||||||
|
assert_eq!(table2.count_rows(None).await.unwrap(), 0);
|
||||||
|
}
|
||||||
|
Some(0) => {
|
||||||
|
assert_eq!(table2.count_rows(None).await.unwrap(), 1);
|
||||||
|
}
|
||||||
|
Some(100) => {
|
||||||
|
assert_eq!(table2.count_rows(None).await.unwrap(), 0);
|
||||||
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
|
assert_eq!(table2.count_rows(None).await.unwrap(), 1);
|
||||||
|
}
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
234
rust/lancedb/src/table/dataset.rs
Normal file
234
rust/lancedb/src/table/dataset.rs
Normal file
@@ -0,0 +1,234 @@
|
|||||||
|
// Copyright 2024 LanceDB Developers.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
use std::{
|
||||||
|
ops::{Deref, DerefMut},
|
||||||
|
sync::Arc,
|
||||||
|
time::{self, Duration, Instant},
|
||||||
|
};
|
||||||
|
|
||||||
|
use lance::Dataset;
|
||||||
|
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||||
|
|
||||||
|
use crate::error::Result;
|
||||||
|
|
||||||
|
/// A wrapper around a [Dataset] that provides lazy-loading and consistency checks.
|
||||||
|
///
|
||||||
|
/// This can be cloned cheaply. It supports concurrent reads or exclusive writes.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct DatasetConsistencyWrapper(Arc<RwLock<DatasetRef>>);
|
||||||
|
|
||||||
|
/// A wrapper around a [Dataset] that provides consistency checks.
|
||||||
|
///
|
||||||
|
/// The dataset is lazily loaded, and starts off as None. On the first access,
|
||||||
|
/// the dataset is loaded.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
enum DatasetRef {
|
||||||
|
/// In this mode, the dataset is always the latest version.
|
||||||
|
Latest {
|
||||||
|
dataset: Dataset,
|
||||||
|
read_consistency_interval: Option<Duration>,
|
||||||
|
last_consistency_check: Option<time::Instant>,
|
||||||
|
},
|
||||||
|
/// In this mode, the dataset is a specific version. It cannot be mutated.
|
||||||
|
TimeTravel { dataset: Dataset, version: u64 },
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DatasetRef {
|
||||||
|
/// Reload the dataset to the appropriate version.
|
||||||
|
async fn reload(&mut self) -> Result<()> {
|
||||||
|
match self {
|
||||||
|
Self::Latest {
|
||||||
|
dataset,
|
||||||
|
last_consistency_check,
|
||||||
|
..
|
||||||
|
} => {
|
||||||
|
*dataset = dataset
|
||||||
|
.checkout_version(dataset.latest_version_id().await?)
|
||||||
|
.await?;
|
||||||
|
last_consistency_check.replace(Instant::now());
|
||||||
|
}
|
||||||
|
Self::TimeTravel { dataset, version } => {
|
||||||
|
dataset.checkout_version(*version).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn as_latest(&mut self, read_consistency_interval: Option<Duration>) -> Result<()> {
|
||||||
|
match self {
|
||||||
|
Self::Latest { .. } => Ok(()),
|
||||||
|
Self::TimeTravel { dataset, .. } => {
|
||||||
|
dataset
|
||||||
|
.checkout_version(dataset.latest_version_id().await?)
|
||||||
|
.await?;
|
||||||
|
*self = Self::Latest {
|
||||||
|
dataset: dataset.clone(),
|
||||||
|
read_consistency_interval,
|
||||||
|
last_consistency_check: Some(Instant::now()),
|
||||||
|
};
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_latest(&mut self, dataset: Dataset) {
|
||||||
|
match self {
|
||||||
|
Self::Latest {
|
||||||
|
dataset: ref mut ds,
|
||||||
|
..
|
||||||
|
} => {
|
||||||
|
*ds = dataset;
|
||||||
|
}
|
||||||
|
_ => unreachable!("Dataset should be in latest mode at this point"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DatasetConsistencyWrapper {
|
||||||
|
/// Create a new wrapper in the latest version mode.
|
||||||
|
pub fn new_latest(dataset: Dataset, read_consistency_interval: Option<Duration>) -> Self {
|
||||||
|
Self(Arc::new(RwLock::new(DatasetRef::Latest {
|
||||||
|
dataset,
|
||||||
|
read_consistency_interval,
|
||||||
|
last_consistency_check: None,
|
||||||
|
})))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a new wrapper in the time travel mode.
|
||||||
|
pub fn new_time_travel(dataset: Dataset, version: u64) -> Self {
|
||||||
|
Self(Arc::new(RwLock::new(DatasetRef::TimeTravel {
|
||||||
|
dataset,
|
||||||
|
version,
|
||||||
|
})))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create an independent copy of self.
|
||||||
|
///
|
||||||
|
/// Unlike Clone, this will track versions independently of the original wrapper and
|
||||||
|
/// will be tied to a different RwLock.
|
||||||
|
pub async fn duplicate(&self) -> Self {
|
||||||
|
let ds_ref = self.0.read().await;
|
||||||
|
Self(Arc::new(RwLock::new((*ds_ref).clone())))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get an immutable reference to the dataset.
|
||||||
|
pub async fn get(&self) -> Result<DatasetReadGuard<'_>> {
|
||||||
|
self.ensure_up_to_date().await?;
|
||||||
|
Ok(DatasetReadGuard {
|
||||||
|
guard: self.0.read().await,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get a mutable reference to the dataset.
|
||||||
|
pub async fn get_mut(&self) -> Result<DatasetWriteGuard<'_>> {
|
||||||
|
self.ensure_up_to_date().await?;
|
||||||
|
Ok(DatasetWriteGuard {
|
||||||
|
guard: self.0.write().await,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Convert into a wrapper in latest version mode
|
||||||
|
pub async fn as_latest(&mut self, read_consistency_interval: Option<Duration>) -> Result<()> {
|
||||||
|
self.0
|
||||||
|
.write()
|
||||||
|
.await
|
||||||
|
.as_latest(read_consistency_interval)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Provide a known latest version of the dataset.
|
||||||
|
///
|
||||||
|
/// This is usually done after some write operation, which inherently will
|
||||||
|
/// have the latest version.
|
||||||
|
pub async fn set_latest(&self, dataset: Dataset) {
|
||||||
|
self.0.write().await.set_latest(dataset);
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn reload(&self) -> Result<()> {
|
||||||
|
self.0.write().await.reload().await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn is_up_to_date(&self) -> Result<bool> {
|
||||||
|
let dataset_ref = self.0.read().await;
|
||||||
|
match &*dataset_ref {
|
||||||
|
DatasetRef::Latest {
|
||||||
|
read_consistency_interval,
|
||||||
|
last_consistency_check,
|
||||||
|
..
|
||||||
|
} => match (read_consistency_interval, last_consistency_check) {
|
||||||
|
(None, _) => Ok(true),
|
||||||
|
(Some(_), None) => Ok(false),
|
||||||
|
(Some(read_consistency_interval), Some(last_consistency_check)) => {
|
||||||
|
if &last_consistency_check.elapsed() < read_consistency_interval {
|
||||||
|
Ok(true)
|
||||||
|
} else {
|
||||||
|
Ok(false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
DatasetRef::TimeTravel { dataset, version } => {
|
||||||
|
Ok(dataset.version().version == *version)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Ensures that the dataset is loaded and up-to-date with consistency and
|
||||||
|
/// version parameters.
|
||||||
|
async fn ensure_up_to_date(&self) -> Result<()> {
|
||||||
|
if !self.is_up_to_date().await? {
|
||||||
|
self.reload().await?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct DatasetReadGuard<'a> {
|
||||||
|
guard: RwLockReadGuard<'a, DatasetRef>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Deref for DatasetReadGuard<'_> {
|
||||||
|
type Target = Dataset;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
match &*self.guard {
|
||||||
|
DatasetRef::Latest { dataset, .. } => dataset,
|
||||||
|
DatasetRef::TimeTravel { dataset, .. } => dataset,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct DatasetWriteGuard<'a> {
|
||||||
|
guard: RwLockWriteGuard<'a, DatasetRef>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Deref for DatasetWriteGuard<'_> {
|
||||||
|
type Target = Dataset;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
match &*self.guard {
|
||||||
|
DatasetRef::Latest { dataset, .. } => dataset,
|
||||||
|
DatasetRef::TimeTravel { dataset, .. } => dataset,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DerefMut for DatasetWriteGuard<'_> {
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
match &mut *self.guard {
|
||||||
|
DatasetRef::Latest { dataset, .. } => dataset,
|
||||||
|
DatasetRef::TimeTravel { dataset, .. } => dataset,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user