feat: add serialize_to_json/from_serialized_json for DBConnection

Add serialization support to DBConnection classes so connections
can be reconstructed in remote workers without tracking namespace
params separately.

- DBConnection.serialize_to_json() base method
- LanceDBConnection: serializes uri, storage_options, read_consistency_interval
- LanceNamespaceDBConnection: stores namespace_client_impl/properties,
  serializes all connection params including pushdown_operations
- from_serialized_json() factory with for_worker flag for worker_uri swap
- connect_namespace() now passes impl/properties to connection for serialization

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Jack Ye
2026-04-11 14:19:59 -07:00
parent c6ae0de3ee
commit dd2a0ec48f
3 changed files with 117 additions and 0 deletions

View File

@@ -215,6 +215,65 @@ def connect(
)
WORKER_URI_KEY = "worker_uri"
def from_serialized_json(
json_str: str,
*,
for_worker: bool = False,
) -> DBConnection:
"""Reconstruct a DBConnection from a JSON string.
The JSON string must have been produced by
:meth:`DBConnection.serialize_to_json`.
Parameters
----------
json_str : str
JSON string produced by ``serialize_to_json()``.
for_worker : bool, default False
When ``True`` and the serialized connection contains a
``worker_uri`` key in its namespace properties, the worker URI
replaces the primary ``uri`` before the connection is created.
This allows remote workers to connect via an internal endpoint.
Returns
-------
DBConnection
A new connection matching the serialized state.
"""
import json
data = json.loads(json_str)
connection_type = data.get("connection_type")
rci_secs = data.get("read_consistency_interval_seconds")
rci = timedelta(seconds=rci_secs) if rci_secs is not None else None
storage_options = data.get("storage_options")
if connection_type == "namespace":
props = dict(data.get("namespace_client_properties") or {})
if for_worker and WORKER_URI_KEY in props:
worker_uri = props.pop(WORKER_URI_KEY)
props["uri"] = worker_uri
return connect_namespace(
namespace_client_impl=data["namespace_client_impl"],
namespace_client_properties=props,
read_consistency_interval=rci,
storage_options=storage_options,
namespace_client_pushdown_operations=data.get("pushdown_operations"),
)
elif connection_type == "local":
return LanceDBConnection(
data["uri"],
read_consistency_interval=rci,
storage_options=storage_options,
)
else:
raise ValueError(f"Unknown connection_type: {connection_type}")
async def connect_async(
uri: URI,
*,

View File

@@ -529,6 +529,21 @@ class DBConnection(EnforceOverrides):
"namespace_client is not supported for this connection type"
)
def serialize_to_json(self) -> str:
"""Serialize this connection to a JSON string for reconstruction.
The returned JSON can be passed to :func:`from_serialized_json`
to recreate an equivalent connection, e.g. in a remote worker.
Returns
-------
str
JSON string representation of this connection.
"""
raise NotImplementedError(
"serialize_to_json is not supported for this connection type"
)
class LanceDBConnection(DBConnection):
"""
@@ -652,6 +667,20 @@ class LanceDBConnection(DBConnection):
val += ")"
return val
@override
def serialize_to_json(self) -> str:
import json
rci = self.read_consistency_interval
return json.dumps({
"connection_type": "local",
"uri": self.uri,
"storage_options": self.storage_options,
"read_consistency_interval_seconds": (
rci.total_seconds() if rci else None
),
})
async def _async_get_table_names(self, start_after: Optional[str], limit: int):
conn = AsyncConnection(await lancedb_connect(self.uri))
return await conn.table_names(start_after=start_after, limit=limit)

View File

@@ -381,6 +381,8 @@ class LanceNamespaceDBConnection(DBConnection):
storage_options: Optional[Dict[str, str]] = None,
session: Optional[Session] = None,
namespace_client_pushdown_operations: Optional[List[str]] = None,
namespace_client_impl: Optional[str] = None,
namespace_client_properties: Optional[Dict[str, str]] = None,
):
"""
Initialize a namespace-based LanceDB connection.
@@ -406,12 +408,37 @@ class LanceNamespaceDBConnection(DBConnection):
namespace.create_table() instead of using declare_table + local write.
Default is None (no pushdown, all operations run locally).
namespace_client_impl : Optional[str]
The namespace implementation name used to create this connection.
Stored for serialization purposes.
namespace_client_properties : Optional[Dict[str, str]]
The namespace properties used to create this connection.
Stored for serialization purposes.
"""
self._namespace_client = namespace_client
self.read_consistency_interval = read_consistency_interval
self.storage_options = storage_options or {}
self.session = session
self._pushdown_operations = set(namespace_client_pushdown_operations or [])
self._namespace_client_impl = namespace_client_impl
self._namespace_client_properties = namespace_client_properties
@override
def serialize_to_json(self) -> str:
import json
return json.dumps({
"connection_type": "namespace",
"namespace_client_impl": self._namespace_client_impl,
"namespace_client_properties": self._namespace_client_properties,
"pushdown_operations": sorted(self._pushdown_operations),
"storage_options": self.storage_options or None,
"read_consistency_interval_seconds": (
self.read_consistency_interval.total_seconds()
if self.read_consistency_interval
else None
),
})
@override
def table_names(
@@ -1472,6 +1499,8 @@ def connect_namespace(
storage_options=storage_options,
session=session,
namespace_client_pushdown_operations=namespace_client_pushdown_operations,
namespace_client_impl=namespace_client_impl,
namespace_client_properties=namespace_client_properties,
)