Compare commits

..

1 Commits

Author SHA1 Message Date
Lance Release
639c943dee Bump version: 0.21.4-beta.0 → 0.21.4-beta.1 2025-08-22 03:54:52 +00:00
54 changed files with 798 additions and 3368 deletions

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.22.0-beta.1"
current_version = "0.21.4-beta.1"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

1626
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -15,14 +15,14 @@ categories = ["database-implementations"]
rust-version = "1.78.0"
[workspace.dependencies]
lance = { "version" = "=0.35.0", default-features = false, "features" = ["dynamodb"] }
lance-io = { "version" = "=0.35.0", default-features = false }
lance-index = { "version" = "=0.35.0" }
lance-linalg = { "version" = "=0.35.0" }
lance-table = { "version" = "=0.35.0" }
lance-testing = { "version" = "=0.35.0" }
lance-datafusion = { "version" = "=0.35.0" }
lance-encoding = { "version" = "=0.35.0" }
lance = { "version" = "=0.33.0", default-features = false, "features" = ["dynamodb"], "tag" = "v0.33.0-beta.4", "git" = "https://github.com/lancedb/lance.git" }
lance-io = { "version" = "=0.33.0", default-features = false, "tag" = "v0.33.0-beta.4", "git" = "https://github.com/lancedb/lance.git" }
lance-index = { "version" = "=0.33.0", "tag" = "v0.33.0-beta.4", "git" = "https://github.com/lancedb/lance.git" }
lance-linalg = { "version" = "=0.33.0", "tag" = "v0.33.0-beta.4", "git" = "https://github.com/lancedb/lance.git" }
lance-table = { "version" = "=0.33.0", "tag" = "v0.33.0-beta.4", "git" = "https://github.com/lancedb/lance.git" }
lance-testing = { "version" = "=0.33.0", "tag" = "v0.33.0-beta.4", "git" = "https://github.com/lancedb/lance.git" }
lance-datafusion = { "version" = "=0.33.0", "tag" = "v0.33.0-beta.4", "git" = "https://github.com/lancedb/lance.git" }
lance-encoding = { "version" = "=0.33.0", "tag" = "v0.33.0-beta.4", "git" = "https://github.com/lancedb/lance.git" }
# Note that this one does not include pyarrow
arrow = { version = "55.1", optional = false }
arrow-array = "55.1"

View File

@@ -8,7 +8,7 @@
<parent>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.22.0-beta.1</version>
<version>0.21.4-beta.1</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -8,7 +8,7 @@
<parent>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.22.0-beta.1</version>
<version>0.21.4-beta.1</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -6,7 +6,7 @@
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.22.0-beta.1</version>
<version>0.21.4-beta.1</version>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
<description>LanceDB Java SDK Parent POM</description>

View File

@@ -1,7 +1,7 @@
[package]
name = "lancedb-nodejs"
edition.workspace = true
version = "0.22.0-beta.1"
version = "0.21.4-beta.1"
license.workspace = true
description.workspace = true
repository.workspace = true

View File

