refactor: remove redundant try_flush invocations (#3706)

* refactor: remove redundant try_flush invocations

Signed-off-by: tison <wander4096@gmail.com>

* fixup

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
This commit is contained in:
tison
2024-04-16 14:35:55 +08:00
committed by GitHub
parent 5af87baeb0
commit cdbdb04d93
3 changed files with 10 additions and 16 deletions

View File

@@ -60,12 +60,6 @@ impl<
.context(error::BufferedWriterClosedSnafu)?;
let metadata = encoder.close().await?;
// Use `rows_written` to keep a track of if any rows have been written.
// If no row's been written, then we can simply close the underlying
// writer without flush so that no file will be actually created.
if self.rows_written != 0 {
self.bytes_written += self.try_flush(true).await?;
}
// It's important to shut down! flushes all pending writes
self.close_inner_writer().await?;
Ok((metadata, self.bytes_written))
@@ -79,8 +73,15 @@ impl<
Fut: Future<Output = Result<T>>,
> LazyBufferedWriter<T, U, F>
{
/// Closes the writer without flushing the buffer data.
/// Closes the writer and flushes the buffer data.
pub async fn close_inner_writer(&mut self) -> Result<()> {
// Use `rows_written` to keep a track of if any rows have been written.
// If no row's been written, then we can simply close the underlying
// writer without flush so that no file will be actually created.
if self.rows_written != 0 {
self.bytes_written += self.try_flush(true).await?;
}
if let Some(writer) = &mut self.writer {
writer.shutdown().await.context(error::AsyncWriteSnafu)?;
}
@@ -117,7 +118,7 @@ impl<
Ok(())
}
pub async fn try_flush(&mut self, all: bool) -> Result<u64> {
async fn try_flush(&mut self, all: bool) -> Result<u64> {
let mut bytes_written: u64 = 0;
// Once buffered data size reaches threshold, split the data in chunks (typically 4MB)

View File

@@ -213,10 +213,6 @@ pub async fn stream_to_file<T: DfRecordBatchEncoder, U: Fn(SharedBuffer) -> T>(
writer.write(&batch).await?;
rows += batch.num_rows();
}
// Flushes all pending writes
let _ = writer.try_flush(true).await?;
writer.close_inner_writer().await?;
Ok(rows)
}

View File

@@ -215,10 +215,7 @@ impl BufferedWriter {
/// Write a record batch to stream writer.
pub async fn write(&mut self, arrow_batch: &RecordBatch) -> error::Result<()> {
self.inner.write(arrow_batch).await?;
self.inner.try_flush(false).await?;
Ok(())
self.inner.write(arrow_batch).await
}
/// Close parquet writer.