Better error granularity for table operations (#113)

This commit is contained in:
gsilvestrin
2023-06-01 09:04:42 -07:00
committed by GitHub
parent 53f3882d6e
commit 6d8cf52e01
5 changed files with 91 additions and 44 deletions

1
Cargo.lock generated
View File

@@ -3366,6 +3366,7 @@ dependencies = [
"lance",
"object_store",
"rand",
"snafu",
"tempfile",
"tokio",
]

View File

@@ -13,7 +13,7 @@ arrow-array = "37.0"
arrow-data = "37.0"
arrow-schema = "37.0"
object_store = "0.5.6"
snafu = "0.7.4"
lance = "0.4.17"
tokio = { version = "1.23", features = ["rt-multi-thread"] }

View File

@@ -17,8 +17,9 @@ use std::path::Path;
use arrow_array::RecordBatchReader;
use lance::io::object_store::ObjectStore;
use snafu::prelude::*;
use crate::error::Result;
use crate::error::{CreateDirSnafu, Result};
use crate::table::Table;
pub struct Database {
@@ -43,10 +44,7 @@ impl Database {
pub async fn connect(uri: &str) -> Result<Database> {
let object_store = ObjectStore::new(uri).await?;
if object_store.is_local() {
let path = Path::new(uri);
if !path.try_exists()? {
create_dir_all(&path)?;
}
Self::try_create_dir(uri).context(CreateDirSnafu { path: uri })?;
}
Ok(Database {
uri: uri.to_string(),
@@ -54,6 +52,15 @@ impl Database {
})
}
/// Try to create a local directory to store the lancedb dataset
fn try_create_dir(path: &str) -> core::result::Result<(), std::io::Error> {
let path = Path::new(path);
if !path.try_exists()? {
create_dir_all(&path)?;
}
Ok(())
}
/// Get the names of all tables in the database.
///
/// # Returns

View File

@@ -12,44 +12,50 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#[derive(Debug)]
pub enum Error {
IO(String),
Lance(String),
}
use snafu::Snafu;
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let (catalog, message) = match self {
Self::IO(s) => ("I/O", s.as_str()),
Self::Lance(s) => ("Lance", s.as_str()),
};
write!(f, "LanceDBError({catalog}): {message}")
}
#[derive(Debug, Snafu)]
#[snafu(visibility(pub(crate)))]
pub enum Error {
#[snafu(display("LanceDBError: Invalid table name: {name}"))]
InvalidTableName { name: String },
#[snafu(display("LanceDBError: Table '{name}' was not found"))]
TableNotFound { name: String },
#[snafu(display("LanceDBError: Table '{name}' already exists"))]
TableAlreadyExists { name: String },
#[snafu(display("LanceDBError: Unable to created lance dataset at {path}: {source}"))]
CreateDir {
path: String,
source: std::io::Error,
},
#[snafu(display("LanceDBError: {message}"))]
Store { message: String },
#[snafu(display("LanceDBError: {message}"))]
Lance { message: String },
}
pub type Result<T> = std::result::Result<T, Error>;
impl From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Self {
Self::IO(e.to_string())
}
}
impl From<lance::Error> for Error {
fn from(e: lance::Error) -> Self {
Self::Lance(e.to_string())
Self::Lance {
message: e.to_string(),
}
}
}
impl From<object_store::Error> for Error {
fn from(e: object_store::Error) -> Self {
Self::IO(e.to_string())
Self::Store {
message: e.to_string(),
}
}
}
impl From<object_store::path::Error> for Error {
fn from(e: object_store::path::Error) -> Self {
Self::IO(e.to_string())
Self::Store {
message: e.to_string(),
}
}
}

View File

@@ -18,8 +18,9 @@ use std::sync::Arc;
use arrow_array::{Float32Array, RecordBatchReader};
use lance::dataset::{Dataset, WriteMode, WriteParams};
use lance::index::IndexType;
use snafu::prelude::*;
use crate::error::{Error, Result};
use crate::error::{Error, InvalidTableNameSnafu, Result};
use crate::index::vector::VectorIndexBuilder;
use crate::query::Query;
@@ -27,6 +28,7 @@ pub const VECTOR_COLUMN_NAME: &str = "vector";
pub const LANCE_FILE_EXTENSION: &str = "lance";
/// A table in a LanceDB database.
#[derive(Debug)]
pub struct Table {
name: String,
uri: String,
@@ -57,9 +59,16 @@ impl Table {
let uri = table_uri
.as_path()
.to_str()
.ok_or(Error::IO(format!("Invalid table name: {}", name)))?;
.context(InvalidTableNameSnafu { name })?;
let dataset = Dataset::open(&uri).await?;
let dataset = Dataset::open(&uri).await.map_err(|e| match e {
lance::Error::DatasetNotFound { .. } => Error::TableNotFound {
name: name.to_string(),
},
e => Error::Lance {
message: e.to_string(),
},
})?;
Ok(Table {
name: name.to_string(),
uri: uri.to_string(),
@@ -88,14 +97,22 @@ impl Table {
let uri = table_uri
.as_path()
.to_str()
.ok_or(Error::IO(format!("Invalid table name: {}", name)))?
.context(InvalidTableNameSnafu { name })?
.to_string();
let dataset =
Arc::new(Dataset::write(&mut batches, &uri, Some(WriteParams::default())).await?);
let dataset = Dataset::write(&mut batches, &uri, Some(WriteParams::default()))
.await
.map_err(|e| match e {
lance::Error::DatasetAlreadyExists { .. } => Error::TableAlreadyExists {
name: name.to_string(),
},
e => Error::Lance {
message: e.to_string(),
},
})?;
Ok(Table {
name: name.to_string(),
uri,
dataset,
dataset: Arc::new(dataset),
})
}
@@ -178,15 +195,6 @@ mod tests {
use super::*;
use crate::index::vector::IvfPQIndexBuilder;
#[tokio::test]
async fn test_new_table_not_exists() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let table = Table::open(&uri, "test").await;
assert!(table.is_err());
}
#[tokio::test]
async fn test_open() {
let tmp_dir = tempdir().unwrap();
@@ -203,6 +211,14 @@ mod tests {
assert_eq!(table.name, "test")
}
#[tokio::test]
async fn test_open_not_found() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let table = Table::open(uri, "test").await;
assert!(matches!(table.unwrap_err(), Error::TableNotFound { .. }));
}
#[test]
fn test_object_store_path() {
use std::path::Path as StdPath;
@@ -211,6 +227,23 @@ mod tests {
assert_eq!(c.to_str().unwrap(), "s3://bucket/path/to/file/subfile");
}
#[tokio::test]
async fn test_create_already_exists() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let batches: Box<dyn RecordBatchReader> = Box::new(make_test_batches());
let schema = batches.schema().clone();
Table::create(&uri, "test", batches).await.unwrap();
let batches: Box<dyn RecordBatchReader> = Box::new(make_test_batches());
let result = Table::create(&uri, "test", batches).await;
assert!(matches!(
result.unwrap_err(),
Error::TableAlreadyExists { .. }
));
}
#[tokio::test]
async fn test_add() {
let tmp_dir = tempdir().unwrap();