From 0554db03b35d6953668ff7398527a8cc2a02ba95 Mon Sep 17 00:00:00 2001 From: Rob Meng Date: Sat, 9 Sep 2023 13:33:16 -0400 Subject: [PATCH] progagate uri query string to lance; add aws integration tests (#486) # WARNING: specifying engine is NOT a publicly supported feature in lancedb yet. THE API WILL CHANGE. This PR exposes dynamodb based commit to `vectordb` and JS SDK (will do python in another PR since it's on a different release track) This PR also added aws integration test using `localstack` ## What? This PR adds uri parameters to DB connection string. User may specify `engine` in the connection string to let LanceDB know that the user wants to use an external store when reading and writing a table. User may also pass any parameters required by the commitStore in the connection string, these parameters will be propagated to lance. e.g. ``` vectordb.connect("s3://my-db-bucket?engine=ddb&ddbTableName=my-commit-table") ``` will automatically convert table path to ``` s3+ddb://my-db-bucket/my_table.lance?&ddbTableName=my-commit-table ``` --- .github/workflows/node.yml | 53 +++++++++++++++ Cargo.toml | 3 +- docker-compose.yml | 15 +++++ node/package-lock.json | 31 ++++++++- node/package.json | 5 +- node/src/integration_test/test.ts | 43 ++++++++++++ rust/vectordb/Cargo.toml | 1 + rust/vectordb/src/database.rs | 106 +++++++++++++++++++++++++++--- 8 files changed, 245 insertions(+), 12 deletions(-) create mode 100644 docker-compose.yml create mode 100644 node/src/integration_test/test.ts diff --git a/.github/workflows/node.yml b/.github/workflows/node.yml index 47f9fb32..ebe253c4 100644 --- a/.github/workflows/node.yml +++ b/.github/workflows/node.yml @@ -107,3 +107,56 @@ jobs: - name: Test run: | npm run test + aws-integtest: + timeout-minutes: 45 + runs-on: "ubuntu-22.04" + defaults: + run: + shell: bash + working-directory: node + env: + AWS_ACCESS_KEY_ID: ACCESSKEY + AWS_SECRET_ACCESS_KEY: SECRETKEY + AWS_DEFAULT_REGION: us-west-2 + # this one is for s3 + AWS_ENDPOINT: http://localhost:4566 + # this one is for dynamodb + DYNAMODB_ENDPOINT: http://localhost:4566 + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 0 + lfs: true + - uses: actions/setup-node@v3 + with: + node-version: 18 + cache: 'npm' + cache-dependency-path: node/package-lock.json + - name: start local stack + run: docker compose -f ../docker-compose.yml up -d + - name: create s3 + run: aws s3 mb s3://lancedb-integtest --endpoint $AWS_ENDPOINT + - name: create ddb + run: | + aws dynamodb create-table \ + --table-name lancedb-integtest \ + --attribute-definitions '[{"AttributeName": "base_uri", "AttributeType": "S"}, {"AttributeName": "version", "AttributeType": "N"}]' \ + --key-schema '[{"AttributeName": "base_uri", "KeyType": "HASH"}, {"AttributeName": "version", "KeyType": "RANGE"}]' \ + --provisioned-throughput '{"ReadCapacityUnits": 10, "WriteCapacityUnits": 10}' \ + --endpoint-url $DYNAMODB_ENDPOINT + - uses: Swatinem/rust-cache@v2 + - name: Install dependencies + run: | + sudo apt update + sudo apt install -y protobuf-compiler libssl-dev + - name: Build + run: | + npm ci + npm run tsc + npm run build + npm run pack-build + npm install --no-save ./dist/lancedb-vectordb-*.tgz + # Remove index.node to test with dependency installed + rm index.node + - name: Test + run: npm run integration-test diff --git a/Cargo.toml b/Cargo.toml index 7ba4d995..537fd6bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ exclude = ["python"] resolver = "2" [workspace.dependencies] -lance = "=0.7.3" +lance = { "version" = "=0.7.3", "features" = ["dynamodb"] } # Note that this one does not include pyarrow arrow = { version = "43.0.0", optional = false } arrow-array = "43.0" @@ -21,3 +21,4 @@ half = { "version" = "=2.2.1", default-features = false, features = [ log = "0.4" object_store = "0.6.1" snafu = "0.7.4" +url = "2" diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 00000000..8a8a924d --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,15 @@ +version: "3.9" +services: + localstack: + image: localstack/localstack:0.14 + ports: + - 4566:4566 + environment: + - SERVICES=s3,dynamodb + - DEBUG=1 + - LS_LOG=trace + - DOCKER_HOST=unix:///var/run/docker.sock + - AWS_ACCESS_KEY_ID=ACCESSKEY + - AWS_SECRET_ACCESS_KEY=SECRETKEY + healthcheck: + test: [ "CMD", "curl", "-f", "http://localhost:4566/health" ] diff --git a/node/package-lock.json b/node/package-lock.json index e777f9f8..370715b7 100644 --- a/node/package-lock.json +++ b/node/package-lock.json @@ -31,6 +31,7 @@ "@types/node": "^18.16.2", "@types/sinon": "^10.0.15", "@types/temp": "^0.9.1", + "@types/uuid": "^9.0.3", "@typescript-eslint/eslint-plugin": "^5.59.1", "cargo-cp-artifact": "^0.1", "chai": "^4.3.7", @@ -48,7 +49,8 @@ "ts-node-dev": "^2.0.0", "typedoc": "^0.24.7", "typedoc-plugin-markdown": "^3.15.3", - "typescript": "*" + "typescript": "*", + "uuid": "^9.0.0" }, "optionalDependencies": { "@lancedb/vectordb-darwin-arm64": "0.2.4", @@ -596,6 +598,12 @@ "@types/node": "*" } }, + "node_modules/@types/uuid": { + "version": "9.0.3", + "resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-9.0.3.tgz", + "integrity": "sha512-taHQQH/3ZyI3zP8M/puluDEIEvtQHVYcC6y3N8ijFtAd28+Ey/G4sg1u2gB01S8MwybLOKAp9/yCMu/uR5l3Ug==", + "dev": true + }, "node_modules/@typescript-eslint/eslint-plugin": { "version": "5.59.1", "resolved": "https://registry.npmjs.org/@typescript-eslint/eslint-plugin/-/eslint-plugin-5.59.1.tgz", @@ -4451,6 +4459,15 @@ "punycode": "^2.1.0" } }, + "node_modules/uuid": { + "version": "9.0.0", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-9.0.0.tgz", + "integrity": "sha512-MXcSTerfPa4uqyzStbRoTgt5XIe3x5+42+q1sDuy3R5MDk66URdLMOZe5aPX/SQd+kuYAh0FdP/pO28IkQyTeg==", + "dev": true, + "bin": { + "uuid": "dist/bin/uuid" + } + }, "node_modules/v8-compile-cache-lib": { "version": "3.0.1", "resolved": "https://registry.npmjs.org/v8-compile-cache-lib/-/v8-compile-cache-lib-3.0.1.tgz", @@ -5093,6 +5110,12 @@ "@types/node": "*" } }, + "@types/uuid": { + "version": "9.0.3", + "resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-9.0.3.tgz", + "integrity": "sha512-taHQQH/3ZyI3zP8M/puluDEIEvtQHVYcC6y3N8ijFtAd28+Ey/G4sg1u2gB01S8MwybLOKAp9/yCMu/uR5l3Ug==", + "dev": true + }, "@typescript-eslint/eslint-plugin": { "version": "5.59.1", "resolved": "https://registry.npmjs.org/@typescript-eslint/eslint-plugin/-/eslint-plugin-5.59.1.tgz", @@ -7844,6 +7867,12 @@ "punycode": "^2.1.0" } }, + "uuid": { + "version": "9.0.0", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-9.0.0.tgz", + "integrity": "sha512-MXcSTerfPa4uqyzStbRoTgt5XIe3x5+42+q1sDuy3R5MDk66URdLMOZe5aPX/SQd+kuYAh0FdP/pO28IkQyTeg==", + "dev": true + }, "v8-compile-cache-lib": { "version": "3.0.1", "resolved": "https://registry.npmjs.org/v8-compile-cache-lib/-/v8-compile-cache-lib-3.0.1.tgz", diff --git a/node/package.json b/node/package.json index 2c6c0458..bdd82027 100644 --- a/node/package.json +++ b/node/package.json @@ -9,6 +9,7 @@ "build": "cargo-cp-artifact --artifact cdylib vectordb-node index.node -- cargo build --message-format=json", "build-release": "npm run build -- --release", "test": "npm run tsc && mocha -recursive dist/test", + "integration-test": "npm run tsc && mocha -recursive dist/integration_test", "lint": "eslint native.js src --ext .js,.ts", "clean": "rm -rf node_modules *.node dist/", "pack-build": "neon pack-build", @@ -34,6 +35,7 @@ "@types/node": "^18.16.2", "@types/sinon": "^10.0.15", "@types/temp": "^0.9.1", + "@types/uuid": "^9.0.3", "@typescript-eslint/eslint-plugin": "^5.59.1", "cargo-cp-artifact": "^0.1", "chai": "^4.3.7", @@ -51,7 +53,8 @@ "ts-node-dev": "^2.0.0", "typedoc": "^0.24.7", "typedoc-plugin-markdown": "^3.15.3", - "typescript": "*" + "typescript": "*", + "uuid": "^9.0.0" }, "dependencies": { "@apache-arrow/ts": "^12.0.0", diff --git a/node/src/integration_test/test.ts b/node/src/integration_test/test.ts new file mode 100644 index 00000000..c138221c --- /dev/null +++ b/node/src/integration_test/test.ts @@ -0,0 +1,43 @@ +// Copyright 2023 LanceDB Developers. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import { describe } from 'mocha' +import * as chai from 'chai' +import * as chaiAsPromised from 'chai-as-promised' +import { v4 as uuidv4 } from 'uuid' + +import * as lancedb from '../index' + +const assert = chai.assert +chai.use(chaiAsPromised) + +describe('LanceDB AWS Integration test', function () { + it('s3+ddb schema is processed correctly', async function () { + this.timeout(5000) + + // WARNING: specifying engine is NOT a publicly supported feature in lancedb yet + // THE API WILL CHANGE + const conn = await lancedb.connect('s3://lancedb-integtest?engine=ddb&ddbTableName=lancedb-integtest') + const data = [{ vector: Array(128).fill(1.0) }] + + const tableName = uuidv4() + let table = await conn.createTable(tableName, data, { writeMode: lancedb.WriteMode.Overwrite }) + + const futs = [table.add(data), table.add(data), table.add(data), table.add(data), table.add(data)] + await Promise.allSettled(futs) + + table = await conn.openTable(tableName) + assert.equal(await table.countRows(), 6) + }) +}) diff --git a/rust/vectordb/Cargo.toml b/rust/vectordb/Cargo.toml index 34247aa2..b075a3e7 100644 --- a/rust/vectordb/Cargo.toml +++ b/rust/vectordb/Cargo.toml @@ -23,6 +23,7 @@ lance = { workspace = true } tokio = { version = "1.23", features = ["rt-multi-thread"] } log = { workspace = true } num-traits = "0" +url = { workspace = true } [dev-dependencies] tempfile = "3.5.0" diff --git a/rust/vectordb/src/database.rs b/rust/vectordb/src/database.rs index 3292df04..4b3710b7 100644 --- a/rust/vectordb/src/database.rs +++ b/rust/vectordb/src/database.rs @@ -20,19 +20,32 @@ use lance::dataset::WriteParams; use lance::io::object_store::ObjectStore; use snafu::prelude::*; -use crate::error::{CreateDirSnafu, InvalidTableNameSnafu, Result}; +use crate::error::{CreateDirSnafu, Error, InvalidTableNameSnafu, Result}; use crate::table::{ReadParams, Table}; pub const LANCE_FILE_EXTENSION: &str = "lance"; pub struct Database { object_store: ObjectStore, + query_string: Option, pub(crate) uri: String, pub(crate) base_path: object_store::path::Path, } const LANCE_EXTENSION: &str = "lance"; +const ENGINE: &str = "engine"; + +/// Parse a url, if it's not a valid url, assume it's a local file +/// and try to parse with file:// appended +fn parse_url(url: &str) -> Result { + match url::Url::parse(url) { + Ok(url) => Ok(url), + Err(_) => url::Url::parse(format!("file://{}", url).as_str()).map_err(|e| Error::Lance { + message: format!("Failed to parse uri: {}", e), + }), + } +} /// A connection to LanceDB impl Database { @@ -46,12 +59,71 @@ impl Database { /// /// * A [Database] object. pub async fn connect(uri: &str) -> Result { - let (object_store, base_path) = ObjectStore::from_uri(uri).await?; - if object_store.is_local() { - Self::try_create_dir(uri).context(CreateDirSnafu { path: uri })?; + // For a native (using lance directly) connection + // The DB doesn't use any uri parameters, but lance does + // So we need to parse the uri, extract the query string, and progate it to lance + let mut url = parse_url(uri)?; + + // special handling for windows + if url.scheme().len() == 1 && cfg!(windows) { + let (object_store, base_path) = ObjectStore::from_uri(uri).await?; + if object_store.is_local() { + Self::try_create_dir(uri).context(CreateDirSnafu { path: uri })?; + } + return Ok(Database { + uri: uri.to_string(), + query_string: None, + base_path, + object_store, + }); } + + // iter thru the query params and extract the commit store param + let mut engine = None; + let mut filtered_querys = vec![]; + + // WARNING: specifying engine is NOT a publicly supported feature in lancedb yet + // THE API WILL CHANGE + for (key, value) in url.query_pairs() { + if key == ENGINE { + engine = Some(value.to_string()); + } else { + // to owned so we can modify the url + filtered_querys.push((key.to_string(), value.to_string())); + } + } + + // Filter out the commit store query param -- it's a lancedb param + url.query_pairs_mut().clear(); + url.query_pairs_mut().extend_pairs(filtered_querys); + // Take a copy of the query string so we can propagate it to lance + let query_string = url.query().map(|s| s.to_string()); + // clear the query string so we can use the url as the base uri + // use .set_query(None) instead of .set_query("") because the latter + // will add a trailing '?' to the url + url.set_query(None); + + let table_base_uri = if let Some(store) = engine { + static WARN_ONCE: std::sync::Once = std::sync::Once::new(); + WARN_ONCE.call_once(|| { + log::warn!("Specifing engine is not a publicly supported feature in lancedb yet. THE API WILL CHANGE"); + }); + let old_scheme = url.scheme().to_string(); + let new_scheme = format!("{}+{}", old_scheme, store); + url.to_string().replacen(&old_scheme, &new_scheme, 1) + } else { + url.to_string() + }; + + let plain_uri = url.to_string(); + let (object_store, base_path) = ObjectStore::from_uri(&plain_uri).await?; + if object_store.is_local() { + Self::try_create_dir(&plain_uri).context(CreateDirSnafu { path: plain_uri })?; + } + Ok(Database { - uri: uri.to_string(), + uri: table_base_uri, + query_string, base_path, object_store, }) @@ -149,11 +221,19 @@ impl Database { let path = Path::new(&self.uri); let table_uri = path.join(format!("{}.{}", name, LANCE_FILE_EXTENSION)); - let uri = table_uri + let mut uri = table_uri .as_path() .to_str() - .context(InvalidTableNameSnafu { name })?; - Ok(uri.to_string()) + .context(InvalidTableNameSnafu { name })? + .to_string(); + + // If there are query string set on the connection, propagate to lance + if let Some(query) = self.query_string.as_ref() { + uri.push('?'); + uri.push_str(query.as_str()); + } + + Ok(uri) } } @@ -170,7 +250,15 @@ mod tests { let uri = tmp_dir.path().to_str().unwrap(); let db = Database::connect(uri).await.unwrap(); - assert_eq!(db.uri, uri); + // file:// scheme should be automatically appended if not specified + // windows path come with drive letter, so file:// won't be appended + let expected = if cfg!(windows) { + uri.to_string() + } else { + format!("file://{}", uri) + }; + + assert_eq!(db.uri, expected); } #[tokio::test]