diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index 34b2f3000a..64a89124d2 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -12,13 +12,8 @@ use nix::unistd::Pid; use postgres::Config; use reqwest::blocking::{Client, RequestBuilder, Response}; use reqwest::{IntoUrl, Method}; -use safekeeper_api::models::TimelineCreateRequest; use thiserror::Error; -use utils::{ - connstring::connection_address, - http::error::HttpErrorBody, - id::{NodeId, TenantId, TimelineId}, -}; +use utils::{connstring::connection_address, http::error::HttpErrorBody, id::NodeId}; use crate::local_env::{LocalEnv, SafekeeperConf}; use crate::storage::PageServerNode; @@ -281,24 +276,4 @@ impl SafekeeperNode { .error_from_body()?; Ok(()) } - - pub fn timeline_create( - &self, - tenant_id: TenantId, - timeline_id: TimelineId, - peer_ids: Vec, - ) -> Result<()> { - Ok(self - .http_request( - Method::POST, - format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id), - ) - .json(&TimelineCreateRequest { - timeline_id, - peer_ids, - }) - .send()? - .error_from_body()? - .json()?) - } } diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index e13ea50eaf..4119650b99 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -1,8 +1,21 @@ use serde::{Deserialize, Serialize}; -use utils::id::{NodeId, TimelineId}; +use utils::{ + id::{NodeId, TenantId, TimelineId}, + lsn::Lsn, +}; #[derive(Serialize, Deserialize)] pub struct TimelineCreateRequest { + #[serde(with = "serde_with::rust::display_fromstr")] + pub tenant_id: TenantId, + #[serde(with = "serde_with::rust::display_fromstr")] pub timeline_id: TimelineId, - pub peer_ids: Vec, + pub peer_ids: Option>, + pub pg_version: u32, + pub system_id: Option, + pub wal_seg_size: Option, + #[serde(with = "serde_with::rust::display_fromstr")] + pub commit_lsn: Lsn, + // If not passed, it is assigned to the beginning of commit_lsn segment. + pub local_start_lsn: Option, } diff --git a/libs/utils/src/lsn.rs b/libs/utils/src/lsn.rs index 1090f4c679..289cec12a8 100644 --- a/libs/utils/src/lsn.rs +++ b/libs/utils/src/lsn.rs @@ -66,6 +66,11 @@ impl Lsn { (self.0 % seg_sz as u64) as usize } + /// Compute LSN of the segment start. + pub fn segment_lsn(self, seg_sz: usize) -> Lsn { + Lsn(self.0 - (self.0 % seg_sz as u64)) + } + /// Compute the segment number pub fn segment_number(self, seg_sz: usize) -> u64 { self.0 / seg_sz as u64 diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index fc0b660a64..ff37be2de1 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -1471,12 +1471,6 @@ SendProposerElected(Safekeeper *sk) */ th = &sk->voteResponse.termHistory; - /* - * If any WAL is present on the sk, it must be authorized by some term. - * OTOH, without any WAL there are no term swiches in the log. - */ - Assert((th->n_entries == 0) == - (sk->voteResponse.flushLsn == InvalidXLogRecPtr)); /* We must start somewhere. */ Assert(propTermHistory.n_entries >= 1); diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 43c0a17f84..6efd09c7e2 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -1,8 +1,8 @@ -use anyhow::anyhow; use hyper::{Body, Request, Response, StatusCode, Uri}; use anyhow::Context; use once_cell::sync::Lazy; +use postgres_ffi::WAL_SEGMENT_SIZE; use serde::Serialize; use serde::Serializer; use std::collections::{HashMap, HashSet}; @@ -10,6 +10,7 @@ use std::fmt::Display; use std::sync::Arc; use tokio::task::JoinError; +use crate::safekeeper::ServerInfo; use crate::safekeeper::Term; use crate::safekeeper::TermHistory; @@ -77,6 +78,7 @@ struct TimelineStatus { #[serde(serialize_with = "display_serialize")] timeline_id: TimelineId, acceptor_state: AcceptorStateStatus, + pg_info: ServerInfo, #[serde(serialize_with = "display_serialize")] flush_lsn: Lsn, #[serde(serialize_with = "display_serialize")] @@ -121,6 +123,7 @@ async fn timeline_status_handler(request: Request) -> Result) -> Result Result ReceiveWalConn<'pg> { system_id: greeting.system_id, wal_seg_size: greeting.wal_seg_size, }; - GlobalTimelines::create(spg.ttid, server_info)? + GlobalTimelines::create(spg.ttid, server_info, Lsn::INVALID, Lsn::INVALID)? } _ => bail!("unexpected message {:?} instead of greeting", next_msg), }; diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 7869aa8b3a..7b11aaf92a 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -222,6 +222,8 @@ impl SafeKeeperState { ttid: &TenantTimelineId, server_info: ServerInfo, peers: Vec, + commit_lsn: Lsn, + local_start_lsn: Lsn, ) -> SafeKeeperState { SafeKeeperState { tenant_id: ttid.tenant_id, @@ -233,10 +235,10 @@ impl SafeKeeperState { server: server_info, proposer_uuid: [0; 16], timeline_start_lsn: Lsn(0), - local_start_lsn: Lsn(0), - commit_lsn: Lsn(0), - backup_lsn: Lsn::INVALID, - peer_horizon_lsn: Lsn(0), + local_start_lsn, + commit_lsn, + backup_lsn: local_start_lsn, + peer_horizon_lsn: local_start_lsn, remote_consistent_lsn: Lsn(0), peers: Peers(peers.iter().map(|p| (*p, PeerInfo::new())).collect()), } @@ -252,6 +254,8 @@ impl SafeKeeperState { wal_seg_size: 0, }, vec![], + Lsn::INVALID, + Lsn::INVALID, ) } } @@ -740,7 +744,8 @@ where "setting timeline_start_lsn to {:?}", state.timeline_start_lsn ); - + } + if state.local_start_lsn == Lsn(0) { state.local_start_lsn = msg.start_streaming_at; info!("setting local_start_lsn to {:?}", state.local_start_lsn); } diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index dc7503af65..3fb77bf582 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -107,6 +107,14 @@ impl SharedState { bail!(TimelineError::UninitialinzedPgVersion(*ttid)); } + if state.commit_lsn < state.local_start_lsn { + bail!( + "commit_lsn {} is higher than local_start_lsn {}", + state.commit_lsn, + state.local_start_lsn + ); + } + // We don't want to write anything to disk, because we may have existing timeline there. // These functions should not change anything on disk. let control_store = control_file::FileStorage::create_new(ttid, conf, state)?; @@ -286,7 +294,7 @@ pub struct Timeline { /// Sending here asks for wal backup launcher attention (start/stop /// offloading). Sending ttid instead of concrete command allows to do /// sending without timeline lock. - wal_backup_launcher_tx: Sender, + pub wal_backup_launcher_tx: Sender, /// Used to broadcast commit_lsn updates to all background jobs. commit_lsn_watch_tx: watch::Sender, @@ -339,10 +347,12 @@ impl Timeline { ttid: TenantTimelineId, wal_backup_launcher_tx: Sender, server_info: ServerInfo, + commit_lsn: Lsn, + local_start_lsn: Lsn, ) -> Result { let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID); let (cancellation_tx, cancellation_rx) = watch::channel(false); - let state = SafeKeeperState::new(&ttid, server_info, vec![]); + let state = SafeKeeperState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn); Ok(Timeline { ttid, @@ -381,6 +391,7 @@ impl Timeline { match || -> Result<()> { shared_state.sk.persist()?; // TODO: add more initialization steps here + shared_state.update_status(self.ttid); Ok(()) }() { Ok(_) => Ok(()), diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index cf99a243d7..a5d373a1da 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -15,6 +15,7 @@ use std::sync::{Arc, Mutex, MutexGuard}; use tokio::sync::mpsc::Sender; use tracing::*; use utils::id::{TenantId, TenantTimelineId, TimelineId}; +use utils::lsn::Lsn; struct GlobalTimelinesState { timelines: HashMap>, @@ -153,7 +154,12 @@ impl GlobalTimelines { /// Create a new timeline with the given id. If the timeline already exists, returns /// an existing timeline. - pub fn create(ttid: TenantTimelineId, server_info: ServerInfo) -> Result> { + pub fn create( + ttid: TenantTimelineId, + server_info: ServerInfo, + commit_lsn: Lsn, + local_start_lsn: Lsn, + ) -> Result> { let (conf, wal_backup_launcher_tx) = { let state = TIMELINES_STATE.lock().unwrap(); if let Ok(timeline) = state.get(&ttid) { @@ -170,6 +176,8 @@ impl GlobalTimelines { ttid, wal_backup_launcher_tx, server_info, + commit_lsn, + local_start_lsn, )?); // Take a lock and finish the initialization holding this mutex. No other threads @@ -190,6 +198,9 @@ impl GlobalTimelines { Ok(_) => { // We are done with bootstrap, release the lock, return the timeline. drop(shared_state); + timeline + .wal_backup_launcher_tx + .blocking_send(timeline.ttid)?; Ok(timeline) } Err(e) => { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 5df0f5cc50..0d6b6f4cd7 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2339,6 +2339,7 @@ class Safekeeper: @dataclass class SafekeeperTimelineStatus: acceptor_epoch: int + pg_version: int flush_lsn: Lsn timeline_start_lsn: Lsn backup_lsn: Lsn @@ -2367,6 +2368,18 @@ class SafekeeperHttpClient(requests.Session): def check_status(self): self.get(f"http://localhost:{self.port}/v1/status").raise_for_status() + def timeline_create( + self, tenant_id: TenantId, timeline_id: TimelineId, pg_version: int, commit_lsn: Lsn + ): + body = { + "tenant_id": str(tenant_id), + "timeline_id": str(timeline_id), + "pg_version": pg_version, + "commit_lsn": str(commit_lsn), + } + res = self.post(f"http://localhost:{self.port}/v1/tenant/timeline", json=body) + res.raise_for_status() + def timeline_status( self, tenant_id: TenantId, timeline_id: TimelineId ) -> SafekeeperTimelineStatus: @@ -2375,6 +2388,7 @@ class SafekeeperHttpClient(requests.Session): resj = res.json() return SafekeeperTimelineStatus( acceptor_epoch=resj["acceptor_state"]["epoch"], + pg_version=resj["pg_info"]["pg_version"], flush_lsn=Lsn(resj["flush_lsn"]), timeline_start_lsn=Lsn(resj["timeline_start_lsn"]), backup_lsn=Lsn(resj["backup_lsn"]), diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 1f9a0157fc..9c8e66e0e2 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -1,6 +1,7 @@ import os import pathlib import random +import shutil import signal import subprocess import sys @@ -8,6 +9,7 @@ import threading import time from contextlib import closing from dataclasses import dataclass, field +from functools import partial from pathlib import Path from typing import Any, List, Optional @@ -371,51 +373,48 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): ) # wait till first segment is removed on all safekeepers + wait( + lambda first_segments=first_segments: all(not os.path.exists(p) for p in first_segments), + "first segment get removed", + ) + + +# Wait for something, defined as f() returning True, raising error if this +# doesn't happen without timeout seconds. +def wait(f, desc, timeout=30): started_at = time.time() while True: - if all(not os.path.exists(p) for p in first_segments): + if f(): break elapsed = time.time() - started_at - if elapsed > 20: - raise RuntimeError(f"timed out waiting {elapsed:.0f}s for first segment get removed") + if elapsed > timeout: + raise RuntimeError(f"timed out waiting {elapsed:.0f}s for {desc}") time.sleep(0.5) -def wait_segment_offload(tenant_id, timeline_id, live_sk, seg_end: Lsn): - started_at = time.time() - http_cli = live_sk.http_client() - while True: - tli_status = http_cli.timeline_status(tenant_id, timeline_id) - log.info(f"live sk status is {tli_status}") - - if tli_status.backup_lsn >= seg_end: - break - elapsed = time.time() - started_at - if elapsed > 30: - raise RuntimeError( - f"timed out waiting {elapsed:.0f}s for segment ending at {seg_end} get offloaded" - ) - time.sleep(0.5) - - -def wait_wal_trim(tenant_id, timeline_id, sk, target_size_mb): - started_at = time.time() +def is_segment_offloaded( + sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, seg_end: Lsn +): http_cli = sk.http_client() - while True: - tli_status = http_cli.timeline_status(tenant_id, timeline_id) - sk_wal_size = get_dir_size(os.path.join(sk.data_dir(), str(tenant_id), str(timeline_id))) - sk_wal_size_mb = sk_wal_size / 1024 / 1024 - log.info(f"Safekeeper id={sk.id} wal_size={sk_wal_size_mb:.2f}MB status={tli_status}") + tli_status = http_cli.timeline_status(tenant_id, timeline_id) + log.info(f"sk status is {tli_status}") + return tli_status.backup_lsn >= seg_end - if sk_wal_size_mb <= target_size_mb: - break - elapsed = time.time() - started_at - if elapsed > 20: - raise RuntimeError( - f"timed out waiting {elapsed:.0f}s for sk_id={sk.id} to trim WAL to {target_size_mb:.2f}MB, current size is {sk_wal_size_mb:.2f}MB" - ) - time.sleep(0.5) +def is_flush_lsn_caught_up(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn): + http_cli = sk.http_client() + tli_status = http_cli.timeline_status(tenant_id, timeline_id) + log.info(f"sk status is {tli_status}") + return tli_status.flush_lsn >= lsn + + +def is_wal_trimmed(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, target_size_mb): + http_cli = sk.http_client() + tli_status = http_cli.timeline_status(tenant_id, timeline_id) + sk_wal_size = get_dir_size(os.path.join(sk.data_dir(), str(tenant_id), str(timeline_id))) + sk_wal_size_mb = sk_wal_size / 1024 / 1024 + log.info(f"Safekeeper id={sk.id} wal_size={sk_wal_size_mb:.2f}MB status={tli_status}") + return sk_wal_size_mb <= target_size_mb @pytest.mark.parametrize("remote_storage_kind", available_remote_storages()) @@ -451,7 +450,10 @@ def test_wal_backup(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Remot cur.execute("insert into t select generate_series(1,250000), 'payload'") live_sk = [sk for sk in env.safekeepers if sk != victim][0] - wait_segment_offload(tenant_id, timeline_id, live_sk, seg_end) + wait( + partial(is_segment_offloaded, live_sk, tenant_id, timeline_id, seg_end), + f"segment ending at {seg_end} get offloaded", + ) victim.start() @@ -463,7 +465,11 @@ def test_wal_backup(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Remot with closing(pg.connect()) as conn: with conn.cursor() as cur: cur.execute("insert into t select generate_series(1,250000), 'payload'") - wait_segment_offload(tenant_id, timeline_id, env.safekeepers[1], Lsn("0/5000000")) + seg_end = Lsn("0/5000000") + wait( + partial(is_segment_offloaded, env.safekeepers[1], tenant_id, timeline_id, seg_end), + f"segment ending at {seg_end} get offloaded", + ) @pytest.mark.parametrize("remote_storage_kind", available_remote_storages()) @@ -494,38 +500,72 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re cur.execute("insert into t values (1, 'payload')") expected_sum += 1 - offloaded_seg_end = [Lsn("0/3000000")] - for seg_end in offloaded_seg_end: - # roughly fills two segments - cur.execute("insert into t select generate_series(1,500000), 'payload'") - expected_sum += 500000 * 500001 // 2 + offloaded_seg_end = Lsn("0/3000000") + # roughly fills two segments + cur.execute("insert into t select generate_series(1,500000), 'payload'") + expected_sum += 500000 * 500001 // 2 - assert query_scalar(cur, "select sum(key) from t") == expected_sum + assert query_scalar(cur, "select sum(key) from t") == expected_sum - for sk in env.safekeepers: - wait_segment_offload(tenant_id, timeline_id, sk, seg_end) + for sk in env.safekeepers: + wait( + partial(is_segment_offloaded, sk, tenant_id, timeline_id, offloaded_seg_end), + f"segment ending at {offloaded_seg_end} get offloaded", + ) # advance remote_consistent_lsn to trigger WAL trimming # this LSN should be less than commit_lsn, so timeline will be active=true in safekeepers, to push etcd updates env.safekeepers[0].http_client().record_safekeeper_info( - tenant_id, timeline_id, {"remote_consistent_lsn": str(offloaded_seg_end[-1])} + tenant_id, timeline_id, {"remote_consistent_lsn": str(offloaded_seg_end)} ) + last_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()")) + for sk in env.safekeepers: # require WAL to be trimmed, so no more than one segment is left on disk - wait_wal_trim(tenant_id, timeline_id, sk, 16 * 1.5) - - last_lsn = query_scalar(cur, "SELECT pg_current_wal_flush_lsn()") + target_size_mb = 16 * 1.5 + wait( + partial(is_wal_trimmed, sk, tenant_id, timeline_id, target_size_mb), + f"sk_id={sk.id} to trim WAL to {target_size_mb:.2f}MB", + ) + # wait till everyone puts data up to last_lsn on disk, we are + # going to recreate state on safekeepers claiming they have data till last_lsn. + wait( + partial(is_flush_lsn_caught_up, sk, tenant_id, timeline_id, last_lsn), + f"sk_id={sk.id} to flush {last_lsn}", + ) ps_cli = env.pageserver.http_client() - pageserver_lsn = ps_cli.timeline_detail(tenant_id, timeline_id)["local"]["last_record_lsn"] - lag = Lsn(last_lsn) - Lsn(pageserver_lsn) + pageserver_lsn = Lsn(ps_cli.timeline_detail(tenant_id, timeline_id)["local"]["last_record_lsn"]) + lag = last_lsn - pageserver_lsn log.info( f"Pageserver last_record_lsn={pageserver_lsn}; flush_lsn={last_lsn}; lag before replay is {lag / 1024}kb" ) pg.stop_and_destroy() + # Also delete and manually create timeline on safekeepers -- this tests + # scenario of manual recovery on different set of safekeepers. + + # save the last (partial) file to put it back after recreation; others will be fetched from s3 + sk = env.safekeepers[0] + tli_dir = Path(sk.data_dir()) / str(tenant_id) / str(timeline_id) + f_partial = Path([f for f in os.listdir(tli_dir) if f.endswith(".partial")][0]) + f_partial_path = tli_dir / f_partial + f_partial_saved = Path(sk.data_dir()) / f_partial.name + f_partial_path.rename(f_partial_saved) + + pg_version = sk.http_client().timeline_status(tenant_id, timeline_id).pg_version + + for sk in env.safekeepers: + cli = sk.http_client() + cli.timeline_delete_force(tenant_id, timeline_id) + cli.timeline_create(tenant_id, timeline_id, pg_version, last_lsn) + f_partial_path = ( + Path(sk.data_dir()) / str(tenant_id) / str(timeline_id) / f_partial_saved.name + ) + shutil.copy(f_partial_saved, f_partial_path) + # recreate timeline on pageserver from scratch ps_cli.timeline_delete(tenant_id, timeline_id) ps_cli.timeline_create(tenant_id, timeline_id) @@ -539,10 +579,12 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re if elapsed > wait_lsn_timeout: raise RuntimeError("Timed out waiting for WAL redo") - pageserver_lsn = env.pageserver.http_client().timeline_detail(tenant_id, timeline_id)[ - "local" - ]["last_record_lsn"] - lag = Lsn(last_lsn) - Lsn(pageserver_lsn) + pageserver_lsn = Lsn( + env.pageserver.http_client().timeline_detail(tenant_id, timeline_id)["local"][ + "last_record_lsn" + ] + ) + lag = last_lsn - pageserver_lsn if time.time() > last_debug_print + 10 or lag <= 0: last_debug_print = time.time()