[Node] initial support of nodejs remote sdk (#333)

This commit is contained in:
Lei Xu
2023-07-18 16:15:27 -07:00
committed by GitHub
parent fb97b03a51
commit 980f910f50
7 changed files with 438 additions and 151 deletions

View File

@@ -14,15 +14,15 @@
import {
RecordBatchFileWriter,
type Table as ArrowTable,
tableFromIPC,
Vector
type Table as ArrowTable
} from 'apache-arrow'
import { fromRecordsToBuffer } from './arrow'
import type { EmbeddingFunction } from './embedding/embedding_function'
import { RemoteConnection } from './remote'
import { Query } from './query'
// eslint-disable-next-line @typescript-eslint/no-var-requires
const { databaseNew, databaseTableNames, databaseOpenTable, databaseDropTable, tableCreate, tableSearch, tableAdd, tableCreateVectorIndex, tableCountRows, tableDelete } = require('../native.js')
const { databaseNew, databaseTableNames, databaseOpenTable, databaseDropTable, tableCreate, tableAdd, tableCreateVectorIndex, tableCountRows, tableDelete } = require('../native.js')
export type { EmbeddingFunction }
export { OpenAIEmbeddingFunction } from './embedding/openai'
@@ -37,7 +37,13 @@ export interface AwsCredentials {
export interface ConnectionOptions {
uri: string
awsCredentials?: AwsCredentials
// API key for the remote connections
apiKey?: string
// Region to connect
region?: string
}
/**
@@ -54,9 +60,16 @@ export async function connect (arg: string | Partial<ConnectionOptions>): Promis
// opts = { uri: arg.uri, awsCredentials = arg.awsCredentials }
opts = Object.assign({
uri: '',
awsCredentials: undefined
awsCredentials: undefined,
apiKey: undefined,
region: 'us-west-2'
}, arg)
}
if (opts.uri.startsWith('db://')) {
// Remote connection
return new RemoteConnection(opts)
}
const db = await databaseNew(opts.uri)
return new LocalConnection(db, opts)
}
@@ -191,8 +204,8 @@ export class LocalConnection implements Connection {
}
/**
* Get the names of all tables in the database.
*/
* Get the names of all tables in the database.
*/
async tableNames (): Promise<string[]> {
return databaseTableNames.call(this._db)
}
@@ -203,6 +216,7 @@ export class LocalConnection implements Connection {
* @param name The name of the table.
*/
async openTable (name: string): Promise<Table>
/**
* Open a table in the database.
*
@@ -308,7 +322,7 @@ export class LocalTable<T = number[]> implements Table<T> {
* @param query The query search term
*/
search (query: T): Query<T> {
return new Query(this._tbl, query, this._embeddings)
return new Query(query, this._tbl, this._embeddings)
}
/**
@@ -430,116 +444,6 @@ export interface IvfPQIndexConfig {
export type VectorIndexParams = IvfPQIndexConfig
/**
* A builder for nearest neighbor queries for LanceDB.
*/
export class Query<T = number[]> {
private readonly _tbl: any
private readonly _query: T
private _queryVector?: number[]
private _limit: number
private _refineFactor?: number
private _nprobes: number
private _select?: string[]
private _filter?: string
private _metricType?: MetricType
private readonly _embeddings?: EmbeddingFunction<T>
constructor (tbl: any, query: T, embeddings?: EmbeddingFunction<T>) {
this._tbl = tbl
this._query = query
this._limit = 10
this._nprobes = 20
this._refineFactor = undefined
this._select = undefined
this._filter = undefined
this._metricType = undefined
this._embeddings = embeddings
}
/***
* Sets the number of results that will be returned
* @param value number of results
*/
limit (value: number): Query<T> {
this._limit = value
return this
}
/**
* Refine the results by reading extra elements and re-ranking them in memory.
* @param value refine factor to use in this query.
*/
refineFactor (value: number): Query<T> {
this._refineFactor = value
return this
}
/**
* The number of probes used. A higher number makes search more accurate but also slower.
* @param value The number of probes used.
*/
nprobes (value: number): Query<T> {
this._nprobes = value
return this
}
/**
* A filter statement to be applied to this query.
* @param value A filter in the same format used by a sql WHERE clause.
*/
filter (value: string): Query<T> {
this._filter = value
return this
}
where = this.filter
/** Return only the specified columns.
*
* @param value Only select the specified columns. If not specified, all columns will be returned.
*/
select (value: string[]): Query<T> {
this._select = value
return this
}
/**
* The MetricType used for this Query.
* @param value The metric to the. @see MetricType for the different options
*/
metricType (value: MetricType): Query<T> {
this._metricType = value
return this
}
/**
* Execute the query and return the results as an Array of Objects
*/
async execute<T = Record<string, unknown>> (): Promise<T[]> {
if (this._embeddings !== undefined) {
this._queryVector = (await this._embeddings.embed([this._query]))[0]
} else {
this._queryVector = this._query as number[]
}
const buffer = await tableSearch.call(this._tbl, this)
const data = tableFromIPC(buffer)
return data.toArray().map((entry: Record<string, unknown>) => {
const newObject: Record<string, unknown> = {}
Object.keys(entry).forEach((key: string) => {
if (entry[key] instanceof Vector) {
newObject[key] = (entry[key] as Vector).toArray()
} else {
newObject[key] = entry[key]
}
})
return newObject as unknown as T
})
}
}
/**
* Write mode for writing a table.
*/

130
node/src/query.ts Normal file
View File

@@ -0,0 +1,130 @@
// Copyright 2023 LanceDB Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import { Vector, tableFromIPC } from 'apache-arrow'
import { type EmbeddingFunction } from './embedding/embedding_function'
import { type MetricType } from '.'
// eslint-disable-next-line @typescript-eslint/no-var-requires
const { tableSearch } = require('../native.js')
/**
* A builder for nearest neighbor queries for LanceDB.
*/
export class Query<T = number[]> {
private readonly _query: T
private readonly _tbl?: any
private _queryVector?: number[]
private _limit: number
private _refineFactor?: number
private _nprobes: number
private _select?: string[]
private _filter?: string
private _metricType?: MetricType
protected readonly _embeddings?: EmbeddingFunction<T>
constructor (query: T, tbl?: any, embeddings?: EmbeddingFunction<T>) {
this._tbl = tbl
this._query = query
this._limit = 10
this._nprobes = 20
this._refineFactor = undefined
this._select = undefined
this._filter = undefined
this._metricType = undefined
this._embeddings = embeddings
}
/***
* Sets the number of results that will be returned
* @param value number of results
*/
limit (value: number): Query<T> {
this._limit = value
return this
}
/**
* Refine the results by reading extra elements and re-ranking them in memory.
* @param value refine factor to use in this query.
*/
refineFactor (value: number): Query<T> {
this._refineFactor = value
return this
}
/**
* The number of probes used. A higher number makes search more accurate but also slower.
* @param value The number of probes used.
*/
nprobes (value: number): Query<T> {
this._nprobes = value
return this
}
/**
* A filter statement to be applied to this query.
* @param value A filter in the same format used by a sql WHERE clause.
*/
filter (value: string): Query<T> {
this._filter = value
return this
}
where = this.filter
/** Return only the specified columns.
*
* @param value Only select the specified columns. If not specified, all columns will be returned.
*/
select (value: string[]): Query<T> {
this._select = value
return this
}
/**
* The MetricType used for this Query.
* @param value The metric to the. @see MetricType for the different options
*/
metricType (value: MetricType): Query<T> {
this._metricType = value
return this
}
/**
* Execute the query and return the results as an Array of Objects
*/
async execute<T = Record<string, unknown>> (): Promise<T[]> {
if (this._embeddings !== undefined) {
this._queryVector = (await this._embeddings.embed([this._query]))[0]
} else {
this._queryVector = this._query as number[]
}
const buffer = await tableSearch.call(this._tbl, this)
const data = tableFromIPC(buffer)
return data.toArray().map((entry: Record<string, unknown>) => {
const newObject: Record<string, unknown> = {}
Object.keys(entry).forEach((key: string) => {
if (entry[key] instanceof Vector) {
newObject[key] = (entry[key] as Vector).toArray()
} else {
newObject[key] = entry[key]
}
})
return newObject as unknown as T
})
}
}

69
node/src/remote/client.ts Normal file
View File

@@ -0,0 +1,69 @@
// Copyright 2023 LanceDB Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import axios from 'axios'
import { tableFromIPC, type Table as ArrowTable } from 'apache-arrow'
export class HttpLancedbClient {
private readonly _url: string
public constructor (url: string, private readonly _apiKey: string) {
this._url = url
}
get uri (): string {
return this._url
}
public async search (
tableName: string,
vector: number[],
k: number,
nprobes: number,
refineFactor?: number,
columns?: string[],
filter?: string
): Promise<ArrowTable<any>> {
const response = await axios.post(
`${this._url}/v1/table/${tableName}`,
{
vector,
k,
nprobes,
refineFactor,
columns,
filter
},
{
headers: {
'Content-Type': 'application/json',
'x-api-key': this._apiKey
},
responseType: 'arraybuffer',
timeout: 10000
}
).catch((err) => {
console.error('error: ', err)
return err.response
})
if (response.status !== 200) {
const errorData = new TextDecoder().decode(response.data)
throw new Error(`Server Error, status: ${response.status as number}, message: ${response.statusText as string}: ${errorData}`)
}
const table = tableFromIPC(response.data)
return table
}
}

163
node/src/remote/index.ts Normal file
View File

@@ -0,0 +1,163 @@
// Copyright 2023 LanceDB Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import {
type EmbeddingFunction, type Table, type VectorIndexParams, type Connection,
type ConnectionOptions
} from '../index'
import { Query } from '../query'
import { type Table as ArrowTable, Vector } from 'apache-arrow'
import { HttpLancedbClient } from './client'
/**
* Remote connection.
*/
export class RemoteConnection implements Connection {
private readonly _client: HttpLancedbClient
private readonly _dbName: string
constructor (opts: ConnectionOptions) {
if (!opts.uri.startsWith('db://')) {
throw new Error(`Invalid remote DB URI: ${opts.uri}`)
}
if (opts.apiKey === undefined || opts.region === undefined) {
throw new Error('API key and region are not supported for remote connections')
}
this._dbName = opts.uri.slice('db://'.length)
const server = `https://${this._dbName}.${opts.region}.api.lancedb.com`
this._client = new HttpLancedbClient(server, opts.apiKey)
}
get uri (): string {
// add the lancedb+ prefix back
return 'db://' + this._client.uri
}
async tableNames (): Promise<string[]> {
throw new Error('Not implemented')
}
async openTable (name: string): Promise<Table>
async openTable<T> (name: string, embeddings: EmbeddingFunction<T>): Promise<Table<T>>
async openTable<T> (name: string, embeddings?: EmbeddingFunction<T>): Promise<Table<T>> {
if (embeddings !== undefined) {
return new RemoteTable(this._client, name, embeddings)
} else {
return new RemoteTable(this._client, name)
}
}
async createTable (name: string, data: Array<Record<string, unknown>>): Promise<Table>
async createTable<T> (name: string, data: Array<Record<string, unknown>>, embeddings: EmbeddingFunction<T>): Promise<Table<T>>
async createTable<T> (name: string, data: Array<Record<string, unknown>>, embeddings?: EmbeddingFunction<T>): Promise<Table<T>> {
throw new Error('Not implemented')
}
async createTableArrow (name: string, table: ArrowTable): Promise<Table> {
throw new Error('Not implemented')
}
async dropTable (name: string): Promise<void> {
throw new Error('Not implemented')
}
}
export class RemoteQuery<T = number[]> extends Query<T> {
constructor (query: T, private readonly _client: HttpLancedbClient,
private readonly _name: string, embeddings?: EmbeddingFunction<T>) {
super(query, undefined, embeddings)
}
// TODO: refactor this to a base class + queryImpl pattern
async execute<T = Record<string, unknown>>(): Promise<T[]> {
// TODO: remove as any hack once we refactor
const embeddings = this._embeddings
const query = (this as any)._query
let queryVector: number[]
if (embeddings !== undefined) {
queryVector = (await embeddings.embed([query]))[0]
} else {
queryVector = query as number[]
}
const data = await this._client.search(
this._name,
queryVector,
(this as any)._limit,
(this as any)._nprobes,
(this as any)._refineFactor,
(this as any)._select,
(this as any)._filter
)
return data.toArray().map((entry: Record<string, unknown>) => {
const newObject: Record<string, unknown> = {}
Object.keys(entry).forEach((key: string) => {
if (entry[key] instanceof Vector) {
newObject[key] = (entry[key] as Vector).toArray()
} else {
newObject[key] = entry[key]
}
})
return newObject as unknown as T
})
}
}
// we are using extend until we have next next version release
// Table and Connection has both been refactored to interfaces
export class RemoteTable<T = number[]> implements Table<T> {
private readonly _client: HttpLancedbClient
private readonly _embeddings?: EmbeddingFunction<T>
private readonly _name: string
constructor (client: HttpLancedbClient, name: string)
constructor (client: HttpLancedbClient, name: string, embeddings: EmbeddingFunction<T>)
constructor (client: HttpLancedbClient, name: string, embeddings?: EmbeddingFunction<T>) {
this._client = client
this._name = name
this._embeddings = embeddings
}
get name (): string {
return this._name
}
search (query: T): Query<T> {
return new RemoteQuery(query, this._client, this._name)//, this._embeddings_new)
}
async add (data: Array<Record<string, unknown>>): Promise<number> {
throw new Error('Not implemented')
}
async overwrite (data: Array<Record<string, unknown>>): Promise<number> {
throw new Error('Not implemented')
}
async createIndex (indexParams: VectorIndexParams): Promise<any> {
throw new Error('Not implemented')
}
async countRows (): Promise<number> {
throw new Error('Not implemented')
}
async delete (filter: string): Promise<void> {
throw new Error('Not implemented')
}
}

View File

@@ -18,7 +18,8 @@ import * as chai from 'chai'
import * as chaiAsPromised from 'chai-as-promised'
import * as lancedb from '../index'
import { type AwsCredentials, type EmbeddingFunction, MetricType, Query, WriteMode } from '../index'
import { type AwsCredentials, type EmbeddingFunction, MetricType, WriteMode } from '../index'
import { Query } from '../query'
const expect = chai.expect
const assert = chai.assert
@@ -268,7 +269,7 @@ describe('LanceDB client', function () {
describe('Query object', function () {
it('sets custom parameters', async function () {
const query = new Query(undefined, [0.1, 0.3])
const query = new Query([0.1, 0.3])
.limit(1)
.metricType(MetricType.Cosine)
.refineFactor(100)