From dd2a0ec48f6c3f404723c7c22456830c4d0f3737 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Sat, 11 Apr 2026 14:19:59 -0700 Subject: [PATCH] feat: add serialize_to_json/from_serialized_json for DBConnection Add serialization support to DBConnection classes so connections can be reconstructed in remote workers without tracking namespace params separately. - DBConnection.serialize_to_json() base method - LanceDBConnection: serializes uri, storage_options, read_consistency_interval - LanceNamespaceDBConnection: stores namespace_client_impl/properties, serializes all connection params including pushdown_operations - from_serialized_json() factory with for_worker flag for worker_uri swap - connect_namespace() now passes impl/properties to connection for serialization Co-Authored-By: Claude Opus 4.6 (1M context) --- python/python/lancedb/__init__.py | 59 ++++++++++++++++++++++++++++++ python/python/lancedb/db.py | 29 +++++++++++++++ python/python/lancedb/namespace.py | 29 +++++++++++++++ 3 files changed, 117 insertions(+) diff --git a/python/python/lancedb/__init__.py b/python/python/lancedb/__init__.py index bb6c65d56..4ce6b7594 100644 --- a/python/python/lancedb/__init__.py +++ b/python/python/lancedb/__init__.py @@ -215,6 +215,65 @@ def connect( ) +WORKER_URI_KEY = "worker_uri" + + +def from_serialized_json( + json_str: str, + *, + for_worker: bool = False, +) -> DBConnection: + """Reconstruct a DBConnection from a JSON string. + + The JSON string must have been produced by + :meth:`DBConnection.serialize_to_json`. + + Parameters + ---------- + json_str : str + JSON string produced by ``serialize_to_json()``. + for_worker : bool, default False + When ``True`` and the serialized connection contains a + ``worker_uri`` key in its namespace properties, the worker URI + replaces the primary ``uri`` before the connection is created. + This allows remote workers to connect via an internal endpoint. + + Returns + ------- + DBConnection + A new connection matching the serialized state. + """ + import json + + data = json.loads(json_str) + connection_type = data.get("connection_type") + + rci_secs = data.get("read_consistency_interval_seconds") + rci = timedelta(seconds=rci_secs) if rci_secs is not None else None + storage_options = data.get("storage_options") + + if connection_type == "namespace": + props = dict(data.get("namespace_client_properties") or {}) + if for_worker and WORKER_URI_KEY in props: + worker_uri = props.pop(WORKER_URI_KEY) + props["uri"] = worker_uri + return connect_namespace( + namespace_client_impl=data["namespace_client_impl"], + namespace_client_properties=props, + read_consistency_interval=rci, + storage_options=storage_options, + namespace_client_pushdown_operations=data.get("pushdown_operations"), + ) + elif connection_type == "local": + return LanceDBConnection( + data["uri"], + read_consistency_interval=rci, + storage_options=storage_options, + ) + else: + raise ValueError(f"Unknown connection_type: {connection_type}") + + async def connect_async( uri: URI, *, diff --git a/python/python/lancedb/db.py b/python/python/lancedb/db.py index 869f1481f..2a9194cfd 100644 --- a/python/python/lancedb/db.py +++ b/python/python/lancedb/db.py @@ -529,6 +529,21 @@ class DBConnection(EnforceOverrides): "namespace_client is not supported for this connection type" ) + def serialize_to_json(self) -> str: + """Serialize this connection to a JSON string for reconstruction. + + The returned JSON can be passed to :func:`from_serialized_json` + to recreate an equivalent connection, e.g. in a remote worker. + + Returns + ------- + str + JSON string representation of this connection. + """ + raise NotImplementedError( + "serialize_to_json is not supported for this connection type" + ) + class LanceDBConnection(DBConnection): """ @@ -652,6 +667,20 @@ class LanceDBConnection(DBConnection): val += ")" return val + @override + def serialize_to_json(self) -> str: + import json + + rci = self.read_consistency_interval + return json.dumps({ + "connection_type": "local", + "uri": self.uri, + "storage_options": self.storage_options, + "read_consistency_interval_seconds": ( + rci.total_seconds() if rci else None + ), + }) + async def _async_get_table_names(self, start_after: Optional[str], limit: int): conn = AsyncConnection(await lancedb_connect(self.uri)) return await conn.table_names(start_after=start_after, limit=limit) diff --git a/python/python/lancedb/namespace.py b/python/python/lancedb/namespace.py index 55df0c82b..4fdf8dccc 100644 --- a/python/python/lancedb/namespace.py +++ b/python/python/lancedb/namespace.py @@ -381,6 +381,8 @@ class LanceNamespaceDBConnection(DBConnection): storage_options: Optional[Dict[str, str]] = None, session: Optional[Session] = None, namespace_client_pushdown_operations: Optional[List[str]] = None, + namespace_client_impl: Optional[str] = None, + namespace_client_properties: Optional[Dict[str, str]] = None, ): """ Initialize a namespace-based LanceDB connection. @@ -406,12 +408,37 @@ class LanceNamespaceDBConnection(DBConnection): namespace.create_table() instead of using declare_table + local write. Default is None (no pushdown, all operations run locally). + namespace_client_impl : Optional[str] + The namespace implementation name used to create this connection. + Stored for serialization purposes. + namespace_client_properties : Optional[Dict[str, str]] + The namespace properties used to create this connection. + Stored for serialization purposes. """ self._namespace_client = namespace_client self.read_consistency_interval = read_consistency_interval self.storage_options = storage_options or {} self.session = session self._pushdown_operations = set(namespace_client_pushdown_operations or []) + self._namespace_client_impl = namespace_client_impl + self._namespace_client_properties = namespace_client_properties + + @override + def serialize_to_json(self) -> str: + import json + + return json.dumps({ + "connection_type": "namespace", + "namespace_client_impl": self._namespace_client_impl, + "namespace_client_properties": self._namespace_client_properties, + "pushdown_operations": sorted(self._pushdown_operations), + "storage_options": self.storage_options or None, + "read_consistency_interval_seconds": ( + self.read_consistency_interval.total_seconds() + if self.read_consistency_interval + else None + ), + }) @override def table_names( @@ -1472,6 +1499,8 @@ def connect_namespace( storage_options=storage_options, session=session, namespace_client_pushdown_operations=namespace_client_pushdown_operations, + namespace_client_impl=namespace_client_impl, + namespace_client_properties=namespace_client_properties, )