From 3d921063940215e5f56f8be0817d375d3b81849a Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Tue, 16 Jun 2026 05:42:44 -0700 Subject: [PATCH] client: split create_view into create_materialized_view; return job handles - create_materialized_view now takes either query= or source+select (folds in the old create_view builder) and returns a MaterializedView handle whose .wait() blocks on initial population. create_view is removed -- it was misnamed (it built a *materialized* view, while CREATE VIEW means the plain non-materialized view the engine also supports). - MaterializedView.refresh() and the remote Table.refresh_column() now return a JobHandle directly, so tbl.refresh_column("c").wait() needs no db.job(...) wrapper. db.job(id) is narrowed to reconnect-by-id (stored id / SQL / REST). - rename View/AsyncView -> MaterializedView/AsyncMaterializedView (+ exports). - tighten the replace path: only a not-found error on the pre-drop is benign; real failures (perms/server) now surface instead of being swallowed. Co-Authored-By: Claude Opus 4.8 (1M context) --- python/python/lancedb/__init__.py | 14 ++- python/python/lancedb/db.py | 167 +++++++++++++------------- python/python/lancedb/remote/table.py | 28 ++++- python/python/lancedb/udf.py | 57 ++++++--- 4 files changed, 161 insertions(+), 105 deletions(-) diff --git a/python/python/lancedb/__init__.py b/python/python/lancedb/__init__.py index 2a3b67c93..79f2245a8 100644 --- a/python/python/lancedb/__init__.py +++ b/python/python/lancedb/__init__.py @@ -17,7 +17,15 @@ from .db import AsyncConnection, DBConnection, LanceDBConnection from .remote import ClientConfig from .remote.db import RemoteDBConnection from .expr import Expr, col, lit, func -from .udf import udf, table_udf, Udf, JobHandle, View, AsyncJobHandle, AsyncView +from .udf import ( + udf, + table_udf, + Udf, + JobHandle, + MaterializedView, + AsyncJobHandle, + AsyncMaterializedView, +) from .schema import vector from .table import AsyncTable, Table from ._lancedb import Session @@ -453,9 +461,9 @@ __all__ = [ "table_udf", "Udf", "JobHandle", - "View", + "MaterializedView", "AsyncJobHandle", - "AsyncView", + "AsyncMaterializedView", "connect", "connect_async", "connect_namespace", diff --git a/python/python/lancedb/db.py b/python/python/lancedb/db.py index 070842670..a46851e9c 100644 --- a/python/python/lancedb/db.py +++ b/python/python/lancedb/db.py @@ -65,6 +65,7 @@ if TYPE_CHECKING: from .common import DATA, URI from .embeddings import EmbeddingFunctionConfig from ._lancedb import Session + from .udf import MaterializedView, AsyncMaterializedView from .namespace_utils import ( _normalize_create_namespace_mode, @@ -631,18 +632,28 @@ class DBConnection(EnforceOverrides): def create_materialized_view( self, name: str, - query: str, + source=None, + select=None, *, + query: Optional[str] = None, + where: Optional[str] = None, auto_refresh: bool = False, with_no_data: bool = False, + replace: bool = False, partition_by: Optional[str] = None, - ) -> Optional[str]: - """Create a materialized view (CREATE MATERIALIZED VIEW). + ) -> "MaterializedView": + """Create a materialized view (CREATE MATERIALIZED VIEW); returns a + `MaterializedView` handle (``.wait()`` blocks until it is populated). - `query` is the view's SELECT statement, e.g. - "SELECT id, embed(body) AS vec FROM articles WHERE id > 1". - Returns the initial-population job id, or None when - with_no_data=True. + Two ways to specify the view body: + + - ergonomic: pass ``source`` (a table name or table) and ``select`` + items -- column names, expression strings ("embed(body)"), + (alias, expression) tuples, or ``@udf`` / ``@table_udf`` objects. + The SELECT is assembled and parsed server-side (one parser, shared + with SQL). + - raw: pass ``query=`` with a full SELECT, e.g. + "SELECT id, embed(body) AS vec FROM articles WHERE id > 1". `partition_by` partitions the view's (single) table function on a source column. If that column has an IVF vector index the server partitions by @@ -650,7 +661,20 @@ class DBConnection(EnforceOverrides): value. (Geneva's `partition_by` and `partition_by_indexed_column` unify here -- the engine picks the strategy from the column.) """ - return LOOP.run( + from .udf import build_view_query, MaterializedView + + if query is None: + if source is None or select is None: + raise ValueError( + "create_materialized_view needs either query= or both " + "source and select" + ) + query = build_view_query(source, select) + if where: + query += f" WHERE {where}" + if replace: + self._drop_view_if_exists(name) + job_id = LOOP.run( self._conn.create_materialized_view( name, query, @@ -659,49 +683,24 @@ class DBConnection(EnforceOverrides): partition_by=partition_by, ) ) + return MaterializedView(self, name, job_id=job_id) - def create_view( - self, - name: str, - source, - select, - *, - where: Optional[str] = None, - auto_refresh: bool = False, - replace: bool = False, - partition_by: Optional[str] = None, - ): - """Create a materialized view from a source and select items, and - return a `View` handle. - - `source` is a table name or table; `select` items are column names, - expression strings ("embed(body)"), (alias, expression) tuples, or - ``@udf`` / ``@table_udf`` objects. Sugar over create_materialized_view: - it assembles the SELECT, which the server parses (one parser, shared - with SQL). - - `partition_by` partitions the view's table function on a source column; - the server partitions by index clusters if that column is IVF-indexed, - else by distinct value (see create_materialized_view). - """ - from .udf import build_view_query, View - - query = build_view_query(source, select) - if where: - query += f" WHERE {where}" - if replace: - try: - self.drop_materialized_view(name) - except Exception: - pass - self.create_materialized_view( - name, query, auto_refresh=auto_refresh, partition_by=partition_by - ) - return View(self, name) + def _drop_view_if_exists(self, name: str) -> None: + # `replace=True` is "drop if present"; only a not-found error is + # benign here. Anything else (perms, server fault) must surface rather + # than be masked by a later create failure. + try: + self.drop_materialized_view(name) + except Exception as e: + msg = str(e).lower() + if "not found" not in msg and "does not exist" not in msg: + raise def job(self, job_id: str): - """A `JobHandle` for polling/cancelling an inflight job by id (e.g. - ``db.job(tbl.refresh_column("c")).wait()``).""" + """A `JobHandle` for reconnecting to an inflight job by id -- e.g. an + id you stored, or one returned from the SQL / REST surface. Submit + methods (`refresh_column`, `MaterializedView.refresh`) already return a + handle directly, so you do not need this to wait on a fresh submission.""" from .udf import JobHandle return JobHandle(self, job_id) @@ -2043,54 +2042,54 @@ class AsyncConnection(object): async def create_materialized_view( self, name: str, - query: str, + source=None, + select=None, *, + query: Optional[str] = None, + where: Optional[str] = None, auto_refresh: bool = False, with_no_data: bool = False, + replace: bool = False, partition_by: Optional[str] = None, - ) -> Optional[str]: - """Create a materialized view; returns the initial-population - job id, or None when with_no_data=True. `partition_by` partitions the - view's table function on a source column (index-cluster if the column is - IVF-indexed, else distinct-value); see the sync method.""" - return await self._inner.create_materialized_view( + ) -> "AsyncMaterializedView": + """Create a materialized view; returns an `AsyncMaterializedView` + handle (``.wait()`` blocks until populated). Pass either ``query=`` (a + full SELECT) or ``source`` + ``select`` items; `partition_by` + partitions the view's table function on a source column (index-cluster + if the column is IVF-indexed, else distinct-value). See the sync + method for the select grammar.""" + from .udf import build_view_query, AsyncMaterializedView + + if query is None: + if source is None or select is None: + raise ValueError( + "create_materialized_view needs either query= or both " + "source and select" + ) + query = build_view_query(source, select) + if where: + query += f" WHERE {where}" + if replace: + try: + await self.drop_materialized_view(name) + except Exception as e: + msg = str(e).lower() + if "not found" not in msg and "does not exist" not in msg: + raise + job_id = await self._inner.create_materialized_view( name, query, auto_refresh=auto_refresh, with_no_data=with_no_data, partition_by=partition_by, ) - - async def create_view( - self, - name: str, - source, - select, - *, - where: Optional[str] = None, - auto_refresh: bool = False, - replace: bool = False, - partition_by: Optional[str] = None, - ): - """Create a materialized view from a source + select items; returns - an `AsyncView`. See the sync `create_view` for the select grammar.""" - from .udf import build_view_query, AsyncView - - query = build_view_query(source, select) - if where: - query += f" WHERE {where}" - if replace: - try: - await self.drop_materialized_view(name) - except Exception: - pass - await self.create_materialized_view( - name, query, auto_refresh=auto_refresh, partition_by=partition_by - ) - return AsyncView(self, name) + return AsyncMaterializedView(self, name, job_id=job_id) def job(self, job_id: str): - """An `AsyncJobHandle` for polling/cancelling an inflight job by id.""" + """An `AsyncJobHandle` for reconnecting to an inflight job by id (a + stored id, or one from the SQL / REST surface). Submit methods already + return a handle, so this is only needed to re-attach to an existing + job.""" from .udf import AsyncJobHandle return AsyncJobHandle(self, job_id) diff --git a/python/python/lancedb/remote/table.py b/python/python/lancedb/remote/table.py index 9c909d341..9c26afd5a 100644 --- a/python/python/lancedb/remote/table.py +++ b/python/python/lancedb/remote/table.py @@ -13,10 +13,14 @@ from typing import ( Iterable, List, Optional, + TYPE_CHECKING, Union, Literal, overload, ) + +if TYPE_CHECKING: + from ..udf import JobHandle import warnings from lancedb import __version__ @@ -906,22 +910,25 @@ class RemoteTable(Table): max_workers: Optional[int] = None, batch_size: Optional[int] = None, priority: Optional[str] = None, - ) -> str: + ) -> "JobHandle": """Trigger recompute of computed columns (REFRESH COLUMN). The expression is resolved server-side from each column's stored binding; columns bound to the same struct-returning function - refresh together. Returns the refresh job id. Server-backed - feature (LanceDB Enterprise / Cloud). + refresh together. Returns a `JobHandle` to wait on, poll, or cancel + (``tbl.refresh_column("c").wait()``). Server-backed feature + (LanceDB Enterprise / Cloud). num_workers / max_workers / batch_size / priority are per-refresh scheduling knobs (how to run THIS refresh) and override any default the function carries. `priority` is a Kueue tier (training | interactive | backfill). """ + from ..udf import JobHandle + if isinstance(columns, str): columns = [columns] - return LOOP.run( + job_id = LOOP.run( self._table.refresh_column( list(columns), where=where, @@ -931,6 +938,19 @@ class RemoteTable(Table): priority=priority, ) ) + return JobHandle(self._job_conn(), job_id) + + def _job_conn(self): + """A client connection for polling jobs this table spawns. Built lazily + from the table's serialized connection state and cached (not pickled -- + a forked/unpickled table rebuilds it on next use).""" + from lancedb import deserialize_conn + + conn = getattr(self, "_job_conn_cache", None) + if conn is None: + conn = deserialize_conn(self._serialized_connection_state()) + self._job_conn_cache = conn + return conn def load_columns( self, diff --git a/python/python/lancedb/udf.py b/python/python/lancedb/udf.py index f392df293..2da5b97d6 100644 --- a/python/python/lancedb/udf.py +++ b/python/python/lancedb/udf.py @@ -19,8 +19,8 @@ Register and use them through the existing connection/table API: db.create_function(embed) # CREATE FUNCTION (once) tbl = db.open_table("docs") tbl.add_columns(computed={"vec": embed("text")}) # bind embed(text) -> vec - db.job(tbl.refresh_column("vec")).wait() # materialize - view = db.create_view("chunks", tbl, ["id", chunk_fn]) + tbl.refresh_column("vec").wait() # materialize (returns a JobHandle) + view = db.create_materialized_view("chunks", tbl, ["id", chunk_fn]) `embed("text")` applies the registered function to the `text` column and yields the expression `embed(text)`; the function itself stays decoupled from any @@ -490,22 +490,38 @@ def _job_id_matches(handle_id: str, listed_id: str) -> bool: return len(prefix) >= 4 and prefix in listed_id -class View: - """A reference to a materialized view (name + connection). View - operations are server-backed connection calls bound to the name.""" +class MaterializedView: + """A reference to a materialized view (name + connection). Operations are + server-backed connection calls bound to the name. - def __init__(self, conn, name: str): + ``create_materialized_view`` returns one of these; ``job_id`` is the + initial-population job (None when the view was created with no data), so + ``db.create_materialized_view(...).wait()`` blocks until it is populated. + """ + + def __init__(self, conn, name: str, job_id: "str | None" = None): self.conn = conn self.name = name + #: initial-population job id from create, or None (with_no_data). + self.job_id = job_id - def refresh(self, full: bool = False): - """Refresh the materialized view; returns the refresh job id. + def wait(self, timeout: float = 3600.0, poll: float = 2.0) -> str: + """Block until the initial-population job (from create) finishes. + A no-op when the view was created with no data.""" + if self.job_id is None: + return "finished" + return JobHandle(self.conn, self.job_id).wait(timeout=timeout, poll=poll) + + def refresh(self, full: bool = False) -> "JobHandle": + """Refresh the materialized view; returns a `JobHandle` to wait on, + poll, or cancel (``view.refresh().wait()``). ``full=True`` forces a full rebuild (recompute and replace every row) instead of the default incremental refresh. A full rebuild preserves the view's indexes -- they are reindexed by the distributed indexer. """ - return self.conn.refresh_materialized_view(self.name, full=full) + job_id = self.conn.refresh_materialized_view(self.name, full=full) + return JobHandle(self.conn, job_id) def explain_refresh(self, full: bool = False): """Plan a refresh without running it (EXPLAIN REFRESH).""" @@ -607,20 +623,33 @@ class JobHandle: self.conn.cancel_job(job.job_id if job is not None else self.id) -class AsyncView: +class AsyncMaterializedView: """Async reference to a materialized view (name + async connection).""" - def __init__(self, conn, name: str): + def __init__(self, conn, name: str, job_id: "str | None" = None): self.conn = conn self.name = name + #: initial-population job id from create, or None (with_no_data). + self.job_id = job_id - async def refresh(self, full: bool = False): - """Refresh the materialized view; returns the refresh job id. + async def wait(self, timeout: float = 3600.0, poll: float = 2.0) -> str: + """Block until the initial-population job (from create) finishes. + A no-op when the view was created with no data.""" + if self.job_id is None: + return "finished" + return await AsyncJobHandle(self.conn, self.job_id).wait( + timeout=timeout, poll=poll + ) + + async def refresh(self, full: bool = False) -> "AsyncJobHandle": + """Refresh the materialized view; returns an `AsyncJobHandle` to wait + on, poll, or cancel. ``full=True`` forces a full rebuild instead of an incremental refresh (indexes are preserved and reindexed by the distributed indexer). """ - return await self.conn.refresh_materialized_view(self.name, full=full) + job_id = await self.conn.refresh_materialized_view(self.name, full=full) + return AsyncJobHandle(self.conn, job_id) async def explain_refresh(self, full: bool = False): return await self.conn.explain_refresh_materialized_view(self.name, full=full)