Fix race condition after attaching tenant with branches. (#4170)

After tenant attach, there is a window where the child timeline is
loaded and accepts GetPage requests, but its parent is not. If a
GetPage request needs to traverse to the parent, it needs to wait for
the parent timeline to become active, or it might miss some records on
the parent timeline.

It's also possible that the parent timeline is active, but it hasn't
yet received all the WAL up to the branch point from the safekeeper.
This happens if a pageserver crashes soon after creating a timeline,
so that the WAL leading to the branch point has not yet been uploaded
to remote storage. After restart, the WAL will be re-streamed and
ingested from the safekeeper, but that takes a while. Because of that,
it's not enough to check that the parent timeline is active, we also
need to wait for the WAL to arrive on the parent timeline, just like
at the beginning of GetPage handling. We probably should change the
behavior at create_timeline so that a timeline can only be created
after all the WAL up to the branch point has been uploaded to remote
storage, but that's not currently the case and out of scope for this
PR (see github issue #4218).

@NanoBjorn encountered this while working on tenant migration. After
migrating a tenant with a parent and child branch, connecting to the
child branch failed with an error like:

```
FATAL:  "base/16385" is not a valid data directory
DETAIL:  File "base/16385/PG_VERSION" is missing.
```

This commit adds two tests that reproduce the bug, with slightly
different symptoms.
This commit is contained in:
Heikki Linnakangas
2023-05-13 10:44:11 +03:00
committed by GitHub
parent edcf4d61a4
commit 2855c73990
6 changed files with 364 additions and 7 deletions

View File

@@ -146,6 +146,10 @@ pub const XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8;
pub const XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED: u8 = (1 << 1) as u8;
pub const XLH_DELETE_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8;
// From replication/message.h
pub const XLOG_LOGICAL_MESSAGE: u8 = 0x00;
// From rmgrlist.h
pub const RM_XLOG_ID: u8 = 0;
pub const RM_XACT_ID: u8 = 1;
pub const RM_SMGR_ID: u8 = 2;
@@ -157,6 +161,7 @@ pub const RM_RELMAP_ID: u8 = 7;
pub const RM_STANDBY_ID: u8 = 8;
pub const RM_HEAP2_ID: u8 = 9;
pub const RM_HEAP_ID: u8 = 10;
pub const RM_LOGICALMSG_ID: u8 = 21;
// from xlogreader.h
pub const XLR_INFO_MASK: u8 = 0x0F;

View File

@@ -105,6 +105,9 @@ impl From<PageReconstructError> for ApiError {
PageReconstructError::Cancelled => {
ApiError::InternalServerError(anyhow::anyhow!("request was cancelled"))
}
PageReconstructError::AncestorStopping(_) => {
ApiError::InternalServerError(anyhow::Error::new(pre))
}
PageReconstructError::WalRedo(pre) => {
ApiError::InternalServerError(anyhow::Error::new(pre))
}

View File

@@ -3441,14 +3441,26 @@ mod tests {
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx)
.await?;
// The branchpoints should contain all timelines, even ones marked
// as Broken.
{
let branchpoints = &tline.gc_info.read().unwrap().retain_lsns;
assert_eq!(branchpoints.len(), 1);
assert_eq!(branchpoints[0], Lsn(0x40));
}
// You can read the key from the child branch even though the parent is
// Broken, as long as you don't need to access data from the parent.
assert_eq!(
newtline.get(*TEST_KEY, Lsn(0x50), &ctx).await?,
TEST_IMG(&format!("foo at {}", Lsn(0x40)))
newtline.get(*TEST_KEY, Lsn(0x70), &ctx).await?,
TEST_IMG(&format!("foo at {}", Lsn(0x70)))
);
let branchpoints = &tline.gc_info.read().unwrap().retain_lsns;
assert_eq!(branchpoints.len(), 1);
assert_eq!(branchpoints[0], Lsn(0x40));
// This needs to traverse to the parent, and fails.
let err = newtline.get(*TEST_KEY, Lsn(0x50), &ctx).await.unwrap_err();
assert!(err
.to_string()
.contains("will not become active. Current state: Broken"));
Ok(())
}

View File

@@ -396,6 +396,9 @@ pub enum PageReconstructError {
/// The operation was cancelled
Cancelled,
/// The ancestor of this is being stopped
AncestorStopping(TimelineId),
/// An error happened replaying WAL records
#[error(transparent)]
WalRedo(#[from] crate::walredo::WalRedoError),
@@ -414,6 +417,9 @@ impl std::fmt::Debug for PageReconstructError {
)
}
Self::Cancelled => write!(f, "cancelled"),
Self::AncestorStopping(timeline_id) => {
write!(f, "ancestor timeline {timeline_id} is being stopped")
}
Self::WalRedo(err) => err.fmt(f),
}
}
@@ -432,6 +438,9 @@ impl std::fmt::Display for PageReconstructError {
)
}
Self::Cancelled => write!(f, "cancelled"),
Self::AncestorStopping(timeline_id) => {
write!(f, "ancestor timeline {timeline_id} is being stopped")
}
Self::WalRedo(err) => err.fmt(f),
}
}
@@ -934,6 +943,31 @@ impl Timeline {
self.state.subscribe()
}
pub async fn wait_to_become_active(
&self,
_ctx: &RequestContext, /* Prepare for use by cancellation */
) -> Result<(), TimelineState> {
let mut receiver = self.state.subscribe();
loop {
let current_state = *receiver.borrow_and_update();
match current_state {
TimelineState::Loading => {
receiver
.changed()
.await
.expect("holding a reference to self");
}
TimelineState::Active { .. } => {
return Ok(());
}
TimelineState::Broken { .. } | TimelineState::Stopping => {
// There's no chance the timeline can transition back into ::Active
return Err(current_state);
}
}
}
}
pub fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
let layer_map = self.layers.read().unwrap();
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
@@ -1853,12 +1887,25 @@ impl Timeline {
Ok(s) => s,
Err(CalculateLogicalSizeError::Cancelled) => {
// Don't make noise, this is a common task.
// In the unlikely case that there ihs another call to this function, we'll retry
// In the unlikely case that there is another call to this function, we'll retry
// because initial_logical_size is still None.
info!("initial size calculation cancelled, likely timeline delete / tenant detach");
return Ok(());
}
x @ Err(_) => x.context("Failed to calculate logical size")?,
Err(CalculateLogicalSizeError::Other(err)) => {
if let Some(e @ PageReconstructError::AncestorStopping(_)) =
err.root_cause().downcast_ref()
{
// This can happen if the timeline parent timeline switches to
// Stopping state while we're still calculating the initial
// timeline size for the child, for example if the tenant is
// being detached or the pageserver is shut down. Like with
// CalculateLogicalSizeError::Cancelled, don't make noise.
info!("initial size calculation failed because the timeline or its ancestor is Stopping, likely because the tenant is being detached: {e:#}");
return Ok(());
}
return Err(err.context("Failed to calculate logical size"));
}
};
// we cannot query current_logical_size.current_size() to know the current
@@ -2240,6 +2287,46 @@ impl Timeline {
Ok(timeline) => timeline,
Err(e) => return Err(PageReconstructError::from(e)),
};
// It's possible that the ancestor timeline isn't active yet, or
// is active but hasn't yet caught up to the branch point. Wait
// for it.
//
// This cannot happen while the pageserver is running normally,
// because you cannot create a branch from a point that isn't
// present in the pageserver yet. However, we don't wait for the
// branch point to be uploaded to cloud storage before creating
// a branch. I.e., the branch LSN need not be remote consistent
// for the branching operation to succeed.
//
// Hence, if we try to load a tenant in such a state where
// 1. the existence of the branch was persisted (in IndexPart and/or locally)
// 2. but the ancestor state is behind branch_lsn because it was not yet persisted
// then we will need to wait for the ancestor timeline to
// re-stream WAL up to branch_lsn before we access it.
//
// How can a tenant get in such a state?
// - ungraceful pageserver process exit
// - detach+attach => this is a bug, https://github.com/neondatabase/neon/issues/4219
//
// NB: this could be avoided by requiring
// branch_lsn >= remote_consistent_lsn
// during branch creation.
match ancestor.wait_to_become_active(ctx).await {
Ok(()) => {}
Err(state) if state == TimelineState::Stopping => {
return Err(PageReconstructError::AncestorStopping(ancestor.timeline_id));
}
Err(state) => {
return Err(PageReconstructError::Other(anyhow::anyhow!(
"Timeline {} will not become active. Current state: {:?}",
ancestor.timeline_id,
&state,
)));
}
}
ancestor.wait_lsn(timeline.ancestor_lsn, ctx).await?;
timeline_owned = ancestor;
timeline = &*timeline_owned;
prev_lsn = Lsn(u64::MAX);

