feat(node): add read_consistency_interval to Node and Rust (#1002)

This PR adds the same consistency semantics as was added in #828. It
*does not* add the same lazy-loading of tables, since that breaks some
existing tests.

This closes #998.

---------

Co-authored-by: Weston Pace <weston.pace@gmail.com>
This commit is contained in:
Will Jones
2024-02-22 15:04:30 -08:00
committed by Weston Pace
parent ef54bd5ba2
commit c5b0934bfb
19 changed files with 735 additions and 1127 deletions

View File

@@ -29,6 +29,6 @@ test("open database", async () => {
const tbl = await db.createTable("test", [{ id: 1 }, { id: 2 }]);
expect(await db.tableNames()).toStrictEqual(["test"]);
const schema = tbl.schema;
const schema = await tbl.schema();
expect(schema).toEqual(new Schema([new Field("id", new Float64(), true)]));
});

View File

@@ -181,3 +181,37 @@ describe("Test creating index", () => {
// TODO: check index type.
});
});
describe("Read consistency interval", () => {
let tmpDir: string;
beforeEach(() => {
tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "read-consistency-"));
});
// const intervals = [undefined, 0, 0.1];
const intervals = [0];
test.each(intervals)("read consistency interval %p", async (interval) => {
const db = await connect({ uri: tmpDir });
const table = await db.createTable("my_table", [{ id: 1 }]);
const db2 = await connect({ uri: tmpDir, readConsistencyInterval: interval });
const table2 = await db2.openTable("my_table");
expect(await table2.countRows()).toEqual(await table.countRows());
await table.add([{ id: 2 }]);
if (interval === undefined) {
expect(await table2.countRows()).toEqual(1n);
// TODO: once we implement time travel we can uncomment this part of the test.
// await table2.checkout_latest();
// expect(await table2.countRows()).toEqual(2);
} else if (interval === 0) {
expect(await table2.countRows()).toEqual(2n);
} else {
// interval == 0.1
expect(await table2.countRows()).toEqual(1n);
await new Promise(r => setTimeout(r, 100));
expect(await table2.countRows()).toEqual(2n);
}
});
});

1087
nodejs/package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -18,7 +18,7 @@
"license": "Apache 2.0",
"devDependencies": {
"@napi-rs/cli": "^2.18.0",
"@types/jest": "^29.5.11",
"@types/jest": "^29.1.2",
"@typescript-eslint/eslint-plugin": "^6.19.0",
"@typescript-eslint/parser": "^6.19.0",
"eslint": "^8.56.0",
@@ -51,7 +51,8 @@
"docs": "typedoc --plugin typedoc-plugin-markdown vectordb/index.ts",
"lint": "eslint vectordb --ext .js,.ts",
"prepublishOnly": "napi prepublish -t npm",
"test": "npm run build && jest",
"//": "maxWorkers=1 is workaround for bigint issue in jest: https://github.com/jestjs/jest/issues/11617#issuecomment-1068732414",
"test": "npm run build && jest --maxWorkers=1",
"universal": "napi universal",
"version": "napi version"
},

View File

