pageserver: make walredo shard-aware

This does not have a functional impact, but enables all
the logging in this code to include the shard_id
label.
This commit is contained in:
John Spray
2023-12-29 13:43:56 +00:00
parent 73a944205b
commit a2e083ebe0
3 changed files with 40 additions and 35 deletions

View File

@@ -13,6 +13,7 @@ use bytes::{Buf, Bytes};
use pageserver::{
config::PageServerConf, repository::Key, walrecord::NeonWalRecord, walredo::PostgresRedoManager,
};
use pageserver_api::shard::TenantShardId;
use utils::{id::TenantId, lsn::Lsn};
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
@@ -26,9 +27,9 @@ fn redo_scenarios(c: &mut Criterion) {
let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
let conf = Box::leak(Box::new(conf));
let tenant_id = TenantId::generate();
let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
let manager = PostgresRedoManager::new(conf, tenant_id);
let manager = PostgresRedoManager::new(conf, tenant_shard_id);
let manager = Arc::new(manager);

View File

@@ -595,10 +595,9 @@ impl Tenant {
mode: SpawnMode,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
// TODO(sharding): make WalRedoManager shard-aware
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
conf,
tenant_shard_id.tenant_id,
tenant_shard_id,
)));
let TenantSharedResources {
@@ -1145,10 +1144,9 @@ impl Tenant {
tenant_shard_id: TenantShardId,
reason: String,
) -> Arc<Tenant> {
// TODO(sharding): make WalRedoManager shard-aware
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
conf,
tenant_shard_id.tenant_id,
tenant_shard_id,
)));
Arc::new(Tenant::new(
TenantState::Broken {

View File

@@ -22,6 +22,7 @@ use anyhow::Context;
use byteorder::{ByteOrder, LittleEndian};
use bytes::{BufMut, Bytes, BytesMut};
use nix::poll::*;
use pageserver_api::shard::TenantShardId;
use serde::Serialize;
use std::collections::VecDeque;
use std::io;
@@ -35,14 +36,11 @@ use std::sync::{Arc, Mutex, MutexGuard, RwLock};
use std::time::Duration;
use std::time::Instant;
use tracing::*;
use utils::{bin_ser::BeSer, id::TenantId, lsn::Lsn, nonblock::set_nonblock};
use utils::{bin_ser::BeSer, lsn::Lsn, nonblock::set_nonblock};
#[cfg(feature = "testing")]
use std::sync::atomic::{AtomicUsize, Ordering};
#[cfg(feature = "testing")]
use pageserver_api::shard::TenantShardId;
use crate::config::PageServerConf;
use crate::metrics::{
WalRedoKillCause, WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_COUNTERS,
@@ -92,7 +90,7 @@ struct ProcessOutput {
/// records.
///
pub struct PostgresRedoManager {
tenant_id: TenantId,
tenant_shard_id: TenantShardId,
conf: &'static PageServerConf,
last_redo_at: std::sync::Mutex<Option<Instant>>,
redo_process: RwLock<Option<Arc<WalRedoProcess>>>,
@@ -186,10 +184,13 @@ impl PostgresRedoManager {
///
/// Create a new PostgresRedoManager.
///
pub fn new(conf: &'static PageServerConf, tenant_id: TenantId) -> PostgresRedoManager {
pub fn new(
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
) -> PostgresRedoManager {
// The actual process is launched lazily, on first request.
PostgresRedoManager {
tenant_id,
tenant_shard_id,
conf,
last_redo_at: std::sync::Mutex::default(),
redo_process: RwLock::new(None),
@@ -244,8 +245,12 @@ impl PostgresRedoManager {
let timer =
WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM.start_timer();
let proc = Arc::new(
WalRedoProcess::launch(self.conf, self.tenant_id, pg_version)
.context("launch walredo process")?,
WalRedoProcess::launch(
self.conf,
self.tenant_shard_id,
pg_version,
)
.context("launch walredo process")?,
);
timer.observe_duration();
*proc_guard = Some(Arc::clone(&proc));
@@ -638,7 +643,7 @@ impl<C: CommandExt> CloseFileDescriptors for C {
struct WalRedoProcess {
#[allow(dead_code)]
conf: &'static PageServerConf,
tenant_id: TenantId,
tenant_shard_id: TenantShardId,
// Some() on construction, only becomes None on Drop.
child: Option<NoLeakChild>,
stdout: Mutex<ProcessOutput>,
@@ -652,10 +657,10 @@ impl WalRedoProcess {
//
// Start postgres binary in special WAL redo mode.
//
#[instrument(skip_all,fields(tenant_id=%tenant_id, pg_version=pg_version))]
#[instrument(skip_all,fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), pg_version=pg_version))]
fn launch(
conf: &'static PageServerConf,
tenant_id: TenantId,
tenant_shard_id: TenantShardId,
pg_version: u32,
) -> anyhow::Result<Self> {
let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible.
@@ -680,7 +685,7 @@ impl WalRedoProcess {
// as close-on-exec by default, but that's not enough, since we use
// libraries that directly call libc open without setting that flag.
.close_fds()
.spawn_no_leak_child(tenant_id)
.spawn_no_leak_child(tenant_shard_id)
.context("spawn process")?;
WAL_REDO_PROCESS_COUNTERS.started.inc();
let mut child = scopeguard::guard(child, |child| {
@@ -741,12 +746,12 @@ impl WalRedoProcess {
error!(error=?e, "failed to read from walredo stderr");
}
}
}.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_id, %pg_version))
}.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %pg_version))
);
Ok(Self {
conf,
tenant_id,
tenant_shard_id,
child: Some(child),
stdin: Mutex::new(ProcessInput {
stdin,
@@ -772,7 +777,7 @@ impl WalRedoProcess {
// Apply given WAL records ('records') over an old page image. Returns
// new page image.
//
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%self.id()))]
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))]
fn apply_wal_records(
&self,
tag: BufferTag,
@@ -966,11 +971,7 @@ impl WalRedoProcess {
// these files will be collected to an allure report
let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len());
// TODO(sharding): update this call when WalRedoProcess gets a TenantShardId.
let path = self
.conf
.tenant_path(&TenantShardId::unsharded(self.tenant_id))
.join(&filename);
let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename);
let res = std::fs::OpenOptions::new()
.write(true)
@@ -1004,7 +1005,7 @@ impl Drop for WalRedoProcess {
/// Wrapper type around `std::process::Child` which guarantees that the child
/// will be killed and waited-for by this process before being dropped.
struct NoLeakChild {
tenant_id: TenantId,
tenant_id: TenantShardId,
child: Option<Child>,
}
@@ -1023,7 +1024,7 @@ impl DerefMut for NoLeakChild {
}
impl NoLeakChild {
fn spawn(tenant_id: TenantId, command: &mut Command) -> io::Result<Self> {
fn spawn(tenant_id: TenantShardId, command: &mut Command) -> io::Result<Self> {
let child = command.spawn()?;
Ok(NoLeakChild {
tenant_id,
@@ -1078,7 +1079,7 @@ impl Drop for NoLeakChild {
Some(child) => child,
None => return,
};
let tenant_id = self.tenant_id;
let tenant_shard_id = self.tenant_id;
// Offload the kill+wait of the child process into the background.
// If someone stops the runtime, we'll leak the child process.
// We can ignore that case because we only stop the runtime on pageserver exit.
@@ -1086,7 +1087,11 @@ impl Drop for NoLeakChild {
tokio::task::spawn_blocking(move || {
// Intentionally don't inherit the tracing context from whoever is dropping us.
// This thread here is going to outlive of our dropper.
let span = tracing::info_span!("walredo", %tenant_id);
let span = tracing::info_span!(
"walredo",
tenant_id = %tenant_shard_id.tenant_id,
shard_id = %tenant_shard_id.shard_slug()
);
let _entered = span.enter();
Self::kill_and_wait_impl(child, WalRedoKillCause::NoLeakChildDrop);
})
@@ -1096,11 +1101,11 @@ impl Drop for NoLeakChild {
}
trait NoLeakChildCommandExt {
fn spawn_no_leak_child(&mut self, tenant_id: TenantId) -> io::Result<NoLeakChild>;
fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result<NoLeakChild>;
}
impl NoLeakChildCommandExt for Command {
fn spawn_no_leak_child(&mut self, tenant_id: TenantId) -> io::Result<NoLeakChild> {
fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result<NoLeakChild> {
NoLeakChild::spawn(tenant_id, self)
}
}
@@ -1155,6 +1160,7 @@ mod tests {
use crate::repository::Key;
use crate::{config::PageServerConf, walrecord::NeonWalRecord};
use bytes::Bytes;
use pageserver_api::shard::TenantShardId;
use std::str::FromStr;
use utils::{id::TenantId, lsn::Lsn};
@@ -1264,9 +1270,9 @@ mod tests {
let repo_dir = camino_tempfile::tempdir()?;
let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
let conf = Box::leak(Box::new(conf));
let tenant_id = TenantId::generate();
let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
let manager = PostgresRedoManager::new(conf, tenant_id);
let manager = PostgresRedoManager::new(conf, tenant_shard_id);
Ok(RedoHarness {
_repo_dir: repo_dir,