diff --git a/pageserver/benches/bench_walredo.rs b/pageserver/benches/bench_walredo.rs index ba41866935..4837626086 100644 --- a/pageserver/benches/bench_walredo.rs +++ b/pageserver/benches/bench_walredo.rs @@ -13,6 +13,7 @@ use bytes::{Buf, Bytes}; use pageserver::{ config::PageServerConf, repository::Key, walrecord::NeonWalRecord, walredo::PostgresRedoManager, }; +use pageserver_api::shard::TenantShardId; use utils::{id::TenantId, lsn::Lsn}; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; @@ -26,9 +27,9 @@ fn redo_scenarios(c: &mut Criterion) { let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf()); let conf = Box::leak(Box::new(conf)); - let tenant_id = TenantId::generate(); + let tenant_shard_id = TenantShardId::unsharded(TenantId::generate()); - let manager = PostgresRedoManager::new(conf, tenant_id); + let manager = PostgresRedoManager::new(conf, tenant_shard_id); let manager = Arc::new(manager); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index e50987c84b..1660de8923 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -595,10 +595,9 @@ impl Tenant { mode: SpawnMode, ctx: &RequestContext, ) -> anyhow::Result> { - // TODO(sharding): make WalRedoManager shard-aware let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new( conf, - tenant_shard_id.tenant_id, + tenant_shard_id, ))); let TenantSharedResources { @@ -1145,10 +1144,9 @@ impl Tenant { tenant_shard_id: TenantShardId, reason: String, ) -> Arc { - // TODO(sharding): make WalRedoManager shard-aware let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new( conf, - tenant_shard_id.tenant_id, + tenant_shard_id, ))); Arc::new(Tenant::new( TenantState::Broken { diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 94e95fd3b3..6918698f29 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -22,6 +22,7 @@ use anyhow::Context; use byteorder::{ByteOrder, LittleEndian}; use bytes::{BufMut, Bytes, BytesMut}; use nix::poll::*; +use pageserver_api::shard::TenantShardId; use serde::Serialize; use std::collections::VecDeque; use std::io; @@ -35,14 +36,11 @@ use std::sync::{Arc, Mutex, MutexGuard, RwLock}; use std::time::Duration; use std::time::Instant; use tracing::*; -use utils::{bin_ser::BeSer, id::TenantId, lsn::Lsn, nonblock::set_nonblock}; +use utils::{bin_ser::BeSer, lsn::Lsn, nonblock::set_nonblock}; #[cfg(feature = "testing")] use std::sync::atomic::{AtomicUsize, Ordering}; -#[cfg(feature = "testing")] -use pageserver_api::shard::TenantShardId; - use crate::config::PageServerConf; use crate::metrics::{ WalRedoKillCause, WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_COUNTERS, @@ -92,7 +90,7 @@ struct ProcessOutput { /// records. /// pub struct PostgresRedoManager { - tenant_id: TenantId, + tenant_shard_id: TenantShardId, conf: &'static PageServerConf, last_redo_at: std::sync::Mutex>, redo_process: RwLock>>, @@ -186,10 +184,13 @@ impl PostgresRedoManager { /// /// Create a new PostgresRedoManager. /// - pub fn new(conf: &'static PageServerConf, tenant_id: TenantId) -> PostgresRedoManager { + pub fn new( + conf: &'static PageServerConf, + tenant_shard_id: TenantShardId, + ) -> PostgresRedoManager { // The actual process is launched lazily, on first request. PostgresRedoManager { - tenant_id, + tenant_shard_id, conf, last_redo_at: std::sync::Mutex::default(), redo_process: RwLock::new(None), @@ -244,8 +245,12 @@ impl PostgresRedoManager { let timer = WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM.start_timer(); let proc = Arc::new( - WalRedoProcess::launch(self.conf, self.tenant_id, pg_version) - .context("launch walredo process")?, + WalRedoProcess::launch( + self.conf, + self.tenant_shard_id, + pg_version, + ) + .context("launch walredo process")?, ); timer.observe_duration(); *proc_guard = Some(Arc::clone(&proc)); @@ -638,7 +643,7 @@ impl CloseFileDescriptors for C { struct WalRedoProcess { #[allow(dead_code)] conf: &'static PageServerConf, - tenant_id: TenantId, + tenant_shard_id: TenantShardId, // Some() on construction, only becomes None on Drop. child: Option, stdout: Mutex, @@ -652,10 +657,10 @@ impl WalRedoProcess { // // Start postgres binary in special WAL redo mode. // - #[instrument(skip_all,fields(tenant_id=%tenant_id, pg_version=pg_version))] + #[instrument(skip_all,fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), pg_version=pg_version))] fn launch( conf: &'static PageServerConf, - tenant_id: TenantId, + tenant_shard_id: TenantShardId, pg_version: u32, ) -> anyhow::Result { let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible. @@ -680,7 +685,7 @@ impl WalRedoProcess { // as close-on-exec by default, but that's not enough, since we use // libraries that directly call libc open without setting that flag. .close_fds() - .spawn_no_leak_child(tenant_id) + .spawn_no_leak_child(tenant_shard_id) .context("spawn process")?; WAL_REDO_PROCESS_COUNTERS.started.inc(); let mut child = scopeguard::guard(child, |child| { @@ -741,12 +746,12 @@ impl WalRedoProcess { error!(error=?e, "failed to read from walredo stderr"); } } - }.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_id, %pg_version)) + }.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %pg_version)) ); Ok(Self { conf, - tenant_id, + tenant_shard_id, child: Some(child), stdin: Mutex::new(ProcessInput { stdin, @@ -772,7 +777,7 @@ impl WalRedoProcess { // Apply given WAL records ('records') over an old page image. Returns // new page image. // - #[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%self.id()))] + #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))] fn apply_wal_records( &self, tag: BufferTag, @@ -966,11 +971,7 @@ impl WalRedoProcess { // these files will be collected to an allure report let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len()); - // TODO(sharding): update this call when WalRedoProcess gets a TenantShardId. - let path = self - .conf - .tenant_path(&TenantShardId::unsharded(self.tenant_id)) - .join(&filename); + let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename); let res = std::fs::OpenOptions::new() .write(true) @@ -1004,7 +1005,7 @@ impl Drop for WalRedoProcess { /// Wrapper type around `std::process::Child` which guarantees that the child /// will be killed and waited-for by this process before being dropped. struct NoLeakChild { - tenant_id: TenantId, + tenant_id: TenantShardId, child: Option, } @@ -1023,7 +1024,7 @@ impl DerefMut for NoLeakChild { } impl NoLeakChild { - fn spawn(tenant_id: TenantId, command: &mut Command) -> io::Result { + fn spawn(tenant_id: TenantShardId, command: &mut Command) -> io::Result { let child = command.spawn()?; Ok(NoLeakChild { tenant_id, @@ -1078,7 +1079,7 @@ impl Drop for NoLeakChild { Some(child) => child, None => return, }; - let tenant_id = self.tenant_id; + let tenant_shard_id = self.tenant_id; // Offload the kill+wait of the child process into the background. // If someone stops the runtime, we'll leak the child process. // We can ignore that case because we only stop the runtime on pageserver exit. @@ -1086,7 +1087,11 @@ impl Drop for NoLeakChild { tokio::task::spawn_blocking(move || { // Intentionally don't inherit the tracing context from whoever is dropping us. // This thread here is going to outlive of our dropper. - let span = tracing::info_span!("walredo", %tenant_id); + let span = tracing::info_span!( + "walredo", + tenant_id = %tenant_shard_id.tenant_id, + shard_id = %tenant_shard_id.shard_slug() + ); let _entered = span.enter(); Self::kill_and_wait_impl(child, WalRedoKillCause::NoLeakChildDrop); }) @@ -1096,11 +1101,11 @@ impl Drop for NoLeakChild { } trait NoLeakChildCommandExt { - fn spawn_no_leak_child(&mut self, tenant_id: TenantId) -> io::Result; + fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result; } impl NoLeakChildCommandExt for Command { - fn spawn_no_leak_child(&mut self, tenant_id: TenantId) -> io::Result { + fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result { NoLeakChild::spawn(tenant_id, self) } } @@ -1155,6 +1160,7 @@ mod tests { use crate::repository::Key; use crate::{config::PageServerConf, walrecord::NeonWalRecord}; use bytes::Bytes; + use pageserver_api::shard::TenantShardId; use std::str::FromStr; use utils::{id::TenantId, lsn::Lsn}; @@ -1264,9 +1270,9 @@ mod tests { let repo_dir = camino_tempfile::tempdir()?; let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf()); let conf = Box::leak(Box::new(conf)); - let tenant_id = TenantId::generate(); + let tenant_shard_id = TenantShardId::unsharded(TenantId::generate()); - let manager = PostgresRedoManager::new(conf, tenant_id); + let manager = PostgresRedoManager::new(conf, tenant_shard_id); Ok(RedoHarness { _repo_dir: repo_dir,