@@ -16,7 +16,8 @@ use napi::bindgen_prelude::*;
use napi_derive::*;
use crate::table::Table;
use vectordb::connection::Connection as LanceDBConnection;
use crate::ConnectionOptions;
use vectordb::connection::{ConnectBuilder, Connection as LanceDBConnection};
use vectordb::ipc::ipc_file_to_batches;
#[napi]
@@ -28,11 +29,23 @@ pub struct Connection {
impl Connection {
/// Create a new Connection instance from the given URI.
#[napi(factory)]
pub async fn new(uri: String) -> napi::Result<Self> {
pub async fn new(options: ConnectionOptions) -> napi::Result<Self> {
let mut builder = ConnectBuilder::new(&options.uri);
if let Some(api_key) = options.api_key {
builder = builder.api_key(&api_key);
}
if let Some(host_override) = options.host_override {
builder = builder.host_override(&host_override);
}
if let Some(interval) = options.read_consistency_interval {
builder =
builder.read_consistency_interval(std::time::Duration::from_secs_f64(interval));
}
Ok(Self {
conn: vectordb::connect(&uri).execute().await.map_err(|e| {
napi::Error::from_reason(format!("Failed to connect to database: {}", e))
})?,
conn: builder
.execute()
.await
.map_err(|e| napi::Error::from_reason(format!("{}", e)))?,
})
}

View File

@@ -22,10 +22,21 @@ mod query;
mod table;
#[napi(object)]
#[derive(Debug)]
pub struct ConnectionOptions {
pub uri: String,
pub api_key: Option<String>,
pub host_override: Option<String>,
/// (For LanceDB OSS only): The interval, in seconds, at which to check for
/// updates to the table from other processes. If None, then consistency is not
/// checked. For performance reasons, this is the default. For strong
/// consistency, set this to zero seconds. Then every read will check for
/// updates from other processes. As a compromise, you can set this to a
/// non-zero value for eventual consistency. If more than that interval
/// has passed since the last check, then the table will be checked for updates.
/// Note: this consistency only applies to read operations. Write operations are
/// always consistent.
pub read_consistency_interval: Option<f64>,
}
/// Write mode for writing a table.
@@ -44,5 +55,5 @@ pub struct WriteOptions {
#[napi]
pub async fn connect(options: ConnectionOptions) -> napi::Result<Connection> {
Connection::new(options.uri.clone()).await
Connection::new(options).await
}

View File

@@ -34,8 +34,12 @@ impl Table {
/// Return Schema as empty Arrow IPC file.
#[napi]
pub fn schema(&self) -> napi::Result<Buffer> {
let mut writer = FileWriter::try_new(vec![], &self.table.schema())
pub async fn schema(&self) -> napi::Result<Buffer> {
let schema =
self.table.schema().await.map_err(|e| {
napi::Error::from_reason(format!("Failed to create IPC file: {}", e))
})?;
let mut writer = FileWriter::try_new(vec![], &schema)
.map_err(|e| napi::Error::from_reason(format!("Failed to create IPC file: {}", e)))?;
writer
.finish()

View File

@@ -53,12 +53,12 @@ export async function connect(
opts = Object.assign(
{
uri: "",
apiKey: "",
hostOverride: "",
apiKey: undefined,
hostOverride: undefined,
},
args
);
}
const nativeConn = await NativeConnection.new(opts.uri);
const nativeConn = await NativeConnection.new(opts);
return new Connection(nativeConn);
}

View File

@@ -16,6 +16,18 @@ export interface ConnectionOptions {
uri: string
apiKey?: string
hostOverride?: string
/**
* (For LanceDB OSS only): The interval, in seconds, at which to check for
* updates to the table from other processes. If None, then consistency is not
* checked. For performance reasons, this is the default. For strong
* consistency, set this to zero seconds. Then every read will check for
* updates from other processes. As a compromise, you can set this to a
* non-zero value for eventual consistency. If more than that interval
* has passed since the last check, then the table will be checked for updates.
* Note: this consistency only applies to read operations. Write operations are
* always consistent.
*/
readConsistencyInterval?: number
}
/** Write mode for writing a table. */
export const enum WriteMode {
@@ -30,7 +42,7 @@ export interface WriteOptions {
export function connect(options: ConnectionOptions): Promise<Connection>
export class Connection {
/** Create a new Connection instance from the given URI. */
static new(uri: string): Promise<Connection>
static new(options: ConnectionOptions): Promise<Connection>
/** List all tables in the dataset. */
tableNames(): Promise<Array<string>>
/**
@@ -71,7 +83,7 @@ export class Query {
}
export class Table {
/** Return Schema as empty Arrow IPC file. */
schema(): Buffer
schema(): Promise<Buffer>
add(buf: Buffer): Promise<void>
countRows(filter?: string | undefined | null): Promise<bigint>
delete(predicate: string): Promise<void>

View File

@@ -32,8 +32,8 @@ export class Table {
}
/** Get the schema of the table. */
get schema(): Schema {
const schemaBuf = this.inner.schema();
async schema(): Promise<Schema> {
const schemaBuf = await this.inner.schema();
const tbl = tableFromIPC(schemaBuf);
return tbl.schema;
}