@@ -159,33 +159,17 @@ export abstract class Connection {
*
* Tables will be returned in lexicographical order.
* @param {Partial<TableNamesOptions>} options - options to control the
* paging / start point (backwards compatibility)
*
*/
abstract tableNames(options?: Partial<TableNamesOptions>): Promise<string[]>;
/**
* List all the table names in this database.
*
* Tables will be returned in lexicographical order.
* @param {string[]} namespace - The namespace to list tables from (defaults to root namespace)
* @param {Partial<TableNamesOptions>} options - options to control the
* paging / start point
*
*/
abstract tableNames(
namespace?: string[],
options?: Partial<TableNamesOptions>,
): Promise<string[]>;
abstract tableNames(options?: Partial<TableNamesOptions>): Promise<string[]>;
/**
* Open a table in the database.
* @param {string} name - The name of the table
* @param {string[]} namespace - The namespace of the table (defaults to root namespace)
* @param {Partial<OpenTableOptions>} options - Additional options
*/
abstract openTable(
name: string,
namespace?: string[],
options?: Partial<OpenTableOptions>,
): Promise<Table>;
@@ -194,7 +178,6 @@ export abstract class Connection {
* @param {object} options - The options object.
* @param {string} options.name - The name of the table.
* @param {Data} options.data - Non-empty Array of Records to be inserted into the table
* @param {string[]} namespace - The namespace to create the table in (defaults to root namespace)
*
*/
abstract createTable(
@@ -202,72 +185,40 @@ export abstract class Connection {
name: string;
data: Data;
} & Partial<CreateTableOptions>,
namespace?: string[],
): Promise<Table>;
/**
* Creates a new Table and initialize it with new data.
* @param {string} name - The name of the table.
* @param {Record<string, unknown>[] | TableLike} data - Non-empty Array of Records
* to be inserted into the table
* @param {Partial<CreateTableOptions>} options - Additional options (backwards compatibility)
*/
abstract createTable(
name: string,
data: Record<string, unknown>[] | TableLike,
options?: Partial<CreateTableOptions>,
): Promise<Table>;
/**
* Creates a new Table and initialize it with new data.
* @param {string} name - The name of the table.
* @param {Record<string, unknown>[] | TableLike} data - Non-empty Array of Records
* to be inserted into the table
* @param {string[]} namespace - The namespace to create the table in (defaults to root namespace)
* @param {Partial<CreateTableOptions>} options - Additional options
*/
abstract createTable(
name: string,
data: Record<string, unknown>[] | TableLike,
namespace?: string[],
options?: Partial<CreateTableOptions>,
): Promise<Table>;
/**
* Creates a new empty Table
* @param {string} name - The name of the table.
* @param {Schema} schema - The schema of the table
* @param {Partial<CreateTableOptions>} options - Additional options (backwards compatibility)
*/
abstract createEmptyTable(
name: string,
schema: import("./arrow").SchemaLike,
options?: Partial<CreateTableOptions>,
): Promise<Table>;
/**
* Creates a new empty Table
* @param {string} name - The name of the table.
* @param {Schema} schema - The schema of the table
* @param {string[]} namespace - The namespace to create the table in (defaults to root namespace)
* @param {Partial<CreateTableOptions>} options - Additional options
*/
abstract createEmptyTable(
name: string,
schema: import("./arrow").SchemaLike,
namespace?: string[],
options?: Partial<CreateTableOptions>,
): Promise<Table>;
/**
* Drop an existing table.
* @param {string} name The name of the table to drop.
* @param {string[]} namespace The namespace of the table (defaults to root namespace).
*/
abstract dropTable(name: string, namespace?: string[]): Promise<void>;
abstract dropTable(name: string): Promise<void>;
/**
* Drop all tables in the database.
* @param {string[]} namespace The namespace to drop tables from (defaults to root namespace).
*/
abstract dropAllTables(namespace?: string[]): Promise<void>;
abstract dropAllTables(): Promise<void>;
}
/** @hideconstructor */
@@ -292,39 +243,16 @@ export class LocalConnection extends Connection {
return this.inner.display();
}
async tableNames(
namespaceOrOptions?: string[] | Partial<TableNamesOptions>,
options?: Partial<TableNamesOptions>,
): Promise<string[]> {
// Detect if first argument is namespace array or options object
let namespace: string[] | undefined;
let tableNamesOptions: Partial<TableNamesOptions> | undefined;
if (Array.isArray(namespaceOrOptions)) {
// First argument is namespace array
namespace = namespaceOrOptions;
tableNamesOptions = options;
} else {
// First argument is options object (backwards compatibility)
namespace = undefined;
tableNamesOptions = namespaceOrOptions;
}
return this.inner.tableNames(
namespace ?? [],
tableNamesOptions?.startAfter,
tableNamesOptions?.limit,
);
async tableNames(options?: Partial<TableNamesOptions>): Promise<string[]> {
return this.inner.tableNames(options?.startAfter, options?.limit);
}
async openTable(
name: string,
namespace?: string[],
options?: Partial<OpenTableOptions>,
): Promise<Table> {
const innerTable = await this.inner.openTable(
name,
namespace ?? [],
cleanseStorageOptions(options?.storageOptions),
options?.indexCacheSize,
);
@@ -358,44 +286,14 @@ export class LocalConnection extends Connection {
nameOrOptions:
| string
| ({ name: string; data: Data } & Partial<CreateTableOptions>),
dataOrNamespace?: Record<string, unknown>[] | TableLike | string[],
namespaceOrOptions?: string[] | Partial<CreateTableOptions>,
data?: Record<string, unknown>[] | TableLike,
options?: Partial<CreateTableOptions>,
): Promise<Table> {
if (typeof nameOrOptions !== "string" && "name" in nameOrOptions) {
// First overload: createTable(options, namespace?)
const { name, data, ...createOptions } = nameOrOptions;
const namespace = dataOrNamespace as string[] | undefined;
return this._createTableImpl(name, data, namespace, createOptions);
const { name, data, ...options } = nameOrOptions;
return this.createTable(name, data, options);
}
// Second overload: createTable(name, data, namespace?, options?)
const name = nameOrOptions;
const data = dataOrNamespace as Record<string, unknown>[] | TableLike;
// Detect if third argument is namespace array or options object
let namespace: string[] | undefined;
let createOptions: Partial<CreateTableOptions> | undefined;
if (Array.isArray(namespaceOrOptions)) {
// Third argument is namespace array
namespace = namespaceOrOptions;
createOptions = options;
} else {
// Third argument is options object (backwards compatibility)
namespace = undefined;
createOptions = namespaceOrOptions;
}
return this._createTableImpl(name, data, namespace, createOptions);
}
private async _createTableImpl(
name: string,
data: Data,
namespace?: string[],
options?: Partial<CreateTableOptions>,
): Promise<Table> {
if (data === undefined) {
throw new Error("data is required");
}
@@ -404,10 +302,9 @@ export class LocalConnection extends Connection {
const storageOptions = this.getStorageOptions(options);
const innerTable = await this.inner.createTable(
name,
nameOrOptions,
buf,
mode,
namespace ?? [],
storageOptions,
);
@@ -417,55 +314,39 @@ export class LocalConnection extends Connection {
async createEmptyTable(
name: string,
schema: import("./arrow").SchemaLike,
namespaceOrOptions?: string[] | Partial<CreateTableOptions>,
options?: Partial<CreateTableOptions>,
): Promise<Table> {
// Detect if third argument is namespace array or options object
let namespace: string[] | undefined;
let createOptions: Partial<CreateTableOptions> | undefined;
if (Array.isArray(namespaceOrOptions)) {
// Third argument is namespace array
namespace = namespaceOrOptions;
createOptions = options;
} else {
// Third argument is options object (backwards compatibility)
namespace = undefined;
createOptions = namespaceOrOptions;
}
let mode: string = createOptions?.mode ?? "create";
const existOk = createOptions?.existOk ?? false;
let mode: string = options?.mode ?? "create";
const existOk = options?.existOk ?? false;
if (mode === "create" && existOk) {
mode = "exist_ok";
}
let metadata: Map<string, string> | undefined = undefined;
if (createOptions?.embeddingFunction !== undefined) {
const embeddingFunction = createOptions.embeddingFunction;
if (options?.embeddingFunction !== undefined) {
const embeddingFunction = options.embeddingFunction;
const registry = getRegistry();
metadata = registry.getTableMetadata([embeddingFunction]);
}
const storageOptions = this.getStorageOptions(createOptions);
const storageOptions = this.getStorageOptions(options);
const table = makeEmptyTable(schema, metadata);
const buf = await fromTableToBuffer(table);
const innerTable = await this.inner.createEmptyTable(
name,
buf,
mode,
namespace ?? [],
storageOptions,
);
return new LocalTable(innerTable);
}
async dropTable(name: string, namespace?: string[]): Promise<void> {
return this.inner.dropTable(name, namespace ?? []);
async dropTable(name: string): Promise<void> {
return this.inner.dropTable(name);
}
async dropAllTables(namespace?: string[]): Promise<void> {
return this.inner.dropAllTables(namespace ?? []);
async dropAllTables(): Promise<void> {
return this.inner.dropAllTables();
}
}

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-darwin-arm64",
"version": "0.22.0-beta.1",
"version": "0.21.4-beta.1",
"os": ["darwin"],
"cpu": ["arm64"],
"main": "lancedb.darwin-arm64.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-darwin-x64",
"version": "0.22.0-beta.1",
"version": "0.21.4-beta.1",
"os": ["darwin"],
"cpu": ["x64"],
"main": "lancedb.darwin-x64.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-gnu",
"version": "0.22.0-beta.1",
"version": "0.21.4-beta.1",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-musl",
"version": "0.22.0-beta.1",
"version": "0.21.4-beta.1",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-gnu",
"version": "0.22.0-beta.1",
"version": "0.21.4-beta.1",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-musl",
"version": "0.22.0-beta.1",
"version": "0.21.4-beta.1",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-arm64-msvc",
"version": "0.22.0-beta.1",
"version": "0.21.4-beta.1",
"os": [
"win32"
],

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-x64-msvc",
"version": "0.22.0-beta.1",
"version": "0.21.4-beta.1",
"os": ["win32"],
"cpu": ["x64"],
"main": "lancedb.win32-x64-msvc.node",

View File

@@ -1,12 +1,12 @@
{
"name": "@lancedb/lancedb",
"version": "0.22.0-beta.1",
"version": "0.21.4-beta.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@lancedb/lancedb",
"version": "0.22.0-beta.1",
"version": "0.21.4-beta.0",
"cpu": [
"x64",
"arm64"

View File

@@ -11,7 +11,7 @@
"ann"
],
"private": false,
"version": "0.22.0-beta.1",
"version": "0.21.4-beta.1",
"main": "dist/index.js",
"exports": {
".": "./dist/index.js",

View File

@@ -100,12 +100,10 @@ impl Connection {
#[napi(catch_unwind)]
pub async fn table_names(
&self,
namespace: Vec<String>,
start_after: Option<String>,
limit: Option<u32>,
) -> napi::Result<Vec<String>> {
let mut op = self.get_inner()?.table_names();
op = op.namespace(namespace);
if let Some(start_after) = start_after {
op = op.start_after(start_after);
}
@@ -127,7 +125,6 @@ impl Connection {
name: String,
buf: Buffer,
mode: String,
namespace: Vec<String>,
storage_options: Option<HashMap<String, String>>,
) -> napi::Result<Table> {
let batches = ipc_file_to_batches(buf.to_vec())
@@ -135,8 +132,6 @@ impl Connection {
let mode = Self::parse_create_mode_str(&mode)?;
let mut builder = self.get_inner()?.create_table(&name, batches).mode(mode);
builder = builder.namespace(namespace);
if let Some(storage_options) = storage_options {
for (key, value) in storage_options {
builder = builder.storage_option(key, value);
@@ -152,7 +147,6 @@ impl Connection {
name: String,
schema_buf: Buffer,
mode: String,
namespace: Vec<String>,
storage_options: Option<HashMap<String, String>>,
) -> napi::Result<Table> {
let schema = ipc_file_to_schema(schema_buf.to_vec()).map_err(|e| {
@@ -163,9 +157,6 @@ impl Connection {
.get_inner()?
.create_empty_table(&name, schema)
.mode(mode);
builder = builder.namespace(namespace);
if let Some(storage_options) = storage_options {
for (key, value) in storage_options {
builder = builder.storage_option(key, value);
@@ -179,14 +170,10 @@ impl Connection {
pub async fn open_table(
&self,
name: String,
namespace: Vec<String>,
storage_options: Option<HashMap<String, String>>,
index_cache_size: Option<u32>,
) -> napi::Result<Table> {
let mut builder = self.get_inner()?.open_table(&name);
builder = builder.namespace(namespace);
if let Some(storage_options) = storage_options {
for (key, value) in storage_options {
builder = builder.storage_option(key, value);
@@ -201,18 +188,12 @@ impl Connection {
/// Drop table with the name. Or raise an error if the table does not exist.
#[napi(catch_unwind)]
pub async fn drop_table(&self, name: String, namespace: Vec<String>) -> napi::Result<()> {
self.get_inner()?
.drop_table(&name, &namespace)
.await
.default_error()
pub async fn drop_table(&self, name: String) -> napi::Result<()> {
self.get_inner()?.drop_table(&name).await.default_error()
}
#[napi(catch_unwind)]
pub async fn drop_all_tables(&self, namespace: Vec<String>) -> napi::Result<()> {
self.get_inner()?
.drop_all_tables(&namespace)
.await
.default_error()
pub async fn drop_all_tables(&self) -> napi::Result<()> {
self.get_inner()?.drop_all_tables().await.default_error()
}
}

View File

@@ -480,7 +480,6 @@ impl JsFullTextQuery {
}
#[napi(factory)]
#[allow(clippy::use_self)] // NAPI doesn't allow Self here but clippy reports it
pub fn boolean_query(queries: Vec<(String, &JsFullTextQuery)>) -> napi::Result<Self> {
let mut sub_queries = Vec::with_capacity(queries.len());
for (occur, q) in queries {

View File

@@ -76,7 +76,6 @@ pub struct ClientConfig {
pub retry_config: Option<RetryConfig>,
pub timeout_config: Option<TimeoutConfig>,
pub extra_headers: Option<HashMap<String, String>>,
pub id_delimiter: Option<String>,
}
impl From<TimeoutConfig> for lancedb::remote::TimeoutConfig {
@@ -116,7 +115,6 @@ impl From<ClientConfig> for lancedb::remote::ClientConfig {
retry_config: config.retry_config.map(Into::into).unwrap_or_default(),
timeout_config: config.timeout_config.map(Into::into).unwrap_or_default(),
extra_headers: config.extra_headers.unwrap_or_default(),
id_delimiter: config.id_delimiter,
}
}
}

View File

@@ -94,7 +94,7 @@ impl napi::bindgen_prelude::FromNapiValue for Session {
env: napi::sys::napi_env,
napi_val: napi::sys::napi_value,
) -> napi::Result<Self> {
let object: napi::bindgen_prelude::ClassInstance<Self> =
let object: napi::bindgen_prelude::ClassInstance<Session> =
napi::bindgen_prelude::ClassInstance::from_napi_value(env, napi_val)?;
let copy = object.clone();
Ok(copy)

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.25.0"
current_version = "0.24.4-beta.1"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-python"
version = "0.25.0"
version = "0.24.4-beta.1"
edition.workspace = true
description = "Python bindings for LanceDB"
license.workspace = true

View File

@@ -21,28 +21,14 @@ class Session:
class Connection(object):
uri: str
async def is_open(self): ...
async def close(self): ...
async def list_namespaces(
self,
namespace: List[str],
page_token: Optional[str],
limit: Optional[int],
) -> List[str]: ...
async def create_namespace(self, namespace: List[str]) -> None: ...
async def drop_namespace(self, namespace: List[str]) -> None: ...
async def table_names(
self,
namespace: List[str],
start_after: Optional[str],
limit: Optional[int],
self, start_after: Optional[str], limit: Optional[int]
) -> list[str]: ...
async def create_table(
self,
name: str,
mode: str,
data: pa.RecordBatchReader,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
) -> Table: ...
async def create_empty_table(
@@ -50,25 +36,10 @@ class Connection(object):
name: str,
mode: str,
schema: pa.Schema,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
) -> Table: ...
async def open_table(
self,
name: str,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
index_cache_size: Optional[int] = None,
) -> Table: ...
async def rename_table(
self,
cur_name: str,
new_name: str,
cur_namespace: List[str] = [],
new_namespace: List[str] = [],
) -> None: ...
async def drop_table(self, name: str, namespace: List[str] = []) -> None: ...
async def drop_all_tables(self, namespace: List[str] = []) -> None: ...
async def rename_table(self, old_name: str, new_name: str) -> None: ...
async def drop_table(self, name: str) -> None: ...
class Table:
def name(self) -> str: ...

View File

@@ -43,70 +43,14 @@ if TYPE_CHECKING:
class DBConnection(EnforceOverrides):
"""An active LanceDB connection interface."""
def list_namespaces(
self,
namespace: List[str] = [],
page_token: Optional[str] = None,
limit: int = 10,
) -> Iterable[str]:
"""List immediate child namespace names in the given namespace.
Parameters
----------
namespace: List[str], default []
The parent namespace to list namespaces in.
Empty list represents root namespace.
page_token: str, optional
The token to use for pagination. If not present, start from the beginning.
limit: int, default 10
The size of the page to return.
Returns
-------
Iterable of str
List of immediate child namespace names
"""
return []
def create_namespace(self, namespace: List[str]) -> None:
"""Create a new namespace.
Parameters
----------
namespace: List[str]
The namespace identifier to create.
"""
raise NotImplementedError(
"Namespace operations are not supported for this connection type"
)
def drop_namespace(self, namespace: List[str]) -> None:
"""Drop a namespace.
Parameters
----------
namespace: List[str]
The namespace identifier to drop.
"""
raise NotImplementedError(
"Namespace operations are not supported for this connection type"
)
@abstractmethod
def table_names(
self,
page_token: Optional[str] = None,
limit: int = 10,
*,
namespace: List[str] = [],
self, page_token: Optional[str] = None, limit: int = 10
) -> Iterable[str]:
"""List all tables in this database, in sorted order
Parameters
----------
namespace: List[str], default []
The namespace to list tables in.
Empty list represents root namespace.
page_token: str, optional
The token to use for pagination. If not present, start from the beginning.
Typically, this token is last table name from the previous page.
@@ -133,7 +77,6 @@ class DBConnection(EnforceOverrides):
fill_value: float = 0.0,
embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None,
*,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
data_storage_version: Optional[str] = None,
enable_v2_manifest_paths: Optional[bool] = None,
@@ -144,9 +87,6 @@ class DBConnection(EnforceOverrides):
----------
name: str
The name of the table.
namespace: List[str], default []
The namespace to create the table in.
Empty list represents root namespace.
data: The data to initialize the table, *optional*
User must provide at least one of `data` or `schema`.
Acceptable types are:
@@ -298,7 +238,6 @@ class DBConnection(EnforceOverrides):
self,
name: str,
*,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
index_cache_size: Optional[int] = None,
) -> Table:
@@ -308,9 +247,6 @@ class DBConnection(EnforceOverrides):
----------
name: str
The name of the table.
namespace: List[str], optional
The namespace to open the table from.
None or empty list represents root namespace.
index_cache_size: int, default 256
**Deprecated**: Use session-level cache configuration instead.
Create a Session with custom cache sizes and pass it to lancedb.connect().
@@ -336,26 +272,17 @@ class DBConnection(EnforceOverrides):
"""
raise NotImplementedError
def drop_table(self, name: str, namespace: List[str] = []):
def drop_table(self, name: str):
"""Drop a table from the database.
Parameters
----------
name: str
The name of the table.
namespace: List[str], default []
The namespace to drop the table from.
Empty list represents root namespace.
"""
raise NotImplementedError
def rename_table(
self,
cur_name: str,
new_name: str,
cur_namespace: List[str] = [],
new_namespace: List[str] = [],
):
def rename_table(self, cur_name: str, new_name: str):
"""Rename a table in the database.
Parameters
@@ -364,12 +291,6 @@ class DBConnection(EnforceOverrides):
The current name of the table.
new_name: str
The new name of the table.
cur_namespace: List[str], optional
The namespace of the current table.
None or empty list represents root namespace.
new_namespace: List[str], optional
The namespace to move the table to.
If not specified, defaults to the same as cur_namespace.
"""
raise NotImplementedError
@@ -380,15 +301,9 @@ class DBConnection(EnforceOverrides):
"""
raise NotImplementedError
def drop_all_tables(self, namespace: List[str] = []):
def drop_all_tables(self):
"""
Drop all tables from the database
Parameters
----------
namespace: List[str], optional
The namespace to drop all tables from.
None or empty list represents root namespace.
"""
raise NotImplementedError
@@ -489,87 +404,18 @@ class LanceDBConnection(DBConnection):
conn = AsyncConnection(await lancedb_connect(self.uri))
return await conn.table_names(start_after=start_after, limit=limit)
@override
def list_namespaces(
self,
namespace: List[str] = [],
page_token: Optional[str] = None,
limit: int = 10,
) -> Iterable[str]:
"""List immediate child namespace names in the given namespace.
Parameters
----------
namespace: List[str], optional
The parent namespace to list namespaces in.
None or empty list represents root namespace.
page_token: str, optional
The token to use for pagination. If not present, start from the beginning.
limit: int, default 10
The size of the page to return.
Returns
-------
Iterable of str
List of immediate child namespace names
"""
return LOOP.run(
self._conn.list_namespaces(
namespace=namespace, page_token=page_token, limit=limit
)
)
@override
def create_namespace(self, namespace: List[str]) -> None:
"""Create a new namespace.
Parameters
----------
namespace: List[str]
The namespace identifier to create.
"""
LOOP.run(self._conn.create_namespace(namespace=namespace))
@override
def drop_namespace(self, namespace: List[str]) -> None:
"""Drop a namespace.
Parameters
----------
namespace: List[str]
The namespace identifier to drop.
"""
return LOOP.run(self._conn.drop_namespace(namespace=namespace))
@override
def table_names(
self,
page_token: Optional[str] = None,
limit: int = 10,
*,
namespace: List[str] = [],
self, page_token: Optional[str] = None, limit: int = 10
) -> Iterable[str]:
"""Get the names of all tables in the database. The names are sorted.
Parameters
----------
namespace: List[str], optional
The namespace to list tables in.
page_token: str, optional
The token to use for pagination.
limit: int, default 10
The maximum number of tables to return.
Returns
-------
Iterator of str.
A list of table names.
"""
return LOOP.run(
self._conn.table_names(
namespace=namespace, start_after=page_token, limit=limit
)
)
return LOOP.run(self._conn.table_names(start_after=page_token, limit=limit))
def __len__(self) -> int:
return len(self.table_names())
@@ -589,18 +435,12 @@ class LanceDBConnection(DBConnection):
fill_value: float = 0.0,
embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None,
*,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
data_storage_version: Optional[str] = None,
enable_v2_manifest_paths: Optional[bool] = None,
) -> LanceTable:
"""Create a table in the database.
Parameters
----------
namespace: List[str], optional
The namespace to create the table in.
See
---
DBConnection.create_table
@@ -619,7 +459,6 @@ class LanceDBConnection(DBConnection):
on_bad_vectors=on_bad_vectors,
fill_value=fill_value,
embedding_functions=embedding_functions,
namespace=namespace,
storage_options=storage_options,
)
return tbl
@@ -629,7 +468,6 @@ class LanceDBConnection(DBConnection):
self,
name: str,
*,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
index_cache_size: Optional[int] = None,
) -> LanceTable:
@@ -639,8 +477,6 @@ class LanceDBConnection(DBConnection):
----------
name: str
The name of the table.
namespace: List[str], optional
The namespace to open the table from.
Returns
-------
@@ -660,68 +496,26 @@ class LanceDBConnection(DBConnection):
return LanceTable.open(
self,
name,
namespace=namespace,
storage_options=storage_options,
index_cache_size=index_cache_size,
)
@override
def drop_table(
self,
name: str,
namespace: List[str] = [],
ignore_missing: bool = False,
):
def drop_table(self, name: str, ignore_missing: bool = False):
"""Drop a table from the database.
Parameters
----------
name: str
The name of the table.
namespace: List[str], optional
The namespace to drop the table from.
ignore_missing: bool, default False
If True, ignore if the table does not exist.
"""
LOOP.run(
self._conn.drop_table(
name, namespace=namespace, ignore_missing=ignore_missing
)
)
LOOP.run(self._conn.drop_table(name, ignore_missing=ignore_missing))
@override
def drop_all_tables(self, namespace: List[str] = []):
LOOP.run(self._conn.drop_all_tables(namespace=namespace))
@override
def rename_table(
self,
cur_name: str,
new_name: str,
cur_namespace: List[str] = [],
new_namespace: List[str] = [],
):
"""Rename a table in the database.
Parameters
----------
cur_name: str
The current name of the table.
new_name: str
The new name of the table.
cur_namespace: List[str], optional
The namespace of the current table.
new_namespace: List[str], optional
The namespace to move the table to.
"""
LOOP.run(
self._conn.rename_table(
cur_name,
new_name,
cur_namespace=cur_namespace,
new_namespace=new_namespace,
)
)
def drop_all_tables(self):
LOOP.run(self._conn.drop_all_tables())
@deprecation.deprecated(
deprecated_in="0.15.1",
@@ -794,67 +588,13 @@ class AsyncConnection(object):
def uri(self) -> str:
return self._inner.uri
async def list_namespaces(
self,
namespace: List[str] = [],
page_token: Optional[str] = None,
limit: int = 10,
) -> Iterable[str]:
"""List immediate child namespace names in the given namespace.
Parameters
----------
namespace: List[str], optional
The parent namespace to list namespaces in.
None or empty list represents root namespace.
page_token: str, optional
The token to use for pagination. If not present, start from the beginning.
limit: int, default 10
The size of the page to return.
Returns
-------
Iterable of str
List of immediate child namespace names (not full paths)
"""
return await self._inner.list_namespaces(
namespace=namespace, page_token=page_token, limit=limit
)
async def create_namespace(self, namespace: List[str]) -> None:
"""Create a new namespace.
Parameters
----------
namespace: List[str]
The namespace identifier to create.
"""
await self._inner.create_namespace(namespace)
async def drop_namespace(self, namespace: List[str]) -> None:
"""Drop a namespace.
Parameters
----------
namespace: List[str]
The namespace identifier to drop.
"""
await self._inner.drop_namespace(namespace)
async def table_names(
self,
*,
namespace: List[str] = [],
start_after: Optional[str] = None,
limit: Optional[int] = None,
self, *, start_after: Optional[str] = None, limit: Optional[int] = None
) -> Iterable[str]:
"""List all tables in this database, in sorted order
Parameters
----------
namespace: List[str], optional
The namespace to list tables in.
None or empty list represents root namespace.
start_after: str, optional
If present, only return names that come lexicographically after the supplied
value.
@@ -868,9 +608,7 @@ class AsyncConnection(object):
-------
Iterable of str
"""
return await self._inner.table_names(
namespace=namespace, start_after=start_after, limit=limit
)
return await self._inner.table_names(start_after=start_after, limit=limit)
async def create_table(
self,
@@ -883,7 +621,6 @@ class AsyncConnection(object):
fill_value: Optional[float] = None,
storage_options: Optional[Dict[str, str]] = None,
*,
namespace: List[str] = [],
embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None,
) -> AsyncTable:
"""Create an [AsyncTable][lancedb.table.AsyncTable] in the database.
@@ -892,9 +629,6 @@ class AsyncConnection(object):
----------
name: str
The name of the table.
namespace: List[str], default []
The namespace to create the table in.
Empty list represents root namespace.
data: The data to initialize the table, *optional*
User must provide at least one of `data` or `schema`.
Acceptable types are:
@@ -1073,7 +807,6 @@ class AsyncConnection(object):
name,
mode,
schema,
namespace=namespace,
storage_options=storage_options,
)
else:
@@ -1082,7 +815,6 @@ class AsyncConnection(object):
name,
mode,
data,
namespace=namespace,
storage_options=storage_options,
)
@@ -1091,8 +823,6 @@ class AsyncConnection(object):
async def open_table(
self,
name: str,
*,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
index_cache_size: Optional[int] = None,
) -> AsyncTable:
@@ -1102,9 +832,6 @@ class AsyncConnection(object):
----------
name: str
The name of the table.
namespace: List[str], optional
The namespace to open the table from.
None or empty list represents root namespace.
storage_options: dict, optional
Additional options for the storage backend. Options already set on the
connection will be inherited by the table, but can be overridden here.
@@ -1128,77 +855,42 @@ class AsyncConnection(object):
-------
A LanceTable object representing the table.
"""
table = await self._inner.open_table(
name,
namespace=namespace,
storage_options=storage_options,
index_cache_size=index_cache_size,
)
table = await self._inner.open_table(name, storage_options, index_cache_size)
return AsyncTable(table)
async def rename_table(
self,
cur_name: str,
new_name: str,
cur_namespace: List[str] = [],
new_namespace: List[str] = [],
):
async def rename_table(self, old_name: str, new_name: str):
"""Rename a table in the database.
Parameters
----------
cur_name: str
old_name: str
The current name of the table.
new_name: str
The new name of the table.
cur_namespace: List[str], optional
The namespace of the current table.
None or empty list represents root namespace.
new_namespace: List[str], optional
The namespace to move the table to.
If not specified, defaults to the same as cur_namespace.
"""
await self._inner.rename_table(
cur_name, new_name, cur_namespace=cur_namespace, new_namespace=new_namespace
)
await self._inner.rename_table(old_name, new_name)
async def drop_table(
self,
name: str,
*,
namespace: List[str] = [],
ignore_missing: bool = False,
):
async def drop_table(self, name: str, *, ignore_missing: bool = False):
"""Drop a table from the database.
Parameters
----------
name: str
The name of the table.
namespace: List[str], default []
The namespace to drop the table from.
Empty list represents root namespace.
ignore_missing: bool, default False
If True, ignore if the table does not exist.
"""
try:
await self._inner.drop_table(name, namespace=namespace)
await self._inner.drop_table(name)
except ValueError as e:
if not ignore_missing:
raise e
if f"Table '{name}' was not found" not in str(e):
raise e
async def drop_all_tables(self, namespace: List[str] = []):
"""Drop all tables from the database.
Parameters
----------
namespace: List[str], optional
The namespace to drop all tables from.
None or empty list represents root namespace.
"""
await self._inner.drop_all_tables(namespace=namespace)
async def drop_all_tables(self):
"""Drop all tables from the database."""
await self._inner.drop_all_tables()
@deprecation.deprecated(
deprecated_in="0.15.1",

View File

@@ -26,9 +26,6 @@ from lance_namespace_urllib3_client.models import (
DescribeTableRequest,
CreateTableRequest,
DropTableRequest,
ListNamespacesRequest,
CreateNamespaceRequest,
DropNamespaceRequest,
JsonArrowSchema,
JsonArrowField,
JsonArrowDataType,
@@ -137,13 +134,10 @@ class LanceNamespaceDBConnection(DBConnection):
@override
def table_names(
self,
page_token: Optional[str] = None,
limit: int = 10,
*,
namespace: List[str] = [],
self, page_token: Optional[str] = None, limit: int = 10
) -> Iterable[str]:
request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit)
# Use namespace to list tables
request = ListTablesRequest(id=None, page_token=page_token, limit=limit)
response = self._ns.list_tables(request)
return response.tables if response.tables else []
@@ -159,7 +153,6 @@ class LanceNamespaceDBConnection(DBConnection):
fill_value: float = 0.0,
embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None,
*,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
data_storage_version: Optional[str] = None,
enable_v2_manifest_paths: Optional[bool] = None,
@@ -190,9 +183,8 @@ class LanceNamespaceDBConnection(DBConnection):
# Convert PyArrow schema to JsonArrowSchema
json_schema = _convert_pyarrow_schema_to_json(schema)
# Create table request with namespace
table_id = namespace + [name]
request = CreateTableRequest(id=table_id, var_schema=json_schema)
# Create table request
request = CreateTableRequest(id=[name], var_schema=json_schema)
# Create empty Arrow IPC stream bytes
import pyarrow.ipc as ipc
@@ -207,21 +199,17 @@ class LanceNamespaceDBConnection(DBConnection):
request_data = buffer.getvalue()
self._ns.create_table(request, request_data)
return self.open_table(
name, namespace=namespace, storage_options=storage_options
)
return self.open_table(name, storage_options=storage_options)
@override
def open_table(
self,
name: str,
*,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
index_cache_size: Optional[int] = None,
) -> Table:
table_id = namespace + [name]
request = DescribeTableRequest(id=table_id)
request = DescribeTableRequest(id=[name])
response = self._ns.describe_table(request)
merged_storage_options = dict()
@@ -237,20 +225,13 @@ class LanceNamespaceDBConnection(DBConnection):
)
@override
def drop_table(self, name: str, namespace: List[str] = []):
def drop_table(self, name: str):
# Use namespace drop_table directly
table_id = namespace + [name]
request = DropTableRequest(id=table_id)
request = DropTableRequest(id=[name])
self._ns.drop_table(request)
@override
def rename_table(
self,
cur_name: str,
new_name: str,
cur_namespace: List[str] = [],
new_namespace: List[str] = [],
):
def rename_table(self, cur_name: str, new_name: str):
raise NotImplementedError(
"rename_table is not supported for namespace connections"
)
@@ -262,66 +243,9 @@ class LanceNamespaceDBConnection(DBConnection):
)
@override
def drop_all_tables(self, namespace: List[str] = []):
for table_name in self.table_names(namespace=namespace):
self.drop_table(table_name, namespace=namespace)
@override
def list_namespaces(
self,
namespace: List[str] = [],
page_token: Optional[str] = None,
limit: int = 10,
) -> Iterable[str]:
"""
List child namespaces under the given namespace.
Parameters
----------
namespace : Optional[List[str]]
The parent namespace to list children from.
If None, lists root-level namespaces.
page_token : Optional[str]
Pagination token for listing results.
limit : int
Maximum number of namespaces to return.
Returns
-------
Iterable[str]
Names of child namespaces.
"""
request = ListNamespacesRequest(
id=namespace, page_token=page_token, limit=limit
)
response = self._ns.list_namespaces(request)
return response.namespaces if response.namespaces else []
@override
def create_namespace(self, namespace: List[str]) -> None:
"""
Create a new namespace.
Parameters
----------
namespace : List[str]
The namespace path to create.
"""
request = CreateNamespaceRequest(id=namespace)
self._ns.create_namespace(request)
@override
def drop_namespace(self, namespace: List[str]) -> None:
"""
Drop a namespace.
Parameters
----------
namespace : List[str]
The namespace path to drop.
"""
request = DropNamespaceRequest(id=namespace)
self._ns.drop_namespace(request)
def drop_all_tables(self):
for table_name in self.table_names():
self.drop_table(table_name)
def _lance_table_from_uri(
self,

View File

@@ -943,22 +943,20 @@ class LanceQueryBuilder(ABC):
>>> query = [100, 100]
>>> plan = table.search(query).analyze_plan()
>>> print(plan) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
AnalyzeExec verbose=true, metrics=[], cumulative_cpu=...
TracedExec, metrics=[], cumulative_cpu=...
ProjectionExec: expr=[...], metrics=[...], cumulative_cpu=...
GlobalLimitExec: skip=0, fetch=10, metrics=[...], cumulative_cpu=...
AnalyzeExec verbose=true, metrics=[]
TracedExec, metrics=[]
ProjectionExec: expr=[...], metrics=[...]
GlobalLimitExec: skip=0, fetch=10, metrics=[...]
FilterExec: _distance@2 IS NOT NULL,
metrics=[output_rows=..., elapsed_compute=...], cumulative_cpu=...
metrics=[output_rows=..., elapsed_compute=...]
SortExec: TopK(fetch=10), expr=[...],
preserve_partitioning=[...],
metrics=[output_rows=..., elapsed_compute=..., row_replacements=...],
cumulative_cpu=...
metrics=[output_rows=..., elapsed_compute=..., row_replacements=...]
KNNVectorDistance: metric=l2,
metrics=[output_rows=..., elapsed_compute=..., output_batches=...],
cumulative_cpu=...
metrics=[output_rows=..., elapsed_compute=..., output_batches=...]
LanceRead: uri=..., projection=[vector], ...
metrics=[output_rows=..., elapsed_compute=...,
bytes_read=..., iops=..., requests=...], cumulative_cpu=...
bytes_read=..., iops=..., requests=...]
Returns
-------

View File

@@ -118,7 +118,6 @@ class ClientConfig:
retry_config: RetryConfig = field(default_factory=RetryConfig)
timeout_config: Optional[TimeoutConfig] = field(default_factory=TimeoutConfig)
extra_headers: Optional[dict] = None
id_delimiter: Optional[str] = None
def __post_init__(self):
if isinstance(self.retry_config, dict):

View File

@@ -96,73 +96,14 @@ class RemoteDBConnection(DBConnection):
def __repr__(self) -> str:
return f"RemoteConnect(name={self.db_name})"
@override
def list_namespaces(
self,
namespace: List[str] = [],
page_token: Optional[str] = None,
limit: int = 10,
) -> Iterable[str]:
"""List immediate child namespace names in the given namespace.
Parameters
----------
namespace: List[str], optional
The parent namespace to list namespaces in.
None or empty list represents root namespace.
page_token: str, optional
The token to use for pagination. If not present, start from the beginning.
limit: int, default 10
The size of the page to return.
Returns
-------
Iterable of str
List of immediate child namespace names
"""
return LOOP.run(
self._conn.list_namespaces(
namespace=namespace, page_token=page_token, limit=limit
)
)
@override
def create_namespace(self, namespace: List[str]) -> None:
"""Create a new namespace.
Parameters
----------
namespace: List[str]
The namespace identifier to create.
"""
LOOP.run(self._conn.create_namespace(namespace=namespace))
@override
def drop_namespace(self, namespace: List[str]) -> None:
"""Drop a namespace.
Parameters
----------
namespace: List[str]
The namespace identifier to drop.
"""
return LOOP.run(self._conn.drop_namespace(namespace=namespace))
@override
def table_names(
self,
page_token: Optional[str] = None,
limit: int = 10,
*,
namespace: List[str] = [],
self, page_token: Optional[str] = None, limit: int = 10
) -> Iterable[str]:
"""List the names of all tables in the database.
Parameters
----------
namespace: List[str], default []
The namespace to list tables in.
Empty list represents root namespace.
page_token: str
The last token to start the new page.
limit: int, default 10
@@ -172,18 +113,13 @@ class RemoteDBConnection(DBConnection):
-------
An iterator of table names.
"""
return LOOP.run(
self._conn.table_names(
namespace=namespace, start_after=page_token, limit=limit
)
)
return LOOP.run(self._conn.table_names(start_after=page_token, limit=limit))
@override
def open_table(
self,
name: str,
*,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
index_cache_size: Optional[int] = None,
) -> Table:
@@ -193,9 +129,6 @@ class RemoteDBConnection(DBConnection):
----------
name: str
The name of the table.
namespace: List[str], optional
The namespace to open the table from.
None or empty list represents root namespace.
Returns
-------
@@ -209,7 +142,7 @@ class RemoteDBConnection(DBConnection):
" (there is no local cache to configure)"
)
table = LOOP.run(self._conn.open_table(name, namespace=namespace))
table = LOOP.run(self._conn.open_table(name))
return RemoteTable(table, self.db_name)
@override
@@ -222,8 +155,6 @@ class RemoteDBConnection(DBConnection):
fill_value: float = 0.0,
mode: Optional[str] = None,
embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None,
*,
namespace: List[str] = [],
) -> Table:
"""Create a [Table][lancedb.table.Table] in the database.
@@ -231,9 +162,6 @@ class RemoteDBConnection(DBConnection):
----------
name: str
The name of the table.
namespace: List[str], optional
The namespace to create the table in.
None or empty list represents root namespace.
data: The data to initialize the table, *optional*
User must provide at least one of `data` or `schema`.
Acceptable types are:
@@ -334,7 +262,6 @@ class RemoteDBConnection(DBConnection):
self._conn.create_table(
name,
data,
namespace=namespace,
mode=mode,
schema=schema,
on_bad_vectors=on_bad_vectors,
@@ -344,27 +271,18 @@ class RemoteDBConnection(DBConnection):
return RemoteTable(table, self.db_name)
@override
def drop_table(self, name: str, namespace: List[str] = []):
def drop_table(self, name: str):
"""Drop a table from the database.
Parameters
----------
name: str
The name of the table.
namespace: List[str], optional
The namespace to drop the table from.
None or empty list represents root namespace.
"""
LOOP.run(self._conn.drop_table(name, namespace=namespace))
LOOP.run(self._conn.drop_table(name))
@override
def rename_table(
self,
cur_name: str,
new_name: str,
cur_namespace: List[str] = [],
new_namespace: List[str] = [],
):
def rename_table(self, cur_name: str, new_name: str):
"""Rename a table in the database.
Parameters
@@ -374,14 +292,7 @@ class RemoteDBConnection(DBConnection):
new_name: str
The new name of the table.
"""
LOOP.run(
self._conn.rename_table(
cur_name,
new_name,
cur_namespace=cur_namespace,
new_namespace=new_namespace,
)
)
LOOP.run(self._conn.rename_table(cur_name, new_name))
async def close(self):
"""Close the connection to the database."""

View File

@@ -115,7 +115,6 @@ class RemoteTable(Table):
*,
replace: bool = False,
wait_timeout: timedelta = None,
name: Optional[str] = None,
):
"""Creates a scalar index
Parameters
@@ -140,11 +139,7 @@ class RemoteTable(Table):
LOOP.run(
self._table.create_index(
column,
config=config,
replace=replace,
wait_timeout=wait_timeout,
name=name,
column, config=config, replace=replace, wait_timeout=wait_timeout
)
)
@@ -166,7 +161,6 @@ class RemoteTable(Table):
ngram_min_length: int = 3,
ngram_max_length: int = 3,
prefix_only: bool = False,
name: Optional[str] = None,
):
config = FTS(
with_position=with_position,
@@ -183,11 +177,7 @@ class RemoteTable(Table):
)
LOOP.run(
self._table.create_index(
column,
config=config,
replace=replace,
wait_timeout=wait_timeout,
name=name,
column, config=config, replace=replace, wait_timeout=wait_timeout
)
)

View File

@@ -783,7 +783,6 @@ class Table(ABC):
replace: bool = True,
index_type: ScalarIndexType = "BTREE",
wait_timeout: Optional[timedelta] = None,
name: Optional[str] = None,
):
"""Create a scalar index on a column.
@@ -798,8 +797,6 @@ class Table(ABC):
The type of index to create.
wait_timeout: timedelta, optional
The timeout to wait if indexing is asynchronous.
name: str, optional
The name of the index. If not provided, a default name will be generated.
Examples
--------
@@ -862,7 +859,6 @@ class Table(ABC):
ngram_max_length: int = 3,
prefix_only: bool = False,
wait_timeout: Optional[timedelta] = None,
name: Optional[str] = None,
):
"""Create a full-text search index on the table.
@@ -927,8 +923,6 @@ class Table(ABC):
Whether to only index the prefix of the token for ngram tokenizer.
wait_timeout: timedelta, optional
The timeout to wait if indexing is asynchronous.
name: str, optional
The name of the index. If not provided, a default name will be generated.
"""
raise NotImplementedError
@@ -1119,9 +1113,7 @@ class Table(ABC):
raise NotImplementedError
@abstractmethod
def take_offsets(
self, offsets: list[int], *, with_row_id: bool = False
) -> LanceTakeQueryBuilder:
def take_offsets(self, offsets: list[int]) -> LanceTakeQueryBuilder:
"""
Take a list of offsets from the table.
@@ -1147,60 +1139,8 @@ class Table(ABC):
A record batch containing the rows at the given offsets.
"""
def __getitems__(self, offsets: list[int]) -> pa.RecordBatch:
"""
Take a list of offsets from the table and return as a record batch.
This method uses the `take_offsets` method to take the rows. However, it
aligns the offsets to the passed in offsets. This means the return type
is a record batch (and so users should take care not to pass in too many
offsets)
Note: this method is primarily intended to fulfill the Dataset contract
for pytorch.
Parameters
----------
offsets: list[int]
The offsets to take.
Returns
-------
pa.RecordBatch
A record batch containing the rows at the given offsets.
"""
# We don't know the order of the results at all. So we calculate a permutation
# for ordering the given offsets. Then we load the data with the _rowoffset
# column. Then we sort by _rowoffset and apply the inverse of the permutation
# that we calculated.
#
# Note: this is potentially a lot of memory copy if we're operating on large
# batches :(
num_offsets = len(offsets)
indices = list(range(num_offsets))
permutation = sorted(indices, key=lambda idx: offsets[idx])
permutation_inv = [0] * num_offsets
for i in range(num_offsets):
permutation_inv[permutation[i]] = i
columns = self.schema.names
columns.append("_rowoffset")
tbl = (
self.take_offsets(offsets)
.select(columns)
.to_arrow()
.sort_by("_rowoffset")
.take(permutation_inv)
.combine_chunks()
.drop_columns(["_rowoffset"])
)
return tbl
@abstractmethod
def take_row_ids(
self, row_ids: list[int], *, with_row_id: bool = False
) -> LanceTakeQueryBuilder:
def take_row_ids(self, row_ids: list[int]) -> LanceTakeQueryBuilder:
"""
Take a list of row ids from the table.
@@ -1706,16 +1646,13 @@ class LanceTable(Table):
connection: "LanceDBConnection",
name: str,
*,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
index_cache_size: Optional[int] = None,
):
self._conn = connection
self._namespace = namespace
self._table = LOOP.run(
connection._conn.open_table(
name,
namespace=namespace,
storage_options=storage_options,
index_cache_size=index_cache_size,
)
@@ -1726,8 +1663,8 @@ class LanceTable(Table):
return self._table.name
@classmethod
def open(cls, db, name, *, namespace: List[str] = [], **kwargs):
tbl = cls(db, name, namespace=namespace, **kwargs)
def open(cls, db, name, **kwargs):
tbl = cls(db, name, **kwargs)
# check the dataset exists
try:
@@ -2111,7 +2048,6 @@ class LanceTable(Table):
*,
replace: bool = True,
index_type: ScalarIndexType = "BTREE",
name: Optional[str] = None,
):
if index_type == "BTREE":
config = BTree()
@@ -2122,7 +2058,7 @@ class LanceTable(Table):
else:
raise ValueError(f"Unknown index type {index_type}")
return LOOP.run(
self._table.create_index(column, replace=replace, config=config, name=name)
self._table.create_index(column, replace=replace, config=config)
)
def create_fts_index(
@@ -2146,7 +2082,6 @@ class LanceTable(Table):
ngram_min_length: int = 3,
ngram_max_length: int = 3,
prefix_only: bool = False,
name: Optional[str] = None,
):
if not use_tantivy:
if not isinstance(field_names, str):
@@ -2184,7 +2119,6 @@ class LanceTable(Table):
field_names,
replace=replace,
config=config,
name=name,
)
)
return
@@ -2551,7 +2485,6 @@ class LanceTable(Table):
fill_value: float = 0.0,
embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None,
*,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str | bool]] = None,
data_storage_version: Optional[str] = None,
enable_v2_manifest_paths: Optional[bool] = None,
@@ -2611,7 +2544,6 @@ class LanceTable(Table):
"""
self = cls.__new__(cls)
self._conn = db
self._namespace = namespace
if data_storage_version is not None:
warnings.warn(
@@ -2644,7 +2576,6 @@ class LanceTable(Table):
on_bad_vectors=on_bad_vectors,
fill_value=fill_value,
embedding_functions=embedding_functions,
namespace=namespace,
storage_options=storage_options,
)
)

View File

@@ -175,18 +175,6 @@ def test_table_names(tmp_db: lancedb.DBConnection):
tmp_db.create_table("test3", data=data)
assert tmp_db.table_names() == ["test1", "test2", "test3"]
# Test that positional arguments for page_token and limit
result = list(tmp_db.table_names("test1", 1)) # page_token="test1", limit=1
assert result == ["test2"], f"Expected ['test2'], got {result}"
# Test mixed positional and keyword arguments
result = list(tmp_db.table_names("test2", limit=2))
assert result == ["test3"], f"Expected ['test3'], got {result}"
# Test that namespace parameter can be passed as keyword
result = list(tmp_db.table_names(namespace=[]))
assert len(result) == 3
@pytest.mark.asyncio
async def test_table_names_async(tmp_path):
@@ -740,93 +728,3 @@ def test_bypass_vector_index_sync(tmp_db: lancedb.DBConnection):
table.search(sample_key).bypass_vector_index().explain_plan(verbose=True)
)
assert "KNN" in plan_without_index
def test_local_namespace_operations(tmp_path):
"""Test that local mode namespace operations behave as expected."""
# Create a local database connection
db = lancedb.connect(tmp_path)
# Test list_namespaces returns empty list
namespaces = list(db.list_namespaces())
assert namespaces == []
# Test list_namespaces with parameters still returns empty list
namespaces_with_params = list(
db.list_namespaces(namespace=["test"], page_token="token", limit=5)
)
assert namespaces_with_params == []
def test_local_create_namespace_not_supported(tmp_path):
"""Test that create_namespace is not supported in local mode."""
db = lancedb.connect(tmp_path)
with pytest.raises(
NotImplementedError,
match="Namespace operations are not supported for listing database",
):
db.create_namespace(["test_namespace"])
def test_local_drop_namespace_not_supported(tmp_path):
"""Test that drop_namespace is not supported in local mode."""
db = lancedb.connect(tmp_path)
with pytest.raises(
NotImplementedError,
match="Namespace operations are not supported for listing database",
):
db.drop_namespace(["test_namespace"])
def test_local_table_operations_with_namespace_raise_error(tmp_path):
"""
Test that table operations with namespace parameter
raise ValueError in local mode.
"""
db = lancedb.connect(tmp_path)
# Create some test data
data = [{"vector": [1.0, 2.0], "item": "test"}]
schema = pa.schema(
[pa.field("vector", pa.list_(pa.float32(), 2)), pa.field("item", pa.string())]
)
# Test create_table with namespace - should raise ValueError
with pytest.raises(
NotImplementedError,
match="Namespace parameter is not supported for listing database",
):
db.create_table(
"test_table_with_ns", data=data, schema=schema, namespace=["test_ns"]
)
# Create table normally for other tests
db.create_table("test_table", data=data, schema=schema)
assert "test_table" in db.table_names()
# Test open_table with namespace - should raise ValueError
with pytest.raises(
NotImplementedError,
match="Namespace parameter is not supported for listing database",
):
db.open_table("test_table", namespace=["test_ns"])
# Test table_names with namespace - should raise ValueError
with pytest.raises(
NotImplementedError,
match="Namespace parameter is not supported for listing database",
):
list(db.table_names(namespace=["test_ns"]))
# Test drop_table with namespace - should raise ValueError
with pytest.raises(
NotImplementedError,
match="Namespace parameter is not supported for listing database",
):
db.drop_table("test_table", namespace=["test_ns"])
# Test table_names without namespace - should work normally
tables_root = list(db.table_names())
assert "test_table" in tables_root

View File

@@ -157,16 +157,7 @@ def test_create_index_with_stemming(tmp_path, table):
def test_create_inverted_index(table, use_tantivy, with_position):
if use_tantivy and not with_position:
pytest.skip("we don't support building a tantivy index without position")
table.create_fts_index(
"text",
use_tantivy=use_tantivy,
with_position=with_position,
name="custom_fts_index",
)
if not use_tantivy:
indices = table.list_indices()
fts_indices = [i for i in indices if i.index_type == "FTS"]
assert any(i.name == "custom_fts_index" for i in fts_indices)
table.create_fts_index("text", use_tantivy=use_tantivy, with_position=with_position)
def test_populate_index(tmp_path, table):

View File

@@ -23,12 +23,6 @@ from lance_namespace_urllib3_client.models import (
CreateTableResponse,
DropTableRequest,
DropTableResponse,
ListNamespacesRequest,
ListNamespacesResponse,
CreateNamespaceRequest,
CreateNamespaceResponse,
DropNamespaceRequest,
DropNamespaceResponse,
)
@@ -37,8 +31,6 @@ class TempNamespace(LanceNamespace):
# Class-level storage to persist table registry across instances
_global_registry: Dict[str, Dict[str, str]] = {}
# Class-level storage for namespaces (supporting 1-level namespace)
_global_namespaces: Dict[str, set] = {}
def __init__(self, **properties):
"""Initialize the test namespace.
@@ -52,48 +44,20 @@ class TempNamespace(LanceNamespace):
root = self.config.root
if root not in self._global_registry:
self._global_registry[root] = {}
if root not in self._global_namespaces:
self._global_namespaces[root] = set()
self.tables = self._global_registry[root] # Reference to shared registry
self.namespaces = self._global_namespaces[
root
] # Reference to shared namespaces
def list_tables(self, request: ListTablesRequest) -> ListTablesResponse:
"""List all tables in the namespace."""
if not request.id:
# List all tables in root namespace
tables = [name for name in self.tables.keys() if "." not in name]
else:
# List tables in specific namespace (1-level only)
if len(request.id) == 1:
namespace_name = request.id[0]
prefix = f"{namespace_name}."
tables = [
name[len(prefix) :]
for name in self.tables.keys()
if name.startswith(prefix)
]
else:
# Multi-level namespaces not supported
raise ValueError("Only 1-level namespaces are supported")
# For simplicity, ignore namespace ID validation
tables = list(self.tables.keys())
return ListTablesResponse(tables=tables)
def describe_table(self, request: DescribeTableRequest) -> DescribeTableResponse:
"""Describe a table by returning its location."""
if not request.id:
if not request.id or len(request.id) != 1:
raise ValueError("Invalid table ID")
if len(request.id) == 1:
# Root namespace table
table_name = request.id[0]
elif len(request.id) == 2:
# Namespaced table (1-level namespace)
namespace_name, table_name = request.id
table_name = f"{namespace_name}.{table_name}"
else:
raise ValueError("Only 1-level namespaces are supported")
table_name = request.id[0]
if table_name not in self.tables:
raise RuntimeError(f"Table does not exist: {table_name}")
@@ -104,22 +68,10 @@ class TempNamespace(LanceNamespace):
self, request: CreateTableRequest, request_data: bytes
) -> CreateTableResponse:
"""Create a table in the namespace."""
if not request.id:
if not request.id or len(request.id) != 1:
raise ValueError("Invalid table ID")
if len(request.id) == 1:
# Root namespace table
table_name = request.id[0]
table_uri = f"{self.config.root}/{table_name}.lance"
elif len(request.id) == 2:
# Namespaced table (1-level namespace)
namespace_name, base_table_name = request.id
# Add namespace to our namespace set
self.namespaces.add(namespace_name)
table_name = f"{namespace_name}.{base_table_name}"
table_uri = f"{self.config.root}/{namespace_name}/{base_table_name}.lance"
else:
raise ValueError("Only 1-level namespaces are supported")
table_name = request.id[0]
# Check if table already exists
if table_name in self.tables:
@@ -129,14 +81,13 @@ class TempNamespace(LanceNamespace):
else:
raise RuntimeError(f"Table already exists: {table_name}")
# Generate table URI based on root directory
table_uri = f"{self.config.root}/{table_name}.lance"
# Parse the Arrow IPC stream to get the schema and create the actual table
import pyarrow.ipc as ipc
import io
import lance
import os
# Create directory if needed for namespaced tables
os.makedirs(os.path.dirname(table_uri), exist_ok=True)
# Read the IPC stream
reader = ipc.open_stream(io.BytesIO(request_data))
@@ -152,19 +103,10 @@ class TempNamespace(LanceNamespace):
def drop_table(self, request: DropTableRequest) -> DropTableResponse:
"""Drop a table from the namespace."""
if not request.id:
if not request.id or len(request.id) != 1:
raise ValueError("Invalid table ID")
if len(request.id) == 1:
# Root namespace table
table_name = request.id[0]
elif len(request.id) == 2:
# Namespaced table (1-level namespace)
namespace_name, base_table_name = request.id
table_name = f"{namespace_name}.{base_table_name}"
else:
raise ValueError("Only 1-level namespaces are supported")
table_name = request.id[0]
if table_name not in self.tables:
raise RuntimeError(f"Table does not exist: {table_name}")
@@ -210,78 +152,6 @@ class TempNamespace(LanceNamespace):
del self.tables[table_name]
return DeregisterTableResponse()
def list_namespaces(self, request: ListNamespacesRequest) -> ListNamespacesResponse:
"""List child namespaces."""
if not request.id:
# List root-level namespaces
namespaces = list(self.namespaces)
elif len(request.id) == 1:
# For 1-level namespace, there are no child namespaces
namespaces = []
else:
raise ValueError("Only 1-level namespaces are supported")
return ListNamespacesResponse(namespaces=namespaces)
def create_namespace(
self, request: CreateNamespaceRequest
) -> CreateNamespaceResponse:
"""Create a namespace."""
if not request.id:
raise ValueError("Invalid namespace ID")
if len(request.id) == 1:
# Create 1-level namespace
namespace_name = request.id[0]
self.namespaces.add(namespace_name)
# Create directory for the namespace
import os
namespace_dir = f"{self.config.root}/{namespace_name}"
os.makedirs(namespace_dir, exist_ok=True)
else:
raise ValueError("Only 1-level namespaces are supported")
return CreateNamespaceResponse()
def drop_namespace(self, request: DropNamespaceRequest) -> DropNamespaceResponse:
"""Drop a namespace."""
if not request.id:
raise ValueError("Invalid namespace ID")
if len(request.id) == 1:
# Drop 1-level namespace
namespace_name = request.id[0]
if namespace_name not in self.namespaces:
raise RuntimeError(f"Namespace does not exist: {namespace_name}")
# Check if namespace has any tables
prefix = f"{namespace_name}."
tables_in_namespace = [
name for name in self.tables.keys() if name.startswith(prefix)
]
if tables_in_namespace:
raise RuntimeError(
f"Cannot drop namespace '{namespace_name}': contains tables"
)
# Remove namespace
self.namespaces.remove(namespace_name)
# Remove directory
import shutil
import os
namespace_dir = f"{self.config.root}/{namespace_name}"
if os.path.exists(namespace_dir):
shutil.rmtree(namespace_dir, ignore_errors=True)
else:
raise ValueError("Only 1-level namespaces are supported")
return DropNamespaceResponse()
class TempNamespaceConfig:
"""Configuration for TestNamespace."""
@@ -317,16 +187,12 @@ class TestNamespaceConnection:
# Clear the TestNamespace registry for this test
if self.temp_dir in TempNamespace._global_registry:
TempNamespace._global_registry[self.temp_dir].clear()
if self.temp_dir in TempNamespace._global_namespaces:
TempNamespace._global_namespaces[self.temp_dir].clear()
def teardown_method(self):
"""Clean up test fixtures."""
# Clear the TestNamespace registry
if self.temp_dir in TempNamespace._global_registry:
del TempNamespace._global_registry[self.temp_dir]
if self.temp_dir in TempNamespace._global_namespaces:
del TempNamespace._global_namespaces[self.temp_dir]
shutil.rmtree(self.temp_dir, ignore_errors=True)
def test_connect_namespace_test(self):
@@ -420,10 +286,6 @@ class TestNamespaceConnection:
assert "table2" in table_names
assert len(table_names) == 1
# Test that drop_table works without explicit namespace parameter
db.drop_table("table2")
assert len(list(db.table_names())) == 0
# Should not be able to open dropped table
with pytest.raises(RuntimeError):
db.open_table("table1")
@@ -491,11 +353,6 @@ class TestNamespaceConnection:
# Verify all tables are gone
assert len(list(db.table_names())) == 0
# Test that table_names works with keyword-only namespace parameter
db.create_table("test_table", schema=schema)
result = list(db.table_names(namespace=[]))
assert "test_table" in result
def test_table_operations(self):
"""Test various table operations through namespace."""
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
@@ -555,153 +412,3 @@ class TestNamespaceConnection:
]
)
db.create_table("test_table", schema=schema, storage_options=table_opts)
def test_namespace_operations(self):
"""Test namespace management operations."""
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
# Initially no namespaces
assert len(list(db.list_namespaces())) == 0
# Create a namespace
db.create_namespace(["test_namespace"])
# Verify namespace exists
namespaces = list(db.list_namespaces())
assert "test_namespace" in namespaces
assert len(namespaces) == 1
# Create table in namespace
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 2)),
]
)
table = db.create_table(
"test_table", schema=schema, namespace=["test_namespace"]
)
assert table is not None
# Verify table exists in namespace
tables_in_namespace = list(db.table_names(namespace=["test_namespace"]))
assert "test_table" in tables_in_namespace
assert len(tables_in_namespace) == 1
# Open table from namespace
table = db.open_table("test_table", namespace=["test_namespace"])
assert table is not None
assert table.name == "test_table"
# Drop table from namespace
db.drop_table("test_table", namespace=["test_namespace"])
# Verify table no longer exists in namespace
tables_in_namespace = list(db.table_names(namespace=["test_namespace"]))
assert len(tables_in_namespace) == 0
# Drop namespace
db.drop_namespace(["test_namespace"])
# Verify namespace no longer exists
namespaces = list(db.list_namespaces())
assert len(namespaces) == 0
def test_namespace_with_tables_cannot_be_dropped(self):
"""Test that namespaces containing tables cannot be dropped."""
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
# Create namespace and table
db.create_namespace(["test_namespace"])
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 2)),
]
)
db.create_table("test_table", schema=schema, namespace=["test_namespace"])
# Try to drop namespace with tables - should fail
with pytest.raises(RuntimeError, match="contains tables"):
db.drop_namespace(["test_namespace"])
# Drop table first
db.drop_table("test_table", namespace=["test_namespace"])
# Now dropping namespace should work
db.drop_namespace(["test_namespace"])
def test_same_table_name_different_namespaces(self):
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
# Create two namespaces
db.create_namespace(["namespace_a"])
db.create_namespace(["namespace_b"])
# Define schema
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 2)),
pa.field("text", pa.string()),
]
)
# Create table with same name in both namespaces
table_a = db.create_table(
"same_name_table", schema=schema, namespace=["namespace_a"]
)
table_b = db.create_table(
"same_name_table", schema=schema, namespace=["namespace_b"]
)
# Add different data to each table
data_a = [
{"id": 1, "vector": [1.0, 2.0], "text": "data_from_namespace_a"},
{"id": 2, "vector": [3.0, 4.0], "text": "also_from_namespace_a"},
]
table_a.add(data_a)
data_b = [
{"id": 10, "vector": [10.0, 20.0], "text": "data_from_namespace_b"},
{"id": 20, "vector": [30.0, 40.0], "text": "also_from_namespace_b"},
{"id": 30, "vector": [50.0, 60.0], "text": "more_from_namespace_b"},
]
table_b.add(data_b)
# Verify data in namespace_a table
opened_table_a = db.open_table("same_name_table", namespace=["namespace_a"])
result_a = opened_table_a.to_pandas().sort_values("id").reset_index(drop=True)
assert len(result_a) == 2
assert result_a["id"].tolist() == [1, 2]
assert result_a["text"].tolist() == [
"data_from_namespace_a",
"also_from_namespace_a",
]
assert [v.tolist() for v in result_a["vector"]] == [[1.0, 2.0], [3.0, 4.0]]
# Verify data in namespace_b table
opened_table_b = db.open_table("same_name_table", namespace=["namespace_b"])
result_b = opened_table_b.to_pandas().sort_values("id").reset_index(drop=True)
assert len(result_b) == 3
assert result_b["id"].tolist() == [10, 20, 30]
assert result_b["text"].tolist() == [
"data_from_namespace_b",
"also_from_namespace_b",
"more_from_namespace_b",
]
assert [v.tolist() for v in result_b["vector"]] == [
[10.0, 20.0],
[30.0, 40.0],
[50.0, 60.0],
]
# Verify root namespace doesn't have this table
root_tables = list(db.table_names())
assert "same_name_table" not in root_tables
# Clean up
db.drop_table("same_name_table", namespace=["namespace_a"])
db.drop_table("same_name_table", namespace=["namespace_b"])
db.drop_namespace(["namespace_a"])
db.drop_namespace(["namespace_b"])

View File

@@ -5,7 +5,6 @@ from typing import List, Union
import unittest.mock as mock
from datetime import timedelta
from pathlib import Path
import random
import lancedb
from lancedb.db import AsyncConnection
@@ -1356,27 +1355,6 @@ def test_take_queries(tmp_path):
]
def test_getitems(tmp_path):
db = lancedb.connect(tmp_path)
data = pa.table(
{
"idx": range(100),
}
)
# Make two fragments
table = db.create_table("test", data)
table.add(pa.table({"idx": range(100, 200)}))
assert table.__getitems__([5, 2, 117]) == pa.table(
{
"idx": [5, 2, 117],
}
)
offsets = random.sample(range(200), 10)
assert table.__getitems__(offsets) == pa.table({"idx": offsets})
@pytest.mark.asyncio
async def test_query_timeout_async(tmp_path):
db = await lancedb.connect_async(tmp_path)

View File

@@ -271,21 +271,12 @@ def test_table_add_in_threadpool():
def test_table_create_indices():
# Track received index creation requests to validate name parameter
received_requests = []
def handler(request):
index_stats = dict(
index_type="IVF_PQ", num_indexed_rows=1000, num_unindexed_rows=0
)
if request.path == "/v1/table/test/create_index/":
# Capture the request body to validate name parameter
content_len = int(request.headers.get("Content-Length", 0))
if content_len > 0:
body = request.rfile.read(content_len)
body_data = json.loads(body)
received_requests.append(body_data)
request.send_response(200)
request.end_headers()
elif request.path == "/v1/table/test/create/?mode=create":
@@ -316,34 +307,34 @@ def test_table_create_indices():
dict(
indexes=[
{
"index_name": "custom_scalar_idx",
"index_name": "id_idx",
"columns": ["id"],
},
{
"index_name": "custom_fts_idx",
"index_name": "text_idx",
"columns": ["text"],
},
{
"index_name": "custom_vector_idx",
"index_name": "vector_idx",
"columns": ["vector"],
},
]
)
)
request.wfile.write(payload.encode())
elif request.path == "/v1/table/test/index/custom_scalar_idx/stats/":
elif request.path == "/v1/table/test/index/id_idx/stats/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
payload = json.dumps(index_stats)
request.wfile.write(payload.encode())
elif request.path == "/v1/table/test/index/custom_fts_idx/stats/":
elif request.path == "/v1/table/test/index/text_idx/stats/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
payload = json.dumps(index_stats)
request.wfile.write(payload.encode())
elif request.path == "/v1/table/test/index/custom_vector_idx/stats/":
elif request.path == "/v1/table/test/index/vector_idx/stats/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
@@ -360,49 +351,16 @@ def test_table_create_indices():
# Parameters are well-tested through local and async tests.
# This is a smoke-test.
table = db.create_table("test", [{"id": 1}])
# Test create_scalar_index with custom name
table.create_scalar_index(
"id", wait_timeout=timedelta(seconds=2), name="custom_scalar_idx"
)
# Test create_fts_index with custom name
table.create_fts_index(
"text", wait_timeout=timedelta(seconds=2), name="custom_fts_idx"
)
# Test create_index with custom name
table.create_scalar_index("id", wait_timeout=timedelta(seconds=2))
table.create_fts_index("text", wait_timeout=timedelta(seconds=2))
table.create_index(
vector_column_name="vector",
wait_timeout=timedelta(seconds=10),
name="custom_vector_idx",
vector_column_name="vector", wait_timeout=timedelta(seconds=10)
)
# Validate that the name parameter was passed correctly in requests
assert len(received_requests) == 3
# Check scalar index request has custom name
scalar_req = received_requests[0]
assert "name" in scalar_req
assert scalar_req["name"] == "custom_scalar_idx"
# Check FTS index request has custom name
fts_req = received_requests[1]
assert "name" in fts_req
assert fts_req["name"] == "custom_fts_idx"
# Check vector index request has custom name
vector_req = received_requests[2]
assert "name" in vector_req
assert vector_req["name"] == "custom_vector_idx"
table.wait_for_index(["custom_scalar_idx"], timedelta(seconds=2))
table.wait_for_index(
["custom_fts_idx", "custom_vector_idx"], timedelta(seconds=2)
)
table.drop_index("custom_vector_idx")
table.drop_index("custom_scalar_idx")
table.drop_index("custom_fts_idx")
table.wait_for_index(["id_idx"], timedelta(seconds=2))
table.wait_for_index(["text_idx", "vector_idx"], timedelta(seconds=2))
table.drop_index("vector_idx")
table.drop_index("id_idx")
table.drop_index("text_idx")
def test_table_wait_for_index_timeout():

View File

@@ -1274,13 +1274,11 @@ def test_create_scalar_index(mem_db: DBConnection):
"my_table",
data=test_data,
)
# Test with default name
table.create_scalar_index("x")
indices = table.list_indices()
assert len(indices) == 1
scalar_index = indices[0]
assert scalar_index.index_type == "BTree"
assert scalar_index.name == "x_idx" # Default name
# Confirm that prefiltering still works with the scalar index column
results = table.search().where("x = 'c'").to_arrow()
@@ -1294,14 +1292,6 @@ def test_create_scalar_index(mem_db: DBConnection):
indices = table.list_indices()
assert len(indices) == 0
# Test with custom name
table.create_scalar_index("y", name="custom_y_index")
indices = table.list_indices()
assert len(indices) == 1
scalar_index = indices[0]
assert scalar_index.index_type == "BTree"
assert scalar_index.name == "custom_y_index"
def test_empty_query(mem_db: DBConnection):
table = mem_db.create_table(

View File

@@ -1,26 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
import pyarrow as pa
import pytest
torch = pytest.importorskip("torch")
def tbl_to_tensor(tbl):
def to_tensor(col: pa.ChunkedArray):
if col.num_chunks > 1:
raise Exception("Single batch was too large to fit into a one-chunk table")
return torch.from_dlpack(col.chunk(0))
return torch.stack([to_tensor(tbl.column(i)) for i in range(tbl.num_columns)])
def test_table_dataloader(mem_db):
table = mem_db.create_table("test_table", pa.table({"a": range(1000)}))
dataloader = torch.utils.data.DataLoader(
table, collate_fn=tbl_to_tensor, batch_size=10, shuffle=True
)
for batch in dataloader:
assert batch.size(0) == 1
assert batch.size(1) == 10

View File

@@ -63,16 +63,14 @@ impl Connection {
self.get_inner().map(|inner| inner.uri().to_string())
}
#[pyo3(signature = (namespace=vec![], start_after=None, limit=None))]
#[pyo3(signature = (start_after=None, limit=None))]
pub fn table_names(
self_: PyRef<'_, Self>,
namespace: Vec<String>,
start_after: Option<String>,
limit: Option<u32>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
let mut op = inner.table_names();
op = op.namespace(namespace);
if let Some(start_after) = start_after {
op = op.start_after(start_after);
}
@@ -82,13 +80,12 @@ impl Connection {
future_into_py(self_.py(), async move { op.execute().await.infer_error() })
}
#[pyo3(signature = (name, mode, data, namespace=vec![], storage_options=None))]
#[pyo3(signature = (name, mode, data, storage_options=None))]
pub fn create_table<'a>(
self_: PyRef<'a, Self>,
name: String,
mode: &str,
data: Bound<'_, PyAny>,
namespace: Vec<String>,
storage_options: Option<HashMap<String, String>>,
) -> PyResult<Bound<'a, PyAny>> {
let inner = self_.get_inner()?.clone();
@@ -96,10 +93,8 @@ impl Connection {
let mode = Self::parse_create_mode_str(mode)?;
let batches = ArrowArrayStreamReader::from_pyarrow_bound(&data)?;
let mut builder = inner.create_table(name, batches).mode(mode);
builder = builder.namespace(namespace);
if let Some(storage_options) = storage_options {
builder = builder.storage_options(storage_options);
}
@@ -110,13 +105,12 @@ impl Connection {
})
}
#[pyo3(signature = (name, mode, schema, namespace=vec![], storage_options=None))]
#[pyo3(signature = (name, mode, schema, storage_options=None))]
pub fn create_empty_table<'a>(
self_: PyRef<'a, Self>,
name: String,
mode: &str,
schema: Bound<'_, PyAny>,
namespace: Vec<String>,
storage_options: Option<HashMap<String, String>>,
) -> PyResult<Bound<'a, PyAny>> {
let inner = self_.get_inner()?.clone();
@@ -127,7 +121,6 @@ impl Connection {
let mut builder = inner.create_empty_table(name, Arc::new(schema)).mode(mode);
builder = builder.namespace(namespace);
if let Some(storage_options) = storage_options {
builder = builder.storage_options(storage_options);
}
@@ -138,115 +131,49 @@ impl Connection {
})
}
#[pyo3(signature = (name, namespace=vec![], storage_options = None, index_cache_size = None))]
#[pyo3(signature = (name, storage_options = None, index_cache_size = None))]
pub fn open_table(
self_: PyRef<'_, Self>,
name: String,
namespace: Vec<String>,
storage_options: Option<HashMap<String, String>>,
index_cache_size: Option<u32>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
let mut builder = inner.open_table(name);
builder = builder.namespace(namespace);
if let Some(storage_options) = storage_options {
builder = builder.storage_options(storage_options);
}
if let Some(index_cache_size) = index_cache_size {
builder = builder.index_cache_size(index_cache_size);
}
future_into_py(self_.py(), async move {
let table = builder.execute().await.infer_error()?;
Ok(Table::new(table))
})
}
#[pyo3(signature = (cur_name, new_name, cur_namespace=vec![], new_namespace=vec![]))]
pub fn rename_table(
self_: PyRef<'_, Self>,
cur_name: String,
old_name: String,
new_name: String,
cur_namespace: Vec<String>,
new_namespace: Vec<String>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
future_into_py(self_.py(), async move {
inner
.rename_table(cur_name, new_name, &cur_namespace, &new_namespace)
.await
.infer_error()
inner.rename_table(old_name, new_name).await.infer_error()
})
}
#[pyo3(signature = (name, namespace=vec![]))]
pub fn drop_table(
self_: PyRef<'_, Self>,
name: String,
namespace: Vec<String>,
) -> PyResult<Bound<'_, PyAny>> {
pub fn drop_table(self_: PyRef<'_, Self>, name: String) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
future_into_py(self_.py(), async move {
inner.drop_table(name, &namespace).await.infer_error()
inner.drop_table(name).await.infer_error()
})
}
#[pyo3(signature = (namespace=vec![],))]
pub fn drop_all_tables(
self_: PyRef<'_, Self>,
namespace: Vec<String>,
) -> PyResult<Bound<'_, PyAny>> {
pub fn drop_all_tables(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
future_into_py(self_.py(), async move {
inner.drop_all_tables(&namespace).await.infer_error()
})
}
// Namespace management methods
#[pyo3(signature = (namespace=vec![], page_token=None, limit=None))]
pub fn list_namespaces(
self_: PyRef<'_, Self>,
namespace: Vec<String>,
page_token: Option<String>,
limit: Option<u32>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
future_into_py(self_.py(), async move {
use lancedb::database::ListNamespacesRequest;
let request = ListNamespacesRequest {
namespace,
page_token,
limit,
};
inner.list_namespaces(request).await.infer_error()
})
}
#[pyo3(signature = (namespace,))]
pub fn create_namespace(
self_: PyRef<'_, Self>,
namespace: Vec<String>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
future_into_py(self_.py(), async move {
use lancedb::database::CreateNamespaceRequest;
let request = CreateNamespaceRequest { namespace };
inner.create_namespace(request).await.infer_error()
})
}
#[pyo3(signature = (namespace,))]
pub fn drop_namespace(
self_: PyRef<'_, Self>,
namespace: Vec<String>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
future_into_py(self_.py(), async move {
use lancedb::database::DropNamespaceRequest;
let request = DropNamespaceRequest { namespace };
inner.drop_namespace(request).await.infer_error()
inner.drop_all_tables().await.infer_error()
})
}
}
@@ -300,7 +227,6 @@ pub struct PyClientConfig {
retry_config: Option<PyClientRetryConfig>,
timeout_config: Option<PyClientTimeoutConfig>,
extra_headers: Option<HashMap<String, String>>,
id_delimiter: Option<String>,
}
#[derive(FromPyObject)]
@@ -355,7 +281,6 @@ impl From<PyClientConfig> for lancedb::remote::ClientConfig {
retry_config: value.retry_config.map(Into::into).unwrap_or_default(),
timeout_config: value.timeout_config.map(Into::into).unwrap_or_default(),
extra_headers: value.extra_headers.unwrap_or_default(),
id_delimiter: value.id_delimiter,
}
}
}

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb"
version = "0.22.0-beta.1"
version = "0.21.4-beta.1"
edition.workspace = true
description = "LanceDB: A serverless, low-latency vector database for AI applications"
license.workspace = true

