Files
lancedb/python/python/lancedb/remote/db.py
Raphael Malikian 0ba70d96c3 fix: add missing stacklevel=2 to warnings.warn() and fix broken message concatenation (Fixes #3563) (#3564)
Fixes #3563

## Summary

- Add `stacklevel=2` to 10 `warnings.warn()` calls across 4 files
- Fix broken message concatenation in `table.py` where the second string
was incorrectly passed as the `category` parameter

## Problem

Multiple `warnings.warn()` calls in the `python/lancedb/` codebase were
missing the `stacklevel` parameter. Without `stacklevel=2`, warnings
point to library internals instead of the caller's code, making it
impossible for users to identify which of their function calls triggered
the warning.

Additionally, two calls in `table.py` (lines 3411 and 3420) had a more
serious bug: the deprecation message was split across two separate
string arguments, causing the second string to be passed as the
`category` parameter instead of being concatenated with the first
string. This would cause `TypeError` when the warning was triggered.

## Changes

| File | Fixes | Description |
|------|-------|-------------|
| `embeddings/colpali.py` | 1 | Add `stacklevel=2` to
`use_token_pooling` deprecation warning |
| `remote/db.py` | 3 | Add `stacklevel=2` to `request_thread_pool`,
`connection_timeout`, `read_timeout` deprecation warnings |
| `remote/table.py` | 3 | Add `stacklevel=2` to `cleanup_old_versions`,
`compact_files`, `optimize` no-op warnings |
| `table.py` | 3 | Fix broken message concatenation for
`data_storage_version` and `enable_v2_manifest_paths` deprecation
warnings + add `stacklevel=2` to `retrain` deprecation warning |

## Verification

```python
# All warnings.warn() calls now have stacklevel
python3 -c "import ast, os; ..."
# Result: All warnings.warn() calls now have stacklevel!
```

## Changelog

| Date | Change | Author |
|------|--------|--------|
| 2026-06-20 | Fix missing stacklevel=2 in 10 warnings.warn() calls +
fix broken message concatenation | rtmalikian |

### Files Changed
- `python/python/lancedb/embeddings/colpali.py` — Add stacklevel=2
- `python/python/lancedb/remote/db.py` — Add stacklevel=2 to 3
deprecation warnings
- `python/python/lancedb/remote/table.py` — Add stacklevel=2 to 3 no-op
warnings
- `python/python/lancedb/table.py` — Fix broken message concatenation +
add stacklevel=2

### Verification
- AST-based audit confirms all `warnings.warn()` calls now include
`stacklevel=2`
- Syntax check passes for all 4 modified files

---

**About the Author:** Raphael Malikian — Clinical AI Solutions
Architect. I specialise in building and fixing AI/ML systems for
healthcare, including vector databases, RAG pipelines, and clinical NLP.
If you need help with your project or think I can add value to your
organisation, feel free to reach out — I'd love to connect.

📧 rtmalikian@gmail.com
🔗 GitHub: https://github.com/rtmalikian
🔗 LinkedIn:
http://www.linkedin.com/in/raphael-t-malikian-mbbs-bsc-hons-71075436a

---

**Disclosure:** This code was developed with assistance from **Hermes
Agent** (Nous Research). All changes were reviewed, tested against the
actual codebase, and verified for correctness.

Signed-off-by: rtmalikian <rtmalikian@gmail.com>
2026-06-23 13:42:59 -07:00

703 lines
23 KiB
Python

# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
from datetime import timedelta
import json
import logging
from concurrent.futures import ThreadPoolExecutor
import sys
from typing import Any, Dict, Iterable, List, Optional, Union
from urllib.parse import urlparse
import warnings
if sys.version_info >= (3, 12):
from typing import override
else:
from overrides import override
# Remove this import to fix circular dependency
# from lancedb import connect_async
from lancedb.remote import ClientConfig, RetryConfig, TimeoutConfig, TlsConfig
import pyarrow as pa
from ..common import DATA
from ..db import DBConnection, LOOP
from ..embeddings import EmbeddingFunctionConfig
from lance_namespace import (
LanceNamespace,
CreateNamespaceResponse,
DescribeNamespaceResponse,
DropNamespaceResponse,
ListNamespacesResponse,
ListTablesResponse,
)
from ..pydantic import LanceModel
from ..table import Table
from ..util import validate_table_name
def _duration_seconds(value: Optional[timedelta]) -> Optional[float]:
return value.total_seconds() if value is not None else None
def _timeout_config_to_dict(
config: Optional[TimeoutConfig],
) -> Optional[dict[str, Any]]:
if config is None:
return None
return {
"timeout": _duration_seconds(config.timeout),
"connect_timeout": _duration_seconds(config.connect_timeout),
"read_timeout": _duration_seconds(config.read_timeout),
"pool_idle_timeout": _duration_seconds(config.pool_idle_timeout),
}
def _retry_config_to_dict(config: RetryConfig) -> dict[str, Any]:
return {
"retries": config.retries,
"connect_retries": config.connect_retries,
"read_retries": config.read_retries,
"backoff_factor": config.backoff_factor,
"backoff_jitter": config.backoff_jitter,
"statuses": config.statuses,
}
def _tls_config_to_dict(config: Optional[TlsConfig]) -> Optional[dict[str, Any]]:
if config is None:
return None
return {
"cert_file": config.cert_file,
"key_file": config.key_file,
"ssl_ca_cert": config.ssl_ca_cert,
"assert_hostname": config.assert_hostname,
}
def _client_config_to_dict(config: ClientConfig) -> dict[str, Any]:
if config.header_provider is not None:
raise ValueError(
"Cannot serialize a remote connection with a header_provider. "
"Use static api_key/extra_headers or provide a worker-side "
"connection factory instead."
)
return {
"user_agent": config.user_agent,
"retry_config": _retry_config_to_dict(config.retry_config),
"timeout_config": _timeout_config_to_dict(config.timeout_config),
"extra_headers": config.extra_headers,
"id_delimiter": config.id_delimiter,
"tls_config": _tls_config_to_dict(config.tls_config),
"header_provider": None,
"user_id": config.user_id,
}
class RemoteDBConnection(DBConnection):
"""A connection to a remote LanceDB database."""
def __init__(
self,
db_url: str,
api_key: str,
region: str,
host_override: Optional[str] = None,
request_thread_pool: Optional[ThreadPoolExecutor] = None,
client_config: Union[ClientConfig, Dict[str, Any], None] = None,
connection_timeout: Optional[float] = None,
read_timeout: Optional[float] = None,
storage_options: Optional[Dict[str, str]] = None,
read_consistency_interval: Optional[timedelta] = None,
):
"""Connect to a remote LanceDB database."""
if isinstance(client_config, dict):
client_config = ClientConfig(**client_config)
elif client_config is None:
client_config = ClientConfig()
# These are legacy options from the old Python-based client. We keep them
# here for backwards compatibility, but will remove them in a future release.
if request_thread_pool is not None:
warnings.warn(
"request_thread_pool is no longer used and will be removed in "
"a future release.",
DeprecationWarning,
stacklevel=2,
)
if connection_timeout is not None:
warnings.warn(
"connection_timeout is deprecated and will be removed in a future "
"release. Please use client_config.timeout_config.connect_timeout "
"instead.",
DeprecationWarning,
stacklevel=2,
)
client_config.timeout_config.connect_timeout = timedelta(
seconds=connection_timeout
)
if read_timeout is not None:
warnings.warn(
"read_timeout is deprecated and will be removed in a future release. "
"Please use client_config.timeout_config.read_timeout instead.",
DeprecationWarning,
stacklevel=2,
)
client_config.timeout_config.read_timeout = timedelta(seconds=read_timeout)
parsed = urlparse(db_url)
if parsed.scheme != "db":
raise ValueError(f"Invalid scheme: {parsed.scheme}, only accepts db://")
self.db_url = db_url
self.api_key = api_key
self.region = region
self.host_override = host_override
self.storage_options = storage_options
self.db_name = parsed.netloc
self.client_config = client_config
# Import connect_async here to avoid circular import
from lancedb import connect_async
self._conn = LOOP.run(
connect_async(
db_url,
api_key=api_key,
region=region,
host_override=host_override,
client_config=client_config,
storage_options=storage_options,
read_consistency_interval=read_consistency_interval,
)
)
def __repr__(self) -> str:
return f"RemoteConnect(name={self.db_name})"
@override
def serialize(self) -> str:
return json.dumps(
{
"connection_type": "remote",
"db_url": self.db_url,
"api_key": self.api_key,
"region": self.region,
"host_override": self.host_override,
"client_config": _client_config_to_dict(self.client_config),
"storage_options": self.storage_options,
}
)
@override
def list_namespaces(
self,
namespace_path: Optional[List[str]] = None,
page_token: Optional[str] = None,
limit: Optional[int] = None,
) -> ListNamespacesResponse:
"""List immediate child namespace names in the given namespace.
Parameters
----------
namespace_path: List[str], optional
The parent namespace to list namespaces in.
None or empty list represents root namespace.
page_token: str, optional
Token for pagination. Use the token from a previous response
to get the next page of results.
limit: int, optional
The maximum number of results to return.
Returns
-------
ListNamespacesResponse
Response containing namespace names and optional page_token for pagination.
"""
if namespace_path is None:
namespace_path = []
return LOOP.run(
self._conn.list_namespaces(
namespace_path=namespace_path, page_token=page_token, limit=limit
)
)
@override
def create_namespace(
self,
namespace_path: List[str],
mode: Optional[str] = None,
properties: Optional[Dict[str, str]] = None,
) -> CreateNamespaceResponse:
"""Create a new namespace.
Parameters
----------
namespace_path: List[str]
The namespace identifier to create.
mode: str, optional
Creation mode - "create" (fail if exists), "exist_ok" (skip if exists),
or "overwrite" (replace if exists). Case insensitive.
properties: Dict[str, str], optional
Properties to set on the namespace.
Returns
-------
CreateNamespaceResponse
Response containing the properties of the created namespace.
"""
return LOOP.run(
self._conn.create_namespace(
namespace_path=namespace_path, mode=mode, properties=properties
)
)
@override
def drop_namespace(
self,
namespace_path: List[str],
mode: Optional[str] = None,
behavior: Optional[str] = None,
) -> DropNamespaceResponse:
"""Drop a namespace.
Parameters
----------
namespace_path: List[str]
The namespace identifier to drop.
mode: str, optional
Whether to skip if not exists ("SKIP") or fail ("FAIL"). Case insensitive.
behavior: str, optional
Whether to restrict drop if not empty ("RESTRICT") or cascade ("CASCADE").
Case insensitive.
Returns
-------
DropNamespaceResponse
Response containing properties and transaction_id if applicable.
"""
return LOOP.run(
self._conn.drop_namespace(
namespace_path=namespace_path, mode=mode, behavior=behavior
)
)
@override
def describe_namespace(
self, namespace_path: List[str]
) -> DescribeNamespaceResponse:
"""Describe a namespace.
Parameters
----------
namespace_path: List[str]
The namespace identifier to describe.
Returns
-------
DescribeNamespaceResponse
Response containing the namespace properties.
"""
return LOOP.run(self._conn.describe_namespace(namespace_path=namespace_path))
@override
def list_tables(
self,
namespace_path: Optional[List[str]] = None,
page_token: Optional[str] = None,
limit: Optional[int] = None,
) -> ListTablesResponse:
"""List all tables in this database with pagination support.
Parameters
----------
namespace_path: List[str], optional
The namespace to list tables in.
None or empty list represents root namespace.
page_token: str, optional
Token for pagination. Use the token from a previous response
to get the next page of results.
limit: int, optional
The maximum number of results to return.
Returns
-------
ListTablesResponse
Response containing table names and optional page_token for pagination.
"""
if namespace_path is None:
namespace_path = []
return LOOP.run(
self._conn.list_tables(
namespace_path=namespace_path, page_token=page_token, limit=limit
)
)
@override
def table_names(
self,
page_token: Optional[str] = None,
limit: int = 10,
*,
namespace_path: Optional[List[str]] = None,
) -> Iterable[str]:
"""List the names of all tables in the database.
.. deprecated::
Use :meth:`list_tables` instead, which provides proper pagination support.
Parameters
----------
namespace_path: List[str], default []
The namespace to list tables in.
Empty list represents root namespace.
page_token: str
The last token to start the new page.
limit: int, default 10
The maximum number of tables to return for each page.
Returns
-------
An iterator of table names.
"""
import warnings
warnings.warn(
"table_names() is deprecated, use list_tables() instead",
DeprecationWarning,
stacklevel=2,
)
if namespace_path is None:
namespace_path = []
return LOOP.run(
self._conn.table_names(
namespace_path=namespace_path, start_after=page_token, limit=limit
)
)
@override
def open_table(
self,
name: str,
*,
namespace_path: Optional[List[str]] = None,
storage_options: Optional[Dict[str, str]] = None,
index_cache_size: Optional[int] = None,
branch: Optional[str] = None,
version: Optional[int] = None,
) -> Table:
"""Open a Lance Table in the database.
Parameters
----------
name: str
The name of the table.
namespace_path: List[str], optional
The namespace to open the table from.
None or empty list represents root namespace.
branch: str, optional
If provided, open a handle scoped to this branch instead of the
default branch. Reads and writes operate in the branch's context.
version: int, optional
If provided, open the table pinned to this version, producing a
read-only handle. Composes with ``branch``: when both are given,
opens that branch at the version; otherwise opens ``main`` at the
version. Call ``checkout_latest`` to return to a writable state.
Returns
-------
A LanceTable object representing the table.
"""
from .table import RemoteTable
if namespace_path is None:
namespace_path = []
if index_cache_size is not None:
logging.info(
"index_cache_size is ignored in LanceDb Cloud"
" (there is no local cache to configure)"
)
table = LOOP.run(self._conn.open_table(name, namespace_path=namespace_path))
tbl = RemoteTable(
table,
self.db_name,
connection_state=self.serialize,
namespace_path=namespace_path,
)
if branch is not None:
tbl = tbl.branches.checkout(branch, version)
elif version is not None:
tbl.checkout(version)
return tbl
def clone_table(
self,
target_table_name: str,
source_uri: str,
*,
target_namespace_path: Optional[List[str]] = None,
source_version: Optional[int] = None,
source_tag: Optional[str] = None,
is_shallow: bool = True,
) -> Table:
"""Clone a table from a source table.
Parameters
----------
target_table_name: str
The name of the target table to create.
source_uri: str
The URI of the source table to clone from.
target_namespace_path: List[str], optional
The namespace for the target table.
None or empty list represents root namespace.
source_version: int, optional
The version of the source table to clone.
source_tag: str, optional
The tag of the source table to clone.
is_shallow: bool, default True
Whether to perform a shallow clone (True) or deep clone (False).
Currently only shallow clone is supported.
Returns
-------
A RemoteTable object representing the cloned table.
"""
from .table import RemoteTable
if target_namespace_path is None:
target_namespace_path = []
table = LOOP.run(
self._conn.clone_table(
target_table_name,
source_uri,
target_namespace_path=target_namespace_path,
source_version=source_version,
source_tag=source_tag,
is_shallow=is_shallow,
)
)
return RemoteTable(
table,
self.db_name,
connection_state=self.serialize,
namespace_path=target_namespace_path,
)
@override
def create_table(
self,
name: str,
data: DATA = None,
schema: Optional[Union[pa.Schema, LanceModel]] = None,
on_bad_vectors: str = "error",
fill_value: float = 0.0,
mode: Optional[str] = None,
exist_ok: bool = False,
embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None,
*,
namespace_path: Optional[List[str]] = None,
) -> Table:
"""Create a [Table][lancedb.table.Table] in the database.
Parameters
----------
name: str
The name of the table.
namespace_path: List[str], optional
The namespace to create the table in.
None or empty list represents root namespace.
data: The data to initialize the table, *optional*
User must provide at least one of `data` or `schema`.
Acceptable types are:
- dict or list-of-dict
- pandas.DataFrame
- pyarrow.Table or pyarrow.RecordBatch
schema: The schema of the table, *optional*
Acceptable types are:
- pyarrow.Schema
- [LanceModel][lancedb.pydantic.LanceModel]
mode: str, default "create"
The mode to use when creating the table.
Can be either "create", "overwrite", or "exist_ok".
exist_ok: bool, default False
If exist_ok is True, and mode is None or "create", mode will be changed
to "exist_ok".
on_bad_vectors: str, default "error"
What to do if any of the vectors are not the same size or contains NaNs.
One of "error", "drop", "fill".
fill_value: float
The value to use when filling vectors. Only used if on_bad_vectors="fill".
Returns
-------
LanceTable
A reference to the newly created table.
!!! note
The vector index won't be created by default.
To create the index, call the `create_index` method on the table.
Examples
--------
Can create with list of tuples or dictionaries:
>>> import lancedb
>>> db = lancedb.connect("db://...", api_key="...", # doctest: +SKIP
... region="...") # doctest: +SKIP
>>> data = [{"vector": [1.1, 1.2], "lat": 45.5, "long": -122.7},
... {"vector": [0.2, 1.8], "lat": 40.1, "long": -74.1}]
>>> db.create_table("my_table", data) # doctest: +SKIP
LanceTable(my_table)
You can also pass a pandas DataFrame:
>>> import pandas as pd
>>> data = pd.DataFrame({
... "vector": [[1.1, 1.2], [0.2, 1.8]],
... "lat": [45.5, 40.1],
... "long": [-122.7, -74.1]
... })
>>> db.create_table("table2", data) # doctest: +SKIP
LanceTable(table2)
>>> custom_schema = pa.schema([
... pa.field("vector", pa.list_(pa.float32(), 2)),
... pa.field("lat", pa.float32()),
... pa.field("long", pa.float32())
... ])
>>> db.create_table("table3", data, schema = custom_schema) # doctest: +SKIP
LanceTable(table3)
It is also possible to create an table from `[Iterable[pa.RecordBatch]]`:
>>> import pyarrow as pa
>>> def make_batches():
... for i in range(5):
... yield pa.RecordBatch.from_arrays(
... [
... pa.array([[3.1, 4.1], [5.9, 26.5]],
... pa.list_(pa.float32(), 2)),
... pa.array(["foo", "bar"]),
... pa.array([10.0, 20.0]),
... ],
... ["vector", "item", "price"],
... )
>>> schema=pa.schema([
... pa.field("vector", pa.list_(pa.float32(), 2)),
... pa.field("item", pa.utf8()),
... pa.field("price", pa.float32()),
... ])
>>> db.create_table("table4", make_batches(), schema=schema) # doctest: +SKIP
LanceTable(table4)
"""
if exist_ok:
if mode == "create":
mode = "exist_ok"
elif not mode:
mode = "exist_ok"
if namespace_path is None:
namespace_path = []
validate_table_name(name)
if embedding_functions is not None:
logging.warning(
"embedding_functions is not yet supported on LanceDB Cloud."
"Please vote https://github.com/lancedb/lancedb/issues/626 "
"for this feature."
)
from .table import RemoteTable
table = LOOP.run(
self._conn.create_table(
name,
data,
namespace_path=namespace_path,
mode=mode,
schema=schema,
on_bad_vectors=on_bad_vectors,
fill_value=fill_value,
)
)
return RemoteTable(
table,
self.db_name,
connection_state=self.serialize,
namespace_path=namespace_path,
)
@override
def drop_table(self, name: str, namespace_path: Optional[List[str]] = None):
"""Drop a table from the database.
Parameters
----------
name: str
The name of the table.
namespace_path: List[str], optional
The namespace to drop the table from.
None or empty list represents root namespace.
"""
if namespace_path is None:
namespace_path = []
LOOP.run(self._conn.drop_table(name, namespace_path=namespace_path))
@override
def rename_table(
self,
cur_name: str,
new_name: str,
cur_namespace_path: Optional[List[str]] = None,
new_namespace_path: Optional[List[str]] = None,
):
"""Rename a table in the database.
Parameters
----------
cur_name: str
The current name of the table.
new_name: str
The new name of the table.
"""
if cur_namespace_path is None:
cur_namespace_path = []
if new_namespace_path is None:
new_namespace_path = []
LOOP.run(
self._conn.rename_table(
cur_name,
new_name,
cur_namespace_path=cur_namespace_path,
new_namespace_path=new_namespace_path,
)
)
@override
def namespace_client(self) -> LanceNamespace:
"""Get the equivalent namespace client for this connection.
Returns a RestNamespace with the same URI and authentication headers.
Returns
-------
LanceNamespace
The namespace client for this connection.
"""
return LOOP.run(self._conn.namespace_client())
async def close(self):
"""Close the connection to the database."""
self._conn.close()