fix(attachment_service): corrupted attachments.json when parallel requests (#6450)

The pagebench integration PR (#6214) issues attachment requests in
parallel.
We observed corrupted attachments.json from time to time, especially in
the test cases with high tenant counts.

The atomic overwrite added in #6444 exposed the root cause cleanly:
the `.commit()` calls of two request handlers could interleave or
be reordered.
See also:
https://github.com/neondatabase/neon/pull/6444#issuecomment-1906392259

This PR makes changes to the `persistence` module to fix above race:
- mpsc queue for PendingWrites
- one writer task performs the writes in mpsc queue order
- request handlers that need to do writes do it using the
  new `mutating_transaction` function.

`mutating_transaction`, while holding the lock, does the modifications,
serializes the post-modification state, and pushes that as a
`PendingWrite` into the mpsc queue.
It then release the lock and `await`s the completion of the write.
The writer tasks executes the `PendingWrites` in queue order.
Once the write has been executed, it wakes the writing tokio task.
This commit is contained in:
Christian Schwarz
2024-01-23 20:14:32 +01:00
committed by GitHub
parent faf275d4a2
commit 743f6dfb9b
4 changed files with 92 additions and 65 deletions

1
Cargo.lock generated
View File

@@ -286,6 +286,7 @@ dependencies = [
"pageserver_client",
"postgres_backend",
"postgres_connection",
"scopeguard",
"serde",
"serde_json",
"thiserror",

View File

@@ -14,6 +14,7 @@ hyper.workspace = true
pageserver_api.workspace = true
pageserver_client.workspace = true
postgres_connection.workspace = true
scopeguard.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true

View File

@@ -66,7 +66,7 @@ async fn main() -> anyhow::Result<()> {
jwt_token: args.jwt_token,
};
let persistence = Arc::new(Persistence::new(&args.path).await);
let persistence = Arc::new(Persistence::spawn(&args.path).await);
let service = Service::spawn(config, persistence).await?;

View File

@@ -1,6 +1,5 @@
use std::{collections::HashMap, str::FromStr};
use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use control_plane::{
attachment_service::{NodeAvailability, NodeSchedulingPolicy},
@@ -12,6 +11,7 @@ use pageserver_api::{
};
use postgres_connection::parse_host_port;
use serde::{Deserialize, Serialize};
use tracing::info;
use utils::{
generation::Generation,
id::{NodeId, TenantId},
@@ -21,50 +21,28 @@ use crate::{node::Node, PlacementPolicy};
/// Placeholder for storage. This will be replaced with a database client.
pub struct Persistence {
state: std::sync::Mutex<PersistentState>,
inner: std::sync::Mutex<Inner>,
}
struct Inner {
state: PersistentState,
write_queue_tx: tokio::sync::mpsc::UnboundedSender<PendingWrite>,
}
// Top level state available to all HTTP handlers
#[derive(Serialize, Deserialize)]
struct PersistentState {
tenants: HashMap<TenantShardId, TenantShardPersistence>,
#[serde(skip)]
path: Utf8PathBuf,
}
/// A convenience for serializing the state inside a sync lock, and then
/// writing it to disk outside of the lock. This will go away when switching
/// to a database backend.
struct PendingWrite {
bytes: Vec<u8>,
path: Utf8PathBuf,
}
impl PendingWrite {
async fn commit(self) -> anyhow::Result<()> {
tokio::task::spawn_blocking(move || {
let tmp_path = utils::crashsafe::path_with_suffix_extension(&self.path, "___new");
utils::crashsafe::overwrite(&self.path, &tmp_path, &self.bytes)
})
.await
.context("spawn_blocking")?
.context("write file")
}
done_tx: tokio::sync::oneshot::Sender<()>,
}
impl PersistentState {
fn save(&self) -> PendingWrite {
PendingWrite {
bytes: serde_json::to_vec(self).expect("Serialization error"),
path: self.path.clone(),
}
}
async fn load(path: &Utf8Path) -> anyhow::Result<Self> {
let bytes = tokio::fs::read(path).await?;
let mut decoded = serde_json::from_slice::<Self>(&bytes)?;
decoded.path = path.to_owned();
for (tenant_id, tenant) in &mut decoded.tenants {
// Backward compat: an old attachments.json from before PR #6251, replace
@@ -93,7 +71,6 @@ impl PersistentState {
tracing::info!("Will create state file at {}", path);
Self {
tenants: HashMap::new(),
path: path.to_owned(),
}
}
Err(e) => {
@@ -104,13 +81,74 @@ impl PersistentState {
}
impl Persistence {
pub async fn new(path: &Utf8Path) -> Self {
pub async fn spawn(path: &Utf8Path) -> Self {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let state = PersistentState::load_or_new(path).await;
tokio::spawn(Self::writer_task(rx, path.to_owned()));
Self {
state: std::sync::Mutex::new(state),
inner: std::sync::Mutex::new(Inner {
state,
write_queue_tx: tx,
}),
}
}
async fn writer_task(
mut rx: tokio::sync::mpsc::UnboundedReceiver<PendingWrite>,
path: Utf8PathBuf,
) {
scopeguard::defer! {
info!("persistence writer task exiting");
};
loop {
match rx.recv().await {
Some(write) => {
tokio::task::spawn_blocking({
let path = path.clone();
move || {
let tmp_path =
utils::crashsafe::path_with_suffix_extension(&path, "___new");
utils::crashsafe::overwrite(&path, &tmp_path, &write.bytes)
}
})
.await
.expect("spawn_blocking")
.expect("write file");
let _ = write.done_tx.send(()); // receiver may lose interest any time
}
None => {
return;
}
}
}
}
/// Perform a modification on our [`PersistentState`].
/// Return a future that completes once our modification has been persisted.
/// The output of the future is the return value of the `txn`` closure.
async fn mutating_transaction<F, R>(&self, txn: F) -> R
where
F: FnOnce(&mut PersistentState) -> R,
{
let (ret, done_rx) = {
let mut inner = self.inner.lock().unwrap();
let ret = txn(&mut inner.state);
let (done_tx, done_rx) = tokio::sync::oneshot::channel();
let write = PendingWrite {
bytes: serde_json::to_vec(&inner.state).expect("Serialization error"),
done_tx,
};
inner
.write_queue_tx
.send(write)
.expect("writer task always outlives self");
(ret, done_rx)
};
// the write task can go away once we start .await'ing
let _: () = done_rx.await.expect("writer task dead, check logs");
ret
}
/// When registering a node, persist it so that on next start we will be able to
/// iterate over known nodes to synchronize their tenant shard states with our observed state.
pub(crate) async fn insert_node(&self, _node: &Node) -> anyhow::Result<()> {
@@ -154,8 +192,8 @@ impl Persistence {
/// At startup, we populate our map of tenant shards from persistent storage.
pub(crate) async fn list_tenant_shards(&self) -> anyhow::Result<Vec<TenantShardPersistence>> {
let locked = self.state.lock().unwrap();
Ok(locked.tenants.values().cloned().collect())
let inner = self.inner.lock().unwrap();
Ok(inner.state.tenants.values().cloned().collect())
}
/// Tenants must be persisted before we schedule them for the first time. This enables us
@@ -164,8 +202,7 @@ impl Persistence {
&self,
shards: Vec<TenantShardPersistence>,
) -> anyhow::Result<()> {
let write = {
let mut locked = self.state.lock().unwrap();
self.mutating_transaction(|locked| {
for shard in shards {
let tenant_shard_id = TenantShardId {
tenant_id: TenantId::from_str(shard.tenant_id.as_str())?,
@@ -175,12 +212,9 @@ impl Persistence {
locked.tenants.insert(tenant_shard_id, shard);
}
locked.save()
};
write.commit().await?;
Ok(())
Ok(())
})
.await
}
/// Reconciler calls this immediately before attaching to a new pageserver, to acquire a unique, monotonically
@@ -191,8 +225,7 @@ impl Persistence {
tenant_shard_id: TenantShardId,
node_id: NodeId,
) -> anyhow::Result<Generation> {
let (write, gen) = {
let mut locked = self.state.lock().unwrap();
self.mutating_transaction(|locked| {
let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) else {
anyhow::bail!("Tried to increment generation of unknown shard");
};
@@ -201,45 +234,37 @@ impl Persistence {
shard.generation_pageserver = Some(node_id);
let gen = Generation::new(shard.generation);
(locked.save(), gen)
};
write.commit().await?;
Ok(gen)
Ok(gen)
})
.await
}
pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
let write = {
let mut locked = self.state.lock().unwrap();
self.mutating_transaction(|locked| {
let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) else {
anyhow::bail!("Tried to increment generation of unknown shard");
};
shard.generation_pageserver = None;
locked.save()
};
write.commit().await?;
Ok(())
Ok(())
})
.await
}
pub(crate) async fn re_attach(
&self,
node_id: NodeId,
) -> anyhow::Result<HashMap<TenantShardId, Generation>> {
let (write, result) = {
self.mutating_transaction(|locked| {
let mut result = HashMap::new();
let mut locked = self.state.lock().unwrap();
for (tenant_shard_id, shard) in locked.tenants.iter_mut() {
if shard.generation_pageserver == Some(node_id) {
shard.generation += 1;
result.insert(*tenant_shard_id, Generation::new(shard.generation));
}
}
(locked.save(), result)
};
write.commit().await?;
Ok(result)
Ok(result)
})
.await
}
// TODO: when we start shard splitting, we must durably mark the tenant so that