From 2e3b34e79bd61ea886b16a4c69d39c99bb3161b6 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 7 Feb 2025 09:30:18 -0800 Subject: [PATCH] feat(node): support inserting and upserting subschemas (#2100) Fixes #2095 Closes #1832 --- docs/src/js/functions/makeArrowTable.md | 6 +- nodejs/__test__/arrow.test.ts | 87 ++++- nodejs/__test__/table.test.ts | 25 ++ nodejs/examples/full_text_search.test.ts | 2 +- nodejs/lancedb/arrow.ts | 435 +++++++++++++++++------ 5 files changed, 436 insertions(+), 119 deletions(-) diff --git a/docs/src/js/functions/makeArrowTable.md b/docs/src/js/functions/makeArrowTable.md index e8916612..83389550 100644 --- a/docs/src/js/functions/makeArrowTable.md +++ b/docs/src/js/functions/makeArrowTable.md @@ -22,8 +22,6 @@ when creating a table or adding data to it) This function converts an array of Record (row-major JS objects) to an Arrow Table (a columnar structure) -Note that it currently does not support nulls. - If a schema is provided then it will be used to determine the resulting array types. Fields will also be reordered to fit the order defined by the schema. @@ -31,6 +29,9 @@ If a schema is not provided then the types will be inferred and the field order will be controlled by the order of properties in the first record. If a type is inferred it will always be nullable. +If not all fields are found in the data, then a subset of the schema will be +returned. + If the input is empty then a schema must be provided to create an empty table. When a schema is not specified then data types will be inferred. The inference @@ -38,6 +39,7 @@ rules are as follows: - boolean => Bool - number => Float64 + - bigint => Int64 - String => Utf8 - Buffer => Binary - Record => Struct diff --git a/nodejs/__test__/arrow.test.ts b/nodejs/__test__/arrow.test.ts index 0f5b6e37..0cb23866 100644 --- a/nodejs/__test__/arrow.test.ts +++ b/nodejs/__test__/arrow.test.ts @@ -55,6 +55,7 @@ describe.each([arrow15, arrow16, arrow17, arrow18])( Float64, Struct, List, + Int16, Int32, Int64, Float, @@ -108,13 +109,16 @@ describe.each([arrow15, arrow16, arrow17, arrow18])( false, ), ]); - const table = (await tableCreationMethod( records, recordsReversed, schema, // biome-ignore lint/suspicious/noExplicitAny: )) as any; + + // We expect deterministic ordering of the fields + expect(table.schema.names).toEqual(schema.names); + schema.fields.forEach( ( // biome-ignore lint/suspicious/noExplicitAny: @@ -141,13 +145,13 @@ describe.each([arrow15, arrow16, arrow17, arrow18])( describe("The function makeArrowTable", function () { it("will use data types from a provided schema instead of inference", async function () { const schema = new Schema([ - new Field("a", new Int32()), - new Field("b", new Float32()), + new Field("a", new Int32(), false), + new Field("b", new Float32(), true), new Field( "c", new FixedSizeList(3, new Field("item", new Float16())), ), - new Field("d", new Int64()), + new Field("d", new Int64(), true), ]); const table = makeArrowTable( [ @@ -165,12 +169,15 @@ describe.each([arrow15, arrow16, arrow17, arrow18])( expect(actual.numRows).toBe(3); const actualSchema = actual.schema; expect(actualSchema).toEqual(schema); + expect(table.getChild("a")?.toJSON()).toEqual([1, 4, 7]); + expect(table.getChild("b")?.toJSON()).toEqual([2, 5, 8]); + expect(table.getChild("d")?.toJSON()).toEqual([9n, 10n, null]); }); it("will assume the column `vector` is FixedSizeList by default", async function () { const schema = new Schema([ new Field("a", new Float(Precision.DOUBLE), true), - new Field("b", new Float(Precision.DOUBLE), true), + new Field("b", new Int64(), true), new Field( "vector", new FixedSizeList( @@ -181,9 +188,9 @@ describe.each([arrow15, arrow16, arrow17, arrow18])( ), ]); const table = makeArrowTable([ - { a: 1, b: 2, vector: [1, 2, 3] }, - { a: 4, b: 5, vector: [4, 5, 6] }, - { a: 7, b: 8, vector: [7, 8, 9] }, + { a: 1, b: 2n, vector: [1, 2, 3] }, + { a: 4, b: 5n, vector: [4, 5, 6] }, + { a: 7, b: 8n, vector: [7, 8, 9] }, ]); const buf = await fromTableToBuffer(table); @@ -193,6 +200,19 @@ describe.each([arrow15, arrow16, arrow17, arrow18])( expect(actual.numRows).toBe(3); const actualSchema = actual.schema; expect(actualSchema).toEqual(schema); + + expect(table.getChild("a")?.toJSON()).toEqual([1, 4, 7]); + expect(table.getChild("b")?.toJSON()).toEqual([2n, 5n, 8n]); + expect( + table + .getChild("vector") + ?.toJSON() + .map((v) => v.toJSON()), + ).toEqual([ + [1, 2, 3], + [4, 5, 6], + [7, 8, 9], + ]); }); it("can support multiple vector columns", async function () { @@ -206,7 +226,7 @@ describe.each([arrow15, arrow16, arrow17, arrow18])( ), new Field( "vec2", - new FixedSizeList(3, new Field("item", new Float16(), true)), + new FixedSizeList(3, new Field("item", new Float64(), true)), true, ), ]); @@ -219,7 +239,7 @@ describe.each([arrow15, arrow16, arrow17, arrow18])( { vectorColumns: { vec1: { type: new Float16() }, - vec2: { type: new Float16() }, + vec2: { type: new Float64() }, }, }, ); @@ -307,6 +327,53 @@ describe.each([arrow15, arrow16, arrow17, arrow18])( false, ); }); + + it("will allow subsets of columns if nullable", async function () { + const schema = new Schema([ + new Field("a", new Int64(), true), + new Field( + "s", + new Struct([ + new Field("x", new Int32(), true), + new Field("y", new Int32(), true), + ]), + true, + ), + new Field("d", new Int16(), true), + ]); + + const table = makeArrowTable([{ a: 1n }], { schema }); + expect(table.numCols).toBe(1); + expect(table.numRows).toBe(1); + + const table2 = makeArrowTable([{ a: 1n, d: 2 }], { schema }); + expect(table2.numCols).toBe(2); + + const table3 = makeArrowTable([{ s: { y: 3 } }], { schema }); + expect(table3.numCols).toBe(1); + const expectedSchema = new Schema([ + new Field("s", new Struct([new Field("y", new Int32(), true)]), true), + ]); + expect(table3.schema).toEqual(expectedSchema); + }); + + it("will work even if columns are sparsely provided", async function () { + const sparseRecords = [{ a: 1n }, { b: 2n }, { c: 3n }, { d: 4n }]; + const table = makeArrowTable(sparseRecords); + expect(table.numCols).toBe(4); + expect(table.numRows).toBe(4); + + const schema = new Schema([ + new Field("a", new Int64(), true), + new Field("b", new Int32(), true), + new Field("c", new Int64(), true), + new Field("d", new Int16(), true), + ]); + const table2 = makeArrowTable(sparseRecords, { schema }); + expect(table2.numCols).toBe(4); + expect(table2.numRows).toBe(4); + expect(table2.schema).toEqual(schema); + }); }); class DummyEmbedding extends EmbeddingFunction { diff --git a/nodejs/__test__/table.test.ts b/nodejs/__test__/table.test.ts index c366dbc7..50740309 100644 --- a/nodejs/__test__/table.test.ts +++ b/nodejs/__test__/table.test.ts @@ -253,6 +253,31 @@ describe.each([arrow15, arrow16, arrow17, arrow18])( const arrowTbl = await table.toArrow(); expect(arrowTbl).toBeInstanceOf(ArrowTable); }); + + it("should be able to handle missing fields", async () => { + const schema = new arrow.Schema([ + new arrow.Field("id", new arrow.Int32(), true), + new arrow.Field("y", new arrow.Int32(), true), + new arrow.Field("z", new arrow.Int64(), true), + ]); + const db = await connect(tmpDir.name); + const table = await db.createEmptyTable("testNull", schema); + await table.add([{ id: 1, y: 2 }]); + await table.add([{ id: 2 }]); + + await table + .mergeInsert("id") + .whenNotMatchedInsertAll() + .execute([ + { id: 3, z: 3 }, + { id: 4, z: 5 }, + ]); + + const res = await table.query().toArrow(); + expect(res.getChild("id")?.toJSON()).toEqual([1, 2, 3, 4]); + expect(res.getChild("y")?.toJSON()).toEqual([2, null, null, null]); + expect(res.getChild("z")?.toJSON()).toEqual([null, null, 3n, 5n]); + }); }, ); diff --git a/nodejs/examples/full_text_search.test.ts b/nodejs/examples/full_text_search.test.ts index 44587f22..9777bda7 100644 --- a/nodejs/examples/full_text_search.test.ts +++ b/nodejs/examples/full_text_search.test.ts @@ -42,4 +42,4 @@ test("full text search", async () => { expect(result.length).toBe(10); // --8<-- [end:full_text_search] }); -}); +}, 10_000); diff --git a/nodejs/lancedb/arrow.ts b/nodejs/lancedb/arrow.ts index cedd08ce..8daca04c 100644 --- a/nodejs/lancedb/arrow.ts +++ b/nodejs/lancedb/arrow.ts @@ -2,31 +2,37 @@ // SPDX-FileCopyrightText: Copyright The LanceDB Authors import { + Data as ArrowData, Table as ArrowTable, Binary, + Bool, BufferType, DataType, + Dictionary, Field, FixedSizeBinary, FixedSizeList, Float, Float32, + Float64, Int, + Int32, + Int64, LargeBinary, List, Null, RecordBatch, RecordBatchFileReader, RecordBatchFileWriter, - RecordBatchReader, RecordBatchStreamWriter, Schema, Struct, Utf8, Vector, + makeVector as arrowMakeVector, makeBuilder, makeData, - type makeTable, + makeTable, vectorFromArray, } from "apache-arrow"; import { Buffers } from "apache-arrow/data"; @@ -236,8 +242,6 @@ export class MakeArrowTableOptions { * This function converts an array of Record (row-major JS objects) * to an Arrow Table (a columnar structure) * - * Note that it currently does not support nulls. - * * If a schema is provided then it will be used to determine the resulting array * types. Fields will also be reordered to fit the order defined by the schema. * @@ -245,6 +249,9 @@ export class MakeArrowTableOptions { * will be controlled by the order of properties in the first record. If a type * is inferred it will always be nullable. * + * If not all fields are found in the data, then a subset of the schema will be + * returned. + * * If the input is empty then a schema must be provided to create an empty table. * * When a schema is not specified then data types will be inferred. The inference @@ -252,6 +259,7 @@ export class MakeArrowTableOptions { * * - boolean => Bool * - number => Float64 + * - bigint => Int64 * - String => Utf8 * - Buffer => Binary * - Record => Struct @@ -322,126 +330,316 @@ export function makeArrowTable( options?: Partial, metadata?: Map, ): ArrowTable { + const opt = new MakeArrowTableOptions(options !== undefined ? options : {}); + let schema: Schema | undefined = undefined; + if (opt.schema !== undefined && opt.schema !== null) { + schema = sanitizeSchema(opt.schema); + schema = validateSchemaEmbeddings( + schema as Schema, + data, + options?.embeddingFunction, + ); + } + + let schemaMetadata = schema?.metadata || new Map(); + if (metadata !== undefined) { + schemaMetadata = new Map([...schemaMetadata, ...metadata]); + } + if ( data.length === 0 && (options?.schema === undefined || options?.schema === null) ) { throw new Error("At least one record or a schema needs to be provided"); - } - - const opt = new MakeArrowTableOptions(options !== undefined ? options : {}); - if (opt.schema !== undefined && opt.schema !== null) { - opt.schema = sanitizeSchema(opt.schema); - opt.schema = validateSchemaEmbeddings( - opt.schema as Schema, - data, - options?.embeddingFunction, - ); - } - const columns: Record = {}; - // TODO: sample dataset to find missing columns - // Prefer the field ordering of the schema, if present - const columnNames = - opt.schema != null ? (opt.schema.names as string[]) : Object.keys(data[0]); - for (const colName of columnNames) { - if ( - data.length !== 0 && - !Object.prototype.hasOwnProperty.call(data[0], colName) - ) { - // The field is present in the schema, but not in the data, skip it - continue; - } - // Extract a single column from the records (transpose from row-major to col-major) - let values = data.map((datum) => datum[colName]); - - // By default (type === undefined) arrow will infer the type from the JS type - let type; - if (opt.schema !== undefined) { - // If there is a schema provided, then use that for the type instead - type = opt.schema?.fields.filter((f) => f.name === colName)[0]?.type; - if (DataType.isInt(type) && type.bitWidth === 64) { - // wrap in BigInt to avoid bug: https://github.com/apache/arrow/issues/40051 - values = values.map((v) => { - if (v === null) { - return v; - } - if (typeof v === "bigint") { - return v; - } - if (typeof v === "number") { - return BigInt(v); - } - throw new Error( - `Expected BigInt or number for column ${colName}, got ${typeof v}`, - ); - }); - } + } else if (data.length === 0) { + if (schema === undefined) { + throw new Error("A schema must be provided if data is empty"); } else { - // Otherwise, check to see if this column is one of the vector columns - // defined by opt.vectorColumns and, if so, use the fixed size list type - const vectorColumnOptions = opt.vectorColumns[colName]; - if (vectorColumnOptions !== undefined) { - const firstNonNullValue = values.find((v) => v !== null); - if (Array.isArray(firstNonNullValue)) { - type = newVectorType( - firstNonNullValue.length, - vectorColumnOptions.type, - ); + schema = new Schema(schema.fields, schemaMetadata); + return new ArrowTable(schema); + } + } + + let inferredSchema = inferSchema(data, schema, opt); + inferredSchema = new Schema(inferredSchema.fields, schemaMetadata); + + const finalColumns: Record = {}; + for (const field of inferredSchema.fields) { + finalColumns[field.name] = transposeData(data, field); + } + + return new ArrowTable(inferredSchema, finalColumns); +} + +function inferSchema( + data: Array>, + schema: Schema | undefined, + opts: MakeArrowTableOptions, +): Schema { + // We will collect all fields we see in the data. + const pathTree = new PathTree(); + + for (const [rowI, row] of data.entries()) { + for (const [path, value] of rowPathsAndValues(row)) { + if (!pathTree.has(path)) { + // First time seeing this field. + if (schema !== undefined) { + const field = getFieldForPath(schema, path); + if (field === undefined) { + throw new Error( + `Found field not in schema: ${path.join(".")} at row ${rowI}`, + ); + } else { + pathTree.set(path, field.type); + } } else { - throw new Error( - `Column ${colName} is expected to be a vector column but first non-null value is not an array. Could not determine size of vector column`, - ); + const inferredType = inferType(value, path, opts); + if (inferredType === undefined) { + throw new Error(`Failed to infer data type for field ${path.join(".")} at row ${rowI}. \ + Consider providing an explicit schema.`); + } + pathTree.set(path, inferredType); + } + } else if (schema === undefined) { + const currentType = pathTree.get(path); + const newType = inferType(value, path, opts); + if (currentType !== newType) { + new Error(`Failed to infer schema for data. Previously inferred type \ + ${currentType} but found ${newType} at row ${rowI}. Consider \ + providing an explicit schema.`); } } } - - try { - // Convert an Array of JS values to an arrow vector - columns[colName] = makeVector(values, type, opt.dictionaryEncodeStrings); - } catch (error: unknown) { - // eslint-disable-next-line @typescript-eslint/restrict-template-expressions - throw Error(`Could not convert column "${colName}" to Arrow: ${error}`); - } } - if (opt.schema != null) { - // `new ArrowTable(columns)` infers a schema which may sometimes have - // incorrect nullability (it assumes nullable=true always) - // - // `new ArrowTable(schema, columns)` will also fail because it will create a - // batch with an inferred schema and then complain that the batch schema - // does not match the provided schema. - // - // To work around this we first create a table with the wrong schema and - // then patch the schema of the batches so we can use - // `new ArrowTable(schema, batches)` which does not do any schema inference - const firstTable = new ArrowTable(columns); - const batchesFixed = firstTable.batches.map( - (batch) => new RecordBatch(opt.schema as Schema, batch.data), - ); - let schema: Schema; - if (metadata !== undefined) { - let schemaMetadata = opt.schema.metadata; - if (schemaMetadata.size === 0) { - schemaMetadata = metadata; - } else { - for (const [key, entry] of schemaMetadata.entries()) { - schemaMetadata.set(key, entry); + if (schema === undefined) { + function fieldsFromPathTree(pathTree: PathTree): Field[] { + const fields = []; + for (const [name, value] of pathTree.map.entries()) { + if (value instanceof PathTree) { + const children = fieldsFromPathTree(value); + fields.push(new Field(name, new Struct(children), true)); + } else { + fields.push(new Field(name, value, true)); } } + return fields; + } + const fields = fieldsFromPathTree(pathTree); + return new Schema(fields); + } else { + function takeMatchingFields( + fields: Field[], + pathTree: PathTree, + ): Field[] { + const outFields = []; + for (const field of fields) { + if (pathTree.map.has(field.name)) { + const value = pathTree.get([field.name]); + if (value instanceof PathTree) { + const struct = field.type as Struct; + const children = takeMatchingFields(struct.children, value); + outFields.push( + new Field(field.name, new Struct(children), field.nullable), + ); + } else { + outFields.push( + new Field(field.name, value as DataType, field.nullable), + ); + } + } + } + return outFields; + } + const fields = takeMatchingFields(schema.fields, pathTree); + return new Schema(fields); + } +} - schema = new Schema(opt.schema.fields as Field[], schemaMetadata); +function* rowPathsAndValues( + row: Record, + basePath: string[] = [], +): Generator<[string[], unknown]> { + for (const [key, value] of Object.entries(row)) { + if (isObject(value)) { + yield* rowPathsAndValues(value, [...basePath, key]); } else { - schema = opt.schema as Schema; + yield [[...basePath, key], value]; } - return new ArrowTable(schema, batchesFixed); } - const tbl = new ArrowTable(columns); - if (metadata !== undefined) { - // biome-ignore lint/suspicious/noExplicitAny: - (tbl.schema).metadata = metadata; +} + +function isObject(value: unknown): value is Record { + return ( + typeof value === "object" && + value !== null && + !Array.isArray(value) && + !(value instanceof RegExp) && + !(value instanceof Date) && + !(value instanceof Set) && + !(value instanceof Map) && + !(value instanceof Buffer) + ); +} + +function getFieldForPath(schema: Schema, path: string[]): Field | undefined { + let current: Field | Schema = schema; + for (const key of path) { + if (current instanceof Schema) { + const field: Field | undefined = current.fields.find( + (f) => f.name === key, + ); + if (field === undefined) { + return undefined; + } + current = field; + } else if (current instanceof Field && DataType.isStruct(current.type)) { + const struct: Struct = current.type; + const field = struct.children.find((f) => f.name === key); + if (field === undefined) { + return undefined; + } + current = field; + } else { + return undefined; + } + } + if (current instanceof Field) { + return current; + } else { + return undefined; + } +} + +/** + * Try to infer which Arrow type to use for a given value. + * + * May return undefined if the type cannot be inferred. + */ +function inferType( + value: unknown, + path: string[], + opts: MakeArrowTableOptions, +): DataType | undefined { + if (typeof value === "bigint") { + return new Int64(); + } else if (typeof value === "number") { + // Even if it's an integer, it's safer to assume Float64. Users can + // always provide an explicit schema or use BigInt if they mean integer. + return new Float64(); + } else if (typeof value === "string") { + if (opts.dictionaryEncodeStrings) { + return new Dictionary(new Utf8(), new Int32()); + } else { + return new Utf8(); + } + } else if (typeof value === "boolean") { + return new Bool(); + } else if (value instanceof Buffer) { + return new Binary(); + } else if (Array.isArray(value)) { + if (value.length === 0) { + return undefined; // Without any values we can't infer the type + } + if (path.length === 1 && Object.hasOwn(opts.vectorColumns, path[0])) { + const floatType = sanitizeType(opts.vectorColumns[path[0]].type); + return new FixedSizeList( + value.length, + new Field("item", floatType, true), + ); + } + const valueType = inferType(value[0], path, opts); + if (valueType === undefined) { + return undefined; + } + // Try to automatically detect embedding columns. + if (valueType instanceof Float && path[path.length - 1] === "vector") { + // We default to Float32 for vectors. + const child = new Field("item", new Float32(), true); + return new FixedSizeList(value.length, child); + } else { + const child = new Field("item", valueType, true); + return new List(child); + } + } else { + // TODO: timestamp + return undefined; + } +} + +class PathTree { + map: Map>; + + constructor(entries?: [string[], V][]) { + this.map = new Map(); + if (entries !== undefined) { + for (const [path, value] of entries) { + this.set(path, value); + } + } + } + has(path: string[]): boolean { + let ref: PathTree = this; + for (const part of path) { + if (!(ref instanceof PathTree) || !ref.map.has(part)) { + return false; + } + ref = ref.map.get(part) as PathTree; + } + return true; + } + get(path: string[]): V | undefined { + let ref: PathTree = this; + for (const part of path) { + if (!(ref instanceof PathTree) || !ref.map.has(part)) { + return undefined; + } + ref = ref.map.get(part) as PathTree; + } + return ref as V; + } + set(path: string[], value: V): void { + let ref: PathTree = this; + for (const part of path.slice(0, path.length - 1)) { + if (!ref.map.has(part)) { + ref.map.set(part, new PathTree()); + } + ref = ref.map.get(part) as PathTree; + } + ref.map.set(path[path.length - 1], value); + } +} + +function transposeData( + data: Record[], + field: Field, + path: string[] = [], +): Vector { + if (field.type instanceof Struct) { + const childFields = field.type.children; + const childVectors = childFields.map((child) => { + return transposeData(data, child, [...path, child.name]); + }); + const structData = makeData({ + type: field.type, + children: childVectors as unknown as ArrowData[], + }); + return arrowMakeVector(structData); + } else { + const valuesPath = [...path, field.name]; + const values = data.map((datum) => { + let current: unknown = datum; + for (const key of valuesPath) { + if (isObject(current) && Object.hasOwn(current, key)) { + current = current[key]; + } else { + return null; + } + } + return current; + }); + return makeVector(values, field.type); } - return tbl; } /** @@ -491,6 +689,31 @@ function makeVector( ): Vector { if (type !== undefined) { // No need for inference, let Arrow create it + if (type instanceof Int) { + if (DataType.isInt(type) && type.bitWidth === 64) { + // wrap in BigInt to avoid bug: https://github.com/apache/arrow/issues/40051 + values = values.map((v) => { + if (v === null) { + return v; + } else if (typeof v === "bigint") { + return v; + } else if (typeof v === "number") { + return BigInt(v); + } else { + return v; + } + }); + } else { + // Similarly, bigint isn't supported for 16 or 32-bit ints. + values = values.map((v) => { + if (typeof v == "bigint") { + return Number(v); + } else { + return v; + } + }); + } + } return vectorFromArray(values, type); } if (values.length === 0) { @@ -902,7 +1125,7 @@ function validateSchemaEmbeddings( schema: Schema, data: Array>, embeddings: EmbeddingFunctionConfig | undefined, -) { +): Schema { const fields = []; const missingEmbeddingFields = [];