feat(node): support table.schema for LocalTable (#789)

Close #773 

we pass an empty table over IPC so we don't need to manually deal with
serde. Then we just return the schema attribute from the empty table.

---------

Co-authored-by: albertlockett <albert.lockett@gmail.com>
This commit is contained in:
Chang She
2024-01-08 21:12:48 -08:00
committed by Weston Pace
parent 073a2a1b28
commit a758876a65
7 changed files with 98 additions and 25 deletions

View File

@@ -14,7 +14,8 @@
import {
type Schema,
Table as ArrowTable
Table as ArrowTable,
tableFromIPC
} from 'apache-arrow'
import { createEmptyTable, fromRecordsToBuffer, fromTableToBuffer } from './arrow'
import type { EmbeddingFunction } from './embedding/embedding_function'
@@ -24,7 +25,7 @@ import { isEmbeddingFunction } from './embedding/embedding_function'
import { type Literal, toSQL } from './util'
// eslint-disable-next-line @typescript-eslint/no-var-requires
const { databaseNew, databaseTableNames, databaseOpenTable, databaseDropTable, tableCreate, tableAdd, tableCreateScalarIndex, tableCreateVectorIndex, tableCountRows, tableDelete, tableUpdate, tableCleanupOldVersions, tableCompactFiles, tableListIndices, tableIndexStats } = require('../native.js')
const { databaseNew, databaseTableNames, databaseOpenTable, databaseDropTable, tableCreate, tableAdd, tableCreateScalarIndex, tableCreateVectorIndex, tableCountRows, tableDelete, tableUpdate, tableCleanupOldVersions, tableCompactFiles, tableListIndices, tableIndexStats, tableSchema } = require('../native.js')
export { Query }
export type { EmbeddingFunction }
@@ -354,6 +355,8 @@ export interface Table<T = number[]> {
* Get statistics about an index.
*/
indexStats: (indexUuid: string) => Promise<IndexStats>
schema: Promise<Schema>
}
export interface UpdateArgs {
@@ -508,6 +511,7 @@ export class LocalConnection implements Connection {
export class LocalTable<T = number[]> implements Table<T> {
private _tbl: any
private readonly _name: string
private readonly _isElectron: boolean
private readonly _embeddings?: EmbeddingFunction<T>
private readonly _options: () => ConnectionOptions
@@ -524,6 +528,7 @@ export class LocalTable<T = number[]> implements Table<T> {
this._name = name
this._embeddings = embeddings
this._options = () => options
this._isElectron = this.checkElectron()
}
get name (): string {
@@ -682,6 +687,27 @@ export class LocalTable<T = number[]> implements Table<T> {
async indexStats (indexUuid: string): Promise<IndexStats> {
return tableIndexStats.call(this._tbl, indexUuid)
}
get schema (): Promise<Schema> {
// empty table
return this.getSchema()
}
private async getSchema (): Promise<Schema> {
const buffer = await tableSchema.call(this._tbl, this._isElectron)
const table = tableFromIPC(buffer)
return table.schema
}
// See https://github.com/electron/electron/issues/2288
private checkElectron (): boolean {
try {
// eslint-disable-next-line no-prototype-builtins
return (process?.versions?.hasOwnProperty('electron') || navigator?.userAgent?.toLowerCase()?.includes(' electron'))
} catch (e) {
return false
}
}
}
export interface CleanupStats {

View File

@@ -498,6 +498,27 @@ describe('LanceDB client', function () {
assert.equal(results.length, 2)
})
})
describe('when inspecting the schema', function () {
it('should return the schema', async function () {
const uri = await createTestDB()
const db = await lancedb.connect(uri)
// the fsl inner field must be named 'item' and be nullable
const expectedSchema = new Schema(
[
new Field('id', new Int32()),
new Field('vector', new FixedSizeList(128, new Field('item', new Float32(), true))),
new Field('s', new Utf8())
]
)
const table = await db.createTable({
name: 'some_table',
schema: expectedSchema
})
const schema = await table.schema
assert.deepEqual(expectedSchema, schema)
})
})
})
describe('Remote LanceDB client', function () {

View File

@@ -36,7 +36,7 @@ fn validate_vector_column(record_batch: &RecordBatch) -> Result<()> {
pub(crate) fn arrow_buffer_to_record_batch(slice: &[u8]) -> Result<(Vec<RecordBatch>, SchemaRef)> {
let mut batches: Vec<RecordBatch> = Vec::new();
let file_reader = FileReader::try_new(Cursor::new(slice), None)?;
let schema = file_reader.schema().clone();
let schema = file_reader.schema();
for b in file_reader {
let record_batch = b?;
validate_vector_column(&record_batch)?;

View File

@@ -13,6 +13,9 @@
// limitations under the License.
use neon::prelude::*;
use neon::types::buffer::TypedArray;
use crate::error::ResultExt;
pub(crate) fn vec_str_to_array<'a, C: Context<'a>>(
vec: &Vec<String>,
@@ -34,3 +37,20 @@ pub(crate) fn js_array_to_vec(array: &JsArray, cx: &mut FunctionContext) -> Vec<
}
query_vec
}
// Creates a new JsBuffer from a rust buffer with a special logic for electron
pub(crate) fn new_js_buffer<'a>(
buffer: Vec<u8>,
cx: &mut TaskContext<'a>,
is_electron: bool,
) -> NeonResult<Handle<'a, JsBuffer>> {
if is_electron {
// Electron does not support `external`: https://github.com/neon-bindings/neon/pull/937
let mut js_buffer = JsBuffer::new(cx, buffer.len()).or_throw(cx)?;
let buffer_data = js_buffer.as_mut_slice(cx);
buffer_data.copy_from_slice(buffer.as_slice());
Ok(js_buffer)
} else {
Ok(JsBuffer::external(cx, buffer))
}
}

View File

@@ -250,5 +250,6 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> {
"tableCreateVectorIndex",
index::vector::table_create_vector_index,
)?;
cx.export_function("tableSchema", JsTable::js_schema)?;
Ok(())
}

View File

@@ -7,7 +7,6 @@ use lance_linalg::distance::MetricType;
use neon::context::FunctionContext;
use neon::handle::Handle;
use neon::prelude::*;
use neon::types::buffer::TypedArray;
use crate::arrow::record_batch_to_buffer;
use crate::error::ResultExt;
@@ -96,26 +95,9 @@ impl JsQuery {
deferred.settle_with(&channel, move |mut cx| {
let results = results.or_throw(&mut cx)?;
let buffer = record_batch_to_buffer(results).or_throw(&mut cx)?;
Self::new_js_buffer(buffer, &mut cx, is_electron)
convert::new_js_buffer(buffer, &mut cx, is_electron)
});
});
Ok(promise)
}
// Creates a new JsBuffer from a rust buffer with a special logic for electron
fn new_js_buffer<'a>(
buffer: Vec<u8>,
cx: &mut TaskContext<'a>,
is_electron: bool,
) -> NeonResult<Handle<'a, JsBuffer>> {
if is_electron {
// Electron does not support `external`: https://github.com/neon-bindings/neon/pull/937
let mut js_buffer = JsBuffer::new(cx, buffer.len()).or_throw(cx)?;
let buffer_data = js_buffer.as_mut_slice(cx);
buffer_data.copy_from_slice(buffer.as_slice());
Ok(js_buffer)
} else {
Ok(JsBuffer::external(cx, buffer))
}
}
}