View File

@@ -305,6 +305,15 @@ impl<'a> WalIngest<'a> {
self.checkpoint_modified = true;
}
}
} else if decoded.xl_rmid == pg_constants::RM_LOGICALMSG_ID {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_LOGICAL_MESSAGE {
// This is a convenient way to make the WAL ingestion pause at
// particular point in the WAL. For more fine-grained control,
// we could peek into the message and only pause if it contains
// a particular string, for example, but this is enough for now.
utils::failpoint_sleep_millis_async!("wal-ingest-logical-message-sleep");
}
}
// Iterate through all the blocks that the record modifies, and

View File

@@ -1,5 +1,7 @@
import os
import shutil
import threading
import time
from contextlib import closing, contextmanager
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
@@ -12,6 +14,8 @@ from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PortDistributor,
RemoteStorageKind,
available_remote_storages,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import (
@@ -19,6 +23,7 @@ from fixtures.pageserver.utils import (
tenant_exists,
wait_for_last_record_lsn,
wait_for_upload,
wait_for_upload_queue_empty,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import (
@@ -512,3 +517,239 @@ def test_tenant_relocation(
if line.startswith("listen_pg_addr"):
lines[i] = f"listen_pg_addr = 'localhost:{env.pageserver.service_port.pg}'"
(env.repo_dir / "config").write_text("\n".join(lines))
# Simulate hard crash of pageserver and re-attach a tenant with a branch
#
# This exercises a race condition after tenant attach, where the
# branch point on the ancestor timeline is greater than the ancestor's
# last-record LSN. We had a bug where GetPage incorrectly followed the
# timeline to the ancestor without waiting for the missing WAL to
# arrive.
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_emergency_relocate_with_branches_slow_replay(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_emergency_relocate_with_branches_slow_replay",
)
env = neon_env_builder.init_start()
env.pageserver.is_testing_enabled_or_skip()
pageserver_http = env.pageserver.http_client()
# Prepare for the test:
#
# - Main branch, with a table and two inserts to it.
# - A logical replication message between the inserts, so that we can conveniently
# pause the WAL ingestion between the two inserts.
# - Child branch, created after the inserts
tenant_id, timeline_id = env.neon_cli.create_tenant()
main_endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
with main_endpoint.cursor() as cur:
cur.execute("CREATE TABLE test_reattach (t text)")
cur.execute("INSERT INTO test_reattach VALUES ('before pause')")
cur.execute("SELECT pg_logical_emit_message(false, 'neon-test', 'between inserts')")
cur.execute("INSERT INTO test_reattach VALUES ('after pause')")
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
main_endpoint.stop()
child_timeline_id = env.neon_cli.create_branch(
"child", tenant_id=tenant_id, ancestor_start_lsn=current_lsn
)
# Wait for the index_part.json file of both branches to be uploaded to remote storage.
# This is a work around for issue https://github.com/neondatabase/neon/issues/3865.
wait_for_upload_queue_empty(pageserver_http, tenant_id, timeline_id)
wait_for_upload_queue_empty(pageserver_http, tenant_id, child_timeline_id)
# Now kill the pageserver, remove the tenant directory, and restart. This simulates
# the scenario that a pageserver dies unexpectedly and cannot be recovered, so we relocate
# the tenant to a different pageserver. We reuse the same pageserver because it's
# simpler than initializing a new one from scratch, but the effect on the single tenant
# is the same.
env.pageserver.stop(immediate=True)
shutil.rmtree(Path(env.repo_dir) / "tenants" / str(tenant_id))
env.pageserver.start()
# This fail point will pause the WAL ingestion on the main branch, after the
# the first insert
pageserver_http.configure_failpoints([("wal-ingest-logical-message-sleep", "return(5000)")])
# Attach and wait a few seconds to give it time to load the tenants, attach to the
# safekeepers, and to stream and ingest the WAL up to the pause-point.
before_attach_time = time.time()
pageserver_http.tenant_attach(tenant_id)
time.sleep(3)
# The wal ingestion on the main timeline should now be paused at the fail point.
# Run a query on the child branch. The GetPage requests for this should recurse to the
# parent timeline, and wait for the WAL to be ingested there. Otherwise it won't see
# the second insert.
child_endpoint = env.endpoints.create_start("child", tenant_id=tenant_id)
with child_endpoint.cursor() as cur:
cur.execute("SELECT * FROM test_reattach")
assert cur.fetchall() == [("before pause",), ("after pause",)]
# Sanity check that the failpoint was reached
assert env.pageserver.log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done')
assert time.time() - before_attach_time > 5
# Clean up
pageserver_http.configure_failpoints(("wal-ingest-logical-message-sleep", "off"))
# Simulate hard crash of pageserver and re-attach a tenant with a branch
#
# This exercises the same race condition after as
# 'test_emergency_relocate_with_branches_slow_replay', but this test case
# is closer to the original scenario where we originally found the
# issue.
#
# In this scenario, the incorrect result to get-request leads to
# *permanent damage* in the child timeline, because ingesting the WAL
# on the child timeline depended on incorrect view of the parent. This
# test reproduced one such case; the symptom was an error on the child, when
# trying to connect to the child endpoint after re-attaching the tenant:
#
# FATAL: database "neondb" does not exist
#
# In the original case where we bumped into this, the error was slightly
# different:
#
# FATAL: "base/16385" is not a valid data directory
# DETAIL: File "base/16385/PG_VERSION" is missing.
#
### Detailed explanation of the original bug and why it lead to that error:
#
# The WAL on the main and the child branches look like this:
#
# Main Child
# 1. CREATE DATABASE
# <child branch is created>
# 2. CREATE TABLE AS SELECT ... 3. CREATE TABLE AS SELECT ...
#
# None of these WAL records have been flushed to disk or uploaded to remote
# storage in the pageserver yet, when the tenant is detached.
#
# After detach and re-attach, a walreceiver is spawned on both timelines.
# They will connect to the safekeepers and start ingesting the WAL
# from their respective IndexParts' `disk_consistent_lsn` onward.
#
# The bug occurs if the child branch's walreceiver runs before the
# main's. It receives the SMGR_CREATE WAL record emitted by the
# CREATE TABLE statement (3.), and applies it, without seeing the
# effect of the CREATE DATABASE statement.
#
# To understand why that leads to a 'File "base/16385/PG_VERSION" is
# missing' error, let's look at what the handlers for the WAL records
# do:
#
# CREATE DATABASE WAL record is handled by ingest_xlog_dbase_create:
#
# ingest_xlog_dbase_create:
# put_relmap_file()
# // NOTE 'true': It means that there is a relmapper and PG_VERSION file
# 1: let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
#
#
# CREATE TABLE emits an SMGR_CREATE record, which is handled by:
#
# ingest_xlog_smgr_create:
# put_rel_creation:
# ...
# let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() {
# 2: // Didn't exist. Update dbdir
# dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false);
# let buf = DbDirectory::ser(&dbdir)?;
# self.put(DBDIR_KEY, Value::Image(buf.into()));
#
# // and create the RelDirectory
# RelDirectory::default()
# } else {
# 3: // reldir already exists, fetch it
# RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?
# };
#
#
# In the correct ordering, the SMGR_CREATE record is applied after the
# CREATE DATABASE record. The CREATE DATABASE creates the entry in the
# 'dbdir', with the 'true' flag that indicates that PG_VERSION exists
# (1). The SMGR_CREATE handler calls put_rel_creation, which finds the
# dbdir entry that the CREATE DATABASE record created, and takes the
# "reldir already exists, fetch it" else-branch at the if statement (3).
#
# In the incorrect ordering, the child walreceiver applies the
# SMGR_CREATE record without seeing the effects of the CREATE
# DATABASE. In that case, put_rel_creation takes the "Didn't
# exist. Update dbir" path (2), and inserts an entry in the
# DbDirectory with 'false' to indicate there is no PG_VERSION file.
#
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_emergency_relocate_with_branches_createdb(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_emergency_relocate_with_branches_createdb",
)
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
# create new nenant
tenant_id, timeline_id = env.neon_cli.create_tenant()
main_endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
with main_endpoint.cursor() as cur:
cur.execute("SELECT pg_logical_emit_message(false, 'neon-test', 'between inserts')")
cur.execute("CREATE DATABASE neondb")
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
child_timeline_id = env.neon_cli.create_branch(
"child", tenant_id=tenant_id, ancestor_start_lsn=current_lsn
)
with main_endpoint.cursor(dbname="neondb") as cur:
cur.execute("CREATE TABLE test_migrate_one AS SELECT generate_series(1,100)")
main_endpoint.stop()
child_endpoint = env.endpoints.create_start("child", tenant_id=tenant_id)
with child_endpoint.cursor(dbname="neondb") as cur:
cur.execute("CREATE TABLE test_migrate_one AS SELECT generate_series(1,200)")
child_endpoint.stop()
# Wait for the index_part.json file of both branches to be uploaded to remote storage.
# This is a work around for issue https://github.com/neondatabase/neon/issues/3865.
wait_for_upload_queue_empty(pageserver_http, tenant_id, timeline_id)
wait_for_upload_queue_empty(pageserver_http, tenant_id, child_timeline_id)
# Kill the pageserver, remove the tenant directory, and restart
env.pageserver.stop(immediate=True)
shutil.rmtree(Path(env.repo_dir) / "tenants" / str(tenant_id))
env.pageserver.start()
# Wait before ingesting the WAL for CREATE DATABASE on the main branch. The original
# bug reproduced easily even without this, as there is always some delay between
# loading the timeline and establishing the connection to the safekeeper to stream and
# ingest the WAL, but let's make this less dependent on accidental timing.
pageserver_http.configure_failpoints([("wal-ingest-logical-message-sleep", "return(5000)")])
before_attach_time = time.time()
pageserver_http.tenant_attach(tenant_id)
child_endpoint.start()
with child_endpoint.cursor(dbname="neondb") as cur:
assert query_scalar(cur, "SELECT count(*) FROM test_migrate_one") == 200
# Sanity check that the failpoint was reached
assert env.pageserver.log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done')
assert time.time() - before_attach_time > 5
# Clean up
pageserver_http.configure_failpoints(("wal-ingest-logical-message-sleep", "off"))