mirror of
https://github.com/lancedb/lancedb.git
synced 2025-12-23 05:19:58 +00:00
feat: add table stats API (#2363)
* Add a new "table stats" API to expose basic table and fragment statistics with local and remote table implementations ### Questions * This is using `calculate_data_stats` to determine total bytes in the table. This seems like a potentially expensive operation - are there any concerns about performance for large datasets? ### Notes * bytes_on_disk seems to be stored at the column level but there does not seem to be a way to easily calculate total bytes per fragment. This may need to be added in lance before we can support fragment size (bytes) statistics. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added a method to retrieve comprehensive table statistics, including total rows, index counts, storage size, and detailed fragment size metrics such as minimum, maximum, mean, and percentiles. - Enabled fetching of table statistics from remote sources through asynchronous requests. - Extended table interfaces across Python, Rust, and Node.js to support synchronous and asynchronous retrieval of table statistics. - **Tests** - Introduced tests to verify the accuracy of the new table statistics feature for both populated and empty tables. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
8
Cargo.lock
generated
8
Cargo.lock
generated
@@ -4148,7 +4148,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lancedb"
|
||||
version = "0.19.0-beta.11"
|
||||
version = "0.19.1-beta.0"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -4235,7 +4235,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lancedb-node"
|
||||
version = "0.19.0-beta.11"
|
||||
version = "0.19.1-beta.0"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-ipc",
|
||||
@@ -4260,7 +4260,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lancedb-nodejs"
|
||||
version = "0.19.0-beta.11"
|
||||
version = "0.19.1-beta.0"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-ipc",
|
||||
@@ -4279,7 +4279,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lancedb-python"
|
||||
version = "0.22.0-beta.11"
|
||||
version = "0.22.1-beta.0"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"env_logger",
|
||||
|
||||
@@ -615,6 +615,22 @@ of the given query
|
||||
|
||||
***
|
||||
|
||||
### stats()
|
||||
|
||||
```ts
|
||||
abstract stats(): Promise<TableStatistics>
|
||||
```
|
||||
|
||||
Returns table and fragment statistics
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<[`TableStatistics`](../interfaces/TableStatistics.md)>
|
||||
|
||||
The table and fragment statistics
|
||||
|
||||
***
|
||||
|
||||
### tags()
|
||||
|
||||
```ts
|
||||
|
||||
@@ -42,6 +42,8 @@
|
||||
- [ConnectionOptions](interfaces/ConnectionOptions.md)
|
||||
- [CreateTableOptions](interfaces/CreateTableOptions.md)
|
||||
- [ExecutableQuery](interfaces/ExecutableQuery.md)
|
||||
- [FragmentStatistics](interfaces/FragmentStatistics.md)
|
||||
- [FragmentSummaryStats](interfaces/FragmentSummaryStats.md)
|
||||
- [FtsOptions](interfaces/FtsOptions.md)
|
||||
- [FullTextQuery](interfaces/FullTextQuery.md)
|
||||
- [FullTextSearchOptions](interfaces/FullTextSearchOptions.md)
|
||||
@@ -59,6 +61,7 @@
|
||||
- [RemovalStats](interfaces/RemovalStats.md)
|
||||
- [RetryConfig](interfaces/RetryConfig.md)
|
||||
- [TableNamesOptions](interfaces/TableNamesOptions.md)
|
||||
- [TableStatistics](interfaces/TableStatistics.md)
|
||||
- [TimeoutConfig](interfaces/TimeoutConfig.md)
|
||||
- [UpdateOptions](interfaces/UpdateOptions.md)
|
||||
- [Version](interfaces/Version.md)
|
||||
|
||||
37
docs/src/js/interfaces/FragmentStatistics.md
Normal file
37
docs/src/js/interfaces/FragmentStatistics.md
Normal file
@@ -0,0 +1,37 @@
|
||||
[**@lancedb/lancedb**](../README.md) • **Docs**
|
||||
|
||||
***
|
||||
|
||||
[@lancedb/lancedb](../globals.md) / FragmentStatistics
|
||||
|
||||
# Interface: FragmentStatistics
|
||||
|
||||
## Properties
|
||||
|
||||
### lengths
|
||||
|
||||
```ts
|
||||
lengths: FragmentSummaryStats;
|
||||
```
|
||||
|
||||
Statistics on the number of rows in the table fragments
|
||||
|
||||
***
|
||||
|
||||
### numFragments
|
||||
|
||||
```ts
|
||||
numFragments: number;
|
||||
```
|
||||
|
||||
The number of fragments in the table
|
||||
|
||||
***
|
||||
|
||||
### numSmallFragments
|
||||
|
||||
```ts
|
||||
numSmallFragments: number;
|
||||
```
|
||||
|
||||
The number of uncompacted fragments in the table
|
||||
77
docs/src/js/interfaces/FragmentSummaryStats.md
Normal file
77
docs/src/js/interfaces/FragmentSummaryStats.md
Normal file
@@ -0,0 +1,77 @@
|
||||
[**@lancedb/lancedb**](../README.md) • **Docs**
|
||||
|
||||
***
|
||||
|
||||
[@lancedb/lancedb](../globals.md) / FragmentSummaryStats
|
||||
|
||||
# Interface: FragmentSummaryStats
|
||||
|
||||
## Properties
|
||||
|
||||
### max
|
||||
|
||||
```ts
|
||||
max: number;
|
||||
```
|
||||
|
||||
The number of rows in the fragment with the most rows
|
||||
|
||||
***
|
||||
|
||||
### mean
|
||||
|
||||
```ts
|
||||
mean: number;
|
||||
```
|
||||
|
||||
The mean number of rows in the fragments
|
||||
|
||||
***
|
||||
|
||||
### min
|
||||
|
||||
```ts
|
||||
min: number;
|
||||
```
|
||||
|
||||
The number of rows in the fragment with the fewest rows
|
||||
|
||||
***
|
||||
|
||||
### p25
|
||||
|
||||
```ts
|
||||
p25: number;
|
||||
```
|
||||
|
||||
The 25th percentile of number of rows in the fragments
|
||||
|
||||
***
|
||||
|
||||
### p50
|
||||
|
||||
```ts
|
||||
p50: number;
|
||||
```
|
||||
|
||||
The 50th percentile of number of rows in the fragments
|
||||
|
||||
***
|
||||
|
||||
### p75
|
||||
|
||||
```ts
|
||||
p75: number;
|
||||
```
|
||||
|
||||
The 75th percentile of number of rows in the fragments
|
||||
|
||||
***
|
||||
|
||||
### p99
|
||||
|
||||
```ts
|
||||
p99: number;
|
||||
```
|
||||
|
||||
The 99th percentile of number of rows in the fragments
|
||||
47
docs/src/js/interfaces/TableStatistics.md
Normal file
47
docs/src/js/interfaces/TableStatistics.md
Normal file
@@ -0,0 +1,47 @@
|
||||
[**@lancedb/lancedb**](../README.md) • **Docs**
|
||||
|
||||
***
|
||||
|
||||
[@lancedb/lancedb](../globals.md) / TableStatistics
|
||||
|
||||
# Interface: TableStatistics
|
||||
|
||||
## Properties
|
||||
|
||||
### fragmentStats
|
||||
|
||||
```ts
|
||||
fragmentStats: FragmentStatistics;
|
||||
```
|
||||
|
||||
Statistics on table fragments
|
||||
|
||||
***
|
||||
|
||||
### numIndices
|
||||
|
||||
```ts
|
||||
numIndices: number;
|
||||
```
|
||||
|
||||
The number of indices in the table
|
||||
|
||||
***
|
||||
|
||||
### numRows
|
||||
|
||||
```ts
|
||||
numRows: number;
|
||||
```
|
||||
|
||||
The number of rows in the table
|
||||
|
||||
***
|
||||
|
||||
### totalBytes
|
||||
|
||||
```ts
|
||||
totalBytes: number;
|
||||
```
|
||||
|
||||
The total number of bytes in the table
|
||||
@@ -71,6 +71,29 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
|
||||
await expect(table.countRows()).resolves.toBe(3);
|
||||
});
|
||||
|
||||
it("should show table stats", async () => {
|
||||
await table.add([{ id: 1 }, { id: 2 }]);
|
||||
await table.add([{ id: 1 }]);
|
||||
await expect(table.stats()).resolves.toEqual({
|
||||
fragmentStats: {
|
||||
lengths: {
|
||||
max: 2,
|
||||
mean: 1,
|
||||
min: 1,
|
||||
p25: 1,
|
||||
p50: 2,
|
||||
p75: 2,
|
||||
p99: 2,
|
||||
},
|
||||
numFragments: 2,
|
||||
numSmallFragments: 2,
|
||||
},
|
||||
numIndices: 0,
|
||||
numRows: 3,
|
||||
totalBytes: 24,
|
||||
});
|
||||
});
|
||||
|
||||
it("should overwrite data if asked", async () => {
|
||||
await table.add([{ id: 1 }, { id: 2 }]);
|
||||
await table.add([{ id: 1 }], { mode: "overwrite" });
|
||||
|
||||
@@ -23,6 +23,9 @@ export {
|
||||
OptimizeStats,
|
||||
CompactionStats,
|
||||
RemovalStats,
|
||||
TableStatistics,
|
||||
FragmentStatistics,
|
||||
FragmentSummaryStats,
|
||||
Tags,
|
||||
TagContents,
|
||||
} from "./native.js";
|
||||
|
||||
@@ -20,6 +20,7 @@ import {
|
||||
IndexConfig,
|
||||
IndexStatistics,
|
||||
OptimizeStats,
|
||||
TableStatistics,
|
||||
Tags,
|
||||
Table as _NativeTable,
|
||||
} from "./native";
|
||||
@@ -482,6 +483,13 @@ export abstract class Table {
|
||||
* Use {@link Table.listIndices} to find the names of the indices.
|
||||
*/
|
||||
abstract indexStats(name: string): Promise<IndexStatistics | undefined>;
|
||||
|
||||
/** Returns table and fragment statistics
|
||||
*
|
||||
* @returns {TableStatistics} The table and fragment statistics
|
||||
*
|
||||
*/
|
||||
abstract stats(): Promise<TableStatistics>;
|
||||
}
|
||||
|
||||
export class LocalTable extends Table {
|
||||
@@ -775,6 +783,11 @@ export class LocalTable extends Table {
|
||||
}
|
||||
return stats;
|
||||
}
|
||||
|
||||
async stats(): Promise<TableStatistics> {
|
||||
return await this.inner.stats();
|
||||
}
|
||||
|
||||
mergeInsert(on: string | string[]): MergeInsertBuilder {
|
||||
on = Array.isArray(on) ? on : [on];
|
||||
return new MergeInsertBuilder(this.inner.mergeInsert(on), this.schema());
|
||||
|
||||
@@ -157,6 +157,12 @@ impl Table {
|
||||
.default_error()
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn stats(&self) -> Result<TableStatistics> {
|
||||
let stats = self.inner_ref()?.stats().await.default_error()?;
|
||||
Ok(stats.into())
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn update(
|
||||
&self,
|
||||
@@ -555,6 +561,80 @@ impl From<lancedb::index::IndexStatistics> for IndexStatistics {
|
||||
}
|
||||
}
|
||||
|
||||
#[napi(object)]
|
||||
pub struct TableStatistics {
|
||||
/// The total number of bytes in the table
|
||||
pub total_bytes: i64,
|
||||
|
||||
/// The number of rows in the table
|
||||
pub num_rows: i64,
|
||||
|
||||
/// The number of indices in the table
|
||||
pub num_indices: i64,
|
||||
|
||||
/// Statistics on table fragments
|
||||
pub fragment_stats: FragmentStatistics,
|
||||
}
|
||||
|
||||
#[napi(object)]
|
||||
pub struct FragmentStatistics {
|
||||
/// The number of fragments in the table
|
||||
pub num_fragments: i64,
|
||||
|
||||
/// The number of uncompacted fragments in the table
|
||||
pub num_small_fragments: i64,
|
||||
|
||||
/// Statistics on the number of rows in the table fragments
|
||||
pub lengths: FragmentSummaryStats,
|
||||
}
|
||||
|
||||
#[napi(object)]
|
||||
pub struct FragmentSummaryStats {
|
||||
/// The number of rows in the fragment with the fewest rows
|
||||
pub min: i64,
|
||||
|
||||
/// The number of rows in the fragment with the most rows
|
||||
pub max: i64,
|
||||
|
||||
/// The mean number of rows in the fragments
|
||||
pub mean: i64,
|
||||
|
||||
/// The 25th percentile of number of rows in the fragments
|
||||
pub p25: i64,
|
||||
|
||||
/// The 50th percentile of number of rows in the fragments
|
||||
pub p50: i64,
|
||||
|
||||
/// The 75th percentile of number of rows in the fragments
|
||||
pub p75: i64,
|
||||
|
||||
/// The 99th percentile of number of rows in the fragments
|
||||
pub p99: i64,
|
||||
}
|
||||
|
||||
impl From<lancedb::table::TableStatistics> for TableStatistics {
|
||||
fn from(v: lancedb::table::TableStatistics) -> Self {
|
||||
Self {
|
||||
total_bytes: v.total_bytes as i64,
|
||||
num_rows: v.num_rows as i64,
|
||||
num_indices: v.num_indices as i64,
|
||||
fragment_stats: FragmentStatistics {
|
||||
num_fragments: v.fragment_stats.num_fragments as i64,
|
||||
num_small_fragments: v.fragment_stats.num_small_fragments as i64,
|
||||
lengths: FragmentSummaryStats {
|
||||
min: v.fragment_stats.lengths.min as i64,
|
||||
max: v.fragment_stats.lengths.max as i64,
|
||||
mean: v.fragment_stats.lengths.mean as i64,
|
||||
p25: v.fragment_stats.lengths.p25 as i64,
|
||||
p50: v.fragment_stats.lengths.p50 as i64,
|
||||
p75: v.fragment_stats.lengths.p75 as i64,
|
||||
p99: v.fragment_stats.lengths.p99 as i64,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[napi(object)]
|
||||
pub struct Version {
|
||||
pub version: i64,
|
||||
|
||||
@@ -578,6 +578,9 @@ class RemoteTable(Table):
|
||||
):
|
||||
return LOOP.run(self._table.wait_for_index(index_names, timeout))
|
||||
|
||||
def stats(self):
|
||||
return LOOP.run(self._table.stats())
|
||||
|
||||
def uses_v2_manifest_paths(self) -> bool:
|
||||
raise NotImplementedError(
|
||||
"uses_v2_manifest_paths() is not supported on the LanceDB Cloud"
|
||||
|
||||
@@ -739,6 +739,13 @@ class Table(ABC):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def stats(self) -> TableStatistics:
|
||||
"""
|
||||
Retrieve table and fragment statistics.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def create_scalar_index(
|
||||
self,
|
||||
@@ -1876,6 +1883,9 @@ class LanceTable(Table):
|
||||
) -> None:
|
||||
return LOOP.run(self._table.wait_for_index(index_names, timeout))
|
||||
|
||||
def stats(self) -> TableStatistics:
|
||||
return LOOP.run(self._table.stats())
|
||||
|
||||
def create_scalar_index(
|
||||
self,
|
||||
column: str,
|
||||
@@ -3170,6 +3180,12 @@ class AsyncTable:
|
||||
"""
|
||||
await self._inner.wait_for_index(index_names, timeout)
|
||||
|
||||
async def stats(self) -> TableStatistics:
|
||||
"""
|
||||
Retrieve table and fragment statistics.
|
||||
"""
|
||||
return await self._inner.stats()
|
||||
|
||||
async def add(
|
||||
self,
|
||||
data: DATA,
|
||||
@@ -4068,6 +4084,82 @@ class IndexStatistics:
|
||||
return getattr(self, key)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TableStatistics:
|
||||
"""
|
||||
Statistics about a table and fragments.
|
||||
|
||||
Attributes
|
||||
----------
|
||||
total_bytes: int
|
||||
The total number of bytes in the table.
|
||||
num_rows: int
|
||||
The total number of rows in the table.
|
||||
num_indices: int
|
||||
The total number of indices in the table.
|
||||
fragment_stats: FragmentStatistics
|
||||
Statistics about fragments in the table.
|
||||
"""
|
||||
|
||||
total_bytes: int
|
||||
num_rows: int
|
||||
num_indices: int
|
||||
fragment_stats: FragmentStatistics
|
||||
|
||||
|
||||
@dataclass
|
||||
class FragmentStatistics:
|
||||
"""
|
||||
Statistics about fragments.
|
||||
|
||||
Attributes
|
||||
----------
|
||||
num_fragments: int
|
||||
The total number of fragments in the table.
|
||||
num_small_fragments: int
|
||||
The total number of small fragments in the table.
|
||||
Small fragments have low row counts and may need to be compacted.
|
||||
lengths: FragmentSummaryStats
|
||||
Statistics about the number of rows in the table fragments.
|
||||
"""
|
||||
|
||||
num_fragments: int
|
||||
num_small_fragments: int
|
||||
lengths: FragmentSummaryStats
|
||||
|
||||
|
||||
@dataclass
|
||||
class FragmentSummaryStats:
|
||||
"""
|
||||
Statistics about fragments sizes
|
||||
|
||||
Attributes
|
||||
----------
|
||||
min: int
|
||||
The number of rows in the fragment with the fewest rows.
|
||||
max: int
|
||||
The number of rows in the fragment with the most rows.
|
||||
mean: int
|
||||
The mean number of rows in the fragments.
|
||||
p25: int
|
||||
The 25th percentile of number of rows in the fragments.
|
||||
p50: int
|
||||
The 50th percentile of number of rows in the fragments.
|
||||
p75: int
|
||||
The 75th percentile of number of rows in the fragments.
|
||||
p99: int
|
||||
The 99th percentile of number of rows in the fragments.
|
||||
"""
|
||||
|
||||
min: int
|
||||
max: int
|
||||
mean: int
|
||||
p25: int
|
||||
p50: int
|
||||
p75: int
|
||||
p99: int
|
||||
|
||||
|
||||
class Tags:
|
||||
"""
|
||||
Table tag manager.
|
||||
|
||||
@@ -389,6 +389,50 @@ def test_table_wait_for_index_timeout():
|
||||
table.wait_for_index(["id_idx"], timedelta(seconds=1))
|
||||
|
||||
|
||||
def test_stats():
|
||||
stats = {
|
||||
"total_bytes": 38,
|
||||
"num_rows": 2,
|
||||
"num_indices": 0,
|
||||
"fragment_stats": {
|
||||
"num_fragments": 1,
|
||||
"num_small_fragments": 1,
|
||||
"lengths": {
|
||||
"min": 2,
|
||||
"max": 2,
|
||||
"mean": 2,
|
||||
"p25": 2,
|
||||
"p50": 2,
|
||||
"p75": 2,
|
||||
"p99": 2,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
def handler(request):
|
||||
if request.path == "/v1/table/test/create/?mode=create":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(b"{}")
|
||||
elif request.path == "/v1/table/test/stats/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
payload = json.dumps(stats)
|
||||
request.wfile.write(payload.encode())
|
||||
else:
|
||||
print(request.path)
|
||||
request.send_response(404)
|
||||
request.end_headers()
|
||||
|
||||
with mock_lancedb_connection(handler) as db:
|
||||
table = db.create_table("test", [{"id": 1}])
|
||||
res = table.stats()
|
||||
print(f"{res=}")
|
||||
assert res == stats
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def query_test_table(query_handler, *, server_version=Version("0.1.0")):
|
||||
def handler(request):
|
||||
|
||||
@@ -1695,3 +1695,31 @@ def test_replace_field_metadata(tmp_path):
|
||||
schema = table.schema
|
||||
field = schema[0].metadata
|
||||
assert field == {b"foo": b"bar"}
|
||||
|
||||
|
||||
def test_stats(mem_db: DBConnection):
|
||||
table = mem_db.create_table(
|
||||
"my_table",
|
||||
data=[{"text": "foo", "id": 0}, {"text": "bar", "id": 1}],
|
||||
)
|
||||
assert len(table) == 2
|
||||
stats = table.stats()
|
||||
print(f"{stats=}")
|
||||
assert stats == {
|
||||
"total_bytes": 38,
|
||||
"num_rows": 2,
|
||||
"num_indices": 0,
|
||||
"fragment_stats": {
|
||||
"num_fragments": 1,
|
||||
"num_small_fragments": 1,
|
||||
"lengths": {
|
||||
"min": 2,
|
||||
"max": 2,
|
||||
"mean": 2,
|
||||
"p25": 2,
|
||||
"p50": 2,
|
||||
"p75": 2,
|
||||
"p99": 2,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
@@ -279,6 +279,40 @@ impl Table {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn stats(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
let stats = inner.stats().await.infer_error()?;
|
||||
Python::with_gil(|py| {
|
||||
let dict = PyDict::new(py);
|
||||
dict.set_item("total_bytes", stats.total_bytes)?;
|
||||
dict.set_item("num_rows", stats.num_rows)?;
|
||||
dict.set_item("num_indices", stats.num_indices)?;
|
||||
|
||||
let fragment_stats = PyDict::new(py);
|
||||
fragment_stats.set_item("num_fragments", stats.fragment_stats.num_fragments)?;
|
||||
fragment_stats.set_item(
|
||||
"num_small_fragments",
|
||||
stats.fragment_stats.num_small_fragments,
|
||||
)?;
|
||||
|
||||
let fragment_lengths = PyDict::new(py);
|
||||
fragment_lengths.set_item("min", stats.fragment_stats.lengths.min)?;
|
||||
fragment_lengths.set_item("max", stats.fragment_stats.lengths.max)?;
|
||||
fragment_lengths.set_item("mean", stats.fragment_stats.lengths.mean)?;
|
||||
fragment_lengths.set_item("p25", stats.fragment_stats.lengths.p25)?;
|
||||
fragment_lengths.set_item("p50", stats.fragment_stats.lengths.p50)?;
|
||||
fragment_lengths.set_item("p75", stats.fragment_stats.lengths.p75)?;
|
||||
fragment_lengths.set_item("p99", stats.fragment_stats.lengths.p99)?;
|
||||
|
||||
fragment_stats.set_item("lengths", fragment_lengths)?;
|
||||
dict.set_item("fragment_stats", fragment_stats)?;
|
||||
|
||||
Ok(Some(dict.unbind()))
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
pub fn __repr__(&self) -> String {
|
||||
match &self.inner {
|
||||
None => format!("ClosedTable({})", self.name),
|
||||
|
||||
@@ -5,7 +5,7 @@ use crate::index::Index;
|
||||
use crate::index::IndexStatistics;
|
||||
use crate::query::{QueryFilter, QueryRequest, Select, VectorQueryRequest};
|
||||
use crate::table::Tags;
|
||||
use crate::table::{AddDataMode, AnyQuery, Filter};
|
||||
use crate::table::{AddDataMode, AnyQuery, Filter, TableStatistics};
|
||||
use crate::utils::{supported_btree_data_type, supported_vector_data_type};
|
||||
use crate::{DistanceType, Error, Table};
|
||||
use arrow_array::{RecordBatch, RecordBatchIterator, RecordBatchReader};
|
||||
@@ -1242,6 +1242,20 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
fn dataset_uri(&self) -> &str {
|
||||
"NOT_SUPPORTED"
|
||||
}
|
||||
|
||||
async fn stats(&self) -> Result<TableStatistics> {
|
||||
let request = self.client.post(&format!("/v1/table/{}/stats/", self.name));
|
||||
let (request_id, response) = self.send(request, true).await?;
|
||||
let response = self.check_table_response(&request_id, response).await?;
|
||||
let body = response.text().await.err_to_http(request_id.clone())?;
|
||||
|
||||
let stats = serde_json::from_str(&body).map_err(|e| Error::Http {
|
||||
source: format!("Failed to parse table statistics: {}", e).into(),
|
||||
request_id,
|
||||
status_code: None,
|
||||
})?;
|
||||
Ok(stats)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
|
||||
@@ -80,10 +80,13 @@ pub mod merge;
|
||||
|
||||
use crate::index::waiter::wait_for_index;
|
||||
pub use chrono::Duration;
|
||||
use futures::future::join_all;
|
||||
pub use lance::dataset::optimize::CompactionOptions;
|
||||
pub use lance::dataset::refs::{TagContents, Tags as LanceTags};
|
||||
pub use lance::dataset::scanner::DatasetRecordBatchStream;
|
||||
use lance::dataset::statistics::DatasetStatisticsExt;
|
||||
pub use lance_index::optimize::OptimizeOptions;
|
||||
use serde_with::skip_serializing_none;
|
||||
|
||||
/// Defines the type of column
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
@@ -523,6 +526,8 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
|
||||
index_names: &[&str],
|
||||
timeout: std::time::Duration,
|
||||
) -> Result<()>;
|
||||
/// Get statistics on the table
|
||||
async fn stats(&self) -> Result<TableStatistics>;
|
||||
}
|
||||
|
||||
/// A Table is a collection of strong typed Rows.
|
||||
@@ -1241,6 +1246,11 @@ impl Table {
|
||||
.unwrap();
|
||||
Ok(Arc::new(repartitioned))
|
||||
}
|
||||
|
||||
/// Retrieve statistics on the table
|
||||
pub async fn stats(&self) -> Result<TableStatistics> {
|
||||
self.inner.stats().await
|
||||
}
|
||||
}
|
||||
|
||||
pub struct NativeTags {
|
||||
@@ -2568,6 +2578,108 @@ impl BaseTable for NativeTable {
|
||||
) -> Result<()> {
|
||||
wait_for_index(self, index_names, timeout).await
|
||||
}
|
||||
|
||||
async fn stats(&self) -> Result<TableStatistics> {
|
||||
let num_rows = self.count_rows(None).await?;
|
||||
let num_indices = self.list_indices().await?.len();
|
||||
let ds = self.dataset.get().await?;
|
||||
let ds_clone = (*ds).clone();
|
||||
let ds_stats = Arc::new(ds_clone).calculate_data_stats().await?;
|
||||
let total_bytes = ds_stats.fields.iter().map(|f| f.bytes_on_disk).sum::<u64>() as usize;
|
||||
|
||||
let frags = ds.get_fragments();
|
||||
let mut sorted_sizes = join_all(
|
||||
frags
|
||||
.iter()
|
||||
.map(|frag| async move { frag.physical_rows().await.unwrap_or(0) }),
|
||||
)
|
||||
.await;
|
||||
sorted_sizes.sort();
|
||||
|
||||
let small_frag_threshold = 100000;
|
||||
let num_fragments = sorted_sizes.len();
|
||||
let num_small_fragments = sorted_sizes
|
||||
.iter()
|
||||
.filter(|&&size| size < small_frag_threshold)
|
||||
.count();
|
||||
|
||||
let p25 = *sorted_sizes.get(num_fragments / 4).unwrap_or(&0);
|
||||
let p50 = *sorted_sizes.get(num_fragments / 2).unwrap_or(&0);
|
||||
let p75 = *sorted_sizes.get(num_fragments * 3 / 4).unwrap_or(&0);
|
||||
let p99 = *sorted_sizes.get(num_fragments * 99 / 100).unwrap_or(&0);
|
||||
let min = sorted_sizes.first().copied().unwrap_or(0);
|
||||
let max = sorted_sizes.last().copied().unwrap_or(0);
|
||||
let mean = if num_fragments == 0 {
|
||||
0
|
||||
} else {
|
||||
sorted_sizes.iter().copied().sum::<usize>() / num_fragments
|
||||
};
|
||||
|
||||
let frag_stats = FragmentStatistics {
|
||||
num_fragments,
|
||||
num_small_fragments,
|
||||
lengths: FragmentSummaryStats {
|
||||
min,
|
||||
max,
|
||||
mean,
|
||||
p25,
|
||||
p50,
|
||||
p75,
|
||||
p99,
|
||||
},
|
||||
};
|
||||
let stats = TableStatistics {
|
||||
total_bytes,
|
||||
num_rows,
|
||||
num_indices,
|
||||
fragment_stats: frag_stats,
|
||||
};
|
||||
Ok(stats)
|
||||
}
|
||||
}
|
||||
|
||||
#[skip_serializing_none]
|
||||
#[derive(Debug, Deserialize, PartialEq)]
|
||||
pub struct TableStatistics {
|
||||
/// The total number of bytes in the table
|
||||
pub total_bytes: usize,
|
||||
|
||||
/// The number of rows in the table
|
||||
pub num_rows: usize,
|
||||
|
||||
/// The number of indices in the table
|
||||
pub num_indices: usize,
|
||||
|
||||
/// Statistics on table fragments
|
||||
pub fragment_stats: FragmentStatistics,
|
||||
}
|
||||
|
||||
#[skip_serializing_none]
|
||||
#[derive(Debug, Deserialize, PartialEq)]
|
||||
pub struct FragmentStatistics {
|
||||
/// The number of fragments in the table
|
||||
pub num_fragments: usize,
|
||||
|
||||
/// The number of uncompacted fragments in the table
|
||||
pub num_small_fragments: usize,
|
||||
|
||||
/// Statistics on the number of rows in the table fragments
|
||||
pub lengths: FragmentSummaryStats,
|
||||
// todo: add size statistics
|
||||
// /// Statistics on the number of bytes in the table fragments
|
||||
// sizes: FragmentStats,
|
||||
}
|
||||
|
||||
#[skip_serializing_none]
|
||||
#[derive(Debug, Deserialize, PartialEq)]
|
||||
pub struct FragmentSummaryStats {
|
||||
pub min: usize,
|
||||
pub max: usize,
|
||||
pub mean: usize,
|
||||
pub p25: usize,
|
||||
pub p50: usize,
|
||||
pub p75: usize,
|
||||
pub p99: usize,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -3945,4 +4057,108 @@ mod tests {
|
||||
Some(&"test_field_val1".to_string())
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
pub async fn test_stats() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
let conn = ConnectBuilder::new(uri).execute().await.unwrap();
|
||||
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Int32, false),
|
||||
Field::new("foo", DataType::Int32, true),
|
||||
]));
|
||||
let batch = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![
|
||||
Arc::new(Int32Array::from_iter_values(0..100)),
|
||||
Arc::new(Int32Array::from_iter_values(0..100)),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let table = conn
|
||||
.create_table(
|
||||
"test_stats",
|
||||
RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema()),
|
||||
)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
for _ in 0..10 {
|
||||
let batch = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![
|
||||
Arc::new(Int32Array::from_iter_values(0..15)),
|
||||
Arc::new(Int32Array::from_iter_values(0..15)),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
table
|
||||
.add(RecordBatchIterator::new(
|
||||
vec![Ok(batch.clone())],
|
||||
batch.schema(),
|
||||
))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
let empty_table = conn
|
||||
.create_table(
|
||||
"test_stats_empty",
|
||||
RecordBatchIterator::new(vec![], batch.schema()),
|
||||
)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let res = table.stats().await.unwrap();
|
||||
println!("{:#?}", res);
|
||||
assert_eq!(
|
||||
res,
|
||||
TableStatistics {
|
||||
num_rows: 250,
|
||||
num_indices: 0,
|
||||
total_bytes: 2000,
|
||||
fragment_stats: FragmentStatistics {
|
||||
num_fragments: 11,
|
||||
num_small_fragments: 11,
|
||||
lengths: FragmentSummaryStats {
|
||||
min: 15,
|
||||
max: 100,
|
||||
mean: 22,
|
||||
p25: 15,
|
||||
p50: 15,
|
||||
p75: 15,
|
||||
p99: 100,
|
||||
},
|
||||
},
|
||||
}
|
||||
);
|
||||
let res = empty_table.stats().await.unwrap();
|
||||
println!("{:#?}", res);
|
||||
assert_eq!(
|
||||
res,
|
||||
TableStatistics {
|
||||
num_rows: 0,
|
||||
num_indices: 0,
|
||||
total_bytes: 0,
|
||||
fragment_stats: FragmentStatistics {
|
||||
num_fragments: 0,
|
||||
num_small_fragments: 0,
|
||||
lengths: FragmentSummaryStats {
|
||||
min: 0,
|
||||
max: 0,
|
||||
mean: 0,
|
||||
p25: 0,
|
||||
p50: 0,
|
||||
p75: 0,
|
||||
p99: 0,
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user