feat(node): support inserting and upserting subschemas (#2100)

Fixes #2095
Closes #1832
This commit is contained in:
Will Jones
2025-02-07 09:30:18 -08:00
committed by GitHub
parent e7574698eb
commit 2e3b34e79b
5 changed files with 436 additions and 119 deletions

View File

@@ -22,8 +22,6 @@ when creating a table or adding data to it)
This function converts an array of Record<String, any> (row-major JS objects)
to an Arrow Table (a columnar structure)
Note that it currently does not support nulls.
If a schema is provided then it will be used to determine the resulting array
types. Fields will also be reordered to fit the order defined by the schema.
@@ -31,6 +29,9 @@ If a schema is not provided then the types will be inferred and the field order
will be controlled by the order of properties in the first record. If a type
is inferred it will always be nullable.
If not all fields are found in the data, then a subset of the schema will be
returned.
If the input is empty then a schema must be provided to create an empty table.
When a schema is not specified then data types will be inferred. The inference
@@ -38,6 +39,7 @@ rules are as follows:
- boolean => Bool
- number => Float64
- bigint => Int64
- String => Utf8
- Buffer => Binary
- Record<String, any> => Struct

View File

@@ -55,6 +55,7 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
Float64,
Struct,
List,
Int16,
Int32,
Int64,
Float,
@@ -108,13 +109,16 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
false,
),
]);
const table = (await tableCreationMethod(
records,
recordsReversed,
schema,
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
)) as any;
// We expect deterministic ordering of the fields
expect(table.schema.names).toEqual(schema.names);
schema.fields.forEach(
(
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
@@ -141,13 +145,13 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
describe("The function makeArrowTable", function () {
it("will use data types from a provided schema instead of inference", async function () {
const schema = new Schema([
new Field("a", new Int32()),
new Field("b", new Float32()),
new Field("a", new Int32(), false),
new Field("b", new Float32(), true),
new Field(
"c",
new FixedSizeList(3, new Field("item", new Float16())),
),
new Field("d", new Int64()),
new Field("d", new Int64(), true),
]);
const table = makeArrowTable(
[
@@ -165,12 +169,15 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
expect(actual.numRows).toBe(3);
const actualSchema = actual.schema;
expect(actualSchema).toEqual(schema);
expect(table.getChild("a")?.toJSON()).toEqual([1, 4, 7]);
expect(table.getChild("b")?.toJSON()).toEqual([2, 5, 8]);
expect(table.getChild("d")?.toJSON()).toEqual([9n, 10n, null]);
});
it("will assume the column `vector` is FixedSizeList<Float32> by default", async function () {
const schema = new Schema([
new Field("a", new Float(Precision.DOUBLE), true),
new Field("b", new Float(Precision.DOUBLE), true),
new Field("b", new Int64(), true),
new Field(
"vector",
new FixedSizeList(
@@ -181,9 +188,9 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
),
]);
const table = makeArrowTable([
{ a: 1, b: 2, vector: [1, 2, 3] },
{ a: 4, b: 5, vector: [4, 5, 6] },
{ a: 7, b: 8, vector: [7, 8, 9] },
{ a: 1, b: 2n, vector: [1, 2, 3] },
{ a: 4, b: 5n, vector: [4, 5, 6] },
{ a: 7, b: 8n, vector: [7, 8, 9] },
]);
const buf = await fromTableToBuffer(table);
@@ -193,6 +200,19 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
expect(actual.numRows).toBe(3);
const actualSchema = actual.schema;
expect(actualSchema).toEqual(schema);
expect(table.getChild("a")?.toJSON()).toEqual([1, 4, 7]);
expect(table.getChild("b")?.toJSON()).toEqual([2n, 5n, 8n]);
expect(
table
.getChild("vector")
?.toJSON()
.map((v) => v.toJSON()),
).toEqual([
[1, 2, 3],
[4, 5, 6],
[7, 8, 9],
]);
});
it("can support multiple vector columns", async function () {
@@ -206,7 +226,7 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
),
new Field(
"vec2",
new FixedSizeList(3, new Field("item", new Float16(), true)),
new FixedSizeList(3, new Field("item", new Float64(), true)),
true,
),
]);
@@ -219,7 +239,7 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
{
vectorColumns: {
vec1: { type: new Float16() },
vec2: { type: new Float16() },
vec2: { type: new Float64() },
},
},
);
@@ -307,6 +327,53 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
false,
);
});
it("will allow subsets of columns if nullable", async function () {
const schema = new Schema([
new Field("a", new Int64(), true),
new Field(
"s",
new Struct([
new Field("x", new Int32(), true),
new Field("y", new Int32(), true),
]),
true,
),
new Field("d", new Int16(), true),
]);
const table = makeArrowTable([{ a: 1n }], { schema });
expect(table.numCols).toBe(1);
expect(table.numRows).toBe(1);
const table2 = makeArrowTable([{ a: 1n, d: 2 }], { schema });
expect(table2.numCols).toBe(2);
const table3 = makeArrowTable([{ s: { y: 3 } }], { schema });
expect(table3.numCols).toBe(1);
const expectedSchema = new Schema([
new Field("s", new Struct([new Field("y", new Int32(), true)]), true),
]);
expect(table3.schema).toEqual(expectedSchema);
});
it("will work even if columns are sparsely provided", async function () {
const sparseRecords = [{ a: 1n }, { b: 2n }, { c: 3n }, { d: 4n }];
const table = makeArrowTable(sparseRecords);
expect(table.numCols).toBe(4);
expect(table.numRows).toBe(4);
const schema = new Schema([
new Field("a", new Int64(), true),
new Field("b", new Int32(), true),
new Field("c", new Int64(), true),
new Field("d", new Int16(), true),
]);
const table2 = makeArrowTable(sparseRecords, { schema });
expect(table2.numCols).toBe(4);
expect(table2.numRows).toBe(4);
expect(table2.schema).toEqual(schema);
});
});
class DummyEmbedding extends EmbeddingFunction<string> {

View File

@@ -253,6 +253,31 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
const arrowTbl = await table.toArrow();
expect(arrowTbl).toBeInstanceOf(ArrowTable);
});
it("should be able to handle missing fields", async () => {
const schema = new arrow.Schema([
new arrow.Field("id", new arrow.Int32(), true),
new arrow.Field("y", new arrow.Int32(), true),
new arrow.Field("z", new arrow.Int64(), true),
]);
const db = await connect(tmpDir.name);
const table = await db.createEmptyTable("testNull", schema);
await table.add([{ id: 1, y: 2 }]);
await table.add([{ id: 2 }]);
await table
.mergeInsert("id")
.whenNotMatchedInsertAll()
.execute([
{ id: 3, z: 3 },
{ id: 4, z: 5 },
]);
const res = await table.query().toArrow();
expect(res.getChild("id")?.toJSON()).toEqual([1, 2, 3, 4]);
expect(res.getChild("y")?.toJSON()).toEqual([2, null, null, null]);
expect(res.getChild("z")?.toJSON()).toEqual([null, null, 3n, 5n]);
});
},
);

