client: split create_view into create_materialized_view; return job handles

- create_materialized_view now takes either query= or source+select (folds in
  the old create_view builder) and returns a MaterializedView handle whose
  .wait() blocks on initial population. create_view is removed -- it was
  misnamed (it built a *materialized* view, while CREATE VIEW means the plain
  non-materialized view the engine also supports).
- MaterializedView.refresh() and the remote Table.refresh_column() now return a
  JobHandle directly, so tbl.refresh_column("c").wait() needs no db.job(...)
  wrapper. db.job(id) is narrowed to reconnect-by-id (stored id / SQL / REST).
- rename View/AsyncView -> MaterializedView/AsyncMaterializedView (+ exports).
- tighten the replace path: only a not-found error on the pre-drop is benign;
  real failures (perms/server) now surface instead of being swallowed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Wyatt Alt
2026-06-16 05:42:44 -07:00
committed by Jack Ye
parent 5810974b37
commit 3d92106394
4 changed files with 161 additions and 105 deletions

View File

@@ -17,7 +17,15 @@ from .db import AsyncConnection, DBConnection, LanceDBConnection
from .remote import ClientConfig
from .remote.db import RemoteDBConnection
from .expr import Expr, col, lit, func
from .udf import udf, table_udf, Udf, JobHandle, View, AsyncJobHandle, AsyncView
from .udf import (
udf,
table_udf,
Udf,
JobHandle,
MaterializedView,
AsyncJobHandle,
AsyncMaterializedView,
)
from .schema import vector
from .table import AsyncTable, Table
from ._lancedb import Session
@@ -453,9 +461,9 @@ __all__ = [
"table_udf",
"Udf",
"JobHandle",
"View",
"MaterializedView",
"AsyncJobHandle",
"AsyncView",
"AsyncMaterializedView",
"connect",
"connect_async",
"connect_namespace",

View File

@@ -65,6 +65,7 @@ if TYPE_CHECKING:
from .common import DATA, URI
from .embeddings import EmbeddingFunctionConfig
from ._lancedb import Session
from .udf import MaterializedView, AsyncMaterializedView
from .namespace_utils import (
_normalize_create_namespace_mode,
@@ -631,18 +632,28 @@ class DBConnection(EnforceOverrides):
def create_materialized_view(
self,
name: str,
query: str,
source=None,
select=None,
*,
query: Optional[str] = None,
where: Optional[str] = None,
auto_refresh: bool = False,
with_no_data: bool = False,
replace: bool = False,
partition_by: Optional[str] = None,
) -> Optional[str]:
"""Create a materialized view (CREATE MATERIALIZED VIEW).
) -> "MaterializedView":
"""Create a materialized view (CREATE MATERIALIZED VIEW); returns a
`MaterializedView` handle (``.wait()`` blocks until it is populated).
`query` is the view's SELECT statement, e.g.
"SELECT id, embed(body) AS vec FROM articles WHERE id > 1".
Returns the initial-population job id, or None when
with_no_data=True.
Two ways to specify the view body:
- ergonomic: pass ``source`` (a table name or table) and ``select``
items -- column names, expression strings ("embed(body)"),
(alias, expression) tuples, or ``@udf`` / ``@table_udf`` objects.
The SELECT is assembled and parsed server-side (one parser, shared
with SQL).
- raw: pass ``query=`` with a full SELECT, e.g.
"SELECT id, embed(body) AS vec FROM articles WHERE id > 1".
`partition_by` partitions the view's (single) table function on a source
column. If that column has an IVF vector index the server partitions by
@@ -650,7 +661,20 @@ class DBConnection(EnforceOverrides):
value. (Geneva's `partition_by` and `partition_by_indexed_column` unify
here -- the engine picks the strategy from the column.)
"""
return LOOP.run(
from .udf import build_view_query, MaterializedView
if query is None:
if source is None or select is None:
raise ValueError(
"create_materialized_view needs either query= or both "
"source and select"
)
query = build_view_query(source, select)
if where:
query += f" WHERE {where}"
if replace:
self._drop_view_if_exists(name)
job_id = LOOP.run(
self._conn.create_materialized_view(
name,
query,
@@ -659,49 +683,24 @@ class DBConnection(EnforceOverrides):
partition_by=partition_by,
)
)
return MaterializedView(self, name, job_id=job_id)
def create_view(
self,
name: str,
source,
select,
*,
where: Optional[str] = None,
auto_refresh: bool = False,
replace: bool = False,
partition_by: Optional[str] = None,
):
"""Create a materialized view from a source and select items, and
return a `View` handle.
`source` is a table name or table; `select` items are column names,
expression strings ("embed(body)"), (alias, expression) tuples, or
``@udf`` / ``@table_udf`` objects. Sugar over create_materialized_view:
it assembles the SELECT, which the server parses (one parser, shared
with SQL).
`partition_by` partitions the view's table function on a source column;
the server partitions by index clusters if that column is IVF-indexed,
else by distinct value (see create_materialized_view).
"""
from .udf import build_view_query, View
query = build_view_query(source, select)
if where:
query += f" WHERE {where}"
if replace:
try:
self.drop_materialized_view(name)
except Exception:
pass
self.create_materialized_view(
name, query, auto_refresh=auto_refresh, partition_by=partition_by
)
return View(self, name)
def _drop_view_if_exists(self, name: str) -> None:
# `replace=True` is "drop if present"; only a not-found error is
# benign here. Anything else (perms, server fault) must surface rather
# than be masked by a later create failure.
try:
self.drop_materialized_view(name)
except Exception as e:
msg = str(e).lower()
if "not found" not in msg and "does not exist" not in msg:
raise
def job(self, job_id: str):
"""A `JobHandle` for polling/cancelling an inflight job by id (e.g.
``db.job(tbl.refresh_column("c")).wait()``)."""
"""A `JobHandle` for reconnecting to an inflight job by id -- e.g. an
id you stored, or one returned from the SQL / REST surface. Submit
methods (`refresh_column`, `MaterializedView.refresh`) already return a
handle directly, so you do not need this to wait on a fresh submission."""
from .udf import JobHandle
return JobHandle(self, job_id)
@@ -2043,54 +2042,54 @@ class AsyncConnection(object):
async def create_materialized_view(
self,
name: str,
query: str,
source=None,
select=None,
*,
query: Optional[str] = None,
where: Optional[str] = None,
auto_refresh: bool = False,
with_no_data: bool = False,
replace: bool = False,
partition_by: Optional[str] = None,
) -> Optional[str]:
"""Create a materialized view; returns the initial-population
job id, or None when with_no_data=True. `partition_by` partitions the
view's table function on a source column (index-cluster if the column is
IVF-indexed, else distinct-value); see the sync method."""
return await self._inner.create_materialized_view(
) -> "AsyncMaterializedView":
"""Create a materialized view; returns an `AsyncMaterializedView`
handle (``.wait()`` blocks until populated). Pass either ``query=`` (a
full SELECT) or ``source`` + ``select`` items; `partition_by`
partitions the view's table function on a source column (index-cluster
if the column is IVF-indexed, else distinct-value). See the sync
method for the select grammar."""
from .udf import build_view_query, AsyncMaterializedView
if query is None:
if source is None or select is None:
raise ValueError(
"create_materialized_view needs either query= or both "
"source and select"
)
query = build_view_query(source, select)
if where:
query += f" WHERE {where}"
if replace:
try:
await self.drop_materialized_view(name)
except Exception as e:
msg = str(e).lower()
if "not found" not in msg and "does not exist" not in msg:
raise
job_id = await self._inner.create_materialized_view(
name,
query,
auto_refresh=auto_refresh,
with_no_data=with_no_data,
partition_by=partition_by,
)
async def create_view(
self,
name: str,
source,
select,
*,
where: Optional[str] = None,
auto_refresh: bool = False,
replace: bool = False,
partition_by: Optional[str] = None,
):
"""Create a materialized view from a source + select items; returns
an `AsyncView`. See the sync `create_view` for the select grammar."""
from .udf import build_view_query, AsyncView
query = build_view_query(source, select)
if where:
query += f" WHERE {where}"
if replace:
try:
await self.drop_materialized_view(name)
except Exception:
pass
await self.create_materialized_view(
name, query, auto_refresh=auto_refresh, partition_by=partition_by
)
return AsyncView(self, name)
return AsyncMaterializedView(self, name, job_id=job_id)
def job(self, job_id: str):
"""An `AsyncJobHandle` for polling/cancelling an inflight job by id."""
"""An `AsyncJobHandle` for reconnecting to an inflight job by id (a
stored id, or one from the SQL / REST surface). Submit methods already
return a handle, so this is only needed to re-attach to an existing
job."""
from .udf import AsyncJobHandle
return AsyncJobHandle(self, job_id)

View File

@@ -13,10 +13,14 @@ from typing import (
Iterable,
List,
Optional,
TYPE_CHECKING,
Union,
Literal,
overload,
)
if TYPE_CHECKING:
from ..udf import JobHandle
import warnings
from lancedb import __version__
@@ -906,22 +910,25 @@ class RemoteTable(Table):
max_workers: Optional[int] = None,
batch_size: Optional[int] = None,
priority: Optional[str] = None,
) -> str:
) -> "JobHandle":
"""Trigger recompute of computed columns (REFRESH COLUMN).
The expression is resolved server-side from each column's stored
binding; columns bound to the same struct-returning function
refresh together. Returns the refresh job id. Server-backed
feature (LanceDB Enterprise / Cloud).
refresh together. Returns a `JobHandle` to wait on, poll, or cancel
(``tbl.refresh_column("c").wait()``). Server-backed feature
(LanceDB Enterprise / Cloud).
num_workers / max_workers / batch_size / priority are per-refresh
scheduling knobs (how to run THIS refresh) and override any default
the function carries. `priority` is a Kueue tier
(training | interactive | backfill).
"""
from ..udf import JobHandle
if isinstance(columns, str):
columns = [columns]
return LOOP.run(
job_id = LOOP.run(
self._table.refresh_column(
list(columns),
where=where,
@@ -931,6 +938,19 @@ class RemoteTable(Table):
priority=priority,
)
)
return JobHandle(self._job_conn(), job_id)
def _job_conn(self):
"""A client connection for polling jobs this table spawns. Built lazily
from the table's serialized connection state and cached (not pickled --
a forked/unpickled table rebuilds it on next use)."""
from lancedb import deserialize_conn
conn = getattr(self, "_job_conn_cache", None)
if conn is None:
conn = deserialize_conn(self._serialized_connection_state())
self._job_conn_cache = conn
return conn
def load_columns(
self,

View File

@@ -19,8 +19,8 @@ Register and use them through the existing connection/table API:
db.create_function(embed) # CREATE FUNCTION (once)
tbl = db.open_table("docs")
tbl.add_columns(computed={"vec": embed("text")}) # bind embed(text) -> vec
db.job(tbl.refresh_column("vec")).wait() # materialize
view = db.create_view("chunks", tbl, ["id", chunk_fn])
tbl.refresh_column("vec").wait() # materialize (returns a JobHandle)
view = db.create_materialized_view("chunks", tbl, ["id", chunk_fn])
`embed("text")` applies the registered function to the `text` column and yields
the expression `embed(text)`; the function itself stays decoupled from any
@@ -490,22 +490,38 @@ def _job_id_matches(handle_id: str, listed_id: str) -> bool:
return len(prefix) >= 4 and prefix in listed_id
class View:
"""A reference to a materialized view (name + connection). View
operations are server-backed connection calls bound to the name."""
class MaterializedView:
"""A reference to a materialized view (name + connection). Operations are
server-backed connection calls bound to the name.
def __init__(self, conn, name: str):
``create_materialized_view`` returns one of these; ``job_id`` is the
initial-population job (None when the view was created with no data), so
``db.create_materialized_view(...).wait()`` blocks until it is populated.
"""
def __init__(self, conn, name: str, job_id: "str | None" = None):
self.conn = conn
self.name = name
#: initial-population job id from create, or None (with_no_data).
self.job_id = job_id
def refresh(self, full: bool = False):
"""Refresh the materialized view; returns the refresh job id.
def wait(self, timeout: float = 3600.0, poll: float = 2.0) -> str:
"""Block until the initial-population job (from create) finishes.
A no-op when the view was created with no data."""
if self.job_id is None:
return "finished"
return JobHandle(self.conn, self.job_id).wait(timeout=timeout, poll=poll)
def refresh(self, full: bool = False) -> "JobHandle":
"""Refresh the materialized view; returns a `JobHandle` to wait on,
poll, or cancel (``view.refresh().wait()``).
``full=True`` forces a full rebuild (recompute and replace every row)
instead of the default incremental refresh. A full rebuild preserves
the view's indexes -- they are reindexed by the distributed indexer.
"""
return self.conn.refresh_materialized_view(self.name, full=full)
job_id = self.conn.refresh_materialized_view(self.name, full=full)
return JobHandle(self.conn, job_id)
def explain_refresh(self, full: bool = False):
"""Plan a refresh without running it (EXPLAIN REFRESH)."""
@@ -607,20 +623,33 @@ class JobHandle:
self.conn.cancel_job(job.job_id if job is not None else self.id)
class AsyncView:
class AsyncMaterializedView:
"""Async reference to a materialized view (name + async connection)."""
def __init__(self, conn, name: str):
def __init__(self, conn, name: str, job_id: "str | None" = None):
self.conn = conn
self.name = name
#: initial-population job id from create, or None (with_no_data).
self.job_id = job_id
async def refresh(self, full: bool = False):
"""Refresh the materialized view; returns the refresh job id.
async def wait(self, timeout: float = 3600.0, poll: float = 2.0) -> str:
"""Block until the initial-population job (from create) finishes.
A no-op when the view was created with no data."""
if self.job_id is None:
return "finished"
return await AsyncJobHandle(self.conn, self.job_id).wait(
timeout=timeout, poll=poll
)
async def refresh(self, full: bool = False) -> "AsyncJobHandle":
"""Refresh the materialized view; returns an `AsyncJobHandle` to wait
on, poll, or cancel.
``full=True`` forces a full rebuild instead of an incremental refresh
(indexes are preserved and reindexed by the distributed indexer).
"""
return await self.conn.refresh_materialized_view(self.name, full=full)
job_id = await self.conn.refresh_materialized_view(self.name, full=full)
return AsyncJobHandle(self.conn, job_id)
async def explain_refresh(self, full: bool = False):
return await self.conn.explain_refresh_materialized_view(self.name, full=full)