implement remote api calls for table mutation (#567)

Add more APIs to remote table for Node SDK
* `add` rows
* `overwrite` table with rows
* `create` table

This has been tested against dev stack
This commit is contained in:
Rob Meng
2023-10-16 11:07:58 -04:00
committed by GitHub
parent 043e388254
commit 345c136cfb
3 changed files with 122 additions and 9 deletions

View File

@@ -20,7 +20,7 @@ import {
Utf8,
type Vector,
FixedSizeList,
vectorFromArray, type Schema, Table as ArrowTable
vectorFromArray, type Schema, Table as ArrowTable, RecordBatchStreamWriter
} from 'apache-arrow'
import { type EmbeddingFunction } from './index'
@@ -77,7 +77,9 @@ function newVectorBuilder (dim: number): FixedSizeListBuilder<Float32> {
// Creates the Arrow Type for a Vector column with dimension `dim`
function newVectorType (dim: number): FixedSizeList<Float32> {
const children = new Field<Float32>('item', new Float32())
// Somewhere we always default to have the elements nullable, so we need to set it to true
// otherwise we often get schema mismatches because the stored data always has schema with nullable elements
const children = new Field<Float32>('item', new Float32(), true)
return new FixedSizeList(dim, children)
}
@@ -88,6 +90,13 @@ export async function fromRecordsToBuffer<T> (data: Array<Record<string, unknown
return Buffer.from(await writer.toUint8Array())
}
// Converts an Array of records into Arrow IPC stream format
export async function fromRecordsToStreamBuffer<T> (data: Array<Record<string, unknown>>, embeddings?: EmbeddingFunction<T>): Promise<Buffer> {
const table = await convertToTable(data, embeddings)
const writer = RecordBatchStreamWriter.writeAll(table)
return Buffer.from(await writer.toUint8Array())
}
// Converts an Arrow Table into Arrow IPC format
export async function fromTableToBuffer<T> (table: ArrowTable, embeddings?: EmbeddingFunction<T>): Promise<Buffer> {
if (embeddings !== undefined) {
@@ -105,6 +114,23 @@ export async function fromTableToBuffer<T> (table: ArrowTable, embeddings?: Embe
return Buffer.from(await writer.toUint8Array())
}
// Converts an Arrow Table into Arrow IPC stream format
export async function fromTableToStreamBuffer<T> (table: ArrowTable, embeddings?: EmbeddingFunction<T>): Promise<Buffer> {
if (embeddings !== undefined) {
const source = table.getChild(embeddings.sourceColumn)
if (source === null) {
throw new Error(`The embedding source column ${embeddings.sourceColumn} was not found in the Arrow Table`)
}
const vectors = await embeddings.embed(source.toArray() as T[])
const column = vectorFromArray(vectors, newVectorType(vectors[0].length))
table = table.assign(new ArrowTable({ vector: column }))
}
const writer = RecordBatchStreamWriter.writeAll(table)
return Buffer.from(await writer.toUint8Array())
}
// Creates an empty Arrow Table
export function createEmptyTable (schema: Schema): ArrowTable {
return new ArrowTable(schema)