safekeeper: consider partial uploads when pulling timeline (#8628)

## Problem
The control file contains the id of the safekeeper that uploaded it.
Previously, when sending a snapshot of the control file to another sk,
it would eventually be gc-ed by the receiving sk. This is incorrect
because the original sk might still need it later.

## Summary of Changes
When sending a snapshot and the control file contains an uploaded
segment:
* Create a copy of the segment in s3 with the destination sk in the
  object name
* Tweak the streamed control file to point to the object create in the
  previous step

Note that the snapshot endpoint now has to know the id of the requestor,
so the api has been extended to include the node if of the destination
sk.

Closes https://github.com/neondatabase/neon/issues/8542
This commit is contained in:
Vlad Lazar
2024-08-15 09:02:33 +01:00
committed by GitHub
parent 168913bdf0
commit fef77b0cc9
8 changed files with 327 additions and 42 deletions

View File

@@ -164,6 +164,30 @@ impl Deref for FileStorage {
}
}
impl TimelinePersistentState {
pub(crate) fn write_to_buf(&self) -> Result<Vec<u8>> {
let mut buf: Vec<u8> = Vec::new();
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
if self.eviction_state == EvictionState::Present {
// temp hack for forward compatibility
const PREV_FORMAT_VERSION: u32 = 8;
let prev = downgrade_v9_to_v8(self);
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, PREV_FORMAT_VERSION)?;
prev.ser_into(&mut buf)?;
} else {
// otherwise, we write the current format version
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
self.ser_into(&mut buf)?;
}
// calculate checksum before resize
let checksum = crc32c::crc32c(&buf);
buf.extend_from_slice(&checksum.to_le_bytes());
Ok(buf)
}
}
#[async_trait::async_trait]
impl Storage for FileStorage {
/// Persists state durably to the underlying storage.
@@ -180,24 +204,8 @@ impl Storage for FileStorage {
&control_partial_path
)
})?;
let mut buf: Vec<u8> = Vec::new();
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
if s.eviction_state == EvictionState::Present {
// temp hack for forward compatibility
const PREV_FORMAT_VERSION: u32 = 8;
let prev = downgrade_v9_to_v8(s);
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, PREV_FORMAT_VERSION)?;
prev.ser_into(&mut buf)?;
} else {
// otherwise, we write the current format version
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
s.ser_into(&mut buf)?;
}
// calculate checksum before resize
let checksum = crc32c::crc32c(&buf);
buf.extend_from_slice(&checksum.to_le_bytes());
let buf: Vec<u8> = s.write_to_buf()?;
control_partial.write_all(&buf).await.with_context(|| {
format!(

View File

@@ -10,7 +10,7 @@
use reqwest::{IntoUrl, Method, StatusCode};
use utils::{
http::error::HttpErrorBody,
id::{TenantId, TimelineId},
id::{NodeId, TenantId, TimelineId},
logging::SecretString,
};
@@ -97,10 +97,11 @@ impl Client {
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
stream_to: NodeId,
) -> Result<reqwest::Response> {
let uri = format!(
"{}/v1/tenant/{}/timeline/{}/snapshot",
self.mgmt_api_endpoint, tenant_id, timeline_id
"{}/v1/tenant/{}/timeline/{}/snapshot/{}",
self.mgmt_api_endpoint, tenant_id, timeline_id, stream_to.0
);
self.get(&uri).await
}

View File

@@ -205,6 +205,7 @@ async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Bo
/// Stream tar archive with all timeline data.
async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let destination = parse_request_param(&request, "destination_id")?;
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
@@ -225,7 +226,13 @@ async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Bo
// so create the chan and write to it in another task.
let (tx, rx) = mpsc::channel(1);
task::spawn(pull_timeline::stream_snapshot(tli, tx));
let conf = get_conf(&request);
task::spawn(pull_timeline::stream_snapshot(
tli,
conf.my_id,
destination,
tx,
));
let rx_stream = ReceiverStream::new(rx);
let body = Body::wrap_stream(rx_stream);
@@ -565,7 +572,7 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
request_span(r, tenant_delete_handler)
})
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot",
"/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot/:destination_id",
|r| request_span(r, timeline_snapshot_handler),
)
.post("/v1/pull_timeline", |r| {

View File

@@ -11,13 +11,8 @@ use std::{
io::{self, ErrorKind},
sync::Arc,
};
use tokio::{
fs::{File, OpenOptions},
io::AsyncWrite,
sync::mpsc,
task,
};
use tokio_tar::{Archive, Builder};
use tokio::{fs::OpenOptions, io::AsyncWrite, sync::mpsc, task};
use tokio_tar::{Archive, Builder, Header};
use tokio_util::{
io::{CopyToBytes, SinkWriter},
sync::PollSender,
@@ -32,13 +27,15 @@ use crate::{
routes::TimelineStatus,
},
safekeeper::Term,
state::TimelinePersistentState,
timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError, WalResidentTimeline},
wal_backup,
wal_storage::{self, open_wal_file, Storage},
GlobalTimelines, SafeKeeperConf,
};
use utils::{
crashsafe::{durable_rename, fsync_async_opt},
id::{TenantId, TenantTimelineId, TimelineId},
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
logging::SecretString,
lsn::Lsn,
pausable_failpoint,
@@ -46,8 +43,13 @@ use utils::{
/// Stream tar archive of timeline to tx.
#[instrument(name = "snapshot", skip_all, fields(ttid = %tli.ttid))]
pub async fn stream_snapshot(tli: WalResidentTimeline, tx: mpsc::Sender<Result<Bytes>>) {
if let Err(e) = stream_snapshot_guts(tli, tx.clone()).await {
pub async fn stream_snapshot(
tli: WalResidentTimeline,
source: NodeId,
destination: NodeId,
tx: mpsc::Sender<Result<Bytes>>,
) {
if let Err(e) = stream_snapshot_guts(tli, source, destination, tx.clone()).await {
// Error type/contents don't matter as they won't can't reach the client
// (hyper likely doesn't do anything with it), but http stream will be
// prematurely terminated. It would be nice to try to send the error in
@@ -81,6 +83,8 @@ impl Drop for SnapshotContext {
pub async fn stream_snapshot_guts(
tli: WalResidentTimeline,
source: NodeId,
destination: NodeId,
tx: mpsc::Sender<Result<Bytes>>,
) -> Result<()> {
// tokio-tar wants Write implementor, but we have mpsc tx <Result<Bytes>>;
@@ -104,7 +108,7 @@ pub async fn stream_snapshot_guts(
// which is also likely suboptimal.
let mut ar = Builder::new_non_terminated(pinned_writer);
let bctx = tli.start_snapshot(&mut ar).await?;
let bctx = tli.start_snapshot(&mut ar, source, destination).await?;
pausable_failpoint!("sk-snapshot-after-list-pausable");
let tli_dir = tli.get_timeline_dir();
@@ -158,13 +162,43 @@ impl WalResidentTimeline {
async fn start_snapshot<W: AsyncWrite + Unpin + Send>(
&self,
ar: &mut tokio_tar::Builder<W>,
source: NodeId,
destination: NodeId,
) -> Result<SnapshotContext> {
let mut shared_state = self.write_shared_state().await;
let wal_seg_size = shared_state.get_wal_seg_size();
let cf_path = self.get_timeline_dir().join(CONTROL_FILE_NAME);
let mut cf = File::open(cf_path).await?;
ar.append_file(CONTROL_FILE_NAME, &mut cf).await?;
let mut control_store = TimelinePersistentState::clone(shared_state.sk.state());
// Modify the partial segment of the in-memory copy for the control file to
// point to the destination safekeeper.
let replace = control_store
.partial_backup
.replace_uploaded_segment(source, destination)?;
if let Some(replace) = replace {
// The deserialized control file has an uploaded partial. We upload a copy
// of it to object storage for the destination safekeeper and send an updated
// control file in the snapshot.
tracing::info!(
"Replacing uploaded partial segment in in-mem control file: {replace:?}"
);
let remote_timeline_path = wal_backup::remote_timeline_path(&self.tli.ttid)?;
wal_backup::copy_partial_segment(
&replace.previous.remote_path(&remote_timeline_path),
&replace.current.remote_path(&remote_timeline_path),
)
.await?;
}
let buf = control_store
.write_to_buf()
.with_context(|| "failed to serialize control store")?;
let mut header = Header::new_gnu();
header.set_size(buf.len().try_into().expect("never breaches u64"));
ar.append_data(&mut header, CONTROL_FILE_NAME, buf.as_slice())
.await
.with_context(|| "failed to append to archive")?;
// We need to stream since the oldest segment someone (s3 or pageserver)
// still needs. This duplicates calc_horizon_lsn logic.
@@ -342,7 +376,7 @@ async fn pull_timeline(
let client = Client::new(host.clone(), sk_auth_token.clone());
// Request stream with basebackup archive.
let bb_resp = client
.snapshot(status.tenant_id, status.timeline_id)
.snapshot(status.tenant_id, status.timeline_id, conf.my_id)
.await?;
// Make Stream of Bytes from it...

View File

@@ -483,6 +483,16 @@ pub(crate) async fn backup_partial_segment(
.await
}
pub(crate) async fn copy_partial_segment(
source: &RemotePath,
destination: &RemotePath,
) -> Result<()> {
let storage = get_configured_remote_storage();
let cancel = CancellationToken::new();
storage.copy_object(source, destination, &cancel).await
}
pub async fn read_object(
file_path: &RemotePath,
offset: u64,

View File

@@ -17,14 +17,13 @@
//! file. Code updates state in the control file before doing any S3 operations.
//! This way control file stores information about all potentially existing
//! remote partial segments and can clean them up after uploading a newer version.
use camino::Utf8PathBuf;
use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
use remote_storage::RemotePath;
use serde::{Deserialize, Serialize};
use tracing::{debug, error, info, instrument, warn};
use utils::lsn::Lsn;
use utils::{id::NodeId, lsn::Lsn};
use crate::{
metrics::{MISC_OPERATION_SECONDS, PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS},
@@ -82,6 +81,12 @@ pub struct State {
pub segments: Vec<PartialRemoteSegment>,
}
#[derive(Debug)]
pub(crate) struct ReplaceUploadedSegment {
pub(crate) previous: PartialRemoteSegment,
pub(crate) current: PartialRemoteSegment,
}
impl State {
/// Find an Uploaded segment. There should be only one Uploaded segment at a time.
pub(crate) fn uploaded_segment(&self) -> Option<PartialRemoteSegment> {
@@ -90,6 +95,54 @@ impl State {
.find(|seg| seg.status == UploadStatus::Uploaded)
.cloned()
}
/// Replace the name of the Uploaded segment (if one exists) in order to match
/// it with `destination` safekeeper. Returns a description of the change or None
/// wrapped in anyhow::Result.
pub(crate) fn replace_uploaded_segment(
&mut self,
source: NodeId,
destination: NodeId,
) -> anyhow::Result<Option<ReplaceUploadedSegment>> {
let current = self
.segments
.iter_mut()
.find(|seg| seg.status == UploadStatus::Uploaded);
let current = match current {
Some(some) => some,
None => {
return anyhow::Ok(None);
}
};
// Sanity check that the partial segment we are replacing is belongs
// to the `source` SK.
if !current
.name
.ends_with(format!("sk{}.partial", source.0).as_str())
{
anyhow::bail!(
"Partial segment name ({}) doesn't match self node id ({})",
current.name,
source
);
}
let previous = current.clone();
let new_name = current.name.replace(
format!("_sk{}", source.0).as_str(),
format!("_sk{}", destination.0).as_str(),
);
current.name = new_name;
anyhow::Ok(Some(ReplaceUploadedSegment {
previous,
current: current.clone(),
}))
}
}
struct PartialBackup {

View File

@@ -67,6 +67,7 @@ from fixtures.pageserver.utils import (
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import (
LocalFsStorage,
MockS3Server,
RemoteStorage,
RemoteStorageKind,
@@ -4425,14 +4426,32 @@ class Safekeeper(LogUtils):
def timeline_dir(self, tenant_id, timeline_id) -> Path:
return self.data_dir / str(tenant_id) / str(timeline_id)
def list_uploaded_segments(self, tenant_id: TenantId, timeline_id: TimelineId):
tline_path = (
self.env.repo_dir
/ "local_fs_remote_storage"
/ "safekeeper"
/ str(tenant_id)
/ str(timeline_id)
)
assert isinstance(self.env.safekeepers_remote_storage, LocalFsStorage)
return self._list_segments_in_dir(
tline_path, lambda name: ".metadata" not in name and ".___temp" not in name
)
def list_segments(self, tenant_id, timeline_id) -> List[str]:
"""
Get list of segment names of the given timeline.
"""
tli_dir = self.timeline_dir(tenant_id, timeline_id)
return self._list_segments_in_dir(
tli_dir, lambda name: not name.startswith("safekeeper.control")
)
def _list_segments_in_dir(self, path: Path, keep_filter: Callable[[str], bool]) -> list[str]:
segments = []
for _, _, filenames in os.walk(tli_dir):
segments.extend([f for f in filenames if not f.startswith("safekeeper.control")])
for _, _, filenames in os.walk(path):
segments.extend([f for f in filenames if keep_filter(f)])
segments.sort()
return segments

View File

@@ -49,7 +49,13 @@ from fixtures.remote_storage import (
)
from fixtures.safekeeper.http import SafekeeperHttpClient
from fixtures.safekeeper.utils import are_walreceivers_absent
from fixtures.utils import PropagatingThread, get_dir_size, query_scalar, start_in_background
from fixtures.utils import (
PropagatingThread,
get_dir_size,
query_scalar,
start_in_background,
wait_until,
)
def wait_lsn_force_checkpoint(
@@ -63,6 +69,18 @@ def wait_lsn_force_checkpoint(
lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
log.info(f"pg_current_wal_flush_lsn is {lsn}, waiting for it on pageserver")
wait_lsn_force_checkpoint_at(lsn, tenant_id, timeline_id, ps, pageserver_conn_options)
def wait_lsn_force_checkpoint_at(
lsn: Lsn,
tenant_id: TenantId,
timeline_id: TimelineId,
ps: NeonPageserver,
pageserver_conn_options=None,
):
pageserver_conn_options = pageserver_conn_options or {}
auth_token = None
if "password" in pageserver_conn_options:
auth_token = pageserver_conn_options["password"]
@@ -2304,3 +2322,138 @@ def test_s3_eviction(
)
assert event_metrics_seen
def test_pull_timeline_partial_segment_integrity(neon_env_builder: NeonEnvBuilder):
"""
Verify that pulling timeline from a SK with an uploaded partial segment
does not lead to consistency issues:
1. Start 3 SKs - only use two
2. Ingest a bit of WAL
3. Wait for partial to be uploaded
4. Pull timeline to the third SK
6. Replace source with destination SK and start compute
5. Wait for source SK to evict timeline
6. Go back to initial compute SK config and validate that
source SK can unevict the timeline (S3 state is consistent)
"""
neon_env_builder.auth_enabled = True
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
neon_env_builder.safekeeper_extra_opts = [
"--enable-offload",
"--delete-offloaded-wal",
"--partial-backup-timeout",
"500ms",
"--control-file-save-interval",
"500ms",
"--eviction-min-resident=500ms",
]
env = neon_env_builder.init_start(initial_tenant_conf={"checkpoint_timeout": "100ms"})
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
(src_sk, dst_sk) = (env.safekeepers[0], env.safekeepers[2])
log.info("use only first 2 safekeepers, 3rd will be seeded")
endpoint = env.endpoints.create("main")
endpoint.active_safekeepers = [1, 2]
endpoint.start()
endpoint.safe_psql("create table t(key int, value text)")
endpoint.safe_psql("insert into t select generate_series(1, 180000), 'papaya'")
endpoint.stop()
def source_partial_segment_uploaded():
first_segment_name = "000000010000000000000001"
segs = src_sk.list_uploaded_segments(tenant_id, timeline_id)
candidate_seg = None
for seg in segs:
if "partial" in seg and "sk1" in seg and not seg.startswith(first_segment_name):
candidate_seg = seg
if candidate_seg is not None:
# The term might change, causing the segment to be gc-ed shortly after,
# so give it a bit of time to make sure it's stable.
time.sleep(2)
segs = src_sk.list_uploaded_segments(tenant_id, timeline_id)
assert candidate_seg in segs
return candidate_seg
raise Exception("Partial segment not uploaded yet")
source_partial_segment = wait_until(15, 1, source_partial_segment_uploaded)
log.info(
f"Uploaded segments before pull are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}"
)
log.info(f"Tracking source partial segment: {source_partial_segment}")
src_flush_lsn = src_sk.get_flush_lsn(tenant_id, timeline_id)
log.info(f"flush_lsn on src before pull_timeline: {src_flush_lsn}")
pageserver_conn_options = {"password": env.auth_keys.generate_tenant_token(tenant_id)}
wait_lsn_force_checkpoint_at(
src_flush_lsn, tenant_id, timeline_id, env.pageserver, pageserver_conn_options
)
dst_sk.pull_timeline([src_sk], tenant_id, timeline_id)
def evicted():
evictions = src_sk.http_client().get_metric_value(
"safekeeper_eviction_events_completed_total", {"kind": "evict"}
)
if evictions is None or evictions == 0:
raise Exception("Eviction did not happen on source safekeeper yet")
wait_until(30, 1, evicted)
endpoint.start(safekeepers=[2, 3])
def new_partial_segment_uploaded():
segs = src_sk.list_uploaded_segments(tenant_id, timeline_id)
for seg in segs:
if "partial" in seg and "sk3" in seg:
return seg
raise Exception("Partial segment not uploaded yet")
log.info(
f"Uploaded segments before post-pull ingest are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}"
)
endpoint.safe_psql("insert into t select generate_series(1, 1000), 'pear'")
wait_until(15, 1, new_partial_segment_uploaded)
log.info(
f"Uploaded segments after post-pull ingest are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}"
)
# Allow for some gc iterations to happen and assert that the original
# uploaded partial segment remains in place.
time.sleep(5)
segs = src_sk.list_uploaded_segments(tenant_id, timeline_id)
assert source_partial_segment in segs
log.info(
f"Uploaded segments at the end are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}"
)
# Restart the endpoint in order to check that the source safekeeper
# can unevict the timeline
endpoint.stop()
endpoint.start(safekeepers=[1, 2])
def unevicted():
unevictions = src_sk.http_client().get_metric_value(
"safekeeper_eviction_events_completed_total", {"kind": "restore"}
)
if unevictions is None or unevictions == 0:
raise Exception("Uneviction did not happen on source safekeeper yet")
wait_until(10, 1, unevicted)