From 743f6dfb9be416748aed8f43f4ca294f71d9e849 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 23 Jan 2024 20:14:32 +0100 Subject: [PATCH] fix(attachment_service): corrupted attachments.json when parallel requests (#6450) The pagebench integration PR (#6214) issues attachment requests in parallel. We observed corrupted attachments.json from time to time, especially in the test cases with high tenant counts. The atomic overwrite added in #6444 exposed the root cause cleanly: the `.commit()` calls of two request handlers could interleave or be reordered. See also: https://github.com/neondatabase/neon/pull/6444#issuecomment-1906392259 This PR makes changes to the `persistence` module to fix above race: - mpsc queue for PendingWrites - one writer task performs the writes in mpsc queue order - request handlers that need to do writes do it using the new `mutating_transaction` function. `mutating_transaction`, while holding the lock, does the modifications, serializes the post-modification state, and pushes that as a `PendingWrite` into the mpsc queue. It then release the lock and `await`s the completion of the write. The writer tasks executes the `PendingWrites` in queue order. Once the write has been executed, it wakes the writing tokio task. --- Cargo.lock | 1 + control_plane/attachment_service/Cargo.toml | 1 + control_plane/attachment_service/src/main.rs | 2 +- .../attachment_service/src/persistence.rs | 153 ++++++++++-------- 4 files changed, 92 insertions(+), 65 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 37dba60aee..02a437ccf9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -286,6 +286,7 @@ dependencies = [ "pageserver_client", "postgres_backend", "postgres_connection", + "scopeguard", "serde", "serde_json", "thiserror", diff --git a/control_plane/attachment_service/Cargo.toml b/control_plane/attachment_service/Cargo.toml index 2e2286dbab..743dd806c4 100644 --- a/control_plane/attachment_service/Cargo.toml +++ b/control_plane/attachment_service/Cargo.toml @@ -14,6 +14,7 @@ hyper.workspace = true pageserver_api.workspace = true pageserver_client.workspace = true postgres_connection.workspace = true +scopeguard.workspace = true serde.workspace = true serde_json.workspace = true thiserror.workspace = true diff --git a/control_plane/attachment_service/src/main.rs b/control_plane/attachment_service/src/main.rs index ee2a22ee53..38e51b9a9e 100644 --- a/control_plane/attachment_service/src/main.rs +++ b/control_plane/attachment_service/src/main.rs @@ -66,7 +66,7 @@ async fn main() -> anyhow::Result<()> { jwt_token: args.jwt_token, }; - let persistence = Arc::new(Persistence::new(&args.path).await); + let persistence = Arc::new(Persistence::spawn(&args.path).await); let service = Service::spawn(config, persistence).await?; diff --git a/control_plane/attachment_service/src/persistence.rs b/control_plane/attachment_service/src/persistence.rs index b9c79ff916..4a3fc8c779 100644 --- a/control_plane/attachment_service/src/persistence.rs +++ b/control_plane/attachment_service/src/persistence.rs @@ -1,6 +1,5 @@ use std::{collections::HashMap, str::FromStr}; -use anyhow::Context; use camino::{Utf8Path, Utf8PathBuf}; use control_plane::{ attachment_service::{NodeAvailability, NodeSchedulingPolicy}, @@ -12,6 +11,7 @@ use pageserver_api::{ }; use postgres_connection::parse_host_port; use serde::{Deserialize, Serialize}; +use tracing::info; use utils::{ generation::Generation, id::{NodeId, TenantId}, @@ -21,50 +21,28 @@ use crate::{node::Node, PlacementPolicy}; /// Placeholder for storage. This will be replaced with a database client. pub struct Persistence { - state: std::sync::Mutex, + inner: std::sync::Mutex, +} + +struct Inner { + state: PersistentState, + write_queue_tx: tokio::sync::mpsc::UnboundedSender, } -// Top level state available to all HTTP handlers #[derive(Serialize, Deserialize)] struct PersistentState { tenants: HashMap, - - #[serde(skip)] - path: Utf8PathBuf, } -/// A convenience for serializing the state inside a sync lock, and then -/// writing it to disk outside of the lock. This will go away when switching -/// to a database backend. struct PendingWrite { bytes: Vec, - path: Utf8PathBuf, -} - -impl PendingWrite { - async fn commit(self) -> anyhow::Result<()> { - tokio::task::spawn_blocking(move || { - let tmp_path = utils::crashsafe::path_with_suffix_extension(&self.path, "___new"); - utils::crashsafe::overwrite(&self.path, &tmp_path, &self.bytes) - }) - .await - .context("spawn_blocking")? - .context("write file") - } + done_tx: tokio::sync::oneshot::Sender<()>, } impl PersistentState { - fn save(&self) -> PendingWrite { - PendingWrite { - bytes: serde_json::to_vec(self).expect("Serialization error"), - path: self.path.clone(), - } - } - async fn load(path: &Utf8Path) -> anyhow::Result { let bytes = tokio::fs::read(path).await?; let mut decoded = serde_json::from_slice::(&bytes)?; - decoded.path = path.to_owned(); for (tenant_id, tenant) in &mut decoded.tenants { // Backward compat: an old attachments.json from before PR #6251, replace @@ -93,7 +71,6 @@ impl PersistentState { tracing::info!("Will create state file at {}", path); Self { tenants: HashMap::new(), - path: path.to_owned(), } } Err(e) => { @@ -104,13 +81,74 @@ impl PersistentState { } impl Persistence { - pub async fn new(path: &Utf8Path) -> Self { + pub async fn spawn(path: &Utf8Path) -> Self { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let state = PersistentState::load_or_new(path).await; + tokio::spawn(Self::writer_task(rx, path.to_owned())); Self { - state: std::sync::Mutex::new(state), + inner: std::sync::Mutex::new(Inner { + state, + write_queue_tx: tx, + }), } } + async fn writer_task( + mut rx: tokio::sync::mpsc::UnboundedReceiver, + path: Utf8PathBuf, + ) { + scopeguard::defer! { + info!("persistence writer task exiting"); + }; + loop { + match rx.recv().await { + Some(write) => { + tokio::task::spawn_blocking({ + let path = path.clone(); + move || { + let tmp_path = + utils::crashsafe::path_with_suffix_extension(&path, "___new"); + utils::crashsafe::overwrite(&path, &tmp_path, &write.bytes) + } + }) + .await + .expect("spawn_blocking") + .expect("write file"); + let _ = write.done_tx.send(()); // receiver may lose interest any time + } + None => { + return; + } + } + } + } + + /// Perform a modification on our [`PersistentState`]. + /// Return a future that completes once our modification has been persisted. + /// The output of the future is the return value of the `txn`` closure. + async fn mutating_transaction(&self, txn: F) -> R + where + F: FnOnce(&mut PersistentState) -> R, + { + let (ret, done_rx) = { + let mut inner = self.inner.lock().unwrap(); + let ret = txn(&mut inner.state); + let (done_tx, done_rx) = tokio::sync::oneshot::channel(); + let write = PendingWrite { + bytes: serde_json::to_vec(&inner.state).expect("Serialization error"), + done_tx, + }; + inner + .write_queue_tx + .send(write) + .expect("writer task always outlives self"); + (ret, done_rx) + }; + // the write task can go away once we start .await'ing + let _: () = done_rx.await.expect("writer task dead, check logs"); + ret + } + /// When registering a node, persist it so that on next start we will be able to /// iterate over known nodes to synchronize their tenant shard states with our observed state. pub(crate) async fn insert_node(&self, _node: &Node) -> anyhow::Result<()> { @@ -154,8 +192,8 @@ impl Persistence { /// At startup, we populate our map of tenant shards from persistent storage. pub(crate) async fn list_tenant_shards(&self) -> anyhow::Result> { - let locked = self.state.lock().unwrap(); - Ok(locked.tenants.values().cloned().collect()) + let inner = self.inner.lock().unwrap(); + Ok(inner.state.tenants.values().cloned().collect()) } /// Tenants must be persisted before we schedule them for the first time. This enables us @@ -164,8 +202,7 @@ impl Persistence { &self, shards: Vec, ) -> anyhow::Result<()> { - let write = { - let mut locked = self.state.lock().unwrap(); + self.mutating_transaction(|locked| { for shard in shards { let tenant_shard_id = TenantShardId { tenant_id: TenantId::from_str(shard.tenant_id.as_str())?, @@ -175,12 +212,9 @@ impl Persistence { locked.tenants.insert(tenant_shard_id, shard); } - locked.save() - }; - - write.commit().await?; - - Ok(()) + Ok(()) + }) + .await } /// Reconciler calls this immediately before attaching to a new pageserver, to acquire a unique, monotonically @@ -191,8 +225,7 @@ impl Persistence { tenant_shard_id: TenantShardId, node_id: NodeId, ) -> anyhow::Result { - let (write, gen) = { - let mut locked = self.state.lock().unwrap(); + self.mutating_transaction(|locked| { let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) else { anyhow::bail!("Tried to increment generation of unknown shard"); }; @@ -201,45 +234,37 @@ impl Persistence { shard.generation_pageserver = Some(node_id); let gen = Generation::new(shard.generation); - (locked.save(), gen) - }; - - write.commit().await?; - Ok(gen) + Ok(gen) + }) + .await } pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> { - let write = { - let mut locked = self.state.lock().unwrap(); + self.mutating_transaction(|locked| { let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) else { anyhow::bail!("Tried to increment generation of unknown shard"); }; shard.generation_pageserver = None; - locked.save() - }; - write.commit().await?; - Ok(()) + Ok(()) + }) + .await } pub(crate) async fn re_attach( &self, node_id: NodeId, ) -> anyhow::Result> { - let (write, result) = { + self.mutating_transaction(|locked| { let mut result = HashMap::new(); - let mut locked = self.state.lock().unwrap(); for (tenant_shard_id, shard) in locked.tenants.iter_mut() { if shard.generation_pageserver == Some(node_id) { shard.generation += 1; result.insert(*tenant_shard_id, Generation::new(shard.generation)); } } - - (locked.save(), result) - }; - - write.commit().await?; - Ok(result) + Ok(result) + }) + .await } // TODO: when we start shard splitting, we must durably mark the tenant so that