fix: IntermediateWriter closes underlying writer twice (#3248)

* fix: IntermediateWriter closes underlying writer twice

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* close writer manually on error

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2024-01-26 18:03:50 +08:00
committed by GitHub
parent 14b233c486
commit 7da8f22cda
3 changed files with 21 additions and 7 deletions

1
Cargo.lock generated
View File

@@ -4072,6 +4072,7 @@ dependencies = [
"regex",
"regex-automata 0.2.0",
"snafu",
"tempfile",
"tokio",
"tokio-util",
]

View File

@@ -25,5 +25,6 @@ snafu.workspace = true
[dev-dependencies]
rand.workspace = true
tempfile.workspace = true
tokio-util.workspace = true
tokio.workspace = true

View File

@@ -49,10 +49,14 @@ impl<W: AsyncWrite + Unpin> IntermediateWriter<W> {
let value_stream = stream::iter(values.into_iter().map(Ok));
let frame_write = FramedWrite::new(&mut self.writer, encoder);
value_stream.forward(frame_write).await?;
// `forward()` will flush and close the writer when the stream ends
if let Err(e) = value_stream.forward(frame_write).await {
self.writer.flush().await.context(FlushSnafu)?;
self.writer.close().await.context(CloseSnafu)?;
return Err(e);
}
self.writer.flush().await.context(FlushSnafu)?;
self.writer.close().await.context(CloseSnafu)
Ok(())
}
}
@@ -85,24 +89,32 @@ impl<R: AsyncRead + Unpin + Send + 'static> IntermediateReader<R> {
#[cfg(test)]
mod tests {
use futures::io::Cursor;
use std::io::{Seek, SeekFrom};
use futures::io::{AllowStdIo, Cursor};
use tempfile::tempfile;
use super::*;
use crate::inverted_index::error::Error;
#[tokio::test]
async fn test_intermediate_read_write_basic() {
let mut buf = vec![];
let file_r = tempfile().unwrap();
let file_w = file_r.try_clone().unwrap();
let mut buf_r = AllowStdIo::new(file_r);
let buf_w = AllowStdIo::new(file_w);
let values = BTreeMap::from_iter([
(Bytes::from("a"), BitVec::from_slice(&[0b10101010])),
(Bytes::from("b"), BitVec::from_slice(&[0b01010101])),
]);
let writer = IntermediateWriter::new(&mut buf);
let writer = IntermediateWriter::new(buf_w);
writer.write_all(values.clone()).await.unwrap();
// reset the handle
buf_r.seek(SeekFrom::Start(0)).unwrap();
let reader = IntermediateReader::new(Cursor::new(buf));
let reader = IntermediateReader::new(buf_r);
let mut stream = reader.into_stream().await.unwrap();
let a = stream.next().await.unwrap().unwrap();