Files
lancedb/node/src/index.ts
2024-04-14 03:32:57 +05:30

1342 lines
39 KiB
TypeScript

// Copyright 2023 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import { type Schema, Table as ArrowTable, tableFromIPC } from 'apache-arrow'
import {
createEmptyTable,
fromRecordsToBuffer,
fromTableToBuffer,
makeArrowTable
} from './arrow'
import type { EmbeddingFunction } from './embedding/embedding_function'
import { RemoteConnection } from './remote'
import { Query } from './query'
import { isEmbeddingFunction } from './embedding/embedding_function'
import { type Literal, toSQL } from './util'
import { type HttpMiddleware } from './middleware'
const {
databaseNew,
databaseTableNames,
databaseOpenTable,
databaseDropTable,
tableCreate,
tableAdd,
tableCreateScalarIndex,
tableCreateVectorIndex,
tableCountRows,
tableDelete,
tableUpdate,
tableMergeInsert,
tableCleanupOldVersions,
tableCompactFiles,
tableListIndices,
tableIndexStats,
tableSchema,
tableAddColumns,
tableAlterColumns,
tableDropColumns
// eslint-disable-next-line @typescript-eslint/no-var-requires
} = require('../native.js')
export { Query }
export type { EmbeddingFunction }
export { OpenAIEmbeddingFunction } from './embedding/openai'
export { convertToTable, makeArrowTable, type MakeArrowTableOptions } from './arrow'
const defaultAwsRegion = 'us-west-2'
export interface AwsCredentials {
accessKeyId: string
secretKey: string
sessionToken?: string
}
export interface ConnectionOptions {
/**
* LanceDB database URI.
*
* - `/path/to/database` - local database
* - `s3://bucket/path/to/database` or `gs://bucket/path/to/database` - database on cloud storage
* - `db://host:port` - remote database (LanceDB cloud)
*/
uri: string
/** User provided AWS crednetials.
*
* If not provided, LanceDB will use the default credentials provider chain.
*
* @deprecated Pass `aws_access_key_id`, `aws_secret_access_key`, and `aws_session_token`
* through `storageOptions` instead.
*/
awsCredentials?: AwsCredentials
/** AWS region to connect to. Default is {@link defaultAwsRegion}
*
* @deprecated Pass `region` through `storageOptions` instead.
*/
awsRegion?: string
/**
* User provided options for object storage. For example, S3 credentials or request timeouts.
*
* The various options are described at https://lancedb.github.io/lancedb/guides/storage/
*/
storageOptions?: Record<string, string>
/**
* API key for the remote connections
*
* Can also be passed by setting environment variable `LANCEDB_API_KEY`
*/
apiKey?: string
/** Region to connect */
region?: string
/**
* Override the host URL for the remote connection.
*
* This is useful for local testing.
*/
hostOverride?: string
/**
* (For LanceDB OSS only): The interval, in seconds, at which to check for
* updates to the table from other processes. If None, then consistency is not
* checked. For performance reasons, this is the default. For strong
* consistency, set this to zero seconds. Then every read will check for
* updates from other processes. As a compromise, you can set this to a
* non-zero value for eventual consistency. If more than that interval
* has passed since the last check, then the table will be checked for updates.
* Note: this consistency only applies to read operations. Write operations are
* always consistent.
*/
readConsistencyInterval?: number
}
function getAwsArgs (opts: ConnectionOptions): any[] {
const callArgs: any[] = []
const awsCredentials = opts.awsCredentials
if (awsCredentials !== undefined) {
callArgs.push(awsCredentials.accessKeyId)
callArgs.push(awsCredentials.secretKey)
callArgs.push(awsCredentials.sessionToken)
} else {
callArgs.fill(undefined, 0, 3)
}
callArgs.push(opts.awsRegion)
return callArgs
}
export interface CreateTableOptions<T> {
// Name of Table
name: string
// Data to insert into the Table
data?: Array<Record<string, unknown>> | ArrowTable | undefined
// Optional Arrow Schema for this table
schema?: Schema | undefined
// Optional embedding function used to create embeddings
embeddingFunction?: EmbeddingFunction<T> | undefined
// WriteOptions for this operation
writeOptions?: WriteOptions | undefined
}
/**
* Connect to a LanceDB instance at the given URI.
*
* Accepted formats:
*
* - `/path/to/database` - local database
* - `s3://bucket/path/to/database` or `gs://bucket/path/to/database` - database on cloud storage
* - `db://host:port` - remote database (LanceDB cloud)
*
* @param uri The uri of the database. If the database uri starts with `db://` then it connects to a remote database.
*
* @see {@link ConnectionOptions} for more details on the URI format.
*/
export async function connect (uri: string): Promise<Connection>
/**
* Connect to a LanceDB instance with connection options.
*
* @param opts The {@link ConnectionOptions} to use when connecting to the database.
*/
export async function connect (
opts: Partial<ConnectionOptions>
): Promise<Connection>
export async function connect (
arg: string | Partial<ConnectionOptions>
): Promise<Connection> {
let opts: ConnectionOptions
if (typeof arg === 'string') {
opts = { uri: arg }
} else {
const keys = Object.keys(arg)
if (keys.length === 1 && keys[0] === 'uri' && typeof arg.uri === 'string') {
opts = { uri: arg.uri }
} else {
opts = Object.assign(
{
uri: '',
awsCredentials: undefined,
awsRegion: defaultAwsRegion,
apiKey: undefined,
region: defaultAwsRegion
},
arg
)
}
}
if (opts.uri.startsWith('db://')) {
// Remote connection
return new RemoteConnection(opts)
}
const storageOptions = opts.storageOptions ?? {};
if (opts.awsCredentials?.accessKeyId !== undefined) {
storageOptions.aws_access_key_id = opts.awsCredentials.accessKeyId
}
if (opts.awsCredentials?.secretKey !== undefined) {
storageOptions.aws_secret_access_key = opts.awsCredentials.secretKey
}
if (opts.awsCredentials?.sessionToken !== undefined) {
storageOptions.aws_session_token = opts.awsCredentials.sessionToken
}
if (opts.awsRegion !== undefined) {
storageOptions.region = opts.awsRegion
}
// It's a pain to pass a record to Rust, so we convert it to an array of key-value pairs
const storageOptionsArr = Object.entries(storageOptions);
const db = await databaseNew(
opts.uri,
storageOptionsArr,
opts.readConsistencyInterval
)
return new LocalConnection(db, opts)
}
/**
* A LanceDB Connection that allows you to open tables and create new ones.
*
* Connection could be local against filesystem or remote against a server.
*/
export interface Connection {
uri: string
tableNames(): Promise<string[]>
/**
* Open a table in the database.
*
* @param name The name of the table.
* @param embeddings An embedding function to use on this table
*/
openTable<T>(
name: string,
embeddings?: EmbeddingFunction<T>
): Promise<Table<T>>
/**
* Creates a new Table, optionally initializing it with new data.
*
* @param {string} name - The name of the table.
* @param data - Array of Records to be inserted into the table
* @param schema - An Arrow Schema that describe this table columns
* @param {EmbeddingFunction} embeddings - An embedding function to use on this table
* @param {WriteOptions} writeOptions - The write options to use when creating the table.
*/
createTable<T>({
name,
data,
schema,
embeddingFunction,
writeOptions
}: CreateTableOptions<T>): Promise<Table<T>>
/**
* Creates a new Table and initialize it with new data.
*
* @param {string} name - The name of the table.
* @param data - Non-empty Array of Records to be inserted into the table
*/
createTable(
name: string,
data: Array<Record<string, unknown>> | ArrowTable
): Promise<Table>
/**
* Creates a new Table and initialize it with new data.
*
* @param {string} name - The name of the table.
* @param data - Non-empty Array of Records to be inserted into the table
* @param {WriteOptions} options - The write options to use when creating the table.
*/
createTable(
name: string,
data: Array<Record<string, unknown>> | ArrowTable,
options: WriteOptions
): Promise<Table>
/**
* Creates a new Table and initialize it with new data.
*
* @param {string} name - The name of the table.
* @param data - Non-empty Array of Records to be inserted into the table
* @param {EmbeddingFunction} embeddings - An embedding function to use on this table
*/
createTable<T>(
name: string,
data: Array<Record<string, unknown>> | ArrowTable,
embeddings: EmbeddingFunction<T>
): Promise<Table<T>>
/**
* Creates a new Table and initialize it with new data.
*
* @param {string} name - The name of the table.
* @param data - Non-empty Array of Records to be inserted into the table
* @param {EmbeddingFunction} embeddings - An embedding function to use on this table
* @param {WriteOptions} options - The write options to use when creating the table.
*/
createTable<T>(
name: string,
data: Array<Record<string, unknown>> | ArrowTable,
embeddings: EmbeddingFunction<T>,
options: WriteOptions
): Promise<Table<T>>
/**
* Drop an existing table.
* @param name The name of the table to drop.
*/
dropTable(name: string): Promise<void>
/**
* Instrument the behavior of this Connection with middleware.
*
* The middleware will be called in the order they are added.
*
* Currently this functionality is only supported for remote Connections.
*
* @param {HttpMiddleware} - Middleware which will instrument the Connection.
* @returns - this Connection instrumented by the passed middleware
*/
withMiddleware(middleware: HttpMiddleware): Connection
}
/**
* A LanceDB Table is the collection of Records. Each Record has one or more vector fields.
*/
export interface Table<T = number[]> {
name: string
/**
* Creates a search query to find the nearest neighbors of the given search term
* @param query The query search term
*/
search: (query: T) => Query<T>
/**
* Insert records into this Table.
*
* @param data Records to be inserted into the Table
* @return The number of rows added to the table
*/
add: (data: Array<Record<string, unknown>> | ArrowTable) => Promise<number>
/**
* Insert records into this Table, replacing its contents.
*
* @param data Records to be inserted into the Table
* @return The number of rows added to the table
*/
overwrite: (
data: Array<Record<string, unknown>> | ArrowTable
) => Promise<number>
/**
* Create an ANN index on this Table vector index.
*
* @param indexParams The parameters of this Index, @see VectorIndexParams.
*/
createIndex: (indexParams: VectorIndexParams) => Promise<any>
/**
* Create a scalar index on this Table for the given column
*
* @param column The column to index
* @param replace If false, fail if an index already exists on the column
* it is always set to true for remote connections
*
* Scalar indices, like vector indices, can be used to speed up scans. A scalar
* index can speed up scans that contain filter expressions on the indexed column.
* For example, the following scan will be faster if the column `my_col` has
* a scalar index:
*
* ```ts
* const con = await lancedb.connect('./.lancedb');
* const table = await con.openTable('images');
* const results = await table.where('my_col = 7').execute();
* ```
*
* Scalar indices can also speed up scans containing a vector search and a
* prefilter:
*
* ```ts
* const con = await lancedb.connect('././lancedb');
* const table = await con.openTable('images');
* const results = await table.search([1.0, 2.0]).where('my_col != 7').prefilter(true);
* ```
*
* Scalar indices can only speed up scans for basic filters using
* equality, comparison, range (e.g. `my_col BETWEEN 0 AND 100`), and set
* membership (e.g. `my_col IN (0, 1, 2)`)
*
* Scalar indices can be used if the filter contains multiple indexed columns and
* the filter criteria are AND'd or OR'd together
* (e.g. `my_col < 0 AND other_col> 100`)
*
* Scalar indices may be used if the filter contains non-indexed columns but,
* depending on the structure of the filter, they may not be usable. For example,
* if the column `not_indexed` does not have a scalar index then the filter
* `my_col = 0 OR not_indexed = 1` will not be able to use any scalar index on
* `my_col`.
*
* @examples
*
* ```ts
* const con = await lancedb.connect('././lancedb')
* const table = await con.openTable('images')
* await table.createScalarIndex('my_col')
* ```
*/
createScalarIndex: (column: string, replace?: boolean) => Promise<void>
/**
* Returns the number of rows in this table.
*/
countRows: (filter?: string) => Promise<number>
/**
* Delete rows from this table.
*
* This can be used to delete a single row, many rows, all rows, or
* sometimes no rows (if your predicate matches nothing).
*
* @param filter A filter in the same format used by a sql WHERE clause. The
* filter must not be empty.
*
* @examples
*
* ```ts
* const con = await lancedb.connect("./.lancedb")
* const data = [
* {id: 1, vector: [1, 2]},
* {id: 2, vector: [3, 4]},
* {id: 3, vector: [5, 6]},
* ];
* const tbl = await con.createTable("my_table", data)
* await tbl.delete("id = 2")
* await tbl.countRows() // Returns 2
* ```
*
* If you have a list of values to delete, you can combine them into a
* stringified list and use the `IN` operator:
*
* ```ts
* const to_remove = [1, 5];
* await tbl.delete(`id IN (${to_remove.join(",")})`)
* await tbl.countRows() // Returns 1
* ```
*/
delete: (filter: string) => Promise<void>
/**
* Update rows in this table.
*
* This can be used to update a single row, many rows, all rows, or
* sometimes no rows (if your predicate matches nothing).
*
* @param args see {@link UpdateArgs} and {@link UpdateSqlArgs} for more details
*
* @examples
*
* ```ts
* const con = await lancedb.connect("./.lancedb")
* const data = [
* {id: 1, vector: [3, 3], name: 'Ye'},
* {id: 2, vector: [4, 4], name: 'Mike'},
* ];
* const tbl = await con.createTable("my_table", data)
*
* await tbl.update({
* where: "id = 2",
* values: { vector: [2, 2], name: "Michael" },
* })
*
* let results = await tbl.search([1, 1]).execute();
* // Returns [
* // {id: 2, vector: [2, 2], name: 'Michael'}
* // {id: 1, vector: [3, 3], name: 'Ye'}
* // ]
* ```
*
*/
update: (args: UpdateArgs | UpdateSqlArgs) => Promise<void>
/**
* Runs a "merge insert" operation on the table
*
* This operation can add rows, update rows, and remove rows all in a single
* transaction. It is a very generic tool that can be used to create
* behaviors like "insert if not exists", "update or insert (i.e. upsert)",
* or even replace a portion of existing data with new data (e.g. replace
* all data where month="january")
*
* The merge insert operation works by combining new data from a
* **source table** with existing data in a **target table** by using a
* join. There are three categories of records.
*
* "Matched" records are records that exist in both the source table and
* the target table. "Not matched" records exist only in the source table
* (e.g. these are new data) "Not matched by source" records exist only
* in the target table (this is old data)
*
* The MergeInsertArgs can be used to customize what should happen for
* each category of data.
*
* Please note that the data may appear to be reordered as part of this
* operation. This is because updated rows will be deleted from the
* dataset and then reinserted at the end with the new values.
*
* @param on a column to join on. This is how records from the source
* table and target table are matched.
* @param data the new data to insert
* @param args parameters controlling how the operation should behave
*/
mergeInsert: (on: string, data: Array<Record<string, unknown>> | ArrowTable, args: MergeInsertArgs) => Promise<void>
/**
* List the indicies on this table.
*/
listIndices: () => Promise<VectorIndex[]>
/**
* Get statistics about an index.
*/
indexStats: (indexUuid: string) => Promise<IndexStats>
filter(value: string): Query<T>
schema: Promise<Schema>
// TODO: Support BatchUDF
/**
* Add new columns with defined values.
*
* @param newColumnTransforms pairs of column names and the SQL expression to use
* to calculate the value of the new column. These
* expressions will be evaluated for each row in the
* table, and can reference existing columns in the table.
*/
addColumns(newColumnTransforms: Array<{ name: string, valueSql: string }>): Promise<void>
/**
* Alter the name or nullability of columns.
*
* @param columnAlterations One or more alterations to apply to columns.
*/
alterColumns(columnAlterations: ColumnAlteration[]): Promise<void>
/**
* Drop one or more columns from the dataset
*
* This is a metadata-only operation and does not remove the data from the
* underlying storage. In order to remove the data, you must subsequently
* call ``compact_files`` to rewrite the data without the removed columns and
* then call ``cleanup_files`` to remove the old files.
*
* @param columnNames The names of the columns to drop. These can be nested
* column references (e.g. "a.b.c") or top-level column
* names (e.g. "a").
*/
dropColumns(columnNames: string[]): Promise<void>
/**
* Instrument the behavior of this Table with middleware.
*
* The middleware will be called in the order they are added.
*
* Currently this functionality is only supported for remote tables.
*
* @param {HttpMiddleware} - Middleware which will instrument the Table.
* @returns - this Table instrumented by the passed middleware
*/
withMiddleware(middleware: HttpMiddleware): Table<T>
}
/**
* A definition of a column alteration. The alteration changes the column at
* `path` to have the new name `name`, to be nullable if `nullable` is true,
* and to have the data type `data_type`. At least one of `rename` or `nullable`
* must be provided.
*/
export interface ColumnAlteration {
/**
* The path to the column to alter. This is a dot-separated path to the column.
* If it is a top-level column then it is just the name of the column. If it is
* a nested column then it is the path to the column, e.g. "a.b.c" for a column
* `c` nested inside a column `b` nested inside a column `a`.
*/
path: string
rename?: string
/**
* Set the new nullability. Note that a nullable column cannot be made non-nullable.
*/
nullable?: boolean
}
export interface UpdateArgs {
/**
* A filter in the same format used by a sql WHERE clause. The filter may be empty,
* in which case all rows will be updated.
*/
where?: string
/**
* A key-value map of updates. The keys are the column names, and the values are the
* new values to set
*/
values: Record<string, Literal>
}
export interface UpdateSqlArgs {
/**
* A filter in the same format used by a sql WHERE clause. The filter may be empty,
* in which case all rows will be updated.
*/
where?: string
/**
* A key-value map of updates. The keys are the column names, and the values are the
* new values to set as SQL expressions.
*/
valuesSql: Record<string, string>
}
export interface MergeInsertArgs {
/**
* If true then rows that exist in both the source table (new data) and
* the target table (old data) will be updated, replacing the old row
* with the corresponding matching row.
*
* If there are multiple matches then the behavior is undefined.
* Currently this causes multiple copies of the row to be created
* but that behavior is subject to change.
*
* Optionally, a filter can be specified. This should be an SQL
* filter where fields with the prefix "target." refer to fields
* in the target table (old data) and fields with the prefix
* "source." refer to fields in the source table (new data). For
* example, the filter "target.lastUpdated < source.lastUpdated" will
* only update matched rows when the incoming `lastUpdated` value is
* newer.
*
* Rows that do not match the filter will not be updated. Rows that
* do not match the filter do become "not matched" rows.
*/
whenMatchedUpdateAll?: string | boolean
/**
* If true then rows that exist only in the source table (new data)
* will be inserted into the target table.
*/
whenNotMatchedInsertAll?: boolean
/**
* If true then rows that exist only in the target table (old data)
* will be deleted.
*
* If this is a string then it will be treated as an SQL filter and
* only rows that both do not match any row in the source table and
* match the given filter will be deleted.
*
* This can be used to replace a selection of existing data with
* new data.
*/
whenNotMatchedBySourceDelete?: string | boolean
}
export interface VectorIndex {
columns: string[]
name: string
uuid: string
}
export interface IndexStats {
numIndexedRows: number | null
numUnindexedRows: number | null
}
/**
* A connection to a LanceDB database.
*/
export class LocalConnection implements Connection {
private readonly _options: () => ConnectionOptions
private readonly _db: any
constructor (db: any, options: ConnectionOptions) {
this._options = () => options
this._db = db
}
get uri (): string {
return this._options().uri
}
/**
* Get the names of all tables in the database.
*/
async tableNames (): Promise<string[]> {
return databaseTableNames.call(this._db)
}
/**
* Open a table in the database.
*
* @param name The name of the table.
*/
async openTable (name: string): Promise<Table>
/**
* Open a table in the database.
*
* @param name The name of the table.
* @param embeddings An embedding function to use on this Table
*/
async openTable<T>(
name: string,
embeddings: EmbeddingFunction<T>
): Promise<Table<T>>
async openTable<T>(
name: string,
embeddings?: EmbeddingFunction<T>
): Promise<Table<T>>
async openTable<T>(
name: string,
embeddings?: EmbeddingFunction<T>
): Promise<Table<T>> {
const tbl = await databaseOpenTable.call(
this._db,
name,
)
if (embeddings !== undefined) {
return new LocalTable(tbl, name, this._options(), embeddings)
} else {
return new LocalTable(tbl, name, this._options())
}
}
async createTable<T>(
name: string | CreateTableOptions<T>,
data?: Array<Record<string, unknown>> | ArrowTable,
optsOrEmbedding?: WriteOptions | EmbeddingFunction<T>,
opt?: WriteOptions
): Promise<Table<T>> {
if (typeof name === 'string') {
let writeOptions: WriteOptions = new DefaultWriteOptions()
if (opt !== undefined && isWriteOptions(opt)) {
writeOptions = opt
} else if (
optsOrEmbedding !== undefined &&
isWriteOptions(optsOrEmbedding)
) {
writeOptions = optsOrEmbedding
}
let embeddings: undefined | EmbeddingFunction<T>
if (
optsOrEmbedding !== undefined &&
isEmbeddingFunction(optsOrEmbedding)
) {
embeddings = optsOrEmbedding
}
return await this.createTableImpl({
name,
data,
embeddingFunction: embeddings,
writeOptions
})
}
return await this.createTableImpl(name)
}
private async createTableImpl<T>({
name,
data,
schema,
embeddingFunction,
writeOptions = new DefaultWriteOptions()
}: {
name: string
data?: Array<Record<string, unknown>> | ArrowTable | undefined
schema?: Schema | undefined
embeddingFunction?: EmbeddingFunction<T> | undefined
writeOptions?: WriteOptions | undefined
}): Promise<Table<T>> {
let buffer: Buffer
function isEmpty (
data: Array<Record<string, unknown>> | ArrowTable<any>
): boolean {
if (data instanceof ArrowTable) {
return data.data.length === 0
}
return data.length === 0
}
if (data === undefined || isEmpty(data)) {
if (schema === undefined) {
throw new Error('Either data or schema needs to defined')
}
buffer = await fromTableToBuffer(createEmptyTable(schema))
} else if (data instanceof ArrowTable) {
buffer = await fromTableToBuffer(data, embeddingFunction, schema)
} else {
// data is Array<Record<...>>
buffer = await fromRecordsToBuffer(data, embeddingFunction, schema)
}
const tbl = await tableCreate.call(
this._db,
name,
buffer,
writeOptions?.writeMode?.toString(),
...getAwsArgs(this._options())
)
if (embeddingFunction !== undefined) {
return new LocalTable(tbl, name, this._options(), embeddingFunction)
} else {
return new LocalTable(tbl, name, this._options())
}
}
/**
* Drop an existing table.
* @param name The name of the table to drop.
*/
async dropTable (name: string): Promise<void> {
await databaseDropTable.call(this._db, name)
}
withMiddleware (middleware: HttpMiddleware): Connection {
return this
}
}
export class LocalTable<T = number[]> implements Table<T> {
private _tbl: any
private readonly _name: string
private readonly _isElectron: boolean
private readonly _embeddings?: EmbeddingFunction<T>
private readonly _options: () => ConnectionOptions
constructor (tbl: any, name: string, options: ConnectionOptions)
/**
* @param tbl
* @param name
* @param options
* @param embeddings An embedding function to use when interacting with this table
*/
constructor (
tbl: any,
name: string,
options: ConnectionOptions,
embeddings: EmbeddingFunction<T>
)
constructor (
tbl: any,
name: string,
options: ConnectionOptions,
embeddings?: EmbeddingFunction<T>
) {
this._tbl = tbl
this._name = name
this._embeddings = embeddings
this._options = () => options
this._isElectron = this.checkElectron()
}
get name (): string {
return this._name
}
/**
* Creates a search query to find the nearest neighbors of the given search term
* @param query The query search term
*/
search (query: T): Query<T> {
return new Query(query, this._tbl, this._embeddings)
}
/**
* Creates a filter query to find all rows matching the specified criteria
* @param value The filter criteria (like SQL where clause syntax)
*/
filter (value: string): Query<T> {
return new Query(undefined, this._tbl, this._embeddings).filter(value)
}
where = this.filter
/**
* Insert records into this Table.
*
* @param data Records to be inserted into the Table
* @return The number of rows added to the table
*/
async add (
data: Array<Record<string, unknown>> | ArrowTable
): Promise<number> {
const schema = await this.schema
let tbl: ArrowTable
if (data instanceof ArrowTable) {
tbl = data
} else {
tbl = makeArrowTable(data, { schema })
}
return tableAdd
.call(
this._tbl,
await fromTableToBuffer(tbl, this._embeddings, schema),
WriteMode.Append.toString(),
...getAwsArgs(this._options())
)
.then((newTable: any) => {
this._tbl = newTable
})
}
/**
* Insert records into this Table, replacing its contents.
*
* @param data Records to be inserted into the Table
* @return The number of rows added to the table
*/
async overwrite (
data: Array<Record<string, unknown>> | ArrowTable
): Promise<number> {
let buffer: Buffer
if (data instanceof ArrowTable) {
buffer = await fromTableToBuffer(data, this._embeddings)
} else {
buffer = await fromRecordsToBuffer(data, this._embeddings)
}
return tableAdd
.call(
this._tbl,
buffer,
WriteMode.Overwrite.toString(),
...getAwsArgs(this._options())
)
.then((newTable: any) => {
this._tbl = newTable
})
}
/**
* Create an ANN index on this Table vector index.
*
* @param indexParams The parameters of this Index, @see VectorIndexParams.
*/
async createIndex (indexParams: VectorIndexParams): Promise<any> {
return tableCreateVectorIndex
.call(this._tbl, indexParams)
.then((newTable: any) => {
this._tbl = newTable
})
}
async createScalarIndex (column: string, replace?: boolean): Promise<void> {
if (replace === undefined) {
replace = true
}
return tableCreateScalarIndex.call(this._tbl, column, replace)
}
/**
* Returns the number of rows in this table.
*/
async countRows (filter?: string): Promise<number> {
return tableCountRows.call(this._tbl, filter)
}
/**
* Delete rows from this table.
*
* @param filter A filter in the same format used by a sql WHERE clause.
*/
async delete (filter: string): Promise<void> {
return tableDelete.call(this._tbl, filter).then((newTable: any) => {
this._tbl = newTable
})
}
/**
* Update rows in this table.
*
* @param args see {@link UpdateArgs} and {@link UpdateSqlArgs} for more details
*
* @returns
*/
async update (args: UpdateArgs | UpdateSqlArgs): Promise<void> {
let filter: string | null
let updates: Record<string, string>
if ('valuesSql' in args) {
filter = args.where ?? null
updates = args.valuesSql
} else {
filter = args.where ?? null
updates = {}
for (const [key, value] of Object.entries(args.values)) {
updates[key] = toSQL(value)
}
}
return tableUpdate
.call(this._tbl, filter, updates)
.then((newTable: any) => {
this._tbl = newTable
})
}
async mergeInsert (on: string, data: Array<Record<string, unknown>> | ArrowTable, args: MergeInsertArgs): Promise<void> {
let whenMatchedUpdateAll = false
let whenMatchedUpdateAllFilt = null
if (args.whenMatchedUpdateAll !== undefined && args.whenMatchedUpdateAll !== null) {
whenMatchedUpdateAll = true
if (args.whenMatchedUpdateAll !== true) {
whenMatchedUpdateAllFilt = args.whenMatchedUpdateAll
}
}
const whenNotMatchedInsertAll = args.whenNotMatchedInsertAll ?? false
let whenNotMatchedBySourceDelete = false
let whenNotMatchedBySourceDeleteFilt = null
if (args.whenNotMatchedBySourceDelete !== undefined && args.whenNotMatchedBySourceDelete !== null) {
whenNotMatchedBySourceDelete = true
if (args.whenNotMatchedBySourceDelete !== true) {
whenNotMatchedBySourceDeleteFilt = args.whenNotMatchedBySourceDelete
}
}
const schema = await this.schema
let tbl: ArrowTable
if (data instanceof ArrowTable) {
tbl = data
} else {
tbl = makeArrowTable(data, { schema })
}
const buffer = await fromTableToBuffer(tbl, this._embeddings, schema)
this._tbl = await tableMergeInsert.call(
this._tbl,
on,
whenMatchedUpdateAll,
whenMatchedUpdateAllFilt,
whenNotMatchedInsertAll,
whenNotMatchedBySourceDelete,
whenNotMatchedBySourceDeleteFilt,
buffer
)
}
/**
* Clean up old versions of the table, freeing disk space.
*
* @param olderThan The minimum age in minutes of the versions to delete. If not
* provided, defaults to two weeks.
* @param deleteUnverified Because they may be part of an in-progress
* transaction, uncommitted files newer than 7 days old are
* not deleted by default. This means that failed transactions
* can leave around data that takes up disk space for up to
* 7 days. You can override this safety mechanism by setting
* this option to `true`, only if you promise there are no
* in progress writes while you run this operation. Failure to
* uphold this promise can lead to corrupted tables.
* @returns
*/
async cleanupOldVersions (
olderThan?: number,
deleteUnverified?: boolean
): Promise<CleanupStats> {
return tableCleanupOldVersions
.call(this._tbl, olderThan, deleteUnverified)
.then((res: { newTable: any, metrics: CleanupStats }) => {
this._tbl = res.newTable
return res.metrics
})
}
/**
* Run the compaction process on the table.
*
* This can be run after making several small appends to optimize the table
* for faster reads.
*
* @param options Advanced options configuring compaction. In most cases, you
* can omit this arguments, as the default options are sensible
* for most tables.
* @returns Metrics about the compaction operation.
*/
async compactFiles (options?: CompactionOptions): Promise<CompactionMetrics> {
const optionsArg = options ?? {}
return tableCompactFiles
.call(this._tbl, optionsArg)
.then((res: { newTable: any, metrics: CompactionMetrics }) => {
this._tbl = res.newTable
return res.metrics
})
}
async listIndices (): Promise<VectorIndex[]> {
return tableListIndices.call(this._tbl)
}
async indexStats (indexUuid: string): Promise<IndexStats> {
return tableIndexStats.call(this._tbl, indexUuid)
}
get schema (): Promise<Schema> {
// empty table
return this.getSchema()
}
private async getSchema (): Promise<Schema> {
const buffer = await tableSchema.call(this._tbl, this._isElectron)
const table = tableFromIPC(buffer)
return table.schema
}
// See https://github.com/electron/electron/issues/2288
private checkElectron (): boolean {
try {
// eslint-disable-next-line no-prototype-builtins
return (
Object.prototype.hasOwnProperty.call(process?.versions, 'electron') ||
navigator?.userAgent?.toLowerCase()?.includes(' electron')
)
} catch (e) {
return false
}
}
async addColumns (newColumnTransforms: Array<{ name: string, valueSql: string }>): Promise<void> {
return tableAddColumns.call(this._tbl, newColumnTransforms)
}
async alterColumns (columnAlterations: ColumnAlteration[]): Promise<void> {
return tableAlterColumns.call(this._tbl, columnAlterations)
}
async dropColumns (columnNames: string[]): Promise<void> {
return tableDropColumns.call(this._tbl, columnNames)
}
withMiddleware (middleware: HttpMiddleware): Table<T> {
return this
}
}
export interface CleanupStats {
/**
* The number of bytes removed from disk.
*/
bytesRemoved: number
/**
* The number of old table versions removed.
*/
oldVersions: number
}
export interface CompactionOptions {
/**
* The number of rows per fragment to target. Fragments that have fewer rows
* will be compacted into adjacent fragments to produce larger fragments.
* Defaults to 1024 * 1024.
*/
targetRowsPerFragment?: number
/**
* The maximum number of rows per group. Defaults to 1024.
*/
maxRowsPerGroup?: number
/**
* If true, fragments that have rows that are deleted may be compacted to
* remove the deleted rows. This can improve the performance of queries.
* Default is true.
*/
materializeDeletions?: boolean
/**
* A number between 0 and 1, representing the proportion of rows that must be
* marked deleted before a fragment is a candidate for compaction to remove
* the deleted rows. Default is 10%.
*/
materializeDeletionsThreshold?: number
/**
* The number of threads to use for compaction. If not provided, defaults to
* the number of cores on the machine.
*/
numThreads?: number
}
export interface CompactionMetrics {
/**
* The number of fragments that were removed.
*/
fragmentsRemoved: number
/**
* The number of new fragments that were created.
*/
fragmentsAdded: number
/**
* The number of files that were removed. Each fragment may have more than one
* file.
*/
filesRemoved: number
/**
* The number of files added. This is typically equal to the number of
* fragments added.
*/
filesAdded: number
}
/// Config to build IVF_PQ index.
///
export interface IvfPQIndexConfig {
/**
* The column to be indexed
*/
column?: string
/**
* A unique name for the index
*/
index_name?: string
/**
* Metric type, L2 or Cosine
*/
metric_type?: MetricType
/**
* The number of partitions this index
*/
num_partitions?: number
/**
* The max number of iterations for kmeans training.
*/
max_iters?: number
/**
* Train as optimized product quantization.
*/
use_opq?: boolean
/**
* Number of subvectors to build PQ code
*/
num_sub_vectors?: number
/**
* The number of bits to present one PQ centroid.
*/
num_bits?: number
/**
* Max number of iterations to train OPQ, if `use_opq` is true.
*/
max_opq_iters?: number
/**
* Replace an existing index with the same name if it exists.
*/
replace?: boolean
/**
* Cache size of the index
*/
index_cache_size?: number
type: 'ivf_pq'
}
export type VectorIndexParams = IvfPQIndexConfig
/**
* Write mode for writing a table.
*/
export enum WriteMode {
/** Create a new {@link Table}. */
Create = 'create',
/** Overwrite the existing {@link Table} if presented. */
Overwrite = 'overwrite',
/** Append new data to the table. */
Append = 'append',
}
/**
* Write options when creating a Table.
*/
export interface WriteOptions {
/** A {@link WriteMode} to use on this operation */
writeMode?: WriteMode
}
export class DefaultWriteOptions implements WriteOptions {
writeMode = WriteMode.Create
}
export function isWriteOptions (value: any): value is WriteOptions {
return (
Object.keys(value).length === 1 &&
(value.writeMode === undefined || typeof value.writeMode === 'string')
)
}
/**
* Distance metrics type.
*/
export enum MetricType {
/**
* Euclidean distance
*/
L2 = 'l2',
/**
* Cosine distance
*/
Cosine = 'cosine',
/**
* Dot product
*/
Dot = 'dot',
}