feat: upgrade lance to 0.15.0 (#1477)

Changelog: https://github.com/lancedb/lance/releases/tag/v0.15.0

* Fixes #1466
* Closes #1475
* Fixes #1446
This commit is contained in:
Will Jones
2024-07-26 09:13:49 -07:00
committed by GitHub
parent 513926960d
commit 9555efacf9
10 changed files with 114 additions and 243 deletions

View File

@@ -20,29 +20,29 @@ keywords = ["lancedb", "lance", "database", "vector", "search"]
categories = ["database-implementations"]
[workspace.dependencies]
lance = { "version" = "=0.14.1", "features" = ["dynamodb"] }
lance-index = { "version" = "=0.14.1" }
lance-linalg = { "version" = "=0.14.1" }
lance-testing = { "version" = "=0.14.1" }
lance-datafusion = { "version" = "=0.14.1" }
lance = { "version" = "=0.15.0", "features" = ["dynamodb"] }
lance-index = { "version" = "=0.15.0" }
lance-linalg = { "version" = "=0.15.0" }
lance-testing = { "version" = "=0.15.0" }
lance-datafusion = { "version" = "=0.15.0" }
# Note that this one does not include pyarrow
arrow = { version = "51.0", optional = false }
arrow-array = "51.0"
arrow-data = "51.0"
arrow-ipc = "51.0"
arrow-ord = "51.0"
arrow-schema = "51.0"
arrow-arith = "51.0"
arrow-cast = "51.0"
arrow = { version = "52.1", optional = false }
arrow-array = "52.1"
arrow-data = "52.1"
arrow-ipc = "52.1"
arrow-ord = "52.1"
arrow-schema = "52.1"
arrow-arith = "52.1"
arrow-cast = "52.1"
async-trait = "0"
chrono = "0.4.35"
datafusion-physical-plan = "37.1"
datafusion-physical-plan = "40.0"
half = { "version" = "=2.4.1", default-features = false, features = [
"num-traits",
] }
futures = "0"
log = "0.4"
object_store = "0.9.0"
object_store = "0.10.1"
pin-project = "1.0.7"
snafu = "0.7.4"
url = "2"

View File

@@ -14,11 +14,13 @@ name = "_lancedb"
crate-type = ["cdylib"]
[dependencies]
arrow = { version = "51.0.0", features = ["pyarrow"] }
arrow = { version = "52.1", features = ["pyarrow"] }
lancedb = { path = "../rust/lancedb" }
env_logger = "0.10"
pyo3 = { version = "0.20", features = ["extension-module", "abi3-py38"] }
pyo3-asyncio = { version = "0.20", features = ["attributes", "tokio-runtime"] }
pyo3 = { version = "0.21", features = ["extension-module", "abi3-py38", "gil-refs"] }
# Using this fork for now: https://github.com/awestlake87/pyo3-asyncio/issues/119
# pyo3-asyncio = { version = "0.20", features = ["attributes", "tokio-runtime"] }
pyo3-asyncio-0-21 = { version = "0.21.0", features = ["attributes", "tokio-runtime"] }
# Prevent dynamic linking of lzma, which comes from datafusion
lzma-sys = { version = "*", features = ["static"] }

View File

@@ -3,7 +3,7 @@ name = "lancedb"
# version in Cargo.toml
dependencies = [
"deprecation",
"pylance==0.14.1",
"pylance==0.15.0",
"ratelimiter~=1.0",
"requests>=2.31.0",
"retry>=0.9.2",

View File

@@ -428,9 +428,9 @@ class LanceQueryBuilder(ABC):
>>> query = [100, 100]
>>> plan = table.search(query).explain_plan(True)
>>> print(plan) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
Projection: fields=[vector, _distance]
ProjectionExec: expr=[vector@0 as vector, _distance@2 as _distance]
FilterExec: _distance@2 IS NOT NULL
SortExec: TopK(fetch=10), expr=[_distance@2 ASC NULLS LAST]
SortExec: TopK(fetch=10), expr=[_distance@2 ASC NULLS LAST], preserve_partitioning=[false]
KNNVectorDistance: metric=l2
LanceScan: uri=..., projection=[vector], row_id=true, row_addr=false, ordered=false
@@ -1214,9 +1214,9 @@ class AsyncQueryBase(object):
... plan = await table.query().nearest_to([1, 2]).explain_plan(True)
... print(plan)
>>> asyncio.run(doctest_example()) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
Projection: fields=[vector, _distance]
ProjectionExec: expr=[vector@0 as vector, _distance@2 as _distance]
FilterExec: _distance@2 IS NOT NULL
SortExec: TopK(fetch=10), expr=[_distance@2 ASC NULLS LAST]
SortExec: TopK(fetch=10), expr=[_distance@2 ASC NULLS LAST], preserve_partitioning=[false]
KNNVectorDistance: metric=l2
LanceScan: uri=..., projection=[vector], row_id=true, row_addr=false, ordered=false

View File

@@ -9,8 +9,8 @@ use arrow::{
};
use futures::stream::StreamExt;
use lancedb::arrow::SendableRecordBatchStream;
use pyo3::{pyclass, pymethods, PyAny, PyObject, PyRef, PyResult, Python};
use pyo3_asyncio::tokio::future_into_py;
use pyo3::{pyclass, pymethods, Bound, PyAny, PyObject, PyRef, PyResult, Python};
use pyo3_asyncio_0_21::tokio::future_into_py;
use crate::error::PythonErrorExt;
@@ -36,7 +36,7 @@ impl RecordBatchStream {
(*self.schema).clone().into_pyarrow(py)
}
pub fn next(self_: PyRef<'_, Self>) -> PyResult<&PyAny> {
pub fn next(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
let inner_next = inner.lock().await.next().await;

View File

@@ -18,9 +18,9 @@ use arrow::{datatypes::Schema, ffi_stream::ArrowArrayStreamReader, pyarrow::From
use lancedb::connection::{Connection as LanceConnection, CreateTableMode};
use pyo3::{
exceptions::{PyRuntimeError, PyValueError},
pyclass, pyfunction, pymethods, PyAny, PyRef, PyResult, Python,
pyclass, pyfunction, pymethods, Bound, PyAny, PyRef, PyResult, Python,
};
use pyo3_asyncio::tokio::future_into_py;
use pyo3_asyncio_0_21::tokio::future_into_py;
use crate::{error::PythonErrorExt, table::Table};
@@ -73,7 +73,7 @@ impl Connection {
self_: PyRef<'_, Self>,
start_after: Option<String>,
limit: Option<u32>,
) -> PyResult<&PyAny> {
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
let mut op = inner.table_names();
if let Some(start_after) = start_after {
@@ -89,15 +89,15 @@ impl Connection {
self_: PyRef<'a, Self>,
name: String,
mode: &str,
data: &PyAny,
data: Bound<'_, PyAny>,
storage_options: Option<HashMap<String, String>>,
use_legacy_format: Option<bool>,
) -> PyResult<&'a PyAny> {
) -> PyResult<Bound<'a, PyAny>> {
let inner = self_.get_inner()?.clone();
let mode = Self::parse_create_mode_str(mode)?;
let batches = ArrowArrayStreamReader::from_pyarrow(data)?;
let batches = ArrowArrayStreamReader::from_pyarrow_bound(&data)?;
let mut builder = inner.create_table(name, batches).mode(mode);
if let Some(storage_options) = storage_options {
@@ -118,15 +118,15 @@ impl Connection {
self_: PyRef<'a, Self>,
name: String,
mode: &str,
schema: &PyAny,
schema: Bound<'_, PyAny>,
storage_options: Option<HashMap<String, String>>,
use_legacy_format: Option<bool>,
) -> PyResult<&'a PyAny> {
) -> PyResult<Bound<'a, PyAny>> {
let inner = self_.get_inner()?.clone();
let mode = Self::parse_create_mode_str(mode)?;
let schema = Schema::from_pyarrow(schema)?;
let schema = Schema::from_pyarrow_bound(&schema)?;
let mut builder = inner.create_empty_table(name, Arc::new(schema)).mode(mode);
@@ -150,7 +150,7 @@ impl Connection {
name: String,
storage_options: Option<HashMap<String, String>>,
index_cache_size: Option<u32>,
) -> PyResult<&PyAny> {
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
let mut builder = inner.open_table(name);
if let Some(storage_options) = storage_options {
@@ -165,14 +165,14 @@ impl Connection {
})
}
pub fn drop_table(self_: PyRef<'_, Self>, name: String) -> PyResult<&PyAny> {
pub fn drop_table(self_: PyRef<'_, Self>, name: String) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
future_into_py(self_.py(), async move {
inner.drop_table(name).await.infer_error()
})
}
pub fn drop_db(self_: PyRef<'_, Self>) -> PyResult<&PyAny> {
pub fn drop_db(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
future_into_py(
self_.py(),
@@ -190,7 +190,7 @@ pub fn connect(
host_override: Option<String>,
read_consistency_interval: Option<f64>,
storage_options: Option<HashMap<String, String>>,
) -> PyResult<&PyAny> {
) -> PyResult<Bound<'_, PyAny>> {
future_into_py(py, async move {
let mut builder = lancedb::connect(&uri);
if let Some(api_key) = api_key {

View File

@@ -22,10 +22,11 @@ use lancedb::query::{
use pyo3::exceptions::PyRuntimeError;
use pyo3::pyclass;
use pyo3::pymethods;
use pyo3::Bound;
use pyo3::PyAny;
use pyo3::PyRef;
use pyo3::PyResult;
use pyo3_asyncio::tokio::future_into_py;
use pyo3_asyncio_0_21::tokio::future_into_py;
use crate::arrow::RecordBatchStream;
use crate::error::PythonErrorExt;
@@ -60,14 +61,17 @@ impl Query {
self.inner = self.inner.clone().limit(limit as usize);
}
pub fn nearest_to(&mut self, vector: &PyAny) -> PyResult<VectorQuery> {
let data: ArrayData = ArrayData::from_pyarrow(vector)?;
pub fn nearest_to(&mut self, vector: Bound<'_, PyAny>) -> PyResult<VectorQuery> {
let data: ArrayData = ArrayData::from_pyarrow_bound(&vector)?;
let array = make_array(data);
let inner = self.inner.clone().nearest_to(array).infer_error()?;
Ok(VectorQuery { inner })
}
pub fn execute(self_: PyRef<'_, Self>, max_batch_length: Option<u32>) -> PyResult<&PyAny> {
pub fn execute(
self_: PyRef<'_, Self>,
max_batch_length: Option<u32>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
let mut opts = QueryExecutionOptions::default();
@@ -79,7 +83,7 @@ impl Query {
})
}
fn explain_plan(self_: PyRef<'_, Self>, verbose: bool) -> PyResult<&PyAny> {
fn explain_plan(self_: PyRef<'_, Self>, verbose: bool) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
inner
@@ -139,7 +143,10 @@ impl VectorQuery {
self.inner = self.inner.clone().bypass_vector_index()
}
pub fn execute(self_: PyRef<'_, Self>, max_batch_length: Option<u32>) -> PyResult<&PyAny> {
pub fn execute(
self_: PyRef<'_, Self>,
max_batch_length: Option<u32>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
let mut opts = QueryExecutionOptions::default();
@@ -151,7 +158,7 @@ impl VectorQuery {
})
}
fn explain_plan(self_: PyRef<'_, Self>, verbose: bool) -> PyResult<&PyAny> {
fn explain_plan(self_: PyRef<'_, Self>, verbose: bool) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
inner

View File

@@ -9,9 +9,9 @@ use pyo3::{
exceptions::{PyRuntimeError, PyValueError},
pyclass, pymethods,
types::{PyDict, PyString},
PyAny, PyRef, PyResult, Python,
Bound, PyAny, PyRef, PyResult, Python,
};
use pyo3_asyncio::tokio::future_into_py;
use pyo3_asyncio_0_21::tokio::future_into_py;
use crate::{
error::PythonErrorExt,
@@ -91,7 +91,7 @@ impl Table {
self.inner.take();
}
pub fn schema(self_: PyRef<'_, Self>) -> PyResult<&PyAny> {
pub fn schema(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
let schema = inner.schema().await.infer_error()?;
@@ -99,8 +99,12 @@ impl Table {
})
}
pub fn add<'a>(self_: PyRef<'a, Self>, data: &PyAny, mode: String) -> PyResult<&'a PyAny> {
let batches = ArrowArrayStreamReader::from_pyarrow(data)?;
pub fn add<'a>(
self_: PyRef<'a, Self>,
data: Bound<'_, PyAny>,
mode: String,
) -> PyResult<Bound<'a, PyAny>> {
let batches = ArrowArrayStreamReader::from_pyarrow_bound(&data)?;
let mut op = self_.inner_ref()?.add(batches);
if mode == "append" {
op = op.mode(AddDataMode::Append);
@@ -116,7 +120,7 @@ impl Table {
})
}
pub fn delete(self_: PyRef<'_, Self>, condition: String) -> PyResult<&PyAny> {
pub fn delete(self_: PyRef<'_, Self>, condition: String) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
inner.delete(&condition).await.infer_error()
@@ -127,7 +131,7 @@ impl Table {
self_: PyRef<'a, Self>,
updates: &PyDict,
r#where: Option<String>,
) -> PyResult<&'a PyAny> {
) -> PyResult<Bound<'a, PyAny>> {
let mut op = self_.inner_ref()?.update();
if let Some(only_if) = r#where {
op = op.only_if(only_if);
@@ -145,7 +149,10 @@ impl Table {
})
}
pub fn count_rows(self_: PyRef<'_, Self>, filter: Option<String>) -> PyResult<&PyAny> {
pub fn count_rows(
self_: PyRef<'_, Self>,
filter: Option<String>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
inner.count_rows(filter).await.infer_error()
@@ -157,7 +164,7 @@ impl Table {
column: String,
index: Option<&Index>,
replace: Option<bool>,
) -> PyResult<&'a PyAny> {
) -> PyResult<Bound<'a, PyAny>> {
let index = if let Some(index) = index {
index.consume()?
} else {
@@ -174,7 +181,7 @@ impl Table {
})
}
pub fn list_indices(self_: PyRef<'_, Self>) -> PyResult<&PyAny> {
pub fn list_indices(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
Ok(inner
@@ -194,7 +201,7 @@ impl Table {
}
}
pub fn version(self_: PyRef<'_, Self>) -> PyResult<&PyAny> {
pub fn version(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(
self_.py(),
@@ -202,21 +209,21 @@ impl Table {
)
}
pub fn checkout(self_: PyRef<'_, Self>, version: u64) -> PyResult<&PyAny> {
pub fn checkout(self_: PyRef<'_, Self>, version: u64) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
inner.checkout(version).await.infer_error()
})
}
pub fn checkout_latest(self_: PyRef<'_, Self>) -> PyResult<&PyAny> {
pub fn checkout_latest(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
inner.checkout_latest().await.infer_error()
})
}
pub fn restore(self_: PyRef<'_, Self>) -> PyResult<&PyAny> {
pub fn restore(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(
self_.py(),
@@ -228,7 +235,10 @@ impl Table {
Query::new(self.inner_ref().unwrap().query())
}
pub fn optimize(self_: PyRef<'_, Self>, cleanup_since_ms: Option<u64>) -> PyResult<&PyAny> {
pub fn optimize(
self_: PyRef<'_, Self>,
cleanup_since_ms: Option<u64>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
let older_than = if let Some(ms) = cleanup_since_ms {
if ms > i64::MAX as u64 {

View File

@@ -57,14 +57,11 @@ tempfile = "3.5.0"
rand = { version = "0.8.3", features = ["small_rng"] }
uuid = { version = "1.7.0", features = ["v4"] }
walkdir = "2"
# For s3 integration tests (dev deps aren't allowed to be optional atm)
# We pin these because the content-length check breaks with localstack
# https://github.com/smithy-lang/smithy-rs/releases/tag/release-2024-05-21
aws-sdk-dynamodb = { version = "=1.23.0" }
aws-sdk-s3 = { version = "=1.23.0" }
aws-sdk-kms = { version = "=1.21.0" }
aws-sdk-dynamodb = { version = "1.38.0" }
aws-sdk-s3 = { version = "1.38.0" }
aws-sdk-kms = { version = "1.37" }
aws-config = { version = "1.0" }
aws-smithy-runtime = { version = "=1.3.1" }
aws-smithy-runtime = { version = "1.3" }
[features]
default = []

View File

@@ -14,26 +14,16 @@
//! A mirroring object store that mirror writes to a secondary object store
use std::{
fmt::Formatter,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use std::{fmt::Formatter, sync::Arc};
use bytes::Bytes;
use futures::{stream::BoxStream, FutureExt, StreamExt};
use futures::{stream::BoxStream, TryFutureExt};
use lance::io::WrappingObjectStore;
use object_store::{
path::Path, Error, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
PutOptions, PutResult, Result,
path::Path, Error, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart,
};
use async_trait::async_trait;
use tokio::{
io::{AsyncWrite, AsyncWriteExt},
task::JoinHandle,
};
#[derive(Debug)]
struct MirroringObjectStore {
@@ -72,19 +62,10 @@ impl PrimaryOnly for Path {
/// Note: this object store does not mirror writes to *.manifest files
#[async_trait]
impl ObjectStore for MirroringObjectStore {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
if location.primary_only() {
self.primary.put(location, bytes).await
} else {
self.secondary.put(location, bytes.clone()).await?;
self.primary.put(location, bytes).await
}
}
async fn put_opts(
&self,
location: &Path,
bytes: Bytes,
bytes: PutPayload,
options: PutOptions,
) -> Result<PutResult> {
if location.primary_only() {
@@ -97,32 +78,22 @@ impl ObjectStore for MirroringObjectStore {
}
}
async fn put_multipart(
async fn put_multipart_opts(
&self,
location: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
opts: PutMultipartOpts,
) -> Result<Box<dyn MultipartUpload>> {
if location.primary_only() {
return self.primary.put_multipart(location).await;
return self.primary.put_multipart_opts(location, opts).await;
}
let (id, stream) = self.secondary.put_multipart(location).await?;
let secondary = self
.secondary
.put_multipart_opts(location, opts.clone())
.await?;
let primary = self.primary.put_multipart_opts(location, opts).await?;
let mirroring_upload = MirroringUpload::new(
Pin::new(stream),
self.primary.clone(),
self.secondary.clone(),
location.clone(),
);
Ok((id, Box::new(mirroring_upload)))
}
async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> {
if location.primary_only() {
return self.primary.abort_multipart(location, multipart_id).await;
}
self.secondary.abort_multipart(location, multipart_id).await
Ok(Box::new(MirroringUpload { primary, secondary }))
}
// Reads are routed to primary only
@@ -170,144 +141,28 @@ impl ObjectStore for MirroringObjectStore {
}
}
struct MirroringUpload {
secondary_stream: Pin<Box<dyn AsyncWrite + Unpin + Send>>,
primary_store: Arc<dyn ObjectStore>,
secondary_store: Arc<dyn ObjectStore>,
location: Path,
state: MirroringUploadShutdown,
}
// The state goes from
// None
// -> (secondary)ShutingDown
// -> (secondary)ShutdownDone
// -> Uploading(to primary)
// -> Done
#[derive(Debug)]
enum MirroringUploadShutdown {
None,
ShutingDown,
ShutdownDone,
Uploading(Pin<Box<JoinHandle<()>>>),
Completed,
struct MirroringUpload {
primary: Box<dyn MultipartUpload>,
secondary: Box<dyn MultipartUpload>,
}
impl MirroringUpload {
pub fn new(
secondary_stream: Pin<Box<dyn AsyncWrite + Unpin + Send>>,
primary_store: Arc<dyn ObjectStore>,
secondary_store: Arc<dyn ObjectStore>,
location: Path,
) -> Self {
Self {
secondary_stream,
primary_store,
secondary_store,
location,
state: MirroringUploadShutdown::None,
}
}
}
impl AsyncWrite for MirroringUpload {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
if !matches!(self.state, MirroringUploadShutdown::None) {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
"already shutdown",
)));
}
// Write to secondary first
let mut_self = self.get_mut();
mut_self.secondary_stream.as_mut().poll_write(cx, buf)
#[async_trait]
impl MultipartUpload for MirroringUpload {
fn put_part(&mut self, data: PutPayload) -> UploadPart {
let put_primary = self.primary.put_part(data.clone());
let put_secondary = self.secondary.put_part(data);
Box::pin(put_secondary.and_then(|_| put_primary))
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
if !matches!(self.state, MirroringUploadShutdown::None) {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
"already shutdown",
)));
}
let mut_self = self.get_mut();
mut_self.secondary_stream.as_mut().poll_flush(cx)
async fn complete(&mut self) -> Result<PutResult> {
self.secondary.complete().await?;
self.primary.complete().await
}
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
let mut_self = self.get_mut();
loop {
// try to shutdown secondary first
match &mut mut_self.state {
MirroringUploadShutdown::None | MirroringUploadShutdown::ShutingDown => {
match mut_self.secondary_stream.as_mut().poll_shutdown(cx) {
Poll::Ready(Ok(())) => {
mut_self.state = MirroringUploadShutdown::ShutdownDone;
// don't return, no waker is setup
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => {
mut_self.state = MirroringUploadShutdown::ShutingDown;
return Poll::Pending;
}
}
}
MirroringUploadShutdown::ShutdownDone => {
let primary_store = mut_self.primary_store.clone();
let secondary_store = mut_self.secondary_store.clone();
let location = mut_self.location.clone();
let upload_future =
Box::pin(tokio::runtime::Handle::current().spawn(async move {
let mut source =
secondary_store.get(&location).await.unwrap().into_stream();
let upload_stream = primary_store.put_multipart(&location).await;
let (_, mut stream) = upload_stream.unwrap();
while let Some(buf) = source.next().await {
let buf = buf.unwrap();
stream.write_all(&buf).await.unwrap();
}
stream.shutdown().await.unwrap();
}));
mut_self.state = MirroringUploadShutdown::Uploading(upload_future);
// don't return, no waker is setup
}
MirroringUploadShutdown::Uploading(ref mut join_handle) => {
match join_handle.poll_unpin(cx) {
Poll::Ready(Ok(())) => {
mut_self.state = MirroringUploadShutdown::Completed;
return Poll::Ready(Ok(()));
}
Poll::Ready(Err(e)) => {
mut_self.state = MirroringUploadShutdown::Completed;
return Poll::Ready(Err(e.into()));
}
Poll::Pending => {
return Poll::Pending;
}
}
}
MirroringUploadShutdown::Completed => {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
"shutdown already completed",
)))
}
}
}
async fn abort(&mut self) -> Result<()> {
self.secondary.abort().await?;
self.primary.abort().await
}
}