From cdbdb04d933c44e42a4a6bd44dbed5f0e35b1141 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 16 Apr 2024 14:35:55 +0800 Subject: [PATCH] refactor: remove redundant try_flush invocations (#3706) * refactor: remove redundant try_flush invocations Signed-off-by: tison * fixup Signed-off-by: tison --------- Signed-off-by: tison --- src/common/datasource/src/buffered_writer.rs | 17 +++++++++-------- src/common/datasource/src/file_format.rs | 4 ---- .../datasource/src/file_format/parquet.rs | 5 +---- 3 files changed, 10 insertions(+), 16 deletions(-) diff --git a/src/common/datasource/src/buffered_writer.rs b/src/common/datasource/src/buffered_writer.rs index 852486ef9f..8ce34070cc 100644 --- a/src/common/datasource/src/buffered_writer.rs +++ b/src/common/datasource/src/buffered_writer.rs @@ -60,12 +60,6 @@ impl< .context(error::BufferedWriterClosedSnafu)?; let metadata = encoder.close().await?; - // Use `rows_written` to keep a track of if any rows have been written. - // If no row's been written, then we can simply close the underlying - // writer without flush so that no file will be actually created. - if self.rows_written != 0 { - self.bytes_written += self.try_flush(true).await?; - } // It's important to shut down! flushes all pending writes self.close_inner_writer().await?; Ok((metadata, self.bytes_written)) @@ -79,8 +73,15 @@ impl< Fut: Future>, > LazyBufferedWriter { - /// Closes the writer without flushing the buffer data. + /// Closes the writer and flushes the buffer data. pub async fn close_inner_writer(&mut self) -> Result<()> { + // Use `rows_written` to keep a track of if any rows have been written. + // If no row's been written, then we can simply close the underlying + // writer without flush so that no file will be actually created. + if self.rows_written != 0 { + self.bytes_written += self.try_flush(true).await?; + } + if let Some(writer) = &mut self.writer { writer.shutdown().await.context(error::AsyncWriteSnafu)?; } @@ -117,7 +118,7 @@ impl< Ok(()) } - pub async fn try_flush(&mut self, all: bool) -> Result { + async fn try_flush(&mut self, all: bool) -> Result { let mut bytes_written: u64 = 0; // Once buffered data size reaches threshold, split the data in chunks (typically 4MB) diff --git a/src/common/datasource/src/file_format.rs b/src/common/datasource/src/file_format.rs index 8774c946d5..6f80590d26 100644 --- a/src/common/datasource/src/file_format.rs +++ b/src/common/datasource/src/file_format.rs @@ -213,10 +213,6 @@ pub async fn stream_to_file T>( writer.write(&batch).await?; rows += batch.num_rows(); } - - // Flushes all pending writes - let _ = writer.try_flush(true).await?; writer.close_inner_writer().await?; - Ok(rows) } diff --git a/src/common/datasource/src/file_format/parquet.rs b/src/common/datasource/src/file_format/parquet.rs index c21bead9f7..651d5904c8 100644 --- a/src/common/datasource/src/file_format/parquet.rs +++ b/src/common/datasource/src/file_format/parquet.rs @@ -215,10 +215,7 @@ impl BufferedWriter { /// Write a record batch to stream writer. pub async fn write(&mut self, arrow_batch: &RecordBatch) -> error::Result<()> { - self.inner.write(arrow_batch).await?; - self.inner.try_flush(false).await?; - - Ok(()) + self.inner.write(arrow_batch).await } /// Close parquet writer.