This commit is contained in:
Bojan Serafimov
2023-06-12 18:52:34 -04:00
parent 45b71fecec
commit ddb98d6f77
2 changed files with 38 additions and 47 deletions

View File

@@ -1,5 +1,4 @@
use std::fs;
use std::io::Read;
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::process::{Command, Stdio};
@@ -16,6 +15,7 @@ use utils::lsn::Lsn;
use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::{ComputeMode, ComputeSpec};
use utils::measured_stream::MeasuredReader;
use crate::config;
use crate::pg_helpers::*;
@@ -134,50 +134,6 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
}
}
/// Wrapper for a reader that counts bytes and reports metrics.
///
/// HACK The interface of this struct is a little funny, mostly because we want
/// to use it as input for tar::Archive::new(reader), which for some reason
/// takes ownership of the reader instead of just &mut. So we can't access
/// the reader to read the byte count because we lose ownership. Instead we
/// pass the ComputeNode inside the struct and update metrics on Drop.
struct ByteCounter<'a, R: Read> {
inner: R,
byte_count: usize,
compute_node: &'a ComputeNode,
}
impl<'a, R: Read> ByteCounter<'a, R> {
fn new(reader: R, compute_node: &'a ComputeNode) -> Self {
Self {
inner: reader,
byte_count: 0,
compute_node,
}
}
}
impl<R: Read> Read for ByteCounter<'_, R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let result = self.inner.read(buf);
if let Ok(n_bytes) = result {
self.byte_count += n_bytes
}
result
}
}
impl<R: Read> Drop for ByteCounter<'_, R> {
fn drop(&mut self) {
self.compute_node
.state
.lock()
.unwrap()
.metrics
.basebackup_bytes = self.byte_count as u64;
}
}
impl ComputeNode {
pub fn set_status(&self, status: ComputeStatus) {
let mut state = self.state.lock().unwrap();
@@ -224,17 +180,20 @@ impl ComputeNode {
_ => format!("basebackup {} {} {}", spec.tenant_id, spec.timeline_id, lsn),
};
let copyreader = client.copy_out(basebackup_cmd.as_str())?;
let read_counter = ByteCounter::new(copyreader, self);
let mut measured_reader = MeasuredReader::new(copyreader);
// Read the archive directly from the `CopyOutReader`
//
// Set `ignore_zeros` so that unpack() reads all the Copy data and
// doesn't stop at the end-of-archive marker. Otherwise, if the server
// sends an Error after finishing the tarball, we will not notice it.
let mut ar = tar::Archive::new(read_counter);
let mut ar = tar::Archive::new(&mut measured_reader);
ar.set_ignore_zeros(true);
ar.unpack(&self.pgdata)?;
// Report metrics
self.state.lock().unwrap().metrics.basebackup_bytes =
measured_reader.get_byte_count() as u64;
self.state.lock().unwrap().metrics.basebackup_ms = Utc::now()
.signed_duration_since(start_time)
.to_std()

View File

@@ -1,4 +1,5 @@
use pin_project_lite::pin_project;
use std::io::Read;
use std::pin::Pin;
use std::{io, task};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
@@ -75,3 +76,34 @@ impl<S: AsyncWrite + Unpin, R, W: FnMut(usize)> AsyncWrite for MeasuredStream<S,
self.project().stream.poll_shutdown(context)
}
}
/// Wrapper for a reader that counts bytes read.
///
/// Similar to MeasuredStream but it's one way and it's sync
pub struct MeasuredReader<R: Read> {
inner: R,
byte_count: usize,
}
impl<R: Read> MeasuredReader<R> {
pub fn new(reader: R) -> Self {
Self {
inner: reader,
byte_count: 0,
}
}
pub fn get_byte_count(&self) -> usize {
self.byte_count
}
}
impl<R: Read> Read for MeasuredReader<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let result = self.inner.read(buf);
if let Ok(n_bytes) = result {
self.byte_count += n_bytes
}
result
}
}