diff --git a/Cargo.lock b/Cargo.lock index 8b3fb86713..a041619ece 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4072,6 +4072,7 @@ dependencies = [ "regex", "regex-automata 0.2.0", "snafu", + "tempfile", "tokio", "tokio-util", ] diff --git a/src/index/Cargo.toml b/src/index/Cargo.toml index 0835da45d0..4c0dc82b02 100644 --- a/src/index/Cargo.toml +++ b/src/index/Cargo.toml @@ -25,5 +25,6 @@ snafu.workspace = true [dev-dependencies] rand.workspace = true +tempfile.workspace = true tokio-util.workspace = true tokio.workspace = true diff --git a/src/index/src/inverted_index/create/sort/intermediate_rw.rs b/src/index/src/inverted_index/create/sort/intermediate_rw.rs index 754a219155..3a4f15c0de 100644 --- a/src/index/src/inverted_index/create/sort/intermediate_rw.rs +++ b/src/index/src/inverted_index/create/sort/intermediate_rw.rs @@ -49,10 +49,14 @@ impl IntermediateWriter { let value_stream = stream::iter(values.into_iter().map(Ok)); let frame_write = FramedWrite::new(&mut self.writer, encoder); - value_stream.forward(frame_write).await?; + // `forward()` will flush and close the writer when the stream ends + if let Err(e) = value_stream.forward(frame_write).await { + self.writer.flush().await.context(FlushSnafu)?; + self.writer.close().await.context(CloseSnafu)?; + return Err(e); + } - self.writer.flush().await.context(FlushSnafu)?; - self.writer.close().await.context(CloseSnafu) + Ok(()) } } @@ -85,24 +89,32 @@ impl IntermediateReader { #[cfg(test)] mod tests { - use futures::io::Cursor; + use std::io::{Seek, SeekFrom}; + + use futures::io::{AllowStdIo, Cursor}; + use tempfile::tempfile; use super::*; use crate::inverted_index::error::Error; #[tokio::test] async fn test_intermediate_read_write_basic() { - let mut buf = vec![]; + let file_r = tempfile().unwrap(); + let file_w = file_r.try_clone().unwrap(); + let mut buf_r = AllowStdIo::new(file_r); + let buf_w = AllowStdIo::new(file_w); let values = BTreeMap::from_iter([ (Bytes::from("a"), BitVec::from_slice(&[0b10101010])), (Bytes::from("b"), BitVec::from_slice(&[0b01010101])), ]); - let writer = IntermediateWriter::new(&mut buf); + let writer = IntermediateWriter::new(buf_w); writer.write_all(values.clone()).await.unwrap(); + // reset the handle + buf_r.seek(SeekFrom::Start(0)).unwrap(); - let reader = IntermediateReader::new(Cursor::new(buf)); + let reader = IntermediateReader::new(buf_r); let mut stream = reader.into_stream().await.unwrap(); let a = stream.next().await.unwrap().unwrap();