View File

@@ -12,18 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use arrow_array::RecordBatchIterator;
use arrow_array::{RecordBatch, RecordBatchIterator};
use lance::dataset::optimize::CompactionOptions;
use lance::dataset::{WriteMode, WriteParams};
use lance::io::object_store::ObjectStoreParams;
use crate::arrow::arrow_buffer_to_record_batch;
use crate::arrow::{arrow_buffer_to_record_batch, record_batch_to_buffer};
use neon::prelude::*;
use neon::types::buffer::TypedArray;
use vectordb::Table;
use crate::error::ResultExt;
use crate::{get_aws_creds, get_aws_region, runtime, JsDatabase};
use crate::{convert, get_aws_creds, get_aws_region, runtime, JsDatabase};
pub(crate) struct JsTable {
pub table: Table,
@@ -426,4 +426,27 @@ impl JsTable {
Ok(promise)
}
pub(crate) fn js_schema(mut cx: FunctionContext) -> JsResult<JsPromise> {
let js_table = cx.this().downcast_or_throw::<JsBox<JsTable>, _>(&mut cx)?;
let rt = runtime(&mut cx)?;
let (deferred, promise) = cx.promise();
let channel = cx.channel();
let table = js_table.table.clone();
let is_electron = cx
.argument::<JsBoolean>(0)
.or_throw(&mut cx)?
.value(&mut cx);
rt.spawn(async move {
deferred.settle_with(&channel, move |mut cx| {
let schema = table.schema();
let batches = vec![RecordBatch::new_empty(schema)];
let buffer = record_batch_to_buffer(batches).or_throw(&mut cx)?;
convert::new_js_buffer(buffer, &mut cx, is_electron)
})
});
Ok(promise)
}
}