diff --git a/python/python/lancedb/__init__.py b/python/python/lancedb/__init__.py index 79f2245a8..c34196e1d 100644 --- a/python/python/lancedb/__init__.py +++ b/python/python/lancedb/__init__.py @@ -26,6 +26,7 @@ from .udf import ( AsyncJobHandle, AsyncMaterializedView, ) +from .lineage import Lineage, Node, Edge, FunctionRef from .schema import vector from .table import AsyncTable, Table from ._lancedb import Session @@ -464,6 +465,10 @@ __all__ = [ "MaterializedView", "AsyncJobHandle", "AsyncMaterializedView", + "Lineage", + "Node", + "Edge", + "FunctionRef", "connect", "connect_async", "connect_namespace", diff --git a/python/python/lancedb/db.py b/python/python/lancedb/db.py index e0fae6c01..fe967b303 100644 --- a/python/python/lancedb/db.py +++ b/python/python/lancedb/db.py @@ -705,6 +705,24 @@ class DBConnection(EnforceOverrides): return JobHandle(self, job_id) + def lineage( + self, + table: str, + column: Optional[str] = None, + *, + direction: Optional[str] = None, + depth: Optional[int] = None, + ): + """Derived-compute lineage of a table/view, or one of its columns: + upstream sources, downstream dependents, and the function version + + location that produced each derived column (with a drift flag). Returns + a `Lineage`. `direction` is "upstream" | "downstream" | "both" (server + default both); `depth` limits column-hops (transitive when omitted).""" + from .lineage import Lineage + + raw = LOOP.run(self._conn.table_lineage(table, column, direction, depth)) + return Lineage.from_json(raw) + def _refresh_materialized_view( self, name: str, @@ -2097,6 +2115,21 @@ class AsyncConnection(object): return AsyncJobHandle(self, job_id) + async def lineage( + self, + table: str, + column: Optional[str] = None, + *, + direction: Optional[str] = None, + depth: Optional[int] = None, + ): + """Derived-compute lineage of a table/view (or column). See the sync + `Connection.lineage`. Returns a `Lineage`.""" + from .lineage import Lineage + + raw = await self._inner.table_lineage(table, column, direction, depth) + return Lineage.from_json(raw) + async def _refresh_materialized_view( self, name: str, diff --git a/python/python/lancedb/lineage.py b/python/python/lancedb/lineage.py new file mode 100644 index 000000000..eae76c88c --- /dev/null +++ b/python/python/lancedb/lineage.py @@ -0,0 +1,177 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The LanceDB Authors +"""Client-side model of derived-compute lineage. + +`Connection.lineage()` / `Table.lineage()` / `MaterializedView.lineage()` return +a `Lineage`: the graph of what a column or materialized view derives from +(upstream), what derives from it (downstream), and -- for each derived column -- +the function that produced it, the version it was produced with, and whether +that is stale relative to the function the registry now holds. + +The server returns this as JSON (the wire contract); these classes deserialize +it. Nothing here talks to the server. +""" + +from __future__ import annotations + +import json +from dataclasses import dataclass, field +from typing import List, Optional, Union + + +@dataclass +class FunctionRef: + """The function that produced a derived column, with version + location.""" + + name: str + #: Version that produced the data (stamped at compute time), if known. + as_computed_version: Optional[str] = None + #: Version the registry currently holds for this function name. + current_version: Optional[str] = None + #: True when the column was produced by an older function than the registry + #: now holds -- i.e. silently stale; re-refresh to catch up. + stale_vs_current: bool = False + language: Optional[str] = None + docker_image: Optional[str] = None + env_digest: Optional[str] = None + code_uri: Optional[str] = None + + @classmethod + def _from(cls, d: dict) -> "FunctionRef": + return cls( + name=d["name"], + as_computed_version=d.get("as_computed_version"), + current_version=d.get("current_version"), + stale_vs_current=d.get("stale_vs_current", False), + language=d.get("language"), + docker_image=d.get("docker_image"), + env_digest=d.get("env_digest"), + code_uri=d.get("code_uri"), + ) + + +@dataclass +class Node: + """A lineage node: a table, view, column, or function.""" + + kind: str # "table" | "view" | "column" | "function" + id: str # "table", "table.column", or "fn:name@version" + table: Optional[str] = None + function: Optional[FunctionRef] = None + + @classmethod + def _from(cls, d: dict) -> "Node": + fn = d.get("function") + return cls( + kind=d["kind"], + id=d["id"], + table=d.get("table"), + function=FunctionRef._from(fn) if fn else None, + ) + + +@dataclass +class Edge: + """`downstream` depends on `upstream`, produced by `via` (a function name, + or None for a passthrough).""" + + downstream: str + upstream: str + via: Optional[str] = None + + @classmethod + def _from(cls, d: dict) -> "Edge": + return cls(downstream=d["downstream"], upstream=d["upstream"], via=d.get("via")) + + +@dataclass +class Lineage: + """A derived-compute lineage graph (nodes + labeled edges).""" + + target: str + nodes: List[Node] = field(default_factory=list) + edges: List[Edge] = field(default_factory=list) + + @classmethod + def from_json(cls, raw: Union[str, bytes, dict]) -> "Lineage": + d = json.loads(raw) if isinstance(raw, (str, bytes)) else raw + return cls( + target=d.get("target", ""), + nodes=[Node._from(n) for n in d.get("nodes", [])], + edges=[Edge._from(e) for e in d.get("edges", [])], + ) + + def functions(self) -> List[FunctionRef]: + """The function nodes in the graph.""" + return [n.function for n in self.nodes if n.function is not None] + + def stale(self) -> List[FunctionRef]: + """Functions whose as-computed version is behind the current registry + version -- the columns they produced are silently out of date.""" + return [f for f in self.functions() if f.stale_vs_current] + + def to_dict(self) -> dict: + def prune(d: dict) -> dict: + return {k: v for k, v in d.items() if v is not None} + + return { + "target": self.target, + "nodes": [ + prune( + { + "kind": n.kind, + "id": n.id, + "table": n.table, + "function": prune(vars(n.function)) if n.function else None, + } + ) + for n in self.nodes + ], + "edges": [prune(vars(e)) for e in self.edges], + } + + def to_graphviz(self) -> str: + """Graphviz DOT for the lineage DAG: columns/tables as nodes, function + names on edges, drift edges dashed + red.""" + stale_names = {f.name for f in self.stale()} + out = [ + "digraph lineage {", + " rankdir=LR;", + ' node [fontname="monospace"];', + ] + for n in self.nodes: + if n.kind == "function": + continue + shape = "ellipse" if n.kind in ("table", "view") else "box" + out.append(f' "{n.id}" [shape={shape}];') + for e in self.edges: + attrs = "" + if e.via: + if e.via in stale_names: + attrs = f' [label="{e.via}" color=red style=dashed]' + else: + attrs = f' [label="{e.via}"]' + out.append(f' "{e.upstream}" -> "{e.downstream}"{attrs};') + out.append("}") + return "\n".join(out) + + def _repr_html_(self) -> str: + warn = "" + drift = self.stale() + if drift: + names = ", ".join(sorted({f.name for f in drift})) + warn = ( + f'

stale vs current: {names} ' + "(re-refresh to catch up)

" + ) + rows = "".join( + f"{e.downstream}" + f"← {e.via or ''}" + f"{e.upstream}" + for e in self.edges + ) + return ( + f"lineage: {self.target}{warn}" + "" + f"{rows}
derivedviafrom
" + ) diff --git a/python/python/lancedb/remote/table.py b/python/python/lancedb/remote/table.py index 9c26afd5a..b99f8960a 100644 --- a/python/python/lancedb/remote/table.py +++ b/python/python/lancedb/remote/table.py @@ -940,6 +940,15 @@ class RemoteTable(Table): ) return JobHandle(self._job_conn(), job_id) + def lineage(self, column=None, *, direction=None, depth=None): + """Derived-compute lineage of this table, or one of its columns: + upstream sources, downstream dependents, and the function version + + location that produced each derived column (with a drift flag). Returns + a `Lineage`. See `Connection.lineage`.""" + return self._job_conn().lineage( + self._name, column, direction=direction, depth=depth + ) + def _job_conn(self): """A client connection for polling jobs this table spawns. Built lazily from the table's serialized connection state and cached (not pickled -- diff --git a/python/python/lancedb/udf.py b/python/python/lancedb/udf.py index 968aa0561..00a35f6b9 100644 --- a/python/python/lancedb/udf.py +++ b/python/python/lancedb/udf.py @@ -557,6 +557,12 @@ class MaterializedView: """Search the materialized view (vector / FTS / hybrid).""" return self._table().search(*args, **kwargs) + def lineage(self, column=None, *, direction=None, depth=None): + """Lineage of the materialized view (or one of its columns). Delegates + to the backing table; the server already includes the view's sources + and downstream dependents. Returns a `Lineage`.""" + return self._table().lineage(column, direction=direction, depth=depth) + _PROGRESS = re.compile(r"(\d+)/(\d+)") @@ -660,6 +666,12 @@ class AsyncMaterializedView: async def drop(self) -> None: await self.conn.drop_materialized_view(self.name) + async def lineage(self, column=None, *, direction=None, depth=None): + """Lineage of the materialized view (or column). Returns a `Lineage`.""" + return await self.conn.lineage( + self.name, column, direction=direction, depth=depth + ) + class AsyncJobHandle: """Async reference to an inflight server-side job, with polling helpers."""