View File

@@ -42,4 +42,4 @@ test("full text search", async () => {
expect(result.length).toBe(10);
// --8<-- [end:full_text_search]
});
});
}, 10_000);

View File

@@ -2,31 +2,37 @@
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
import {
Data as ArrowData,
Table as ArrowTable,
Binary,
Bool,
BufferType,
DataType,
Dictionary,
Field,
FixedSizeBinary,
FixedSizeList,
Float,
Float32,
Float64,
Int,
Int32,
Int64,
LargeBinary,
List,
Null,
RecordBatch,
RecordBatchFileReader,
RecordBatchFileWriter,
RecordBatchReader,
RecordBatchStreamWriter,
Schema,
Struct,
Utf8,
Vector,
makeVector as arrowMakeVector,
makeBuilder,
makeData,
type makeTable,
makeTable,
vectorFromArray,
} from "apache-arrow";
import { Buffers } from "apache-arrow/data";
@@ -236,8 +242,6 @@ export class MakeArrowTableOptions {
* This function converts an array of Record<String, any> (row-major JS objects)
* to an Arrow Table (a columnar structure)
*
* Note that it currently does not support nulls.
*
* If a schema is provided then it will be used to determine the resulting array
* types. Fields will also be reordered to fit the order defined by the schema.
*
@@ -245,6 +249,9 @@ export class MakeArrowTableOptions {
* will be controlled by the order of properties in the first record. If a type
* is inferred it will always be nullable.
*
* If not all fields are found in the data, then a subset of the schema will be
* returned.
*
* If the input is empty then a schema must be provided to create an empty table.
*
* When a schema is not specified then data types will be inferred. The inference
@@ -252,6 +259,7 @@ export class MakeArrowTableOptions {
*
* - boolean => Bool
* - number => Float64
* - bigint => Int64
* - String => Utf8
* - Buffer => Binary
* - Record<String, any> => Struct
@@ -322,126 +330,316 @@ export function makeArrowTable(
options?: Partial<MakeArrowTableOptions>,
metadata?: Map<string, string>,
): ArrowTable {
const opt = new MakeArrowTableOptions(options !== undefined ? options : {});
let schema: Schema | undefined = undefined;
if (opt.schema !== undefined && opt.schema !== null) {
schema = sanitizeSchema(opt.schema);
schema = validateSchemaEmbeddings(
schema as Schema,
data,
options?.embeddingFunction,
);
}
let schemaMetadata = schema?.metadata || new Map<string, string>();
if (metadata !== undefined) {
schemaMetadata = new Map([...schemaMetadata, ...metadata]);
}
if (
data.length === 0 &&
(options?.schema === undefined || options?.schema === null)
) {
throw new Error("At least one record or a schema needs to be provided");
}
const opt = new MakeArrowTableOptions(options !== undefined ? options : {});
if (opt.schema !== undefined && opt.schema !== null) {
opt.schema = sanitizeSchema(opt.schema);
opt.schema = validateSchemaEmbeddings(
opt.schema as Schema,
data,
options?.embeddingFunction,
);
}
const columns: Record<string, Vector> = {};
// TODO: sample dataset to find missing columns
// Prefer the field ordering of the schema, if present
const columnNames =
opt.schema != null ? (opt.schema.names as string[]) : Object.keys(data[0]);
for (const colName of columnNames) {
if (
data.length !== 0 &&
!Object.prototype.hasOwnProperty.call(data[0], colName)
) {
// The field is present in the schema, but not in the data, skip it
continue;
}
// Extract a single column from the records (transpose from row-major to col-major)
let values = data.map((datum) => datum[colName]);
// By default (type === undefined) arrow will infer the type from the JS type
let type;
if (opt.schema !== undefined) {
// If there is a schema provided, then use that for the type instead
type = opt.schema?.fields.filter((f) => f.name === colName)[0]?.type;
if (DataType.isInt(type) && type.bitWidth === 64) {
// wrap in BigInt to avoid bug: https://github.com/apache/arrow/issues/40051
values = values.map((v) => {
if (v === null) {
return v;
}
if (typeof v === "bigint") {
return v;
}
if (typeof v === "number") {
return BigInt(v);
}
throw new Error(
`Expected BigInt or number for column ${colName}, got ${typeof v}`,
);
});
}
} else if (data.length === 0) {
if (schema === undefined) {
throw new Error("A schema must be provided if data is empty");
} else {
// Otherwise, check to see if this column is one of the vector columns
// defined by opt.vectorColumns and, if so, use the fixed size list type
const vectorColumnOptions = opt.vectorColumns[colName];
if (vectorColumnOptions !== undefined) {
const firstNonNullValue = values.find((v) => v !== null);
if (Array.isArray(firstNonNullValue)) {
type = newVectorType(
firstNonNullValue.length,
vectorColumnOptions.type,
);
schema = new Schema(schema.fields, schemaMetadata);
return new ArrowTable(schema);
}
}
let inferredSchema = inferSchema(data, schema, opt);
inferredSchema = new Schema(inferredSchema.fields, schemaMetadata);
const finalColumns: Record<string, Vector> = {};
for (const field of inferredSchema.fields) {
finalColumns[field.name] = transposeData(data, field);
}
return new ArrowTable(inferredSchema, finalColumns);
}
function inferSchema(
data: Array<Record<string, unknown>>,
schema: Schema | undefined,
opts: MakeArrowTableOptions,
): Schema {
// We will collect all fields we see in the data.
const pathTree = new PathTree<DataType>();
for (const [rowI, row] of data.entries()) {
for (const [path, value] of rowPathsAndValues(row)) {
if (!pathTree.has(path)) {
// First time seeing this field.
if (schema !== undefined) {
const field = getFieldForPath(schema, path);
if (field === undefined) {
throw new Error(
`Found field not in schema: ${path.join(".")} at row ${rowI}`,
);
} else {
pathTree.set(path, field.type);
}
} else {
throw new Error(
`Column ${colName} is expected to be a vector column but first non-null value is not an array. Could not determine size of vector column`,
);
const inferredType = inferType(value, path, opts);
if (inferredType === undefined) {
throw new Error(`Failed to infer data type for field ${path.join(".")} at row ${rowI}. \
Consider providing an explicit schema.`);
}
pathTree.set(path, inferredType);
}
} else if (schema === undefined) {
const currentType = pathTree.get(path);
const newType = inferType(value, path, opts);
if (currentType !== newType) {
new Error(`Failed to infer schema for data. Previously inferred type \
${currentType} but found ${newType} at row ${rowI}. Consider \
providing an explicit schema.`);
}
}
}
try {
// Convert an Array of JS values to an arrow vector
columns[colName] = makeVector(values, type, opt.dictionaryEncodeStrings);
} catch (error: unknown) {
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
throw Error(`Could not convert column "${colName}" to Arrow: ${error}`);
}
}
if (opt.schema != null) {
// `new ArrowTable(columns)` infers a schema which may sometimes have
// incorrect nullability (it assumes nullable=true always)
//
// `new ArrowTable(schema, columns)` will also fail because it will create a
// batch with an inferred schema and then complain that the batch schema
// does not match the provided schema.
//
// To work around this we first create a table with the wrong schema and
// then patch the schema of the batches so we can use
// `new ArrowTable(schema, batches)` which does not do any schema inference
const firstTable = new ArrowTable(columns);
const batchesFixed = firstTable.batches.map(
(batch) => new RecordBatch(opt.schema as Schema, batch.data),
);
let schema: Schema;
if (metadata !== undefined) {
let schemaMetadata = opt.schema.metadata;
if (schemaMetadata.size === 0) {
schemaMetadata = metadata;
} else {
for (const [key, entry] of schemaMetadata.entries()) {
schemaMetadata.set(key, entry);
if (schema === undefined) {
function fieldsFromPathTree(pathTree: PathTree<DataType>): Field[] {
const fields = [];
for (const [name, value] of pathTree.map.entries()) {
if (value instanceof PathTree) {
const children = fieldsFromPathTree(value);
fields.push(new Field(name, new Struct(children), true));
} else {
fields.push(new Field(name, value, true));
}
}
return fields;
}
const fields = fieldsFromPathTree(pathTree);
return new Schema(fields);
} else {
function takeMatchingFields(
fields: Field[],
pathTree: PathTree<DataType>,
): Field[] {
const outFields = [];
for (const field of fields) {
if (pathTree.map.has(field.name)) {
const value = pathTree.get([field.name]);
if (value instanceof PathTree) {
const struct = field.type as Struct;
const children = takeMatchingFields(struct.children, value);
outFields.push(
new Field(field.name, new Struct(children), field.nullable),
);
} else {
outFields.push(
new Field(field.name, value as DataType, field.nullable),
);
}
}
}
return outFields;
}
const fields = takeMatchingFields(schema.fields, pathTree);
return new Schema(fields);
}
}
schema = new Schema(opt.schema.fields as Field[], schemaMetadata);
function* rowPathsAndValues(
row: Record<string, unknown>,
basePath: string[] = [],
): Generator<[string[], unknown]> {
for (const [key, value] of Object.entries(row)) {
if (isObject(value)) {
yield* rowPathsAndValues(value, [...basePath, key]);
} else {
schema = opt.schema as Schema;
yield [[...basePath, key], value];
}
return new ArrowTable(schema, batchesFixed);
}
const tbl = new ArrowTable(columns);
if (metadata !== undefined) {
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
(<any>tbl.schema).metadata = metadata;
}
function isObject(value: unknown): value is Record<string, unknown> {
return (
typeof value === "object" &&
value !== null &&
!Array.isArray(value) &&
!(value instanceof RegExp) &&
!(value instanceof Date) &&
!(value instanceof Set) &&
!(value instanceof Map) &&
!(value instanceof Buffer)
);
}
function getFieldForPath(schema: Schema, path: string[]): Field | undefined {
let current: Field | Schema = schema;
for (const key of path) {
if (current instanceof Schema) {
const field: Field | undefined = current.fields.find(
(f) => f.name === key,
);
if (field === undefined) {
return undefined;
}
current = field;
} else if (current instanceof Field && DataType.isStruct(current.type)) {
const struct: Struct = current.type;
const field = struct.children.find((f) => f.name === key);
if (field === undefined) {
return undefined;
}
current = field;
} else {
return undefined;
}
}
if (current instanceof Field) {
return current;
} else {
return undefined;
}
}
/**
* Try to infer which Arrow type to use for a given value.
*
* May return undefined if the type cannot be inferred.
*/
function inferType(
value: unknown,
path: string[],
opts: MakeArrowTableOptions,
): DataType | undefined {
if (typeof value === "bigint") {
return new Int64();
} else if (typeof value === "number") {
// Even if it's an integer, it's safer to assume Float64. Users can
// always provide an explicit schema or use BigInt if they mean integer.
return new Float64();
} else if (typeof value === "string") {
if (opts.dictionaryEncodeStrings) {
return new Dictionary(new Utf8(), new Int32());
} else {
return new Utf8();
}
} else if (typeof value === "boolean") {
return new Bool();
} else if (value instanceof Buffer) {
return new Binary();
} else if (Array.isArray(value)) {
if (value.length === 0) {
return undefined; // Without any values we can't infer the type
}
if (path.length === 1 && Object.hasOwn(opts.vectorColumns, path[0])) {
const floatType = sanitizeType(opts.vectorColumns[path[0]].type);
return new FixedSizeList(
value.length,
new Field("item", floatType, true),
);
}
const valueType = inferType(value[0], path, opts);
if (valueType === undefined) {
return undefined;
}
// Try to automatically detect embedding columns.
if (valueType instanceof Float && path[path.length - 1] === "vector") {
// We default to Float32 for vectors.
const child = new Field("item", new Float32(), true);
return new FixedSizeList(value.length, child);
} else {
const child = new Field("item", valueType, true);
return new List(child);
}
} else {
// TODO: timestamp
return undefined;
}
}
class PathTree<V> {
map: Map<string, V | PathTree<V>>;
constructor(entries?: [string[], V][]) {
this.map = new Map();
if (entries !== undefined) {
for (const [path, value] of entries) {
this.set(path, value);
}
}
}
has(path: string[]): boolean {
let ref: PathTree<V> = this;
for (const part of path) {
if (!(ref instanceof PathTree) || !ref.map.has(part)) {
return false;
}
ref = ref.map.get(part) as PathTree<V>;
}
return true;
}
get(path: string[]): V | undefined {
let ref: PathTree<V> = this;
for (const part of path) {
if (!(ref instanceof PathTree) || !ref.map.has(part)) {
return undefined;
}
ref = ref.map.get(part) as PathTree<V>;
}
return ref as V;
}
set(path: string[], value: V): void {
let ref: PathTree<V> = this;
for (const part of path.slice(0, path.length - 1)) {
if (!ref.map.has(part)) {
ref.map.set(part, new PathTree<V>());
}
ref = ref.map.get(part) as PathTree<V>;
}
ref.map.set(path[path.length - 1], value);
}
}
function transposeData(
data: Record<string, unknown>[],
field: Field,
path: string[] = [],
): Vector {
if (field.type instanceof Struct) {
const childFields = field.type.children;
const childVectors = childFields.map((child) => {
return transposeData(data, child, [...path, child.name]);
});
const structData = makeData({
type: field.type,
children: childVectors as unknown as ArrowData<DataType>[],
});
return arrowMakeVector(structData);
} else {
const valuesPath = [...path, field.name];
const values = data.map((datum) => {
let current: unknown = datum;
for (const key of valuesPath) {
if (isObject(current) && Object.hasOwn(current, key)) {
current = current[key];
} else {
return null;
}
}
return current;
});
return makeVector(values, field.type);
}
return tbl;
}
/**
@@ -491,6 +689,31 @@ function makeVector(
): Vector<any> {
if (type !== undefined) {
// No need for inference, let Arrow create it
if (type instanceof Int) {
if (DataType.isInt(type) && type.bitWidth === 64) {
// wrap in BigInt to avoid bug: https://github.com/apache/arrow/issues/40051
values = values.map((v) => {
if (v === null) {
return v;
} else if (typeof v === "bigint") {
return v;
} else if (typeof v === "number") {
return BigInt(v);
} else {
return v;
}
});
} else {
// Similarly, bigint isn't supported for 16 or 32-bit ints.
values = values.map((v) => {
if (typeof v == "bigint") {
return Number(v);
} else {
return v;
}
});
}
}
return vectorFromArray(values, type);
}
if (values.length === 0) {
@@ -902,7 +1125,7 @@ function validateSchemaEmbeddings(
schema: Schema,
data: Array<Record<string, unknown>>,
embeddings: EmbeddingFunctionConfig | undefined,
) {
): Schema {
const fields = [];
const missingEmbeddingFields = [];