diff --git a/proxy/src/context/parquet.rs b/proxy/src/context/parquet.rs index 1e9e723938..e920d7be01 100644 --- a/proxy/src/context/parquet.rs +++ b/proxy/src/context/parquet.rs @@ -1,7 +1,7 @@ use std::{sync::Arc, time::SystemTime}; use anyhow::Context; -use bytes::BytesMut; +use bytes::{buf::Writer, BufMut, BytesMut}; use chrono::{Datelike, Timelike}; use futures::{Stream, StreamExt}; use parquet::{ @@ -192,8 +192,9 @@ async fn worker_inner( let mut rows = Vec::with_capacity(config.rows_per_group); let schema = rows.as_slice().schema()?; - let file = BytesWriter::default(); - let mut w = SerializedFileWriter::new(file, schema.clone(), config.propeties.clone())?; + let buffer = BytesMut::new(); + let w = buffer.writer(); + let mut w = SerializedFileWriter::new(w, schema.clone(), config.propeties.clone())?; let mut last_upload = time::Instant::now(); @@ -221,20 +222,23 @@ async fn worker_inner( } if !w.flushed_row_groups().is_empty() { - let _: BytesWriter = upload_parquet(w, len, &storage).await?; + let _: Writer = upload_parquet(w, len, &storage).await?; } Ok(()) } -async fn flush_rows( +async fn flush_rows( rows: Vec, - mut w: SerializedFileWriter, + mut w: SerializedFileWriter, ) -> anyhow::Result<( Vec, - SerializedFileWriter, + SerializedFileWriter, RowGroupMetaDataPtr, -)> { +)> +where + W: std::io::Write + Send + 'static, +{ let span = Span::current(); let (mut rows, w, rg_meta) = tokio::task::spawn_blocking(move || { let _enter = span.enter(); @@ -258,10 +262,10 @@ async fn flush_rows( } async fn upload_parquet( - w: SerializedFileWriter, + w: SerializedFileWriter>, len: i64, storage: &GenericRemoteStorage, -) -> anyhow::Result { +) -> anyhow::Result> { let len_uncompressed = w .flushed_row_groups() .iter() @@ -270,11 +274,12 @@ async fn upload_parquet( // I don't know how compute intensive this is, although it probably isn't much... better be safe than sorry. // finish method only available on the fork: https://github.com/apache/arrow-rs/issues/5253 - let (mut file, metadata) = tokio::task::spawn_blocking(move || w.finish()) + let (writer, metadata) = tokio::task::spawn_blocking(move || w.finish()) .await .unwrap()?; - let data = file.buf.split().freeze(); + let mut buffer = writer.into_inner(); + let data = buffer.split().freeze(); let compression = len as f64 / len_uncompressed as f64; let size = data.len(); @@ -315,24 +320,7 @@ async fn upload_parquet( .await .context("request_data_upload")?; - Ok(file) -} - -// why doesn't BytesMut impl io::Write? -#[derive(Default)] -struct BytesWriter { - buf: BytesMut, -} - -impl std::io::Write for BytesWriter { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - self.buf.extend_from_slice(buf); - Ok(buf.len()) - } - - fn flush(&mut self) -> std::io::Result<()> { - Ok(()) - } + Ok(buffer.writer()) } #[cfg(test)]