mirror of
https://github.com/lancedb/lancedb.git
synced 2025-12-25 14:29:56 +00:00
Prior to this commit, issuing drop_all_tables on a listing database with an external manifest store would delete physical tables but leave references behind in the manifest store. The table drop would succeed, but subsequent creation of a table with the same name would fail with a conflict. With this patch, the external manifest store is updated to account for the dropped tables so that dropped table names can be reused.
334 lines
8.9 KiB
TypeScript
334 lines
8.9 KiB
TypeScript
// SPDX-License-Identifier: Apache-2.0
|
|
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
|
|
|
/* eslint-disable @typescript-eslint/naming-convention */
|
|
|
|
import {
|
|
CreateTableCommand,
|
|
DeleteTableCommand,
|
|
DynamoDBClient,
|
|
} from "@aws-sdk/client-dynamodb";
|
|
import {
|
|
CreateKeyCommand,
|
|
KMSClient,
|
|
ScheduleKeyDeletionCommand,
|
|
} from "@aws-sdk/client-kms";
|
|
import {
|
|
CreateBucketCommand,
|
|
DeleteBucketCommand,
|
|
DeleteObjectCommand,
|
|
HeadObjectCommand,
|
|
ListObjectsV2Command,
|
|
S3Client,
|
|
} from "@aws-sdk/client-s3";
|
|
import { connect } from "../lancedb";
|
|
|
|
// Skip these tests unless the S3_TEST environment variable is set
|
|
const maybeDescribe = process.env.S3_TEST ? describe : describe.skip;
|
|
|
|
// These are all keys that are accepted by storage_options
|
|
const CONFIG = {
|
|
allowHttp: "true",
|
|
awsAccessKeyId: "ACCESSKEY",
|
|
awsSecretAccessKey: "SECRETKEY",
|
|
awsEndpoint: "http://127.0.0.1:4566",
|
|
dynamodbEndpoint: "http://127.0.0.1:4566",
|
|
awsRegion: "us-east-1",
|
|
};
|
|
|
|
class S3Bucket {
|
|
name: string;
|
|
constructor(name: string) {
|
|
this.name = name;
|
|
}
|
|
|
|
static s3Client() {
|
|
return new S3Client({
|
|
region: CONFIG.awsRegion,
|
|
credentials: {
|
|
accessKeyId: CONFIG.awsAccessKeyId,
|
|
secretAccessKey: CONFIG.awsSecretAccessKey,
|
|
},
|
|
endpoint: CONFIG.awsEndpoint,
|
|
});
|
|
}
|
|
|
|
public static async create(name: string): Promise<S3Bucket> {
|
|
const client = this.s3Client();
|
|
// Delete the bucket if it already exists
|
|
try {
|
|
await this.deleteBucket(client, name);
|
|
} catch {
|
|
// It's fine if the bucket doesn't exist
|
|
}
|
|
await client.send(new CreateBucketCommand({ Bucket: name }));
|
|
return new S3Bucket(name);
|
|
}
|
|
|
|
public async delete() {
|
|
const client = S3Bucket.s3Client();
|
|
await S3Bucket.deleteBucket(client, this.name);
|
|
}
|
|
|
|
static async deleteBucket(client: S3Client, name: string) {
|
|
// Must delete all objects before we can delete the bucket
|
|
const objects = await client.send(
|
|
new ListObjectsV2Command({ Bucket: name }),
|
|
);
|
|
if (objects.Contents) {
|
|
for (const object of objects.Contents) {
|
|
await client.send(
|
|
new DeleteObjectCommand({ Bucket: name, Key: object.Key }),
|
|
);
|
|
}
|
|
}
|
|
|
|
await client.send(new DeleteBucketCommand({ Bucket: name }));
|
|
}
|
|
|
|
public async assertAllEncrypted(path: string, keyId: string) {
|
|
const client = S3Bucket.s3Client();
|
|
const objects = await client.send(
|
|
new ListObjectsV2Command({ Bucket: this.name, Prefix: path }),
|
|
);
|
|
if (objects.Contents) {
|
|
for (const object of objects.Contents) {
|
|
const metadata = await client.send(
|
|
new HeadObjectCommand({ Bucket: this.name, Key: object.Key }),
|
|
);
|
|
expect(metadata.ServerSideEncryption).toBe("aws:kms");
|
|
expect(metadata.SSEKMSKeyId).toContain(keyId);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
class KmsKey {
|
|
keyId: string;
|
|
constructor(keyId: string) {
|
|
this.keyId = keyId;
|
|
}
|
|
|
|
static kmsClient() {
|
|
return new KMSClient({
|
|
region: CONFIG.awsRegion,
|
|
credentials: {
|
|
accessKeyId: CONFIG.awsAccessKeyId,
|
|
secretAccessKey: CONFIG.awsSecretAccessKey,
|
|
},
|
|
endpoint: CONFIG.awsEndpoint,
|
|
});
|
|
}
|
|
|
|
public static async create(): Promise<KmsKey> {
|
|
const client = this.kmsClient();
|
|
const key = await client.send(new CreateKeyCommand({}));
|
|
const keyId = key?.KeyMetadata?.KeyId;
|
|
if (!keyId) {
|
|
throw new Error("Failed to create KMS key");
|
|
}
|
|
return new KmsKey(keyId);
|
|
}
|
|
|
|
public async delete() {
|
|
const client = KmsKey.kmsClient();
|
|
await client.send(new ScheduleKeyDeletionCommand({ KeyId: this.keyId }));
|
|
}
|
|
}
|
|
|
|
maybeDescribe("storage_options", () => {
|
|
let bucket: S3Bucket;
|
|
let kmsKey: KmsKey;
|
|
beforeAll(async () => {
|
|
bucket = await S3Bucket.create("lancedb");
|
|
kmsKey = await KmsKey.create();
|
|
});
|
|
afterAll(async () => {
|
|
await kmsKey.delete();
|
|
await bucket.delete();
|
|
});
|
|
|
|
it("can be used to configure auth and endpoints", async () => {
|
|
const uri = `s3://${bucket.name}/test`;
|
|
const db = await connect(uri, { storageOptions: CONFIG });
|
|
|
|
let table = await db.createTable("test", [{ a: 1, b: 2 }]);
|
|
|
|
let rowCount = await table.countRows();
|
|
expect(rowCount).toBe(1);
|
|
|
|
let tableNames = await db.tableNames();
|
|
expect(tableNames).toEqual(["test"]);
|
|
|
|
table = await db.openTable("test");
|
|
rowCount = await table.countRows();
|
|
expect(rowCount).toBe(1);
|
|
|
|
await table.add([
|
|
{ a: 2, b: 3 },
|
|
{ a: 3, b: 4 },
|
|
]);
|
|
rowCount = await table.countRows();
|
|
expect(rowCount).toBe(3);
|
|
|
|
await db.dropTable("test");
|
|
|
|
tableNames = await db.tableNames();
|
|
expect(tableNames).toEqual([]);
|
|
|
|
await db.dropAllTables();
|
|
});
|
|
|
|
it("can configure encryption at connection and table level", async () => {
|
|
const uri = `s3://${bucket.name}/test`;
|
|
let db = await connect(uri, { storageOptions: CONFIG });
|
|
|
|
let table = await db.createTable("table1", [{ a: 1, b: 2 }], {
|
|
storageOptions: {
|
|
awsServerSideEncryption: "aws:kms",
|
|
awsSseKmsKeyId: kmsKey.keyId,
|
|
},
|
|
});
|
|
|
|
let rowCount = await table.countRows();
|
|
expect(rowCount).toBe(1);
|
|
|
|
await table.add([{ a: 2, b: 3 }]);
|
|
|
|
await bucket.assertAllEncrypted("test/table1.lance", kmsKey.keyId);
|
|
|
|
// Now with encryption settings at connection level
|
|
db = await connect(uri, {
|
|
storageOptions: {
|
|
...CONFIG,
|
|
awsServerSideEncryption: "aws:kms",
|
|
awsSseKmsKeyId: kmsKey.keyId,
|
|
},
|
|
});
|
|
table = await db.createTable("table2", [{ a: 1, b: 2 }]);
|
|
rowCount = await table.countRows();
|
|
expect(rowCount).toBe(1);
|
|
|
|
await table.add([{ a: 2, b: 3 }]);
|
|
|
|
await bucket.assertAllEncrypted("test/table2.lance", kmsKey.keyId);
|
|
|
|
await db.dropAllTables();
|
|
});
|
|
});
|
|
|
|
class DynamoDBCommitTable {
|
|
name: string;
|
|
constructor(name: string) {
|
|
this.name = name;
|
|
}
|
|
|
|
static dynamoClient() {
|
|
return new DynamoDBClient({
|
|
region: CONFIG.awsRegion,
|
|
credentials: {
|
|
accessKeyId: CONFIG.awsAccessKeyId,
|
|
secretAccessKey: CONFIG.awsSecretAccessKey,
|
|
},
|
|
endpoint: CONFIG.awsEndpoint,
|
|
});
|
|
}
|
|
|
|
public static async create(name: string): Promise<DynamoDBCommitTable> {
|
|
const client = DynamoDBCommitTable.dynamoClient();
|
|
const command = new CreateTableCommand({
|
|
TableName: name,
|
|
AttributeDefinitions: [
|
|
{
|
|
AttributeName: "base_uri",
|
|
AttributeType: "S",
|
|
},
|
|
{
|
|
AttributeName: "version",
|
|
AttributeType: "N",
|
|
},
|
|
],
|
|
KeySchema: [
|
|
{ AttributeName: "base_uri", KeyType: "HASH" },
|
|
{ AttributeName: "version", KeyType: "RANGE" },
|
|
],
|
|
ProvisionedThroughput: {
|
|
ReadCapacityUnits: 1,
|
|
WriteCapacityUnits: 1,
|
|
},
|
|
});
|
|
await client.send(command);
|
|
return new DynamoDBCommitTable(name);
|
|
}
|
|
|
|
public async delete() {
|
|
const client = DynamoDBCommitTable.dynamoClient();
|
|
await client.send(new DeleteTableCommand({ TableName: this.name }));
|
|
}
|
|
}
|
|
|
|
maybeDescribe("DynamoDB Lock", () => {
|
|
let bucket: S3Bucket;
|
|
let commitTable: DynamoDBCommitTable;
|
|
|
|
beforeAll(async () => {
|
|
bucket = await S3Bucket.create("lancedb2");
|
|
commitTable = await DynamoDBCommitTable.create("commitTable");
|
|
});
|
|
|
|
afterAll(async () => {
|
|
await commitTable.delete();
|
|
await bucket.delete();
|
|
});
|
|
|
|
it("can be used to configure a DynamoDB table for commit log", async () => {
|
|
const uri = `s3+ddb://${bucket.name}/test?ddbTableName=${commitTable.name}`;
|
|
const db = await connect(uri, {
|
|
storageOptions: CONFIG,
|
|
readConsistencyInterval: 0,
|
|
});
|
|
|
|
const table = await db.createTable("test", [{ a: 1, b: 2 }]);
|
|
|
|
// 5 concurrent appends
|
|
const futs = Array.from({ length: 5 }, async () => {
|
|
// Open a table so each append has a separate table reference. Otherwise
|
|
// they will share the same table reference and the internal ReadWriteLock
|
|
// will prevent any real concurrency.
|
|
const table = await db.openTable("test");
|
|
await table.add([{ a: 2, b: 3 }]);
|
|
});
|
|
await Promise.all(futs);
|
|
|
|
const rowCount = await table.countRows();
|
|
expect(rowCount).toBe(6);
|
|
|
|
await db.dropAllTables();
|
|
});
|
|
|
|
it("clears dynamodb state after dropping all tables", async () => {
|
|
const uri = `s3+ddb://${bucket.name}/test?ddbTableName=${commitTable.name}`;
|
|
const db = await connect(uri, {
|
|
storageOptions: CONFIG,
|
|
readConsistencyInterval: 0,
|
|
});
|
|
|
|
await db.createTable("foo", [{ a: 1, b: 2 }]);
|
|
await db.createTable("bar", [{ a: 1, b: 2 }]);
|
|
|
|
let tableNames = await db.tableNames();
|
|
expect(tableNames).toEqual(["bar", "foo"]);
|
|
|
|
await db.dropAllTables();
|
|
tableNames = await db.tableNames();
|
|
expect(tableNames).toEqual([]);
|
|
|
|
// We can create a new table with the same name as the one we dropped.
|
|
await db.createTable("foo", [{ a: 1, b: 2 }]);
|
|
tableNames = await db.tableNames();
|
|
expect(tableNames).toEqual(["foo"]);
|
|
|
|
await db.dropAllTables();
|
|
});
|
|
});
|