mirror of
https://github.com/lancedb/lancedb.git
synced 2026-06-23 14:10:39 +00:00
Compare commits
1 Commits
codex/upda
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
41ac32a344 |
@@ -9,16 +9,28 @@
|
||||
//!
|
||||
//! Blob tables require Lance file format >= 2.2 and stable row ids at create.
|
||||
|
||||
use arrow_schema::{Field, Schema};
|
||||
use lance::dataset::WriteParams;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow_array::builder::LargeBinaryBuilder;
|
||||
use arrow_array::{Array, LargeBinaryArray, RecordBatch, StructArray, UInt8Array, UInt64Array};
|
||||
use arrow_schema::{DataType, Field, Schema};
|
||||
use lance::dataset::{Dataset, WriteParams};
|
||||
use lance_arrow::FieldExt;
|
||||
use lance_core::datatypes::parse_field_path;
|
||||
use lance_encoding::version::LanceFileVersion;
|
||||
|
||||
use crate::error::{Error, Result};
|
||||
|
||||
pub use lance::dataset::BlobFile;
|
||||
|
||||
/// Creates an Arrow field for a Lance blob v2 column.
|
||||
///
|
||||
/// `Struct<data, uri>` with the `lance.blob.v2` marker. Same layout Lance
|
||||
/// expects on write.
|
||||
///
|
||||
/// A blob column may be top-level or nested inside a struct or list. Nested
|
||||
/// blobs are addressed by a dotted path (e.g. `info.blob`) in the read APIs.
|
||||
///
|
||||
/// ```
|
||||
/// use arrow_schema::{DataType, Field, Schema};
|
||||
///
|
||||
@@ -27,15 +39,71 @@ use lance_encoding::version::LanceFileVersion;
|
||||
/// lancedb::blob("image", true),
|
||||
/// ]);
|
||||
/// ```
|
||||
///
|
||||
/// Blob tables use Lance file format >= 2.2 and stable row ids at create.
|
||||
pub fn blob(name: impl AsRef<str>, nullable: bool) -> Field {
|
||||
lance::blob::blob_field(name.as_ref(), nullable)
|
||||
}
|
||||
|
||||
/// Returns true if `schema` declares any blob v2 column.
|
||||
/// Returns true if `field` is a blob v2 column.
|
||||
///
|
||||
/// ```
|
||||
/// let field = lancedb::blob("image", true);
|
||||
/// assert!(lancedb::blob::is_blob(&field));
|
||||
/// ```
|
||||
pub fn is_blob(field: &Field) -> bool {
|
||||
field.is_blob_v2()
|
||||
}
|
||||
|
||||
/// Returns true if `field`, or any field nested under it, is a blob v2 column.
|
||||
fn field_tree_has_blob_v2(field: &Field) -> bool {
|
||||
if field.is_blob_v2() {
|
||||
return true;
|
||||
}
|
||||
match field.data_type() {
|
||||
DataType::Struct(children) => children.iter().any(|c| field_tree_has_blob_v2(c)),
|
||||
DataType::List(child) | DataType::LargeList(child) | DataType::FixedSizeList(child, _) => {
|
||||
field_tree_has_blob_v2(child)
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Collects the dotted paths of blob v2 columns under `field`, into `paths`.
|
||||
fn collect_blob_paths(field: &Field, prefix: &str, paths: &mut Vec<String>) {
|
||||
let path = if prefix.is_empty() {
|
||||
field.name().clone()
|
||||
} else {
|
||||
format!("{prefix}.{}", field.name())
|
||||
};
|
||||
if field.is_blob_v2() {
|
||||
paths.push(path);
|
||||
return;
|
||||
}
|
||||
match field.data_type() {
|
||||
DataType::Struct(children) => {
|
||||
for child in children {
|
||||
collect_blob_paths(child, &path, paths);
|
||||
}
|
||||
}
|
||||
DataType::List(child) | DataType::LargeList(child) | DataType::FixedSizeList(child, _) => {
|
||||
collect_blob_paths(child, &path, paths)
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if `schema` declares any blob v2 column, including nested ones.
|
||||
pub(crate) fn has_blob_columns(schema: &Schema) -> bool {
|
||||
schema.fields().iter().any(|field| field.is_blob_v2())
|
||||
schema.fields().iter().any(|f| field_tree_has_blob_v2(f))
|
||||
}
|
||||
|
||||
/// Blob v2 column paths in `schema`, declaration order preserved. Nested blobs
|
||||
/// are dotted paths (e.g. `info.blob`).
|
||||
pub(crate) fn blob_column_names(schema: &Schema) -> Vec<String> {
|
||||
let mut paths = Vec::new();
|
||||
for field in schema.fields() {
|
||||
collect_blob_paths(field, "", &mut paths);
|
||||
}
|
||||
paths
|
||||
}
|
||||
|
||||
/// Bumps storage format to at least [`LanceFileVersion::V2_2`] for blob schemas.
|
||||
@@ -53,6 +121,206 @@ pub(crate) fn ensure_blob_storage_version(schema: &Schema, params: &mut WritePar
|
||||
}
|
||||
}
|
||||
|
||||
/// Validate that `column` exists and is a blob v2 column.
|
||||
///
|
||||
/// Legacy v1 columns (`lance-encoding:blob`) error with a migration hint.
|
||||
pub(crate) fn ensure_blob_v2_column(
|
||||
schema: &lance_core::datatypes::Schema,
|
||||
column: &str,
|
||||
) -> Result<()> {
|
||||
match schema.field(column) {
|
||||
Some(field) if field.is_blob_v2() => Ok(()),
|
||||
Some(field) if field.is_blob() => Err(Error::InvalidInput {
|
||||
message: format!(
|
||||
"column '{column}' is a legacy blob column; blob APIs require blob v2 columns \
|
||||
(ARROW:extension:name = \"lance.blob.v2\")"
|
||||
),
|
||||
}),
|
||||
Some(_) => Err(Error::InvalidInput {
|
||||
message: format!("column '{column}' is not a blob column"),
|
||||
}),
|
||||
None => Err(Error::InvalidInput {
|
||||
message: format!("no column named '{column}' in this table"),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the leaf descriptor `StructArray` for `column` in a descriptor batch.
|
||||
fn leaf_descriptor_struct<'a>(batch: &'a RecordBatch, column: &str) -> Result<&'a StructArray> {
|
||||
let path = parse_field_path(column).map_err(|e| Error::InvalidInput {
|
||||
message: format!("invalid blob column path '{column}': {e}"),
|
||||
})?;
|
||||
let not_struct = || Error::Runtime {
|
||||
message: format!("blob column '{column}' did not read back as a descriptor struct"),
|
||||
};
|
||||
let mut current = batch
|
||||
.column_by_name(&path[0])
|
||||
.and_then(|c| c.as_any().downcast_ref::<StructArray>())
|
||||
.ok_or_else(not_struct)?;
|
||||
for segment in &path[1..] {
|
||||
current = current
|
||||
.column_by_name(segment)
|
||||
.and_then(|c| c.as_any().downcast_ref::<StructArray>())
|
||||
.ok_or_else(not_struct)?;
|
||||
}
|
||||
Ok(current)
|
||||
}
|
||||
|
||||
/// Null rows in `row_ids`, from a descriptor take.
|
||||
///
|
||||
/// Lance `read_blobs` / `take_blobs` skip null rows (`kind == 0 && position == 0 && size == 0`).
|
||||
/// TODO(lance): aligned read API would drop this pass.
|
||||
async fn blob_null_mask(
|
||||
dataset: &Arc<Dataset>,
|
||||
column: &str,
|
||||
row_ids: &[u64],
|
||||
) -> Result<Vec<bool>> {
|
||||
let projection = dataset.schema().project(&[column])?;
|
||||
let descriptors = dataset.take_builder(row_ids, projection)?.execute().await?;
|
||||
if descriptors.num_rows() != row_ids.len() {
|
||||
return Err(Error::InvalidInput {
|
||||
message: format!(
|
||||
"blob take for column '{column}' requested {} row ids but only {} exist in the \
|
||||
table; pass row ids collected from this table",
|
||||
row_ids.len(),
|
||||
descriptors.num_rows()
|
||||
),
|
||||
});
|
||||
}
|
||||
let descriptor_struct = leaf_descriptor_struct(&descriptors, column)?;
|
||||
let child = |name: &str| {
|
||||
descriptor_struct
|
||||
.column_by_name(name)
|
||||
.ok_or_else(|| Error::Runtime {
|
||||
message: format!("blob descriptor for '{column}' is missing the '{name}' field"),
|
||||
})
|
||||
};
|
||||
let kinds = child("kind")?
|
||||
.as_any()
|
||||
.downcast_ref::<UInt8Array>()
|
||||
.ok_or_else(|| Error::Runtime {
|
||||
message: format!("blob descriptor 'kind' for '{column}' is not a UInt8 array"),
|
||||
})?;
|
||||
let positions = child("position")?
|
||||
.as_any()
|
||||
.downcast_ref::<UInt64Array>()
|
||||
.ok_or_else(|| Error::Runtime {
|
||||
message: format!("blob descriptor 'position' for '{column}' is not a UInt64 array"),
|
||||
})?;
|
||||
let sizes = child("size")?
|
||||
.as_any()
|
||||
.downcast_ref::<UInt64Array>()
|
||||
.ok_or_else(|| Error::Runtime {
|
||||
message: format!("blob descriptor 'size' for '{column}' is not a UInt64 array"),
|
||||
})?;
|
||||
|
||||
// Match Lance `collect_blob_entries_v2` skip condition (`BlobKind::Inline` == 0).
|
||||
Ok((0..descriptor_struct.len())
|
||||
.map(|i| {
|
||||
descriptor_struct.is_null(i)
|
||||
|| kinds.is_null(i)
|
||||
|| (kinds.value(i) == 0 && positions.value(i) == 0 && sizes.value(i) == 0)
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
fn non_null_row_ids(row_ids: &[u64], null_mask: &[bool]) -> Vec<u64> {
|
||||
row_ids
|
||||
.iter()
|
||||
.zip(null_mask)
|
||||
.filter_map(|(row_id, is_null)| (!is_null).then_some(*row_id))
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Materialize blob bytes for `row_ids` (same length and order, nulls preserved).
|
||||
pub(crate) async fn take_blobs_aligned(
|
||||
dataset: &Arc<Dataset>,
|
||||
column: &str,
|
||||
row_ids: &[u64],
|
||||
) -> Result<LargeBinaryArray> {
|
||||
ensure_blob_v2_column(dataset.schema(), column)?;
|
||||
if row_ids.is_empty() {
|
||||
return Ok(LargeBinaryBuilder::new().finish());
|
||||
}
|
||||
|
||||
let null_mask = blob_null_mask(dataset, column, row_ids).await?;
|
||||
let non_null_row_ids = non_null_row_ids(row_ids, &null_mask);
|
||||
let non_null_count = non_null_row_ids.len();
|
||||
let payloads = if non_null_count == 0 {
|
||||
Vec::new()
|
||||
} else {
|
||||
dataset
|
||||
.read_blobs(column)?
|
||||
.with_row_ids(non_null_row_ids)
|
||||
.preserve_order(true)
|
||||
.execute()
|
||||
.await?
|
||||
};
|
||||
|
||||
if payloads.len() != non_null_count {
|
||||
return Err(Error::Runtime {
|
||||
message: format!(
|
||||
"blob read for column '{column}' returned {} payloads for {} non-null rows",
|
||||
payloads.len(),
|
||||
non_null_count
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
let mut builder = LargeBinaryBuilder::new();
|
||||
let mut payload_idx = 0;
|
||||
for is_null in &null_mask {
|
||||
if *is_null {
|
||||
builder.append_null();
|
||||
} else {
|
||||
builder.append_value(payloads[payload_idx].data.as_ref());
|
||||
payload_idx += 1;
|
||||
}
|
||||
}
|
||||
Ok(builder.finish())
|
||||
}
|
||||
|
||||
/// Open lazy [`BlobFile`] handles for `row_ids` (same length and order, nulls as `None`).
|
||||
pub(crate) async fn take_blob_files_aligned(
|
||||
dataset: &Arc<Dataset>,
|
||||
column: &str,
|
||||
row_ids: &[u64],
|
||||
) -> Result<Vec<Option<BlobFile>>> {
|
||||
ensure_blob_v2_column(dataset.schema(), column)?;
|
||||
if row_ids.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let null_mask = blob_null_mask(dataset, column, row_ids).await?;
|
||||
let non_null_row_ids = non_null_row_ids(row_ids, &null_mask);
|
||||
let handles = if non_null_row_ids.is_empty() {
|
||||
Vec::new()
|
||||
} else {
|
||||
dataset.take_blobs(&non_null_row_ids, column).await?
|
||||
};
|
||||
if handles.len() != non_null_row_ids.len() {
|
||||
return Err(Error::Runtime {
|
||||
message: format!(
|
||||
"blob take for column '{column}' returned {} handles for {} non-null rows",
|
||||
handles.len(),
|
||||
non_null_row_ids.len()
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
let mut handles = handles.into_iter();
|
||||
Ok(null_mask
|
||||
.iter()
|
||||
.map(|is_null| {
|
||||
if *is_null {
|
||||
None
|
||||
} else {
|
||||
Some(handles.next().unwrap())
|
||||
}
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -116,6 +384,47 @@ mod tests {
|
||||
assert_eq!(params.data_storage_version.unwrap(), LanceFileVersion::V2_3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn legacy_v1_blob_column_is_rejected_with_migration_hint() {
|
||||
let legacy = Field::new("image", DataType::LargeBinary, true).with_metadata(
|
||||
std::collections::HashMap::from([(
|
||||
"lance-encoding:blob".to_string(),
|
||||
"true".to_string(),
|
||||
)]),
|
||||
);
|
||||
let arrow_schema = Schema::new(vec![legacy]);
|
||||
let lance_schema = lance_core::datatypes::Schema::try_from(&arrow_schema).unwrap();
|
||||
|
||||
let err = ensure_blob_v2_column(&lance_schema, "image").unwrap_err();
|
||||
assert!(matches!(err, Error::InvalidInput { .. }));
|
||||
assert!(err.to_string().contains("legacy blob column"));
|
||||
assert!(err.to_string().contains("lance.blob.v2"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn non_blob_and_unknown_columns_are_rejected_by_name() {
|
||||
let arrow_schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
|
||||
let lance_schema = lance_core::datatypes::Schema::try_from(&arrow_schema).unwrap();
|
||||
|
||||
let err = ensure_blob_v2_column(&lance_schema, "id").unwrap_err();
|
||||
assert!(err.to_string().contains("'id' is not a blob column"));
|
||||
|
||||
let err = ensure_blob_v2_column(&lance_schema, "missing").unwrap_err();
|
||||
assert!(err.to_string().contains("no column named 'missing'"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn blob_column_names_includes_nested_path() {
|
||||
let blob_field = blob("blob", true);
|
||||
let info = Field::new(
|
||||
"info",
|
||||
DataType::Struct(vec![Field::new("name", DataType::Utf8, false), blob_field].into()),
|
||||
true,
|
||||
);
|
||||
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false), info]);
|
||||
assert_eq!(blob_column_names(&schema), vec!["info.blob"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn storage_version_noop_without_blob_columns() {
|
||||
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
|
||||
|
||||
@@ -189,7 +189,7 @@ use std::{fmt::Display, str::FromStr};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub use blob::blob;
|
||||
pub use blob::{blob, is_blob};
|
||||
pub use connection::{ConnectNamespaceBuilder, Connection};
|
||||
pub use error::{Error, Result};
|
||||
use lance_index::vector::ApproxMode as LanceApproxMode;
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
//! LanceDB Table APIs
|
||||
|
||||
use arrow_array::{RecordBatch, RecordBatchReader};
|
||||
use arrow_array::{LargeBinaryArray, RecordBatch, RecordBatchReader};
|
||||
use arrow_schema::{Schema, SchemaRef};
|
||||
use async_trait::async_trait;
|
||||
use datafusion_execution::TaskContext;
|
||||
@@ -12,6 +12,7 @@ use datafusion_physical_plan::ExecutionPlan;
|
||||
use datafusion_physical_plan::display::DisplayableExecutionPlan;
|
||||
use futures::StreamExt;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use lance::dataset::BlobFile;
|
||||
pub use lance::dataset::ColumnAlteration;
|
||||
pub use lance::dataset::NewColumnTransform;
|
||||
pub use lance::dataset::ReadParams;
|
||||
@@ -587,6 +588,28 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
|
||||
async fn close_lsm_writers(&self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
/// Names of the blob v2 columns in this table, in declaration order.
|
||||
async fn blob_columns(&self) -> Result<Vec<String>> {
|
||||
Err(Error::NotSupported {
|
||||
message: "blob_columns is not supported on this table type".into(),
|
||||
})
|
||||
}
|
||||
/// Materialize blob bytes for the given row ids. See [`Table::fetch_blobs`].
|
||||
async fn fetch_blobs(&self, _column: &str, _row_ids: &[u64]) -> Result<LargeBinaryArray> {
|
||||
Err(Error::NotSupported {
|
||||
message: "fetch_blobs is not supported on this table type".into(),
|
||||
})
|
||||
}
|
||||
/// Open lazy blob handles for the given row ids. See [`Table::fetch_blob_files`].
|
||||
async fn fetch_blob_files(
|
||||
&self,
|
||||
_column: &str,
|
||||
_row_ids: &[u64],
|
||||
) -> Result<Vec<Option<BlobFile>>> {
|
||||
Err(Error::NotSupported {
|
||||
message: "fetch_blob_files is not supported on this table type".into(),
|
||||
})
|
||||
}
|
||||
/// Gets the table tag manager.
|
||||
async fn tags(&self) -> Result<Box<dyn Tags + '_>>;
|
||||
/// Optimize the dataset.
|
||||
@@ -927,6 +950,76 @@ impl Table {
|
||||
self.inner.count_rows(filter.map(Filter::Sql)).await
|
||||
}
|
||||
|
||||
/// Names of the blob v2 columns in this table, in declaration order.
|
||||
///
|
||||
/// Nested blobs use dotted paths (e.g. `info.blob`). Returns
|
||||
/// [`Error::NotSupported`] on table types without blob support.
|
||||
pub async fn blob_columns(&self) -> Result<Vec<String>> {
|
||||
self.inner.blob_columns().await
|
||||
}
|
||||
|
||||
/// Materialize blob bytes for the given row ids.
|
||||
///
|
||||
/// Output matches `row_ids` in length and order. Null and zero-length rows
|
||||
/// are null. Prefer [`Self::fetch_blob_files`] for large selections.
|
||||
///
|
||||
/// ```
|
||||
/// use arrow_array::UInt64Array;
|
||||
/// use futures::TryStreamExt;
|
||||
/// use lancedb::query::{ExecutableQuery, QueryBase};
|
||||
///
|
||||
/// # use lancedb::Table;
|
||||
/// # async fn materialize(table: &Table) -> Result<(), Box<dyn std::error::Error>> {
|
||||
/// let mut stream = table.query().with_row_id().limit(10).execute().await?;
|
||||
/// while let Some(batch) = stream.try_next().await? {
|
||||
/// let row_ids = batch
|
||||
/// .column_by_name("_rowid")
|
||||
/// .unwrap()
|
||||
/// .as_any()
|
||||
/// .downcast_ref::<UInt64Array>()
|
||||
/// .unwrap();
|
||||
/// let images = table.fetch_blobs("image", row_ids.values()).await?;
|
||||
/// let _ = images;
|
||||
/// }
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
///
|
||||
/// Returns [`Error::InvalidInput`] when the column does not exist or is
|
||||
/// not a blob v2 column, and [`Error::NotSupported`] on table types
|
||||
/// without blob support.
|
||||
pub async fn fetch_blobs(
|
||||
&self,
|
||||
column: impl AsRef<str>,
|
||||
row_ids: &[u64],
|
||||
) -> Result<LargeBinaryArray> {
|
||||
self.inner.fetch_blobs(column.as_ref(), row_ids).await
|
||||
}
|
||||
|
||||
/// Open lazy [`BlobFile`] handles for the given row ids.
|
||||
///
|
||||
/// Same length and order as `row_ids`. Null rows are `None`. Bytes are not
|
||||
/// read from disk until a call to [`BlobFile::read`].
|
||||
///
|
||||
/// ```
|
||||
/// # use lancedb::Table;
|
||||
/// # async fn lazy_read(table: &Table, row_ids: &[u64]) -> Result<(), Box<dyn std::error::Error>> {
|
||||
/// let handles = table.fetch_blob_files("image", row_ids).await?;
|
||||
/// if let Some(Some(first)) = handles.first() {
|
||||
/// let bytes = first.read().await?;
|
||||
/// println!("first blob is {} bytes", bytes.len());
|
||||
/// }
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
pub async fn fetch_blob_files(
|
||||
&self,
|
||||
column: impl AsRef<str>,
|
||||
row_ids: &[u64],
|
||||
) -> Result<Vec<Option<BlobFile>>> {
|
||||
self.inner.fetch_blob_files(column.as_ref(), row_ids).await
|
||||
}
|
||||
|
||||
/// Insert new records into this Table
|
||||
///
|
||||
/// # Arguments
|
||||
@@ -2761,6 +2854,25 @@ impl BaseTable for NativeTable {
|
||||
merge::lsm::close_lsm_writers(self).await
|
||||
}
|
||||
|
||||
async fn blob_columns(&self) -> Result<Vec<String>> {
|
||||
let schema = self.schema().await?;
|
||||
Ok(crate::blob::blob_column_names(schema.as_ref()))
|
||||
}
|
||||
|
||||
async fn fetch_blobs(&self, column: &str, row_ids: &[u64]) -> Result<LargeBinaryArray> {
|
||||
let dataset = self.dataset.get().await?;
|
||||
crate::blob::take_blobs_aligned(&dataset, column, row_ids).await
|
||||
}
|
||||
|
||||
async fn fetch_blob_files(
|
||||
&self,
|
||||
column: &str,
|
||||
row_ids: &[u64],
|
||||
) -> Result<Vec<Option<BlobFile>>> {
|
||||
let dataset = self.dataset.get().await?;
|
||||
crate::blob::take_blob_files_aligned(&dataset, column, row_ids).await
|
||||
}
|
||||
|
||||
/// Delete rows from the table
|
||||
async fn delete(&self, predicate: Predicate<'_>) -> Result<DeleteResult> {
|
||||
let result = delete::execute_delete(self, predicate).await?;
|
||||
|
||||
@@ -1,17 +1,22 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
//! Integration tests for blob v2 columns.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow_array::{Array, BinaryArray, Int64Array, LargeBinaryArray, RecordBatch, StructArray};
|
||||
use arrow_schema::{DataType, Field, Schema};
|
||||
use arrow_array::{
|
||||
Array, ArrayRef, BinaryArray, Int64Array, LargeBinaryArray, RecordBatch, StringArray,
|
||||
StructArray, UInt64Array,
|
||||
};
|
||||
use arrow_schema::{DataType, Field, Fields, Schema};
|
||||
use futures::TryStreamExt;
|
||||
use lance_encoding::version::LanceFileVersion;
|
||||
use lancedb::{
|
||||
Connection, Result, Table, blob::blob, connect,
|
||||
database::listing::OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS, query::ExecutableQuery,
|
||||
Connection, Error, Result, Table,
|
||||
blob::blob,
|
||||
connect, connect_namespace,
|
||||
database::listing::OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS,
|
||||
query::{ExecutableQuery, QueryBase},
|
||||
table::{AddDataMode, CompactionOptions, OptimizeAction},
|
||||
};
|
||||
use tempfile::tempdir;
|
||||
|
||||
@@ -91,7 +96,7 @@ async fn query_image_struct(table: &Table) -> StructArray {
|
||||
.expect("image column present")
|
||||
.as_any()
|
||||
.downcast_ref::<StructArray>()
|
||||
.expect("blob column reads back as a descriptor struct")
|
||||
.expect("image column is a descriptor struct")
|
||||
.clone()
|
||||
}
|
||||
|
||||
@@ -119,10 +124,7 @@ async fn explicit_stable_row_id_setting_wins_over_blob_default() -> Result<()> {
|
||||
.execute()
|
||||
.await?;
|
||||
|
||||
assert!(
|
||||
storage_format_version(&table).await >= LanceFileVersion::V2_2,
|
||||
"format bump still applies; the schema cannot be written below 2.2"
|
||||
);
|
||||
assert!(storage_format_version(&table).await >= LanceFileVersion::V2_2);
|
||||
assert!(!uses_stable_row_ids(&table).await);
|
||||
Ok(())
|
||||
}
|
||||
@@ -144,7 +146,6 @@ async fn creating_with_blob_data_bumps_format() -> Result<()> {
|
||||
let tmp = tempdir().unwrap();
|
||||
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||
|
||||
// Batch already declares the blob field (pre-built struct).
|
||||
let blob_field = blob("image", true);
|
||||
let DataType::Struct(children) = blob_field.data_type().clone() else {
|
||||
unreachable!("blob field is a struct")
|
||||
@@ -153,7 +154,7 @@ async fn creating_with_blob_data_bumps_format() -> Result<()> {
|
||||
children,
|
||||
vec![
|
||||
Arc::new(LargeBinaryArray::from_iter_values([b"payload".as_slice()])),
|
||||
Arc::new(arrow_array::StringArray::from(vec![None::<&str>])),
|
||||
Arc::new(StringArray::from(vec![None::<&str>])),
|
||||
],
|
||||
None,
|
||||
);
|
||||
@@ -184,7 +185,6 @@ async fn add_coerces_large_binary_into_blob_column() -> Result<()> {
|
||||
assert_eq!(table.count_rows(None).await?, 2);
|
||||
let image = query_image_struct(&table).await;
|
||||
assert_eq!(image.len(), 2);
|
||||
// Table schema still has the blob marker after append.
|
||||
let schema = table.schema().await?;
|
||||
let field = schema.field_with_name("image").unwrap();
|
||||
assert_eq!(
|
||||
@@ -257,12 +257,12 @@ async fn add_rejects_uncoercible_blob_input() -> Result<()> {
|
||||
])),
|
||||
vec![
|
||||
Arc::new(Int64Array::from(vec![1])),
|
||||
Arc::new(arrow_array::StringArray::from(vec!["not bytes"])),
|
||||
Arc::new(StringArray::from(vec!["not bytes"])),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
let err = table.add(batch).execute().await.unwrap_err();
|
||||
assert!(err.to_string().contains("image"), "got: {err}");
|
||||
assert!(err.to_string().contains("image"));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -288,9 +288,7 @@ async fn namespace_create_applies_blob_defaults() -> Result<()> {
|
||||
let tmp = tempdir().unwrap();
|
||||
let mut properties = std::collections::HashMap::new();
|
||||
properties.insert("root".to_string(), tmp.path().to_str().unwrap().to_string());
|
||||
let db = lancedb::connect_namespace("dir", properties)
|
||||
.execute()
|
||||
.await?;
|
||||
let db = connect_namespace("dir", properties).execute().await?;
|
||||
let table = db
|
||||
.create_empty_table("t", blob_table_schema())
|
||||
.execute()
|
||||
@@ -301,17 +299,14 @@ async fn namespace_create_applies_blob_defaults() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Overwrite takes the input schema as-is (same as cast skip). Raw binary
|
||||
// overwrite drops the blob marker unless the input declares blob v2.
|
||||
// Overwrite takes the input schema as-is. A raw-binary overwrite drops the blob
|
||||
// marker; re-declaring blob v2 in the input restores it.
|
||||
#[tokio::test]
|
||||
async fn overwrite_replaces_blob_schema_with_input_schema() -> Result<()> {
|
||||
use lancedb::table::AddDataMode;
|
||||
|
||||
let tmp = tempdir().unwrap();
|
||||
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"blob".as_slice())]).await?;
|
||||
|
||||
// Raw binary overwrite. Plain LargeBinary replaces the blob declaration.
|
||||
let raw_schema = Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Int64, false),
|
||||
Field::new("image", DataType::LargeBinary, true),
|
||||
@@ -336,11 +331,9 @@ async fn overwrite_replaces_blob_schema_with_input_schema() -> Result<()> {
|
||||
.field_with_name("image")
|
||||
.unwrap()
|
||||
.metadata()
|
||||
.contains_key("ARROW:extension:name"),
|
||||
"raw binary overwrite leaves a plain binary column"
|
||||
.contains_key("ARROW:extension:name")
|
||||
);
|
||||
|
||||
// Overwrite with a declared blob struct keeps the blob column.
|
||||
let blob_field = blob("image", true);
|
||||
let DataType::Struct(children) = blob_field.data_type().clone() else {
|
||||
unreachable!("blob field is a struct")
|
||||
@@ -349,7 +342,7 @@ async fn overwrite_replaces_blob_schema_with_input_schema() -> Result<()> {
|
||||
children,
|
||||
vec![
|
||||
Arc::new(LargeBinaryArray::from_iter_values([b"declared".as_slice()])),
|
||||
Arc::new(arrow_array::StringArray::from(vec![None::<&str>])),
|
||||
Arc::new(StringArray::from(vec![None::<&str>])),
|
||||
],
|
||||
None,
|
||||
);
|
||||
@@ -378,3 +371,579 @@ async fn overwrite_replaces_blob_schema_with_input_schema() -> Result<()> {
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn collect_row_ids(table: &Table) -> Result<Vec<u64>> {
|
||||
let batches = table
|
||||
.query()
|
||||
.with_row_id()
|
||||
.execute()
|
||||
.await?
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?;
|
||||
let batch = arrow_select::concat::concat_batches(&batches[0].schema(), &batches).unwrap();
|
||||
Ok(batch
|
||||
.column_by_name("_rowid")
|
||||
.unwrap()
|
||||
.as_any()
|
||||
.downcast_ref::<UInt64Array>()
|
||||
.unwrap()
|
||||
.values()
|
||||
.to_vec())
|
||||
}
|
||||
|
||||
async fn collect_id_rowid(table: &Table) -> Result<Vec<(i64, u64)>> {
|
||||
let batches = table
|
||||
.query()
|
||||
.with_row_id()
|
||||
.execute()
|
||||
.await?
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?;
|
||||
let batch = arrow_select::concat::concat_batches(&batches[0].schema(), &batches).unwrap();
|
||||
let ids = batch
|
||||
.column_by_name("id")
|
||||
.unwrap()
|
||||
.as_any()
|
||||
.downcast_ref::<Int64Array>()
|
||||
.unwrap();
|
||||
let row_ids = batch
|
||||
.column_by_name("_rowid")
|
||||
.unwrap()
|
||||
.as_any()
|
||||
.downcast_ref::<UInt64Array>()
|
||||
.unwrap();
|
||||
Ok(ids
|
||||
.values()
|
||||
.iter()
|
||||
.copied()
|
||||
.zip(row_ids.values().iter().copied())
|
||||
.collect())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fetch_blobs_round_trips_bytes() -> Result<()> {
|
||||
let tmp = tempdir().unwrap();
|
||||
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||
let payload: &[u8] = b"blob-round-trip-payload";
|
||||
let table = create_inline_blob_table(&db, "t", &[1], &[Some(payload)]).await?;
|
||||
|
||||
let ids = collect_row_ids(&table).await?;
|
||||
let bytes = table.fetch_blobs("image", &ids).await?;
|
||||
assert_eq!(bytes.len(), 1);
|
||||
assert_eq!(bytes.value(0), payload);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fetch_blobs_round_trips_nested_blob_column() -> Result<()> {
|
||||
let tmp = tempdir().unwrap();
|
||||
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||
|
||||
let blob_field = blob("blob", true);
|
||||
let DataType::Struct(blob_children) = blob_field.data_type().clone() else {
|
||||
unreachable!("blob field is a struct")
|
||||
};
|
||||
let blob_array = StructArray::new(
|
||||
blob_children,
|
||||
vec![
|
||||
Arc::new(LargeBinaryArray::from_iter_values([
|
||||
b"hello".as_slice(),
|
||||
b"world".as_slice(),
|
||||
])) as ArrayRef,
|
||||
Arc::new(StringArray::from(vec![None::<&str>, None::<&str>])) as ArrayRef,
|
||||
],
|
||||
None,
|
||||
);
|
||||
let info_fields: Fields = vec![Field::new("name", DataType::Utf8, false), blob_field].into();
|
||||
let info_array = StructArray::new(
|
||||
info_fields.clone(),
|
||||
vec![
|
||||
Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
|
||||
Arc::new(blob_array) as ArrayRef,
|
||||
],
|
||||
None,
|
||||
);
|
||||
let schema = Arc::new(Schema::new(vec![Field::new(
|
||||
"info",
|
||||
DataType::Struct(info_fields),
|
||||
true,
|
||||
)]));
|
||||
let batch = RecordBatch::try_new(schema, vec![Arc::new(info_array) as ArrayRef]).unwrap();
|
||||
let table = db.create_table("t", batch).execute().await?;
|
||||
|
||||
assert!(storage_format_version(&table).await >= LanceFileVersion::V2_2);
|
||||
assert!(uses_stable_row_ids(&table).await);
|
||||
|
||||
let ids = collect_row_ids(&table).await?;
|
||||
let bytes = table.fetch_blobs("info.blob", &ids).await?;
|
||||
assert_eq!(bytes.len(), 2);
|
||||
let values: std::collections::HashSet<&[u8]> =
|
||||
(0..bytes.len()).map(|i| bytes.value(i)).collect();
|
||||
assert!(values.contains(b"hello".as_slice()));
|
||||
assert!(values.contains(b"world".as_slice()));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn blob_columns_lists_nested_dotted_paths() -> Result<()> {
|
||||
let tmp = tempdir().unwrap();
|
||||
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||
let blob_field = blob("blob", true);
|
||||
let info = Field::new(
|
||||
"info",
|
||||
DataType::Struct(vec![Field::new("name", DataType::Utf8, false), blob_field].into()),
|
||||
true,
|
||||
);
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
blob("thumbnail", true),
|
||||
Field::new("id", DataType::Int64, false),
|
||||
info,
|
||||
]));
|
||||
let table = db.create_empty_table("t", schema).execute().await?;
|
||||
assert_eq!(table.blob_columns().await?, vec!["thumbnail", "info.blob"]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn blob_columns_lists_blob_fields_in_order() -> Result<()> {
|
||||
let tmp = tempdir().unwrap();
|
||||
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
blob("thumbnail", true),
|
||||
Field::new("id", DataType::Int64, false),
|
||||
blob("image", true),
|
||||
]));
|
||||
let table = db.create_empty_table("t", schema).execute().await?;
|
||||
assert_eq!(table.blob_columns().await?, vec!["thumbnail", "image"]);
|
||||
|
||||
let plain = db
|
||||
.create_empty_table(
|
||||
"plain",
|
||||
Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)])),
|
||||
)
|
||||
.execute()
|
||||
.await?;
|
||||
assert!(plain.blob_columns().await?.is_empty());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fetch_blobs_preserves_null_alignment() -> Result<()> {
|
||||
let tmp = tempdir().unwrap();
|
||||
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||
let table = create_inline_blob_table(
|
||||
&db,
|
||||
"t",
|
||||
&[1, 2, 3, 4],
|
||||
&[Some(b"a".as_slice()), None, Some(b"c"), None],
|
||||
)
|
||||
.await?;
|
||||
|
||||
let pairs = collect_id_rowid(&table).await?;
|
||||
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
|
||||
let bytes = table.fetch_blobs("image", &ids).await?;
|
||||
assert_eq!(bytes.len(), ids.len());
|
||||
for (i, (id, _)) in pairs.iter().enumerate() {
|
||||
match id {
|
||||
1 => assert_eq!(bytes.value(i), b"a"),
|
||||
2 | 4 => assert!(bytes.is_null(i)),
|
||||
3 => assert_eq!(bytes.value(i), b"c"),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fetch_blobs_all_null_column_returns_all_nulls() -> Result<()> {
|
||||
let tmp = tempdir().unwrap();
|
||||
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||
let table = create_inline_blob_table(&db, "t", &[1, 2], &[None, None]).await?;
|
||||
|
||||
let ids = collect_row_ids(&table).await?;
|
||||
let bytes = table.fetch_blobs("image", &ids).await?;
|
||||
assert_eq!(bytes.len(), 2);
|
||||
assert_eq!(bytes.null_count(), 2);
|
||||
|
||||
let files = table.fetch_blob_files("image", &ids).await?;
|
||||
assert_eq!(files.len(), 2);
|
||||
assert!(files.iter().all(Option::is_none));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fetch_blobs_aligns_with_reordered_and_duplicate_ids() -> Result<()> {
|
||||
let tmp = tempdir().unwrap();
|
||||
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||
let table = create_inline_blob_table(
|
||||
&db,
|
||||
"t",
|
||||
&[1, 2, 3],
|
||||
&[Some(b"one".as_slice()), Some(b"two"), Some(b"three")],
|
||||
)
|
||||
.await?;
|
||||
|
||||
let pairs = collect_id_rowid(&table).await?;
|
||||
let by_id = |want: i64| pairs.iter().find(|(id, _)| *id == want).unwrap().1;
|
||||
let request = vec![by_id(3), by_id(1), by_id(3), by_id(2)];
|
||||
let bytes = table.fetch_blobs("image", &request).await?;
|
||||
assert_eq!(bytes.len(), 4);
|
||||
assert_eq!(bytes.value(0), b"three");
|
||||
assert_eq!(bytes.value(1), b"one");
|
||||
assert_eq!(bytes.value(2), b"three");
|
||||
assert_eq!(bytes.value(3), b"two");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fetch_blobs_empty_ids_returns_empty() -> Result<()> {
|
||||
let tmp = tempdir().unwrap();
|
||||
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"x".as_slice())]).await?;
|
||||
|
||||
assert_eq!(table.fetch_blobs("image", &[]).await?.len(), 0);
|
||||
assert!(table.fetch_blob_files("image", &[]).await?.is_empty());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fetch_blobs_out_of_range_id_errors_without_panic() -> Result<()> {
|
||||
let tmp = tempdir().unwrap();
|
||||
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"x".as_slice())]).await?;
|
||||
|
||||
let err = table.fetch_blobs("image", &[u64::MAX]).await.unwrap_err();
|
||||
assert!(err.to_string().contains("row ids"));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fetch_blobs_rejects_non_blob_column() -> Result<()> {
|
||||
let tmp = tempdir().unwrap();
|
||||
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"x".as_slice())]).await?;
|
||||
|
||||
let err = table.fetch_blobs("id", &[0]).await.unwrap_err();
|
||||
assert!(matches!(err, Error::InvalidInput { .. }));
|
||||
assert!(err.to_string().contains("'id' is not a blob column"));
|
||||
|
||||
let err = table.fetch_blob_files("id", &[0]).await.unwrap_err();
|
||||
assert!(err.to_string().contains("'id' is not a blob column"));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fetch_blobs_rejects_unknown_column() -> Result<()> {
|
||||
let tmp = tempdir().unwrap();
|
||||
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"x".as_slice())]).await?;
|
||||
|
||||
let err = table.fetch_blobs("missing", &[0]).await.unwrap_err();
|
||||
assert!(err.to_string().contains("no column named 'missing'"));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fetch_blobs_rejects_legacy_v1_blob_column() -> Result<()> {
|
||||
let tmp = tempdir().unwrap();
|
||||
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||
let legacy = Field::new("image", DataType::LargeBinary, true).with_metadata(
|
||||
std::collections::HashMap::from([("lance-encoding:blob".to_string(), "true".to_string())]),
|
||||
);
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Int64, false),
|
||||
legacy,
|
||||
]));
|
||||
let table = db.create_empty_table("t", schema).execute().await?;
|
||||
|
||||
let err = table.fetch_blobs("image", &[0]).await.unwrap_err();
|
||||
assert!(err.to_string().contains("legacy blob column"));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fetch_blob_files_reads_lazily_and_aligns_nulls() -> Result<()> {
|
||||
let tmp = tempdir().unwrap();
|
||||
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||
let table =
|
||||
create_inline_blob_table(&db, "t", &[1, 2], &[Some(b"lazy-bytes".as_slice()), None])
|
||||
.await?;
|
||||
|
||||
let pairs = collect_id_rowid(&table).await?;
|
||||
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
|
||||
let files = table.fetch_blob_files("image", &ids).await?;
|
||||
assert_eq!(files.len(), 2);
|
||||
for ((id, _), file) in pairs.iter().zip(&files) {
|
||||
match id {
|
||||
1 => {
|
||||
let handle = file.as_ref().unwrap();
|
||||
assert_eq!(handle.read().await.unwrap().as_ref(), b"lazy-bytes");
|
||||
}
|
||||
2 => assert!(file.is_none()),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fetch_blobs_reads_multiple_blob_columns_independently() -> Result<()> {
|
||||
let tmp = tempdir().unwrap();
|
||||
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Int64, false),
|
||||
blob("image", true),
|
||||
blob("thumbnail", true),
|
||||
]));
|
||||
let table = db.create_empty_table("t", schema).execute().await?;
|
||||
let batch = RecordBatch::try_new(
|
||||
Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Int64, false),
|
||||
Field::new("image", DataType::LargeBinary, true),
|
||||
Field::new("thumbnail", DataType::LargeBinary, true),
|
||||
])),
|
||||
vec![
|
||||
Arc::new(Int64Array::from(vec![1, 2])),
|
||||
Arc::new(LargeBinaryArray::from_iter(vec![
|
||||
Some(b"image-1".as_slice()),
|
||||
None,
|
||||
])),
|
||||
Arc::new(LargeBinaryArray::from_iter(vec![
|
||||
None,
|
||||
Some(b"thumb-2".as_slice()),
|
||||
])),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
table.add(batch).execute().await?;
|
||||
|
||||
let pairs = collect_id_rowid(&table).await?;
|
||||
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
|
||||
let images = table.fetch_blobs("image", &ids).await?;
|
||||
let thumbs = table.fetch_blobs("thumbnail", &ids).await?;
|
||||
for (i, (id, _)) in pairs.iter().enumerate() {
|
||||
match id {
|
||||
1 => {
|
||||
assert_eq!(images.value(i), b"image-1");
|
||||
assert!(thumbs.is_null(i));
|
||||
}
|
||||
2 => {
|
||||
assert!(images.is_null(i));
|
||||
assert_eq!(thumbs.value(i), b"thumb-2");
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fetch_blobs_spans_fragments() -> Result<()> {
|
||||
let tmp = tempdir().unwrap();
|
||||
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"frag-one".as_slice())]).await?;
|
||||
table
|
||||
.add(binary_input_batch(&[2], &[Some(b"frag-two".as_slice())]))
|
||||
.execute()
|
||||
.await?;
|
||||
|
||||
let pairs = collect_id_rowid(&table).await?;
|
||||
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
|
||||
let bytes = table.fetch_blobs("image", &ids).await?;
|
||||
for (i, (id, _)) in pairs.iter().enumerate() {
|
||||
match id {
|
||||
1 => assert_eq!(bytes.value(i), b"frag-one"),
|
||||
2 => assert_eq!(bytes.value(i), b"frag-two"),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fetch_blobs_packed_payload_round_trip() -> Result<()> {
|
||||
let tmp = tempdir().unwrap();
|
||||
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||
let big = vec![0xAB_u8; 100 * 1024];
|
||||
let small = b"small".to_vec();
|
||||
let table = create_inline_blob_table(
|
||||
&db,
|
||||
"t",
|
||||
&[1, 2],
|
||||
&[Some(big.as_slice()), Some(small.as_slice())],
|
||||
)
|
||||
.await?;
|
||||
|
||||
let pairs = collect_id_rowid(&table).await?;
|
||||
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
|
||||
let bytes = table.fetch_blobs("image", &ids).await?;
|
||||
for (i, (id, _)) in pairs.iter().enumerate() {
|
||||
match id {
|
||||
1 => assert_eq!(bytes.value(i), big.as_slice()),
|
||||
2 => assert_eq!(bytes.value(i), small.as_slice()),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fetch_blobs_after_delete() -> Result<()> {
|
||||
let tmp = tempdir().unwrap();
|
||||
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||
let table = create_inline_blob_table(
|
||||
&db,
|
||||
"t",
|
||||
&[1, 2, 3],
|
||||
&[Some(b"one".as_slice()), Some(b"two"), Some(b"three")],
|
||||
)
|
||||
.await?;
|
||||
|
||||
table.delete("id = 2").await?;
|
||||
let pairs = collect_id_rowid(&table).await?;
|
||||
assert_eq!(pairs.len(), 2);
|
||||
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
|
||||
let bytes = table.fetch_blobs("image", &ids).await?;
|
||||
for (i, (id, _)) in pairs.iter().enumerate() {
|
||||
match id {
|
||||
1 => assert_eq!(bytes.value(i), b"one"),
|
||||
3 => assert_eq!(bytes.value(i), b"three"),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fetch_blobs_with_precompaction_row_ids_survives_compaction() -> Result<()> {
|
||||
let tmp = tempdir().unwrap();
|
||||
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"frag-one".as_slice())]).await?;
|
||||
table
|
||||
.add(binary_input_batch(&[2], &[Some(b"frag-two".as_slice())]))
|
||||
.execute()
|
||||
.await?;
|
||||
|
||||
let pairs_before = collect_id_rowid(&table).await?;
|
||||
let ids_before: Vec<u64> = pairs_before.iter().map(|(_, rowid)| *rowid).collect();
|
||||
|
||||
table
|
||||
.optimize(OptimizeAction::Compact {
|
||||
options: CompactionOptions::default(),
|
||||
remap_options: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
let bytes_after = table.fetch_blobs("image", &ids_before).await?;
|
||||
assert_eq!(bytes_after.len(), 2);
|
||||
for (i, (id, _)) in pairs_before.iter().enumerate() {
|
||||
match id {
|
||||
1 => assert_eq!(bytes_after.value(i), b"frag-one"),
|
||||
2 => assert_eq!(bytes_after.value(i), b"frag-two"),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn zero_length_blob_reads_back_as_null() -> Result<()> {
|
||||
let tmp = tempdir().unwrap();
|
||||
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"".as_slice())]).await?;
|
||||
|
||||
let ids = collect_row_ids(&table).await?;
|
||||
let bytes = table.fetch_blobs("image", &ids).await?;
|
||||
assert_eq!(bytes.len(), 1);
|
||||
assert!(bytes.is_null(0));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
const DEDICATED_BLOB_LEN: usize = 64 * 1024;
|
||||
const SCRAMBLED_LOGICAL_IDS: [i64; 7] = [6, 3, 1, 4, 6, 2, 5];
|
||||
|
||||
fn dedicated_blob_bytes(tag: u8) -> Vec<u8> {
|
||||
vec![tag; DEDICATED_BLOB_LEN]
|
||||
}
|
||||
|
||||
async fn multi_fragment_dedicated_blob_table(db: &Connection) -> Result<Table> {
|
||||
let rows: [(i64, Option<u8>); 6] = [
|
||||
(1, Some(1)),
|
||||
(2, Some(2)),
|
||||
(3, None),
|
||||
(4, Some(4)),
|
||||
(5, None),
|
||||
(6, Some(6)),
|
||||
];
|
||||
let mut table: Option<Table> = None;
|
||||
for (logical_id, blob_tag) in rows {
|
||||
let bytes = blob_tag.map(dedicated_blob_bytes);
|
||||
let image = [bytes.as_deref()];
|
||||
table = Some(match table {
|
||||
None => create_inline_blob_table(db, "t", &[logical_id], &image).await?,
|
||||
Some(t) => {
|
||||
t.add(binary_input_batch(&[logical_id], &image))
|
||||
.execute()
|
||||
.await?;
|
||||
t
|
||||
}
|
||||
});
|
||||
}
|
||||
Ok(table.unwrap())
|
||||
}
|
||||
|
||||
async fn row_ids_for_logical(table: &Table, logical_ids: &[i64]) -> Result<Vec<u64>> {
|
||||
let id_rowid = collect_id_rowid(table).await?;
|
||||
Ok(logical_ids
|
||||
.iter()
|
||||
.map(|logical_id| {
|
||||
id_rowid
|
||||
.iter()
|
||||
.find(|(id, _)| id == logical_id)
|
||||
.map(|(_, row_id)| *row_id)
|
||||
.unwrap()
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fetch_blobs_aligns_across_fragments_with_nulls_and_dups() -> Result<()> {
|
||||
let tmp = tempdir().unwrap();
|
||||
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||
let table = multi_fragment_dedicated_blob_table(&db).await?;
|
||||
let row_ids = row_ids_for_logical(&table, &SCRAMBLED_LOGICAL_IDS).await?;
|
||||
|
||||
let bytes = table.fetch_blobs("image", &row_ids).await?;
|
||||
assert_eq!(bytes.len(), SCRAMBLED_LOGICAL_IDS.len());
|
||||
for (slot, logical_id) in SCRAMBLED_LOGICAL_IDS.iter().enumerate() {
|
||||
match logical_id {
|
||||
3 | 5 => assert!(bytes.is_null(slot)),
|
||||
id => assert_eq!(
|
||||
bytes.value(slot),
|
||||
dedicated_blob_bytes(*id as u8).as_slice()
|
||||
),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fetch_blob_files_aligns_across_fragments_with_nulls_and_dups() -> Result<()> {
|
||||
let tmp = tempdir().unwrap();
|
||||
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||
let table = multi_fragment_dedicated_blob_table(&db).await?;
|
||||
let row_ids = row_ids_for_logical(&table, &SCRAMBLED_LOGICAL_IDS).await?;
|
||||
|
||||
let files = table.fetch_blob_files("image", &row_ids).await?;
|
||||
assert_eq!(files.len(), SCRAMBLED_LOGICAL_IDS.len());
|
||||
for (slot, logical_id) in SCRAMBLED_LOGICAL_IDS.iter().enumerate() {
|
||||
match logical_id {
|
||||
3 | 5 => assert!(files[slot].is_none()),
|
||||
id => {
|
||||
let payload = files[slot].as_ref().unwrap().read().await?;
|
||||
assert_eq!(payload.as_ref(), dedicated_blob_bytes(*id as u8).as_slice());
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user