mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-23 16:30:39 +00:00
feat(puffin): finish return written bytes (#3082)
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
@@ -44,8 +44,8 @@ pub trait PuffinSyncWriter {
|
||||
/// Add a blob to the Puffin file
|
||||
fn add_blob<R: std::io::Read>(&mut self, blob: Blob<R>) -> Result<()>;
|
||||
|
||||
/// Finish writing the Puffin file
|
||||
fn finish(&mut self) -> Result<()>;
|
||||
/// Finish writing the Puffin file, returns the number of bytes written
|
||||
fn finish(&mut self) -> Result<usize>;
|
||||
}
|
||||
|
||||
/// The trait for writing Puffin files asynchronously
|
||||
@@ -57,6 +57,6 @@ pub trait PuffinAsyncWriter {
|
||||
/// Add a blob to the Puffin file
|
||||
async fn add_blob<R: futures::AsyncRead + Send>(&mut self, blob: Blob<R>) -> Result<()>;
|
||||
|
||||
/// Finish writing the Puffin file
|
||||
async fn finish(&mut self) -> Result<()>;
|
||||
/// Finish writing the Puffin file, returns the number of bytes written
|
||||
async fn finish(&mut self) -> Result<usize>;
|
||||
}
|
||||
|
||||
@@ -36,8 +36,8 @@ pub struct PuffinFileWriter<W> {
|
||||
/// The metadata of the blobs
|
||||
blob_metadata: Vec<BlobMetadata>,
|
||||
|
||||
/// The offset of the next blob
|
||||
next_blob_offset: u64,
|
||||
/// The number of bytes written
|
||||
written_bytes: u64,
|
||||
}
|
||||
|
||||
impl<W> PuffinFileWriter<W> {
|
||||
@@ -46,7 +46,7 @@ impl<W> PuffinFileWriter<W> {
|
||||
writer,
|
||||
properties: HashMap::new(),
|
||||
blob_metadata: Vec::new(),
|
||||
next_blob_offset: 0,
|
||||
written_bytes: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,7 +59,7 @@ impl<W> PuffinFileWriter<W> {
|
||||
BlobMetadataBuilder::default()
|
||||
.blob_type(typ)
|
||||
.properties(properties)
|
||||
.offset(self.next_blob_offset as _)
|
||||
.offset(self.written_bytes as _)
|
||||
.length(size as _)
|
||||
.build()
|
||||
.expect("Required fields are not set")
|
||||
@@ -79,14 +79,16 @@ impl<W: io::Write> PuffinSyncWriter for PuffinFileWriter<W> {
|
||||
let blob_metadata = self.create_blob_metadata(blob.blob_type, blob.properties, size);
|
||||
self.blob_metadata.push(blob_metadata);
|
||||
|
||||
self.next_blob_offset += size;
|
||||
self.written_bytes += size;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn finish(&mut self) -> Result<()> {
|
||||
fn finish(&mut self) -> Result<usize> {
|
||||
self.write_header_if_needed_sync()?;
|
||||
self.write_footer_sync()?;
|
||||
self.writer.flush().context(FlushSnafu)
|
||||
self.writer.flush().context(FlushSnafu)?;
|
||||
|
||||
Ok(self.written_bytes as usize)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -106,23 +108,25 @@ impl<W: AsyncWrite + Unpin + Send> PuffinAsyncWriter for PuffinFileWriter<W> {
|
||||
let blob_metadata = self.create_blob_metadata(blob.blob_type, blob.properties, size);
|
||||
self.blob_metadata.push(blob_metadata);
|
||||
|
||||
self.next_blob_offset += size;
|
||||
self.written_bytes += size;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn finish(&mut self) -> Result<()> {
|
||||
async fn finish(&mut self) -> Result<usize> {
|
||||
self.write_header_if_needed_async().await?;
|
||||
self.write_footer_async().await?;
|
||||
self.writer.flush().await.context(FlushSnafu)?;
|
||||
self.writer.close().await.context(CloseSnafu)
|
||||
self.writer.close().await.context(CloseSnafu)?;
|
||||
|
||||
Ok(self.written_bytes as usize)
|
||||
}
|
||||
}
|
||||
|
||||
impl<W: io::Write> PuffinFileWriter<W> {
|
||||
fn write_header_if_needed_sync(&mut self) -> Result<()> {
|
||||
if self.next_blob_offset == 0 {
|
||||
if self.written_bytes == 0 {
|
||||
self.writer.write_all(&MAGIC).context(WriteSnafu)?;
|
||||
self.next_blob_offset += MAGIC.len() as u64;
|
||||
self.written_bytes += MAGIC.len() as u64;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -134,15 +138,17 @@ impl<W: io::Write> PuffinFileWriter<W> {
|
||||
)
|
||||
.into_footer_bytes()?;
|
||||
|
||||
self.writer.write_all(&bytes).context(WriteSnafu)
|
||||
self.writer.write_all(&bytes).context(WriteSnafu)?;
|
||||
self.written_bytes += bytes.len() as u64;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<W: AsyncWrite + Unpin> PuffinFileWriter<W> {
|
||||
async fn write_header_if_needed_async(&mut self) -> Result<()> {
|
||||
if self.next_blob_offset == 0 {
|
||||
if self.written_bytes == 0 {
|
||||
self.writer.write_all(&MAGIC).await.context(WriteSnafu)?;
|
||||
self.next_blob_offset += MAGIC.len() as u64;
|
||||
self.written_bytes += MAGIC.len() as u64;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -154,6 +160,8 @@ impl<W: AsyncWrite + Unpin> PuffinFileWriter<W> {
|
||||
)
|
||||
.into_footer_bytes()?;
|
||||
|
||||
self.writer.write_all(&bytes).await.context(WriteSnafu)
|
||||
self.writer.write_all(&bytes).await.context(WriteSnafu)?;
|
||||
self.written_bytes += bytes.len() as u64;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -125,7 +125,8 @@ fn test_writer_reader_with_empty_sync() {
|
||||
"Test 1234".to_string(),
|
||||
)]));
|
||||
|
||||
writer.finish().unwrap();
|
||||
let written_bytes = writer.finish().unwrap();
|
||||
assert!(written_bytes > 0);
|
||||
|
||||
let mut buf = Cursor::new(buf.into_inner());
|
||||
let mut reader = PuffinFileReader::new(&mut buf);
|
||||
@@ -150,7 +151,8 @@ async fn test_writer_reader_empty_async() {
|
||||
"Test 1234".to_string(),
|
||||
)]));
|
||||
|
||||
writer.finish().await.unwrap();
|
||||
let written_bytes = writer.finish().await.unwrap();
|
||||
assert!(written_bytes > 0);
|
||||
|
||||
let mut buf = AsyncCursor::new(buf.into_inner());
|
||||
let mut reader = PuffinFileReader::new(&mut buf);
|
||||
@@ -194,7 +196,8 @@ fn test_writer_reader_sync() {
|
||||
"Test 1234".to_string(),
|
||||
)]));
|
||||
|
||||
writer.finish().unwrap();
|
||||
let written_bytes = writer.finish().unwrap();
|
||||
assert!(written_bytes > 0);
|
||||
|
||||
let mut buf = Cursor::new(buf.into_inner());
|
||||
let mut reader = PuffinFileReader::new(&mut buf);
|
||||
@@ -257,7 +260,8 @@ async fn test_writer_reader_async() {
|
||||
"Test 1234".to_string(),
|
||||
)]));
|
||||
|
||||
writer.finish().await.unwrap();
|
||||
let written_bytes = writer.finish().await.unwrap();
|
||||
assert!(written_bytes > 0);
|
||||
|
||||
let mut buf = AsyncCursor::new(buf.into_inner());
|
||||
let mut reader = PuffinFileReader::new(&mut buf);
|
||||
|
||||
Reference in New Issue
Block a user