mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-24 00:20:37 +00:00
Add size metric and test
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
use std::fs;
|
||||
use std::io::Read;
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
use std::path::Path;
|
||||
use std::process::{Command, Stdio};
|
||||
@@ -133,6 +134,50 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrapper for a reader that counts bytes and reports metrics.
|
||||
///
|
||||
/// HACK The interface of this struct is a little funny, mostly because we want
|
||||
/// to use it as input for tar::Archive::new(reader), which for some reason
|
||||
/// takes ownership of the reader instead of just &mut. So we can't access
|
||||
/// the reader to read the byte count because we lose ownership. Instead we
|
||||
/// pass the ComputeNode inside the struct and update metrics on Drop.
|
||||
struct ByteCounter<'a, R: Read> {
|
||||
inner: R,
|
||||
byte_count: usize,
|
||||
compute_node: &'a ComputeNode,
|
||||
}
|
||||
|
||||
impl<'a, R: Read> ByteCounter<'a, R> {
|
||||
fn new(reader: R, compute_node: &'a ComputeNode) -> Self {
|
||||
Self {
|
||||
inner: reader,
|
||||
byte_count: 0,
|
||||
compute_node,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: Read> Read for ByteCounter<'_, R> {
|
||||
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||
let result = self.inner.read(buf);
|
||||
if let Ok(n_bytes) = result {
|
||||
self.byte_count += n_bytes
|
||||
}
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: Read> Drop for ByteCounter<'_, R> {
|
||||
fn drop(&mut self) {
|
||||
self.compute_node
|
||||
.state
|
||||
.lock()
|
||||
.unwrap()
|
||||
.metrics
|
||||
.basebackup_bytes = self.byte_count as u64;
|
||||
}
|
||||
}
|
||||
|
||||
impl ComputeNode {
|
||||
pub fn set_status(&self, status: ComputeStatus) {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
@@ -179,13 +224,14 @@ impl ComputeNode {
|
||||
_ => format!("basebackup {} {} {}", spec.tenant_id, spec.timeline_id, lsn),
|
||||
};
|
||||
let copyreader = client.copy_out(basebackup_cmd.as_str())?;
|
||||
let read_counter = ByteCounter::new(copyreader, self);
|
||||
|
||||
// Read the archive directly from the `CopyOutReader`
|
||||
//
|
||||
// Set `ignore_zeros` so that unpack() reads all the Copy data and
|
||||
// doesn't stop at the end-of-archive marker. Otherwise, if the server
|
||||
// sends an Error after finishing the tarball, we will not notice it.
|
||||
let mut ar = tar::Archive::new(copyreader);
|
||||
let mut ar = tar::Archive::new(read_counter);
|
||||
ar.set_ignore_zeros(true);
|
||||
ar.unpack(&self.pgdata)?;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user