Implement IO metrics in VirtualFile (#1112)

* Implement IO metrics in VirtualFile

* Do not group virtual file close statistics by tenantid/timelineid

* Add comments concenring close metrics
This commit is contained in:
Konstantin Knizhnik
2022-01-13 17:36:53 +03:00
committed by GitHub
parent 772d853dcf
commit bc6db2c10e
2 changed files with 95 additions and 16 deletions

View File

@@ -41,6 +41,7 @@ use crate::repository::{
TimelineWriter, ZenithWalRecord,
};
use crate::tenant_mgr;
use crate::virtual_file::VirtualFile;
use crate::walreceiver;
use crate::walreceiver::IS_WAL_RECEIVER;
use crate::walredo::WalRedoManager;
@@ -514,10 +515,10 @@ impl LayeredRepository {
let _enter = info_span!("saving metadata").entered();
let path = metadata_path(conf, timelineid, tenantid);
// use OpenOptions to ensure file presence is consistent with first_save
let mut file = OpenOptions::new()
.write(true)
.create_new(first_save)
.open(&path)?;
let mut file = VirtualFile::open_with_options(
&path,
OpenOptions::new().write(true).create_new(first_save),
)?;
let metadata_bytes = data.to_bytes().context("Failed to get metadata bytes")?;

View File

@@ -10,15 +10,46 @@
//! This is similar to PostgreSQL's virtual file descriptor facility in
//! src/backend/storage/file/fd.c
//!
use lazy_static::lazy_static;
use std::fs::{File, OpenOptions};
use std::io::{Error, ErrorKind, Read, Seek, SeekFrom, Write};
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{RwLock, RwLockWriteGuard};
use zenith_metrics::{register_histogram_vec, register_int_gauge_vec, HistogramVec, IntGaugeVec};
use once_cell::sync::OnceCell;
// Metrics collected on disk IO operations
const STORAGE_IO_TIME_BUCKETS: &[f64] = &[
0.000001, // 1 usec
0.00001, // 10 usec
0.0001, // 100 usec
0.001, // 1 msec
0.01, // 10 msec
0.1, // 100 msec
1.0, // 1 sec
];
lazy_static! {
static ref STORAGE_IO_TIME: HistogramVec = register_histogram_vec!(
"pageserver_io_time",
"Time spent in IO operations",
&["operation", "tenant_id", "timeline_id"],
STORAGE_IO_TIME_BUCKETS.into()
)
.expect("failed to define a metric");
}
lazy_static! {
static ref STORAGE_IO_SIZE: IntGaugeVec = register_int_gauge_vec!(
"pageserver_io_size",
"Amount of bytes",
&["operation", "tenant_id", "timeline_id"]
)
.expect("failed to define a metric");
}
///
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
/// the underlying file is closed if the system is low on file descriptors,
@@ -51,6 +82,10 @@ pub struct VirtualFile {
/// storing it here.
pub path: PathBuf,
open_options: OpenOptions,
/// For metrics
tenantid: String,
timelineid: String,
}
#[derive(PartialEq, Clone, Copy)]
@@ -145,7 +180,13 @@ impl OpenFiles {
// old file.
//
if let Some(old_file) = slot_guard.file.take() {
drop(old_file);
// We do not have information about tenantid/timelineid of evicted file.
// It is possible to store path together with file or use filepath crate,
// but as far as close() is not expected to be fast, it is not so critical to gather
// precise per-tenant statistic here.
STORAGE_IO_TIME
.with_label_values(&["close", "-", "-"])
.observe_closure_duration(|| drop(old_file));
}
// Prepare the slot for reuse and return it
@@ -185,9 +226,20 @@ impl VirtualFile {
path: &Path,
open_options: &OpenOptions,
) -> Result<VirtualFile, std::io::Error> {
let parts = path.to_str().unwrap().split('/').collect::<Vec<&str>>();
let tenantid;
let timelineid;
if parts.len() > 5 && parts[parts.len() - 5] == "tenants" {
tenantid = parts[parts.len() - 4].to_string();
timelineid = parts[parts.len() - 2].to_string();
} else {
tenantid = "*".to_string();
timelineid = "*".to_string();
}
let (handle, mut slot_guard) = get_open_files().find_victim_slot();
let file = open_options.open(path)?;
let file = STORAGE_IO_TIME
.with_label_values(&["open", &tenantid, &timelineid])
.observe_closure_duration(|| open_options.open(path))?;
// Strip all options other than read and write.
//
@@ -204,6 +256,8 @@ impl VirtualFile {
pos: 0,
path: path.to_path_buf(),
open_options: reopen_options,
tenantid,
timelineid,
};
slot_guard.file.replace(file);
@@ -213,13 +267,13 @@ impl VirtualFile {
/// Call File::sync_all() on the underlying File.
pub fn sync_all(&self) -> Result<(), Error> {
self.with_file(|file| file.sync_all())?
self.with_file("fsync", |file| file.sync_all())?
}
/// Helper function that looks up the underlying File for this VirtualFile,
/// opening it and evicting some other File if necessary. It calls 'func'
/// with the physical File.
fn with_file<F, R>(&self, mut func: F) -> Result<R, Error>
fn with_file<F, R>(&self, op: &str, mut func: F) -> Result<R, Error>
where
F: FnMut(&File) -> R,
{
@@ -242,7 +296,9 @@ impl VirtualFile {
if let Some(file) = &slot_guard.file {
// Found a cached file descriptor.
slot.recently_used.store(true, Ordering::Relaxed);
return Ok(func(file));
return Ok(STORAGE_IO_TIME
.with_label_values(&[op, &self.tenantid, &self.timelineid])
.observe_closure_duration(|| func(file)));
}
}
}
@@ -267,7 +323,9 @@ impl VirtualFile {
let (handle, mut slot_guard) = open_files.find_victim_slot();
// Open the physical file
let file = self.open_options.open(&self.path)?;
let file = STORAGE_IO_TIME
.with_label_values(&["open", &self.tenantid, &self.timelineid])
.observe_closure_duration(|| self.open_options.open(&self.path))?;
// Perform the requested operation on it
//
@@ -276,7 +334,9 @@ impl VirtualFile {
// library RwLock doesn't allow downgrading without releasing the lock,
// and that doesn't seem worth the trouble. (parking_lot RwLock would
// allow it)
let result = func(&file);
let result = STORAGE_IO_TIME
.with_label_values(&[op, &self.tenantid, &self.timelineid])
.observe_closure_duration(|| func(&file));
// Store the File in the slot and update the handle in the VirtualFile
// to point to it.
@@ -299,7 +359,13 @@ impl Drop for VirtualFile {
let mut slot_guard = slot.inner.write().unwrap();
if slot_guard.tag == handle.tag {
slot.recently_used.store(false, Ordering::Relaxed);
slot_guard.file.take();
// Unlike files evicted by replacement algorithm, here
// we group close time by tenantid/timelineid.
// At allows to compare number/time of "normal" file closes
// with file eviction.
STORAGE_IO_TIME
.with_label_values(&["close", &self.tenantid, &self.timelineid])
.observe_closure_duration(|| slot_guard.file.take());
}
}
}
@@ -335,7 +401,7 @@ impl Seek for VirtualFile {
self.pos = offset;
}
SeekFrom::End(offset) => {
self.pos = self.with_file(|mut file| file.seek(SeekFrom::End(offset)))??
self.pos = self.with_file("seek", |mut file| file.seek(SeekFrom::End(offset)))??
}
SeekFrom::Current(offset) => {
let pos = self.pos as i128 + offset as i128;
@@ -357,11 +423,23 @@ impl Seek for VirtualFile {
impl FileExt for VirtualFile {
fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
self.with_file(|file| file.read_at(buf, offset))?
let result = self.with_file("read", |file| file.read_at(buf, offset))?;
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["read", &self.tenantid, &self.timelineid])
.add(size as i64);
}
result
}
fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
self.with_file(|file| file.write_at(buf, offset))?
let result = self.with_file("write", |file| file.write_at(buf, offset))?;
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["write", &self.tenantid, &self.timelineid])
.add(size as i64);
}
result
}
}