feat: port create_table to the async python API and the remote rust API (#1031)

I've also started `ASYNC_MIGRATION.MD` to keep track of the breaking
changes from sync to async python.
This commit is contained in:
Weston Pace
2024-02-29 13:29:29 -08:00
committed by GitHub
parent 085066d2a8
commit 2a02d1394b
29 changed files with 1406 additions and 53 deletions

View File

@@ -12,19 +12,33 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use std::{sync::Arc, time::Duration};
use lancedb::connection::Connection as LanceConnection;
use pyo3::{pyclass, pyfunction, pymethods, PyAny, PyRef, PyResult, Python};
use arrow::{datatypes::Schema, ffi_stream::ArrowArrayStreamReader, pyarrow::FromPyArrow};
use lancedb::connection::{Connection as LanceConnection, CreateTableMode};
use pyo3::{
exceptions::PyValueError, pyclass, pyfunction, pymethods, PyAny, PyRef, PyResult, Python,
};
use pyo3_asyncio::tokio::future_into_py;
use crate::error::PythonErrorExt;
use crate::{error::PythonErrorExt, table::Table};
#[pyclass]
pub struct Connection {
inner: LanceConnection,
}
impl Connection {
fn parse_create_mode_str(mode: &str) -> PyResult<CreateTableMode> {
match mode {
"create" => Ok(CreateTableMode::Create),
"overwrite" => Ok(CreateTableMode::Overwrite),
"exist_ok" => Ok(CreateTableMode::exist_ok(|builder| builder)),
_ => Err(PyValueError::new_err(format!("Invalid mode {}", mode))),
}
}
}
#[pymethods]
impl Connection {
pub fn table_names(self_: PyRef<'_, Self>) -> PyResult<&PyAny> {
@@ -33,6 +47,51 @@ impl Connection {
inner.table_names().await.infer_error()
})
}
pub fn create_table<'a>(
self_: PyRef<'a, Self>,
name: String,
mode: &str,
data: &PyAny,
) -> PyResult<&'a PyAny> {
let inner = self_.inner.clone();
let mode = Self::parse_create_mode_str(mode)?;
let batches = Box::new(ArrowArrayStreamReader::from_pyarrow(data)?);
future_into_py(self_.py(), async move {
let table = inner
.create_table(name, batches)
.mode(mode)
.execute()
.await
.infer_error()?;
Ok(Table::new(table))
})
}
pub fn create_empty_table<'a>(
self_: PyRef<'a, Self>,
name: String,
mode: &str,
schema: &PyAny,
) -> PyResult<&'a PyAny> {
let inner = self_.inner.clone();
let mode = Self::parse_create_mode_str(mode)?;
let schema = Schema::from_pyarrow(schema)?;
future_into_py(self_.py(), async move {
let table = inner
.create_empty_table(name, Arc::new(schema))
.mode(mode)
.execute()
.await
.infer_error()?;
Ok(Table::new(table))
})
}
}
#[pyfunction]

View File

@@ -45,6 +45,7 @@ impl<T> PythonErrorExt<T> for std::result::Result<T, LanceError> {
LanceError::Lance { .. } => self.runtime_error(),
LanceError::Runtime { .. } => self.runtime_error(),
LanceError::Http { .. } => self.runtime_error(),
LanceError::Arrow { .. } => self.runtime_error(),
},
}
}

View File

@@ -17,7 +17,8 @@ use env_logger::Env;
use pyo3::{pymodule, types::PyModule, wrap_pyfunction, PyResult, Python};
pub mod connection;
pub(crate) mod error;
pub mod error;
pub mod table;
#[pymodule]
pub fn _lancedb(_py: Python, m: &PyModule) -> PyResult<()> {

34
python/src/table.rs Normal file
View File

@@ -0,0 +1,34 @@
use std::sync::Arc;
use arrow::pyarrow::ToPyArrow;
use lancedb::table::Table as LanceTable;
use pyo3::{pyclass, pymethods, PyAny, PyRef, PyResult, Python};
use pyo3_asyncio::tokio::future_into_py;
use crate::error::PythonErrorExt;
#[pyclass]
pub struct Table {
inner: Arc<dyn LanceTable>,
}
impl Table {
pub(crate) fn new(inner: Arc<dyn LanceTable>) -> Self {
Self { inner }
}
}
#[pymethods]
impl Table {
pub fn name(&self) -> String {
self.inner.name().to_string()
}
pub fn schema(self_: PyRef<'_, Self>) -> PyResult<&PyAny> {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
let schema = inner.schema().await.infer_error()?;
Python::with_gil(|py| schema.to_pyarrow(py))
})
}
}