View File

@@ -62,8 +62,10 @@ async fn main() -> Result<()> {
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
for text in out.iter().flatten() {
println!("Result: {}", text);
for text in out.iter() {
if let Some(text) = text {
println!("Result: {}", text);
}
}
}

View File

@@ -43,7 +43,7 @@ async fn main() -> Result<()> {
// --8<-- [end:delete]
// --8<-- [start:drop_table]
db.drop_table("my_table", &[]).await.unwrap();
db.drop_table("my_table").await.unwrap();
// --8<-- [end:drop_table]
Ok(())
}

View File

@@ -379,7 +379,6 @@ mod tests {
data: CreateTableData::Empty(TableDefinition::new_from_schema(dummy_schema)),
mode: Default::default(),
write_options: Default::default(),
namespace: vec![],
})
.await
.unwrap();
@@ -415,7 +414,6 @@ mod tests {
data: CreateTableData::Empty(TableDefinition::new_from_schema(dummy_schema)),
mode: Default::default(),
write_options: Default::default(),
namespace: vec![],
})
.await
.unwrap();

View File

@@ -19,9 +19,8 @@ use crate::database::listing::{
ListingDatabase, OPT_NEW_TABLE_STORAGE_VERSION, OPT_NEW_TABLE_V2_MANIFEST_PATHS,
};
use crate::database::{
CreateNamespaceRequest, CreateTableData, CreateTableMode, CreateTableRequest, Database,
DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest, OpenTableRequest,
TableNamesRequest,
CreateTableData, CreateTableMode, CreateTableRequest, Database, DatabaseOptions,
OpenTableRequest, TableNamesRequest,
};
use crate::embeddings::{
EmbeddingDefinition, EmbeddingFunction, EmbeddingRegistry, MemoryRegistry, WithEmbeddings,
@@ -68,12 +67,6 @@ impl TableNamesBuilder {
self
}
/// Set the namespace to list tables from
pub fn namespace(mut self, namespace: Vec<String>) -> Self {
self.request.namespace = namespace;
self
}
/// Execute the table names operation
pub async fn execute(self) -> Result<Vec<String>> {
self.parent.clone().table_names(self.request).await
@@ -355,12 +348,6 @@ impl<const HAS_DATA: bool> CreateTableBuilder<HAS_DATA> {
);
self
}
/// Set the namespace for the table
pub fn namespace(mut self, namespace: Vec<String>) -> Self {
self.request.namespace = namespace;
self
}
}
#[derive(Clone, Debug)]
@@ -380,7 +367,6 @@ impl OpenTableBuilder {
parent,
request: OpenTableRequest {
name,
namespace: vec![],
index_cache_size: None,
lance_read_params: None,
},
@@ -456,12 +442,6 @@ impl OpenTableBuilder {
self
}
/// Set the namespace for the table
pub fn namespace(mut self, namespace: Vec<String>) -> Self {
self.request.namespace = namespace;
self
}
/// Open the table
pub async fn execute(self) -> Result<Table> {
Ok(Table::new_with_embedding_registry(
@@ -584,16 +564,9 @@ impl Connection {
&self,
old_name: impl AsRef<str>,
new_name: impl AsRef<str>,
cur_namespace: &[String],
new_namespace: &[String],
) -> Result<()> {
self.internal
.rename_table(
old_name.as_ref(),
new_name.as_ref(),
cur_namespace,
new_namespace,
)
.rename_table(old_name.as_ref(), new_name.as_ref())
.await
}
@@ -601,9 +574,8 @@ impl Connection {
///
/// # Arguments
/// * `name` - The name of the table to drop
/// * `namespace` - The namespace to drop the table from
pub async fn drop_table(&self, name: impl AsRef<str>, namespace: &[String]) -> Result<()> {
self.internal.drop_table(name.as_ref(), namespace).await
pub async fn drop_table(&self, name: impl AsRef<str>) -> Result<()> {
self.internal.drop_table(name.as_ref()).await
}
/// Drop the database
@@ -611,30 +583,12 @@ impl Connection {
/// This is the same as dropping all of the tables
#[deprecated(since = "0.15.1", note = "Use `drop_all_tables` instead")]
pub async fn drop_db(&self) -> Result<()> {
self.internal.drop_all_tables(&[]).await
self.internal.drop_all_tables().await
}
/// Drops all tables in the database
///
/// # Arguments
/// * `namespace` - The namespace to drop all tables from. Empty slice represents root namespace.
pub async fn drop_all_tables(&self, namespace: &[String]) -> Result<()> {
self.internal.drop_all_tables(namespace).await
}
/// List immediate child namespace names in the given namespace
pub async fn list_namespaces(&self, request: ListNamespacesRequest) -> Result<Vec<String>> {
self.internal.list_namespaces(request).await
}
/// Create a new namespace
pub async fn create_namespace(&self, request: CreateNamespaceRequest) -> Result<()> {
self.internal.create_namespace(request).await
}
/// Drop a namespace
pub async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<()> {
self.internal.drop_namespace(request).await
pub async fn drop_all_tables(&self) -> Result<()> {
self.internal.drop_all_tables().await
}
/// Get the in-memory embedding registry.
@@ -1266,12 +1220,12 @@ mod tests {
// drop non-exist table
assert!(matches!(
db.drop_table("invalid_table", &[]).await,
db.drop_table("invalid_table").await,
Err(crate::Error::TableNotFound { .. }),
));
create_dir_all(tmp_dir.path().join("table1.lance")).unwrap();
db.drop_table("table1", &[]).await.unwrap();
db.drop_table("table1").await.unwrap();
let tables = db.table_names().execute().await.unwrap();
assert_eq!(tables.len(), 0);

View File

@@ -34,36 +34,9 @@ pub trait DatabaseOptions {
fn serialize_into_map(&self, map: &mut HashMap<String, String>);
}
/// A request to list namespaces in the database
#[derive(Clone, Debug, Default)]
pub struct ListNamespacesRequest {
/// The parent namespace to list namespaces in. Empty list represents root namespace.
pub namespace: Vec<String>,
/// If present, only return names that come lexicographically after the supplied value.
pub page_token: Option<String>,
/// The maximum number of namespace names to return
pub limit: Option<u32>,
}
/// A request to create a namespace
#[derive(Clone, Debug)]
pub struct CreateNamespaceRequest {
/// The namespace identifier to create
pub namespace: Vec<String>,
}
/// A request to drop a namespace
#[derive(Clone, Debug)]
pub struct DropNamespaceRequest {
/// The namespace identifier to drop
pub namespace: Vec<String>,
}
/// A request to list names of tables in the database
#[derive(Clone, Debug, Default)]
pub struct TableNamesRequest {
/// The namespace to list tables in. Empty list represents root namespace.
pub namespace: Vec<String>,
/// If present, only return names that come lexicographically after the supplied
/// value.
///
@@ -78,8 +51,6 @@ pub struct TableNamesRequest {
#[derive(Clone, Debug)]
pub struct OpenTableRequest {
pub name: String,
/// The namespace to open the table from. Empty list represents root namespace.
pub namespace: Vec<String>,
pub index_cache_size: Option<u32>,
pub lance_read_params: Option<ReadParams>,
}
@@ -154,8 +125,6 @@ impl StreamingWriteSource for CreateTableData {
pub struct CreateTableRequest {
/// The name of the new table
pub name: String,
/// The namespace to create the table in. Empty list represents root namespace.
pub namespace: Vec<String>,
/// Initial data to write to the table, can be None to create an empty table
pub data: CreateTableData,
/// The mode to use when creating the table
@@ -168,7 +137,6 @@ impl CreateTableRequest {
pub fn new(name: String, data: CreateTableData) -> Self {
Self {
name,
namespace: vec![],
data,
mode: CreateTableMode::default(),
write_options: WriteOptions::default(),
@@ -183,12 +151,6 @@ impl CreateTableRequest {
pub trait Database:
Send + Sync + std::any::Any + std::fmt::Debug + std::fmt::Display + 'static
{
/// List immediate child namespace names in the given namespace
async fn list_namespaces(&self, request: ListNamespacesRequest) -> Result<Vec<String>>;
/// Create a new namespace
async fn create_namespace(&self, request: CreateNamespaceRequest) -> Result<()>;
/// Drop a namespace
async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<()>;
/// List the names of tables in the database
async fn table_names(&self, request: TableNamesRequest) -> Result<Vec<String>>;
/// Create a table in the database
@@ -196,16 +158,10 @@ pub trait Database:
/// Open a table in the database
async fn open_table(&self, request: OpenTableRequest) -> Result<Arc<dyn BaseTable>>;
/// Rename a table in the database
async fn rename_table(
&self,
cur_name: &str,
new_name: &str,
cur_namespace: &[String],
new_namespace: &[String],
) -> Result<()>;
async fn rename_table(&self, old_name: &str, new_name: &str) -> Result<()>;
/// Drop a table in the database
async fn drop_table(&self, name: &str, namespace: &[String]) -> Result<()>;
async fn drop_table(&self, name: &str) -> Result<()>;
/// Drop all tables in the database
async fn drop_all_tables(&self, namespace: &[String]) -> Result<()>;
async fn drop_all_tables(&self) -> Result<()>;
fn as_any(&self) -> &dyn std::any::Any;
}

View File

@@ -22,8 +22,7 @@ use crate::table::NativeTable;
use crate::utils::validate_table_name;
use super::{
BaseTable, CreateNamespaceRequest, CreateTableMode, CreateTableRequest, Database,
DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest, OpenTableRequest,
BaseTable, CreateTableMode, CreateTableRequest, Database, DatabaseOptions, OpenTableRequest,
TableNamesRequest,
};
@@ -552,7 +551,6 @@ impl ListingDatabase {
async fn handle_table_exists(
&self,
table_name: &str,
namespace: Vec<String>,
mode: CreateTableMode,
data_schema: &arrow_schema::Schema,
) -> Result<Arc<dyn BaseTable>> {
@@ -563,7 +561,6 @@ impl ListingDatabase {
CreateTableMode::ExistOk(callback) => {
let req = OpenTableRequest {
name: table_name.to_string(),
namespace: namespace.clone(),
index_cache_size: None,
lance_read_params: None,
};
@@ -587,28 +584,7 @@ impl ListingDatabase {
#[async_trait::async_trait]
impl Database for ListingDatabase {
async fn list_namespaces(&self, _request: ListNamespacesRequest) -> Result<Vec<String>> {
Ok(Vec::new())
}
async fn create_namespace(&self, _request: CreateNamespaceRequest) -> Result<()> {
Err(Error::NotSupported {
message: "Namespace operations are not supported for listing database".into(),
})
}
async fn drop_namespace(&self, _request: DropNamespaceRequest) -> Result<()> {
Err(Error::NotSupported {
message: "Namespace operations are not supported for listing database".into(),
})
}
async fn table_names(&self, request: TableNamesRequest) -> Result<Vec<String>> {
if !request.namespace.is_empty() {
return Err(Error::NotSupported {
message: "Namespace parameter is not supported for listing database. Only root namespace is supported.".into(),
});
}
let mut f = self
.object_store
.read_dir(self.base_path.clone())
@@ -639,11 +615,6 @@ impl Database for ListingDatabase {
}
async fn create_table(&self, request: CreateTableRequest) -> Result<Arc<dyn BaseTable>> {
if !request.namespace.is_empty() {
return Err(Error::NotSupported {
message: "Namespace parameter is not supported for listing database. Only root namespace is supported.".into(),
});
}
let table_uri = self.table_uri(&request.name)?;
let (storage_version_override, v2_manifest_override) =
@@ -666,24 +637,14 @@ impl Database for ListingDatabase {
{
Ok(table) => Ok(Arc::new(table)),
Err(Error::TableAlreadyExists { .. }) => {
self.handle_table_exists(
&request.name,
request.namespace.clone(),
request.mode,
&data_schema,
)
.await
self.handle_table_exists(&request.name, request.mode, &data_schema)
.await
}
Err(err) => Err(err),
}
}
async fn open_table(&self, mut request: OpenTableRequest) -> Result<Arc<dyn BaseTable>> {
if !request.namespace.is_empty() {
return Err(Error::NotSupported {
message: "Namespace parameter is not supported for listing database. Only root namespace is supported.".into(),
});
}
let table_uri = self.table_uri(&request.name)?;
// Only modify the storage options if we actually have something to
@@ -733,44 +694,17 @@ impl Database for ListingDatabase {
Ok(native_table)
}
async fn rename_table(
&self,
_cur_name: &str,
_new_name: &str,
cur_namespace: &[String],
new_namespace: &[String],
) -> Result<()> {
if !cur_namespace.is_empty() {
return Err(Error::NotSupported {
message: "Namespace parameter is not supported for listing database.".into(),
});
}
if !new_namespace.is_empty() {
return Err(Error::NotSupported {
message: "Namespace parameter is not supported for listing database.".into(),
});
}
async fn rename_table(&self, _old_name: &str, _new_name: &str) -> Result<()> {
Err(Error::NotSupported {
message: "rename_table is not supported in LanceDB OSS".into(),
message: "rename_table is not supported in LanceDB OSS".to_string(),
})
}
async fn drop_table(&self, name: &str, namespace: &[String]) -> Result<()> {
if !namespace.is_empty() {
return Err(Error::NotSupported {
message: "Namespace parameter is not supported for listing database.".into(),
});
}
async fn drop_table(&self, name: &str) -> Result<()> {
self.drop_tables(vec![name.to_string()]).await
}
async fn drop_all_tables(&self, namespace: &[String]) -> Result<()> {
// Check if namespace parameter is provided
if !namespace.is_empty() {
return Err(Error::NotSupported {
message: "Namespace parameter is not supported for listing database.".into(),
});
}
async fn drop_all_tables(&self) -> Result<()> {
let tables = self.table_names(TableNamesRequest::default()).await?;
self.drop_tables(tables).await
}

View File

@@ -9,7 +9,7 @@ use futures::{stream::BoxStream, TryFutureExt};
use lance::io::WrappingObjectStore;
use object_store::{
path::Path, Error, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, UploadPart,
PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart,
};
use async_trait::async_trait;
@@ -73,7 +73,7 @@ impl ObjectStore for MirroringObjectStore {
async fn put_multipart_opts(
&self,
location: &Path,
opts: PutMultipartOptions,
opts: PutMultipartOpts,
) -> Result<Box<dyn MultipartUpload>> {
if location.primary_only() {
return self.primary.put_multipart_opts(location, opts).await;
@@ -170,11 +170,7 @@ impl MirroringObjectStoreWrapper {
}
impl WrappingObjectStore for MirroringObjectStoreWrapper {
fn wrap(
&self,
primary: Arc<dyn ObjectStore>,
_storage_options: Option<&std::collections::HashMap<String, String>>,
) -> Arc<dyn ObjectStore> {
fn wrap(&self, primary: Arc<dyn ObjectStore>) -> Arc<dyn ObjectStore> {
Arc::new(MirroringObjectStore {
primary,
secondary: self.secondary.clone(),

View File

@@ -11,7 +11,7 @@ use futures::stream::BoxStream;
use lance::io::WrappingObjectStore;
use object_store::{
path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
PutMultipartOptions, PutOptions, PutPayload, PutResult, Result as OSResult, UploadPart,
PutMultipartOpts, PutOptions, PutPayload, PutResult, Result as OSResult, UploadPart,
};
#[derive(Debug, Default)]
@@ -50,11 +50,7 @@ impl IoStatsHolder {
}
impl WrappingObjectStore for IoStatsHolder {
fn wrap(
&self,
target: Arc<dyn ObjectStore>,
_storage_options: Option<&std::collections::HashMap<String, String>>,
) -> Arc<dyn ObjectStore> {
fn wrap(&self, target: Arc<dyn ObjectStore>) -> Arc<dyn ObjectStore> {
Arc::new(IoTrackingStore {
target,
stats: self.0.clone(),
@@ -110,7 +106,7 @@ impl ObjectStore for IoTrackingStore {
async fn put_multipart_opts(
&self,
location: &Path,
opts: PutMultipartOptions,
opts: PutMultipartOpts,
) -> OSResult<Box<dyn MultipartUpload>> {
let target = self.target.put_multipart_opts(location, opts).await?;
Ok(Box::new(IoTrackingMultipartUpload {

View File

@@ -25,9 +25,6 @@ pub struct ClientConfig {
pub user_agent: String,
// TODO: how to configure request ids?
pub extra_headers: HashMap<String, String>,
/// The delimiter to use when constructing object identifiers.
/// If not default, passes as query parameter.
pub id_delimiter: Option<String>,
}
impl Default for ClientConfig {
@@ -37,7 +34,6 @@ impl Default for ClientConfig {
retry_config: RetryConfig::default(),
user_agent: concat!("LanceDB-Rust-Client/", env!("CARGO_PKG_VERSION")).into(),
extra_headers: HashMap::new(),
id_delimiter: None,
}
}
}
@@ -149,7 +145,6 @@ pub struct RestfulLanceDbClient<S: HttpSend = Sender> {
host: String,
pub(crate) retry_config: ResolvedRetryConfig,
pub(crate) sender: S,
pub(crate) id_delimiter: String,
}
pub trait HttpSend: Clone + Send + Sync + std::fmt::Debug + 'static {
@@ -273,7 +268,6 @@ impl RestfulLanceDbClient<Sender> {
host,
retry_config,
sender: Sender,
id_delimiter: client_config.id_delimiter.unwrap_or("$".to_string()),
})
}
}
@@ -362,22 +356,12 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
pub fn get(&self, uri: &str) -> RequestBuilder {
let full_uri = format!("{}{}", self.host, uri);
let builder = self.client.get(full_uri);
self.add_id_delimiter_query_param(builder)
self.client.get(full_uri)
}
pub fn post(&self, uri: &str) -> RequestBuilder {
let full_uri = format!("{}{}", self.host, uri);
let builder = self.client.post(full_uri);
self.add_id_delimiter_query_param(builder)
}
fn add_id_delimiter_query_param(&self, req: RequestBuilder) -> RequestBuilder {
if self.id_delimiter != "$" {
req.query(&[("delimiter", self.id_delimiter.clone())])
} else {
req
}
self.client.post(full_uri)
}
pub async fn send(&self, req: RequestBuilder) -> Result<(String, Response)> {
@@ -610,7 +594,6 @@ pub mod test_utils {
sender: MockSender {
f: Arc::new(wrapper),
},
id_delimiter: "$".to_string(),
}
}
}

View File

@@ -14,9 +14,8 @@ use serde::Deserialize;
use tokio::task::spawn_blocking;
use crate::database::{
CreateNamespaceRequest, CreateTableData, CreateTableMode, CreateTableRequest, Database,
DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest, OpenTableRequest,
TableNamesRequest,
CreateTableData, CreateTableMode, CreateTableRequest, Database, DatabaseOptions,
OpenTableRequest, TableNamesRequest,
};
use crate::error::Result;
use crate::table::BaseTable;
@@ -246,61 +245,10 @@ impl From<&CreateTableMode> for &'static str {
}
}
fn build_table_identifier(name: &str, namespace: &[String], delimiter: &str) -> String {
if !namespace.is_empty() {
let mut parts = namespace.to_vec();
parts.push(name.to_string());
parts.join(delimiter)
} else {
name.to_string()
}
}
fn build_namespace_identifier(namespace: &[String], delimiter: &str) -> String {
if namespace.is_empty() {
// According to the namespace spec, use delimiter to represent root namespace
delimiter.to_string()
} else {
namespace.join(delimiter)
}
}
/// Build a secure cache key using length prefixes.
/// This format is completely unambiguous regardless of delimiter or content.
/// Format: [u32_len][namespace1][u32_len][namespace2]...[u32_len][table_name]
/// Returns a hex-encoded string for use as a cache key.
fn build_cache_key(name: &str, namespace: &[String]) -> String {
let mut key = Vec::new();
// Add each namespace component with length prefix
for ns in namespace {
let bytes = ns.as_bytes();
key.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
key.extend_from_slice(bytes);
}
// Add table name with length prefix
let name_bytes = name.as_bytes();
key.extend_from_slice(&(name_bytes.len() as u32).to_le_bytes());
key.extend_from_slice(name_bytes);
// Convert to hex string for use as a cache key
key.iter().map(|b| format!("{:02x}", b)).collect()
}
#[async_trait]
impl<S: HttpSend> Database for RemoteDatabase<S> {
async fn table_names(&self, request: TableNamesRequest) -> Result<Vec<String>> {
let mut req = if !request.namespace.is_empty() {
let namespace_id =
build_namespace_identifier(&request.namespace, &self.client.id_delimiter);
self.client
.get(&format!("/v1/namespace/{}/table/list", namespace_id))
} else {
// TODO: use new API for all listing operations once stable
self.client.get("/v1/table/")
};
let mut req = self.client.get("/v1/table/");
if let Some(limit) = request.limit {
req = req.query(&[("limit", limit)]);
}
@@ -316,17 +264,12 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
.err_to_http(request_id)?
.tables;
for table in &tables {
let table_identifier =
build_table_identifier(table, &request.namespace, &self.client.id_delimiter);
let cache_key = build_cache_key(table, &request.namespace);
let remote_table = Arc::new(RemoteTable::new(
self.client.clone(),
table.clone(),
request.namespace.clone(),
table_identifier.clone(),
version.clone(),
));
self.table_cache.insert(cache_key, remote_table).await;
self.table_cache.insert(table.clone(), remote_table).await;
}
Ok(tables)
}
@@ -352,11 +295,9 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
.await
.unwrap()?;
let identifier =
build_table_identifier(&request.name, &request.namespace, &self.client.id_delimiter);
let req = self
.client
.post(&format!("/v1/table/{}/create/", identifier))
.post(&format!("/v1/table/{}/create/", request.name))
.query(&[("mode", Into::<&str>::into(&request.mode))])
.body(data_buffer)
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE);
@@ -373,7 +314,6 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
CreateTableMode::ExistOk(callback) => {
let req = OpenTableRequest {
name: request.name.clone(),
namespace: request.namespace.clone(),
index_cache_size: None,
lance_read_params: None,
};
@@ -402,160 +342,70 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
}
let rsp = self.client.check_response(&request_id, rsp).await?;
let version = parse_server_version(&request_id, &rsp)?;
let table_identifier =
build_table_identifier(&request.name, &request.namespace, &self.client.id_delimiter);
let cache_key = build_cache_key(&request.name, &request.namespace);
let table = Arc::new(RemoteTable::new(
self.client.clone(),
request.name.clone(),
request.namespace.clone(),
table_identifier,
version,
));
self.table_cache.insert(cache_key, table.clone()).await;
self.table_cache
.insert(request.name.clone(), table.clone())
.await;
Ok(table)
}
async fn open_table(&self, request: OpenTableRequest) -> Result<Arc<dyn BaseTable>> {
let identifier =
build_table_identifier(&request.name, &request.namespace, &self.client.id_delimiter);
let cache_key = build_cache_key(&request.name, &request.namespace);
// We describe the table to confirm it exists before moving on.
if let Some(table) = self.table_cache.get(&cache_key).await {
if let Some(table) = self.table_cache.get(&request.name).await {
Ok(table.clone())
} else {
let req = self
.client
.post(&format!("/v1/table/{}/describe/", identifier));
.post(&format!("/v1/table/{}/describe/", request.name));
let (request_id, rsp) = self.client.send_with_retry(req, None, true).await?;
if rsp.status() == StatusCode::NOT_FOUND {
return Err(crate::Error::TableNotFound {
name: identifier.clone(),
});
return Err(crate::Error::TableNotFound { name: request.name });
}
let rsp = self.client.check_response(&request_id, rsp).await?;
let version = parse_server_version(&request_id, &rsp)?;
let table_identifier = build_table_identifier(
&request.name,
&request.namespace,
&self.client.id_delimiter,
);
let table = Arc::new(RemoteTable::new(
self.client.clone(),
request.name.clone(),
request.namespace.clone(),
table_identifier,
version,
));
let cache_key = build_cache_key(&request.name, &request.namespace);
self.table_cache.insert(cache_key, table.clone()).await;
self.table_cache.insert(request.name, table.clone()).await;
Ok(table)
}
}
async fn rename_table(
&self,
current_name: &str,
new_name: &str,
cur_namespace: &[String],
new_namespace: &[String],
) -> Result<()> {
let current_identifier =
build_table_identifier(current_name, cur_namespace, &self.client.id_delimiter);
let current_cache_key = build_cache_key(current_name, cur_namespace);
let new_cache_key = build_cache_key(new_name, new_namespace);
let mut body = serde_json::json!({ "new_table_name": new_name });
if !new_namespace.is_empty() {
body["new_namespace"] = serde_json::Value::Array(
new_namespace
.iter()
.map(|s| serde_json::Value::String(s.clone()))
.collect(),
);
}
async fn rename_table(&self, current_name: &str, new_name: &str) -> Result<()> {
let req = self
.client
.post(&format!("/v1/table/{}/rename/", current_identifier))
.json(&body);
.post(&format!("/v1/table/{}/rename/", current_name));
let req = req.json(&serde_json::json!({ "new_table_name": new_name }));
let (request_id, resp) = self.client.send(req).await?;
self.client.check_response(&request_id, resp).await?;
let table = self.table_cache.remove(&current_cache_key).await;
let table = self.table_cache.remove(current_name).await;
if let Some(table) = table {
self.table_cache.insert(new_cache_key, table).await;
self.table_cache.insert(new_name.into(), table).await;
}
Ok(())
}
async fn drop_table(&self, name: &str, namespace: &[String]) -> Result<()> {
let identifier = build_table_identifier(name, namespace, &self.client.id_delimiter);
let cache_key = build_cache_key(name, namespace);
let req = self.client.post(&format!("/v1/table/{}/drop/", identifier));
async fn drop_table(&self, name: &str) -> Result<()> {
let req = self.client.post(&format!("/v1/table/{}/drop/", name));
let (request_id, resp) = self.client.send(req).await?;
self.client.check_response(&request_id, resp).await?;
self.table_cache.remove(&cache_key).await;
self.table_cache.remove(name).await;
Ok(())
}
async fn drop_all_tables(&self, namespace: &[String]) -> Result<()> {
// TODO: Implement namespace-aware drop_all_tables
let _namespace = namespace; // Suppress unused warning for now
async fn drop_all_tables(&self) -> Result<()> {
Err(crate::Error::NotSupported {
message: "Dropping all tables is not currently supported in the remote API".to_string(),
message: "Dropping databases is not supported in the remote API".to_string(),
})
}
async fn list_namespaces(&self, request: ListNamespacesRequest) -> Result<Vec<String>> {
let namespace_id =
build_namespace_identifier(request.namespace.as_slice(), &self.client.id_delimiter);
let mut req = self
.client
.get(&format!("/v1/namespace/{}/list", namespace_id));
if let Some(limit) = request.limit {
req = req.query(&[("limit", limit)]);
}
if let Some(page_token) = request.page_token {
req = req.query(&[("page_token", page_token)]);
}
let (request_id, resp) = self.client.send(req).await?;
let resp = self.client.check_response(&request_id, resp).await?;
#[derive(Deserialize)]
struct ListNamespacesResponse {
namespaces: Vec<String>,
}
let parsed: ListNamespacesResponse = resp.json().await.map_err(|e| Error::Runtime {
message: format!("Failed to parse namespace response: {}", e),
})?;
Ok(parsed.namespaces)
}
async fn create_namespace(&self, request: CreateNamespaceRequest) -> Result<()> {
let namespace_id =
build_namespace_identifier(request.namespace.as_slice(), &self.client.id_delimiter);
let req = self
.client
.post(&format!("/v1/namespace/{}/create", namespace_id));
let (request_id, resp) = self.client.send(req).await?;
self.client.check_response(&request_id, resp).await?;
Ok(())
}
async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<()> {
let namespace_id =
build_namespace_identifier(request.namespace.as_slice(), &self.client.id_delimiter);
let req = self
.client
.post(&format!("/v1/namespace/{}/drop", namespace_id));
let (request_id, resp) = self.client.send(req).await?;
self.client.check_response(&request_id, resp).await?;
Ok(())
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
@@ -586,7 +436,6 @@ impl From<StorageOptions> for RemoteOptions {
#[cfg(test)]
mod tests {
use super::build_cache_key;
use std::sync::{Arc, OnceLock};
use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator};
@@ -599,38 +448,6 @@ mod tests {
Connection, Error,
};
#[test]
fn test_cache_key_security() {
// Test that cache keys are unique regardless of delimiter manipulation
// Case 1: Different delimiters should not affect cache key
let key1 = build_cache_key("table1", &["ns1".to_string(), "ns2".to_string()]);
let key2 = build_cache_key("table1", &["ns1$ns2".to_string()]);
assert_ne!(
key1, key2,
"Cache keys should differ for different namespace structures"
);
// Case 2: Table name containing delimiter should not cause collision
let key3 = build_cache_key("ns2$table1", &["ns1".to_string()]);
assert_ne!(
key1, key3,
"Cache key should be different when table name contains delimiter"
);
// Case 3: Empty namespace vs namespace with empty string
let key4 = build_cache_key("table1", &[]);
let key5 = build_cache_key("table1", &["".to_string()]);
assert_ne!(
key4, key5,
"Empty namespace should differ from namespace with empty string"
);
// Case 4: Verify same inputs produce same key (consistency)
let key6 = build_cache_key("table1", &["ns1".to_string(), "ns2".to_string()]);
assert_eq!(key1, key6, "Same inputs should produce same cache key");
}
#[tokio::test]
async fn test_retries() {
// We'll record the request_id here, to check it matches the one in the error.
@@ -894,7 +711,7 @@ mod tests {
http::Response::builder().status(200).body("").unwrap()
});
conn.drop_table("table1", &[]).await.unwrap();
conn.drop_table("table1").await.unwrap();
// NOTE: the API will return 200 even if the table does not exist. So we shouldn't expect 404.
}
@@ -914,9 +731,7 @@ mod tests {
http::Response::builder().status(200).body("").unwrap()
});
conn.rename_table("table1", "table2", &[], &[])
.await
.unwrap();
conn.rename_table("table1", "table2").await.unwrap();
}
#[tokio::test]
@@ -930,186 +745,4 @@ mod tests {
.await
.unwrap();
}
#[tokio::test]
async fn test_table_names_with_root_namespace() {
// When namespace is empty (root namespace), should use /v1/table/ for backwards compatibility
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::GET);
assert_eq!(request.url().path(), "/v1/table/");
assert_eq!(request.url().query(), None);
http::Response::builder()
.status(200)
.body(r#"{"tables": ["table1", "table2"]}"#)
.unwrap()
});
let names = conn
.table_names()
.namespace(vec![])
.execute()
.await
.unwrap();
assert_eq!(names, vec!["table1", "table2"]);
}
#[tokio::test]
async fn test_table_names_with_namespace() {
// When namespace is non-empty, should use /v1/namespace/{id}/table/list
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::GET);
assert_eq!(request.url().path(), "/v1/namespace/test/table/list");
assert_eq!(request.url().query(), None);
http::Response::builder()
.status(200)
.body(r#"{"tables": ["table1", "table2"]}"#)
.unwrap()
});
let names = conn
.table_names()
.namespace(vec!["test".to_string()])
.execute()
.await
.unwrap();
assert_eq!(names, vec!["table1", "table2"]);
}
#[tokio::test]
async fn test_table_names_with_nested_namespace() {
// When namespace is vec!["ns1", "ns2"], should use /v1/namespace/ns1$ns2/table/list
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::GET);
assert_eq!(request.url().path(), "/v1/namespace/ns1$ns2/table/list");
assert_eq!(request.url().query(), None);
http::Response::builder()
.status(200)
.body(r#"{"tables": ["ns1$ns2$table1", "ns1$ns2$table2"]}"#)
.unwrap()
});
let names = conn
.table_names()
.namespace(vec!["ns1".to_string(), "ns2".to_string()])
.execute()
.await
.unwrap();
assert_eq!(names, vec!["ns1$ns2$table1", "ns1$ns2$table2"]);
}
#[tokio::test]
async fn test_open_table_with_namespace() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/table/ns1$ns2$table1/describe/");
assert_eq!(request.url().query(), None);
http::Response::builder()
.status(200)
.body(r#"{"table": "table1"}"#)
.unwrap()
});
let table = conn
.open_table("table1")
.namespace(vec!["ns1".to_string(), "ns2".to_string()])
.execute()
.await
.unwrap();
assert_eq!(table.name(), "table1");
}
#[tokio::test]
async fn test_create_table_with_namespace() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/table/ns1$table1/create/");
assert_eq!(
request
.headers()
.get(reqwest::header::CONTENT_TYPE)
.unwrap(),
ARROW_STREAM_CONTENT_TYPE.as_bytes()
);
http::Response::builder().status(200).body("").unwrap()
});
let data = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let reader = RecordBatchIterator::new([Ok(data.clone())], data.schema());
let table = conn
.create_table("table1", reader)
.namespace(vec!["ns1".to_string()])
.execute()
.await
.unwrap();
assert_eq!(table.name(), "table1");
}
#[tokio::test]
async fn test_drop_table_with_namespace() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/table/ns1$ns2$table1/drop/");
assert_eq!(request.url().query(), None);
assert!(request.body().is_none());
http::Response::builder().status(200).body("").unwrap()
});
conn.drop_table("table1", &["ns1".to_string(), "ns2".to_string()])
.await
.unwrap();
}
#[tokio::test]
async fn test_rename_table_with_namespace() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/table/ns1$table1/rename/");
assert_eq!(
request.headers().get("Content-Type").unwrap(),
JSON_CONTENT_TYPE
);
let body = request.body().unwrap().as_bytes().unwrap();
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
assert_eq!(body["new_table_name"], "table2");
assert_eq!(body["new_namespace"], serde_json::json!(["ns2"]));
http::Response::builder().status(200).body("").unwrap()
});
conn.rename_table(
"table1",
"table2",
&["ns1".to_string()],
&["ns2".to_string()],
)
.await
.unwrap();
}
#[tokio::test]
async fn test_create_empty_table_with_namespace() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/table/prod$data$metrics/create/");
assert_eq!(
request
.headers()
.get(reqwest::header::CONTENT_TYPE)
.unwrap(),
ARROW_STREAM_CONTENT_TYPE.as_bytes()
);
http::Response::builder().status(200).body("").unwrap()
});
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
conn.create_empty_table("metrics", schema)
.namespace(vec!["prod".to_string(), "data".to_string()])
.execute()
.await
.unwrap();
}
}

View File

@@ -70,7 +70,7 @@ impl<S: HttpSend + 'static> Tags for RemoteTags<'_, S> {
let request = self
.inner
.client
.post(&format!("/v1/table/{}/tags/list/", self.inner.identifier));
.post(&format!("/v1/table/{}/tags/list/", self.inner.name));
let (request_id, response) = self.inner.send(request, true).await?;
let response = self
.inner
@@ -104,10 +104,7 @@ impl<S: HttpSend + 'static> Tags for RemoteTags<'_, S> {
let request = self
.inner
.client
.post(&format!(
"/v1/table/{}/tags/version/",
self.inner.identifier
))
.post(&format!("/v1/table/{}/tags/version/", self.inner.name))
.json(&serde_json::json!({ "tag": tag }));
let (request_id, response) = self.inner.send(request, true).await?;
@@ -149,7 +146,7 @@ impl<S: HttpSend + 'static> Tags for RemoteTags<'_, S> {
let request = self
.inner
.client
.post(&format!("/v1/table/{}/tags/create/", self.inner.identifier))
.post(&format!("/v1/table/{}/tags/create/", self.inner.name))
.json(&serde_json::json!({
"tag": tag,
"version": version
@@ -166,7 +163,7 @@ impl<S: HttpSend + 'static> Tags for RemoteTags<'_, S> {
let request = self
.inner
.client
.post(&format!("/v1/table/{}/tags/delete/", self.inner.identifier))
.post(&format!("/v1/table/{}/tags/delete/", self.inner.name))
.json(&serde_json::json!({ "tag": tag }));
let (request_id, response) = self.inner.send(request, true).await?;
@@ -180,7 +177,7 @@ impl<S: HttpSend + 'static> Tags for RemoteTags<'_, S> {
let request = self
.inner
.client
.post(&format!("/v1/table/{}/tags/update/", self.inner.identifier))
.post(&format!("/v1/table/{}/tags/update/", self.inner.name))
.json(&serde_json::json!({
"tag": tag,
"version": version
@@ -199,8 +196,6 @@ pub struct RemoteTable<S: HttpSend = Sender> {
#[allow(dead_code)]
client: RestfulLanceDbClient<S>,
name: String,
namespace: Vec<String>,
identifier: String,
server_version: ServerVersion,
version: RwLock<Option<u64>>,
@@ -210,15 +205,11 @@ impl<S: HttpSend> RemoteTable<S> {
pub fn new(
client: RestfulLanceDbClient<S>,
name: String,
namespace: Vec<String>,
identifier: String,
server_version: ServerVersion,
) -> Self {
Self {
client,
name,
namespace,
identifier,
server_version,
version: RwLock::new(None),
}
@@ -232,7 +223,7 @@ impl<S: HttpSend> RemoteTable<S> {
async fn describe_version(&self, version: Option<u64>) -> Result<TableDescription> {
let mut request = self
.client
.post(&format!("/v1/table/{}/describe/", self.identifier));
.post(&format!("/v1/table/{}/describe/", self.name));
let body = serde_json::json!({ "version": version });
request = request.json(&body);
@@ -343,7 +334,7 @@ impl<S: HttpSend> RemoteTable<S> {
) -> Result<reqwest::Response> {
if response.status() == StatusCode::NOT_FOUND {
return Err(Error::TableNotFound {
name: self.identifier.clone(),
name: self.name.clone(),
});
}
@@ -557,9 +548,7 @@ impl<S: HttpSend> RemoteTable<S> {
query: &AnyQuery,
options: &QueryExecutionOptions,
) -> Result<Vec<Pin<Box<dyn RecordBatchStream + Send>>>> {
let mut request = self
.client
.post(&format!("/v1/table/{}/query/", self.identifier));
let mut request = self.client.post(&format!("/v1/table/{}/query/", self.name));
if let Some(timeout) = options.timeout {
// Also send to server, so it can abort the query if it takes too long.
@@ -626,7 +615,7 @@ struct TableDescription {
impl<S: HttpSend> std::fmt::Display for RemoteTable<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "RemoteTable({})", self.identifier)
write!(f, "RemoteTable({})", self.name)
}
}
@@ -645,9 +634,7 @@ mod test_utils {
let client = client_with_handler(handler);
Self {
client,
name: name.clone(),
namespace: vec![],
identifier: name,
name,
server_version: version.map(ServerVersion).unwrap_or_default(),
version: RwLock::new(None),
}
@@ -663,14 +650,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
fn name(&self) -> &str {
&self.name
}
fn namespace(&self) -> &[String] {
&self.namespace
}
fn id(&self) -> &str {
&self.identifier
}
async fn version(&self) -> Result<u64> {
self.describe().await.map(|desc| desc.version)
}
@@ -699,7 +678,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
async fn restore(&self) -> Result<()> {
let mut request = self
.client
.post(&format!("/v1/table/{}/restore/", self.identifier));
.post(&format!("/v1/table/{}/restore/", self.name));
let version = self.current_version().await;
let body = serde_json::json!({ "version": version });
request = request.json(&body);
@@ -713,7 +692,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
async fn list_versions(&self) -> Result<Vec<Version>> {
let request = self
.client
.post(&format!("/v1/table/{}/version/list/", self.identifier));
.post(&format!("/v1/table/{}/version/list/", self.name));
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
@@ -744,7 +723,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
async fn count_rows(&self, filter: Option<Filter>) -> Result<usize> {
let mut request = self
.client
.post(&format!("/v1/table/{}/count_rows/", self.identifier));
.post(&format!("/v1/table/{}/count_rows/", self.name));
let version = self.current_version().await;
@@ -780,7 +759,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
self.check_mutable().await?;
let mut request = self
.client
.post(&format!("/v1/table/{}/insert/", self.identifier))
.post(&format!("/v1/table/{}/insert/", self.name))
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE);
match add.mode {
@@ -852,7 +831,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
async fn explain_plan(&self, query: &AnyQuery, verbose: bool) -> Result<String> {
let base_request = self
.client
.post(&format!("/v1/table/{}/explain_plan/", self.identifier));
.post(&format!("/v1/table/{}/explain_plan/", self.name));
let query_bodies = self.prepare_query_bodies(query).await?;
let requests: Vec<reqwest::RequestBuilder> = query_bodies
@@ -901,7 +880,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
) -> Result<String> {
let request = self
.client
.post(&format!("/v1/table/{}/analyze_plan/", self.identifier));
.post(&format!("/v1/table/{}/analyze_plan/", self.name));
let query_bodies = self.prepare_query_bodies(query).await?;
let requests: Vec<reqwest::RequestBuilder> = query_bodies
@@ -940,7 +919,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
self.check_mutable().await?;
let request = self
.client
.post(&format!("/v1/table/{}/update/", self.identifier));
.post(&format!("/v1/table/{}/update/", self.name));
let mut updates = Vec::new();
for (column, expression) in update.columns {
@@ -979,7 +958,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let body = serde_json::json!({ "predicate": predicate });
let request = self
.client
.post(&format!("/v1/table/{}/delete/", self.identifier))
.post(&format!("/v1/table/{}/delete/", self.name))
.json(&body);
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
@@ -1001,7 +980,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
self.check_mutable().await?;
let request = self
.client
.post(&format!("/v1/table/{}/create_index/", self.identifier));
.post(&format!("/v1/table/{}/create_index/", self.name));
let column = match index.columns.len() {
0 => {
@@ -1142,7 +1121,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let query = MergeInsertRequest::try_from(params)?;
let mut request = self
.client
.post(&format!("/v1/table/{}/merge_insert/", self.identifier))
.post(&format!("/v1/table/{}/merge_insert/", self.name))
.query(&query)
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE);
@@ -1214,7 +1193,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let body = serde_json::json!({ "new_columns": body });
let request = self
.client
.post(&format!("/v1/table/{}/add_columns/", self.identifier))
.post(&format!("/v1/table/{}/add_columns/", self.name))
.json(&body);
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
@@ -1267,7 +1246,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let body = serde_json::json!({ "alterations": body });
let request = self
.client
.post(&format!("/v1/table/{}/alter_columns/", self.identifier))
.post(&format!("/v1/table/{}/alter_columns/", self.name))
.json(&body);
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
@@ -1292,7 +1271,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let body = serde_json::json!({ "columns": columns });
let request = self
.client
.post(&format!("/v1/table/{}/drop_columns/", self.identifier))
.post(&format!("/v1/table/{}/drop_columns/", self.name))
.json(&body);
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
@@ -1316,7 +1295,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
// Make request to list the indices
let mut request = self
.client
.post(&format!("/v1/table/{}/index/list/", self.identifier));
.post(&format!("/v1/table/{}/index/list/", self.name));
let version = self.current_version().await;
let body = serde_json::json!({ "version": version });
request = request.json(&body);
@@ -1372,7 +1351,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
async fn index_stats(&self, index_name: &str) -> Result<Option<IndexStatistics>> {
let mut request = self.client.post(&format!(
"/v1/table/{}/index/{}/stats/",
self.identifier, index_name
self.name, index_name
));
let version = self.current_version().await;
let body = serde_json::json!({ "version": version });
@@ -1400,7 +1379,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
async fn drop_index(&self, index_name: &str) -> Result<()> {
let request = self.client.post(&format!(
"/v1/table/{}/index/{}/drop/",
self.identifier, index_name
self.name, index_name
));
let (request_id, response) = self.send(request, true).await?;
if response.status() == StatusCode::NOT_FOUND {
@@ -1428,9 +1407,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
}
async fn stats(&self) -> Result<TableStatistics> {
let request = self
.client
.post(&format!("/v1/table/{}/stats/", self.identifier));
let request = self.client.post(&format!("/v1/table/{}/stats/", self.name));
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
let body = response.text().await.err_to_http(request_id.clone())?;
@@ -3105,174 +3082,4 @@ mod tests {
});
table
}
#[tokio::test]
async fn test_table_with_namespace_identifier() {
// Test that a table created with namespace uses the correct identifier in API calls
let table = Table::new_with_handler("ns1$ns2$table1", |request| {
assert_eq!(request.method(), "POST");
// All API calls should use the full identifier in the path
assert_eq!(request.url().path(), "/v1/table/ns1$ns2$table1/describe/");
http::Response::builder()
.status(200)
.body(r#"{"version": 1, "schema": { "fields": [] }}"#)
.unwrap()
});
// The name() method should return just the base name, not the full identifier
assert_eq!(table.name(), "ns1$ns2$table1");
// API operations should work correctly
let version = table.version().await.unwrap();
assert_eq!(version, 1);
}
#[tokio::test]
async fn test_query_with_namespace() {
let table = Table::new_with_handler("analytics$events", |request| {
match request.url().path() {
"/v1/table/analytics$events/query/" => {
assert_eq!(request.method(), "POST");
// Return empty arrow stream
let data = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let body = write_ipc_file(&data);
http::Response::builder()
.status(200)
.header("Content-Type", ARROW_FILE_CONTENT_TYPE)
.body(body)
.unwrap()
}
_ => {
panic!("Unexpected path: {}", request.url().path());
}
}
});
let results = table.query().execute().await.unwrap();
let batches = results.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 3);
}
#[tokio::test]
async fn test_add_data_with_namespace() {
let data = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let (sender, receiver) = std::sync::mpsc::channel();
let table = Table::new_with_handler("prod$metrics", move |mut request| {
if request.url().path() == "/v1/table/prod$metrics/insert/" {
assert_eq!(request.method(), "POST");
assert_eq!(
request.headers().get("Content-Type").unwrap(),
ARROW_STREAM_CONTENT_TYPE
);
let mut body_out = reqwest::Body::from(Vec::new());
std::mem::swap(request.body_mut().as_mut().unwrap(), &mut body_out);
sender.send(body_out).unwrap();
http::Response::builder()
.status(200)
.body(r#"{"version": 2}"#)
.unwrap()
} else {
panic!("Unexpected request path: {}", request.url().path());
}
});
let result = table
.add(RecordBatchIterator::new([Ok(data.clone())], data.schema()))
.execute()
.await
.unwrap();
assert_eq!(result.version, 2);
let body = receiver.recv().unwrap();
let body = collect_body(body).await;
let expected_body = write_ipc_stream(&data);
assert_eq!(&body, &expected_body);
}
#[tokio::test]
async fn test_create_index_with_namespace() {
let table = Table::new_with_handler("dev$users", |request| {
match request.url().path() {
"/v1/table/dev$users/create_index/" => {
assert_eq!(request.method(), "POST");
assert_eq!(
request.headers().get("Content-Type").unwrap(),
JSON_CONTENT_TYPE
);
// Verify the request body contains the column name
if let Some(body) = request.body().unwrap().as_bytes() {
let body = std::str::from_utf8(body).unwrap();
let value: serde_json::Value = serde_json::from_str(body).unwrap();
assert_eq!(value["column"], "embedding");
assert_eq!(value["index_type"], "IVF_PQ");
}
http::Response::builder().status(200).body("").unwrap()
}
"/v1/table/dev$users/describe/" => {
// Needed for schema check in Auto index type
http::Response::builder()
.status(200)
.body(r#"{"version": 1, "schema": {"fields": [{"name": "embedding", "type": {"type": "list", "item": {"type": "float32"}}, "nullable": false}]}}"#)
.unwrap()
}
_ => {
panic!("Unexpected path: {}", request.url().path());
}
}
});
table
.create_index(&["embedding"], Index::IvfPq(IvfPqIndexBuilder::default()))
.execute()
.await
.unwrap();
}
#[tokio::test]
async fn test_drop_columns_with_namespace() {
let table = Table::new_with_handler("test$schema_ops", |request| {
assert_eq!(request.method(), "POST");
assert_eq!(
request.url().path(),
"/v1/table/test$schema_ops/drop_columns/"
);
assert_eq!(
request.headers().get("Content-Type").unwrap(),
JSON_CONTENT_TYPE
);
if let Some(body) = request.body().unwrap().as_bytes() {
let body = std::str::from_utf8(body).unwrap();
let value: serde_json::Value = serde_json::from_str(body).unwrap();
let columns = value["columns"].as_array().unwrap();
assert_eq!(columns.len(), 2);
assert_eq!(columns[0], "old_col1");
assert_eq!(columns[1], "old_col2");
}
http::Response::builder()
.status(200)
.body(r#"{"version": 5}"#)
.unwrap()
});
let result = table.drop_columns(&["old_col1", "old_col2"]).await.unwrap();
assert_eq!(result.version, 5);
}
}

View File

@@ -32,7 +32,7 @@ use lance::index::vector::VectorIndexParams;
use lance::io::WrappingObjectStore;
use lance_datafusion::exec::{analyze_plan as lance_analyze_plan, execute_plan};
use lance_datafusion::utils::StreamingWriteSource;
use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams};
use lance_index::scalar::{ScalarIndexParams, ScalarIndexType};
use lance_index::vector::hnsw::builder::HnswBuildParams;
use lance_index::vector::ivf::IvfBuildParams;
use lance_index::vector::pq::PQBuildParams;
@@ -509,10 +509,6 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
fn as_any(&self) -> &dyn std::any::Any;
/// Get the name of the table.
fn name(&self) -> &str;
/// Get the namespace of the table.
fn namespace(&self) -> &[String];
/// Get the id of the table
fn id(&self) -> &str;
/// Get the arrow [Schema] of the table.
async fn schema(&self) -> Result<SchemaRef>;
/// Count the number of rows in this table.
@@ -1778,9 +1774,7 @@ impl NativeTable {
);
Ok(Box::new(lance_idx_params))
} else if supported_btree_data_type(field.data_type()) {
Ok(Box::new(ScalarIndexParams::for_builtin(
BuiltinIndexType::BTree,
)))
Ok(Box::new(ScalarIndexParams::new(ScalarIndexType::BTree)))
} else {
return Err(Error::InvalidInput {
message: format!(
@@ -1793,21 +1787,15 @@ impl NativeTable {
}
Index::BTree(_) => {
Self::validate_index_type(field, "BTree", supported_btree_data_type)?;
Ok(Box::new(ScalarIndexParams::for_builtin(
BuiltinIndexType::BTree,
)))
Ok(Box::new(ScalarIndexParams::new(ScalarIndexType::BTree)))
}
Index::Bitmap(_) => {
Self::validate_index_type(field, "Bitmap", supported_bitmap_data_type)?;
Ok(Box::new(ScalarIndexParams::for_builtin(
BuiltinIndexType::Bitmap,
)))
Ok(Box::new(ScalarIndexParams::new(ScalarIndexType::Bitmap)))
}
Index::LabelList(_) => {
Self::validate_index_type(field, "LabelList", supported_label_list_data_type)?;
Ok(Box::new(ScalarIndexParams::for_builtin(
BuiltinIndexType::LabelList,
)))
Ok(Box::new(ScalarIndexParams::new(ScalarIndexType::LabelList)))
}
Index::FTS(fts_opts) => {
Self::validate_index_type(field, "FTS", supported_fts_data_type)?;
@@ -2019,16 +2007,6 @@ impl BaseTable for NativeTable {
self.name.as_str()
}
fn namespace(&self) -> &[String] {
// Native tables don't support namespaces yet, return empty slice for root namespace
&[]
}
fn id(&self) -> &str {
// For native tables, id is same as name since no namespace support
self.name.as_str()
}
async fn version(&self) -> Result<u64> {
Ok(self.dataset.get().await?.version().version)
}
@@ -3272,7 +3250,6 @@ mod tests {
fn wrap(
&self,
original: Arc<dyn object_store::ObjectStore>,
_storage_options: Option<&std::collections::HashMap<String, String>>,
) -> Arc<dyn object_store::ObjectStore> {
self.called.store(true, Ordering::Relaxed);
original

View File

@@ -130,7 +130,7 @@ async fn test_minio_lifecycle() -> Result<()> {
let data = RecordBatchIterator::new(vec![Ok(data.clone())], data.schema());
table.add(data).execute().await?;
db.drop_table("test_table", &[]).await?;
db.drop_table("test_table").await?;
Ok(())
}