safekeeper: test pull_timeline with WAL gc.

Do pull_timeline while WAL is being removed. To this end
- extract pausable_failpoint to utils, sprinkle pull_timeline with it
- add 'checkpoint' sk http endpoint to force WAL removal.

After fixing checking for pull file status code test fails so far which is
expected.
This commit is contained in:
Arseny Sher
2024-05-20 16:21:57 +03:00
committed by Arseny Sher
parent 43f9a16e46
commit 3797566c36
19 changed files with 241 additions and 50 deletions

View File

@@ -9,6 +9,33 @@ use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use tracing::*;
/// Declare a failpoint that can use the `pause` failpoint action.
/// We don't want to block the executor thread, hence, spawn_blocking + await.
#[macro_export]
macro_rules! pausable_failpoint {
($name:literal) => {
if cfg!(feature = "testing") {
tokio::task::spawn_blocking({
let current = tracing::Span::current();
move || {
let _entered = current.entered();
tracing::info!("at failpoint {}", $name);
fail::fail_point!($name);
}
})
.await
.expect("spawn_blocking");
}
};
($name:literal, $cond:expr) => {
if cfg!(feature = "testing") {
if $cond {
pausable_failpoint!($name)
}
}
};
}
/// use with fail::cfg("$name", "return(2000)")
///
/// The effect is similar to a "sleep(2000)" action, i.e. we sleep for the

View File

@@ -42,6 +42,7 @@ use utils::completion;
use utils::crashsafe::path_with_suffix_extension;
use utils::failpoint_support;
use utils::fs_ext;
use utils::pausable_failpoint;
use utils::sync::gate::Gate;
use utils::sync::gate::GateGuard;
use utils::timeout::timeout_cancellable;
@@ -122,32 +123,6 @@ use utils::{
lsn::{Lsn, RecordLsn},
};
/// Declare a failpoint that can use the `pause` failpoint action.
/// We don't want to block the executor thread, hence, spawn_blocking + await.
macro_rules! pausable_failpoint {
($name:literal) => {
if cfg!(feature = "testing") {
tokio::task::spawn_blocking({
let current = tracing::Span::current();
move || {
let _entered = current.entered();
tracing::info!("at failpoint {}", $name);
fail::fail_point!($name);
}
})
.await
.expect("spawn_blocking");
}
};
($name:literal, $cond:expr) => {
if cfg!(feature = "testing") {
if $cond {
pausable_failpoint!($name)
}
}
};
}
pub mod blob_io;
pub mod block_io;
pub mod vectored_blob_io;

View File

@@ -8,7 +8,7 @@ use tokio::sync::OwnedMutexGuard;
use tokio_util::sync::CancellationToken;
use tracing::{error, instrument, Instrument};
use utils::{backoff, completion, crashsafe, fs_ext, id::TimelineId};
use utils::{backoff, completion, crashsafe, fs_ext, id::TimelineId, pausable_failpoint};
use crate::{
config::PageServerConf,

View File

@@ -197,6 +197,7 @@ pub(crate) use upload::upload_initdb_dir;
use utils::backoff::{
self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
};
use utils::pausable_failpoint;
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicU32, Ordering};

View File

@@ -9,7 +9,7 @@ use std::time::SystemTime;
use tokio::fs::{self, File};
use tokio::io::AsyncSeekExt;
use tokio_util::sync::CancellationToken;
use utils::backoff;
use utils::{backoff, pausable_failpoint};
use super::Generation;
use crate::tenant::remote_timeline_client::{

View File

@@ -17,7 +17,7 @@ use crate::tenant::{Tenant, TenantState};
use rand::Rng;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::{backoff, completion};
use utils::{backoff, completion, pausable_failpoint};
static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
once_cell::sync::Lazy::new(|| {

View File

@@ -41,7 +41,7 @@ use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::{
bin_ser::BeSer,
fs_ext,
fs_ext, pausable_failpoint,
sync::gate::{Gate, GateGuard},
vec_map::VecMap,
};

View File

@@ -7,7 +7,7 @@ use anyhow::Context;
use pageserver_api::{models::TimelineState, shard::TenantShardId};
use tokio::sync::OwnedMutexGuard;
use tracing::{error, info, instrument, Instrument};
use utils::{crashsafe, fs_ext, id::TimelineId};
use utils::{crashsafe, fs_ext, id::TimelineId, pausable_failpoint};
use crate::{
config::PageServerConf,

View File

@@ -287,6 +287,26 @@ async fn timeline_files_handler(request: Request<Body>) -> Result<Response<Body>
.map_err(|e| ApiError::InternalServerError(e.into()))
}
/// Force persist control file and remove old WAL.
async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
let tli = GlobalTimelines::get(ttid)?;
tli.maybe_persist_control_file(true)
.await
.map_err(ApiError::InternalServerError)?;
tli.remove_old_wal()
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
/// Deactivates the timeline and removes its data directory.
async fn timeline_delete_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let ttid = TenantTimelineId::new(
@@ -553,6 +573,10 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
"/v1/tenant/:tenant_id/timeline/:timeline_id/control_file",
|r| request_span(r, patch_control_file_handler),
)
.post(
"/v1/tenant/:tenant_id/timeline/:timeline_id/checkpoint",
|r| request_span(r, timeline_checkpoint_handler),
)
// for tests
.post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
request_span(r, record_safekeeper_info)

View File

@@ -11,6 +11,7 @@ use tracing::info;
use utils::{
id::{TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,
pausable_failpoint,
};
use crate::{
@@ -162,6 +163,8 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response>
filenames.remove(control_file_index);
filenames.insert(0, "safekeeper.control".to_string());
pausable_failpoint!("sk-pull-timeline-after-list-pausable");
info!(
"downloading {} files from safekeeper {}",
filenames.len(),
@@ -183,6 +186,13 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response>
let mut file = tokio::fs::File::create(&file_path).await?;
let mut response = client.get(&http_url).send().await?;
if response.status() != reqwest::StatusCode::OK {
bail!(
"pulling file {} failed: status is {}",
filename,
response.status()
);
}
while let Some(chunk) = response.chunk().await? {
file.write_all(&chunk).await?;
file.flush().await?;

View File

@@ -15,7 +15,7 @@ pub async fn task_main(_conf: SafeKeeperConf) -> anyhow::Result<()> {
for tli in &tlis {
let ttid = tli.ttid;
async {
if let Err(e) = tli.maybe_persist_control_file().await {
if let Err(e) = tli.maybe_persist_control_file(false).await {
warn!("failed to persist control file: {e}");
}
if let Err(e) = tli.remove_old_wal().await {

View File

@@ -827,9 +827,9 @@ where
/// Persist control file if there is something to save and enough time
/// passed after the last save.
pub async fn maybe_persist_inmem_control_file(&mut self) -> Result<bool> {
pub async fn maybe_persist_inmem_control_file(&mut self, force: bool) -> Result<bool> {
const CF_SAVE_INTERVAL: Duration = Duration::from_secs(300);
if self.state.pers.last_persist_at().elapsed() < CF_SAVE_INTERVAL {
if !force && self.state.pers.last_persist_at().elapsed() < CF_SAVE_INTERVAL {
return Ok(false);
}
let need_persist = self.state.inmem.commit_lsn > self.state.commit_lsn

View File

@@ -821,9 +821,9 @@ impl Timeline {
/// passed after the last save. This helps to keep remote_consistent_lsn up
/// to date so that storage nodes restart doesn't cause many pageserver ->
/// safekeeper reconnections.
pub async fn maybe_persist_control_file(self: &Arc<Self>) -> Result<()> {
pub async fn maybe_persist_control_file(self: &Arc<Self>, force: bool) -> Result<()> {
let mut guard = self.write_shared_state().await;
let changed = guard.sk.maybe_persist_inmem_control_file().await?;
let changed = guard.sk.maybe_persist_inmem_control_file(force).await?;
guard.skip_update = !changed;
Ok(())
}

View File

@@ -106,7 +106,7 @@ pub async fn main_task(
if !is_active {
// TODO: maybe use tokio::spawn?
if let Err(e) = tli.maybe_persist_control_file().await {
if let Err(e) = tli.maybe_persist_control_file(false).await {
warn!("control file save in update_status failed: {:?}", e);
}
}

View File

@@ -5,6 +5,8 @@ from typing import Any, Type, TypeVar, Union
T = TypeVar("T", bound="Id")
DEFAULT_WAL_SEG_SIZE = 16 * 1024 * 1024
@total_ordering
class Lsn:
@@ -67,6 +69,9 @@ class Lsn:
def as_int(self) -> int:
return self.lsn_int
def segment_lsn(self, seg_sz: int = DEFAULT_WAL_SEG_SIZE) -> "Lsn":
return Lsn(self.lsn_int - (self.lsn_int % seg_sz))
@dataclass(frozen=True)
class Key:

View File

@@ -3771,7 +3771,7 @@ class SafekeeperPort:
@dataclass
class Safekeeper:
class Safekeeper(LogUtils):
"""An object representing a running safekeeper daemon."""
env: NeonEnv
@@ -3779,6 +3779,13 @@ class Safekeeper:
id: int
running: bool = False
def __init__(self, env: NeonEnv, port: SafekeeperPort, id: int, running: bool = False):
self.env = env
self.port = port
self.id = id
self.running = running
self.logfile = Path(self.data_dir) / f"safekeeper-{id}.log"
def start(self, extra_opts: Optional[List[str]] = None) -> "Safekeeper":
assert self.running is False
self.env.neon_cli.safekeeper_start(self.id, extra_opts=extra_opts)
@@ -3839,11 +3846,38 @@ class Safekeeper:
port=self.port.http, auth_token=auth_token, is_testing_enabled=is_testing_enabled
)
def get_timeline_start_lsn(self, tenant_id: TenantId, timeline_id: TimelineId) -> Lsn:
timeline_status = self.http_client().timeline_status(tenant_id, timeline_id)
timeline_start_lsn = timeline_status.timeline_start_lsn
log.info(f"sk {self.id} timeline start LSN: {timeline_start_lsn}")
return timeline_start_lsn
def get_flush_lsn(self, tenant_id: TenantId, timeline_id: TimelineId) -> Lsn:
timeline_status = self.http_client().timeline_status(tenant_id, timeline_id)
flush_lsn = timeline_status.flush_lsn
log.info(f"sk {self.id} flush LSN: {flush_lsn}")
return flush_lsn
def pull_timeline(
self, srcs: list[Safekeeper], tenant_id: TenantId, timeline_id: TimelineId
) -> Dict[str, Any]:
"""
pull_timeline from srcs to self.
"""
src_https = [f"http://localhost:{sk.port.http}" for sk in srcs]
res = self.http_client().pull_timeline(
{"tenant_id": str(tenant_id), "timeline_id": str(timeline_id), "http_hosts": src_https}
)
src_ids = [sk.id for sk in srcs]
log.info(f"finished pulling timeline from {src_ids} to {self.id}")
return res
@property
def data_dir(self) -> str:
return os.path.join(self.env.repo_dir, "safekeepers", f"sk{self.id}")
def timeline_dir(self, tenant_id, timeline_id) -> str:
return os.path.join(self.data_dir(), str(tenant_id), str(timeline_id))
return os.path.join(self.data_dir, str(tenant_id), str(timeline_id))
def list_segments(self, tenant_id, timeline_id) -> List[str]:
"""
@@ -3856,6 +3890,35 @@ class Safekeeper:
segments.sort()
return segments
def checkpoint_up_to(self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn):
"""
Assuming pageserver(s) uploaded to s3 up to `lsn`,
1) wait for remote_consistent_lsn and wal_backup_lsn on safekeeper to reach it.
2) checkpoint timeline on safekeeper, which should remove WAL before this LSN.
"""
cli = self.http_client()
def are_lsns_advanced():
stat = cli.timeline_status(tenant_id, timeline_id)
log.info(
f"waiting for remote_consistent_lsn and backup_lsn on sk {self.id} to reach {lsn}, currently remote_consistent_lsn={stat.remote_consistent_lsn}, backup_lsn={stat.backup_lsn}"
)
assert stat.remote_consistent_lsn >= lsn and stat.backup_lsn >= lsn.segment_lsn()
# xxx: max wait is long because we might be waiting for reconnection from
# pageserver to this safekeeper
wait_until(30, 1, are_lsns_advanced)
cli.checkpoint(tenant_id, timeline_id)
def wait_until_paused(self, failpoint: str):
msg = f"at failpoint {failpoint}"
def paused():
log.info(f"waiting for hitting failpoint {failpoint}")
self.assert_log_contains(msg)
wait_until(20, 0.5, paused)
class S3Scrubber:
def __init__(self, env: NeonEnvBuilder, log_dir: Optional[Path] = None):

View File

@@ -177,6 +177,13 @@ class SafekeeperHttpClient(requests.Session):
)
res.raise_for_status()
def checkpoint(self, tenant_id: TenantId, timeline_id: TimelineId):
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint",
json={},
)
res.raise_for_status()
# only_local doesn't remove segments in the remote storage.
def timeline_delete(
self, tenant_id: TenantId, timeline_id: TimelineId, only_local: bool = False

View File

@@ -560,3 +560,25 @@ def assert_pageserver_backups_equal(left: Path, right: Path, skip_files: Set[str
elapsed = time.time() - started_at
log.info(f"assert_pageserver_backups_equal completed in {elapsed}s")
class PropagatingThread(threading.Thread):
_target: Any
_args: Any
_kwargs: Any
"""
Simple Thread wrapper with join() propagating the possible exception in the thread.
"""
def run(self):
self.exc = None
try:
self.ret = self._target(*self._args, **self._kwargs)
except BaseException as e:
self.exc = e
def join(self, timeout=None):
super(PropagatingThread, self).join(timeout)
if self.exc:
raise self.exc
return self.ret

View File

@@ -23,7 +23,6 @@ from fixtures.log_helper import log
from fixtures.metrics import parse_metrics
from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
NeonEnvBuilder,
NeonPageserver,
PgBin,
@@ -48,7 +47,7 @@ from fixtures.remote_storage import (
)
from fixtures.safekeeper.http import SafekeeperHttpClient
from fixtures.safekeeper.utils import are_walreceivers_absent
from fixtures.utils import get_dir_size, query_scalar, start_in_background
from fixtures.utils import PropagatingThread, get_dir_size, query_scalar, start_in_background
def wait_lsn_force_checkpoint(
@@ -360,7 +359,7 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
# We will wait for first segment removal. Make sure they exist for starter.
first_segments = [
os.path.join(sk.data_dir(), str(tenant_id), str(timeline_id), "000000010000000000000001")
os.path.join(sk.data_dir, str(tenant_id), str(timeline_id), "000000010000000000000001")
for sk in env.safekeepers
]
assert all(os.path.exists(p) for p in first_segments)
@@ -445,7 +444,7 @@ def is_flush_lsn_caught_up(sk: Safekeeper, tenant_id: TenantId, timeline_id: Tim
def is_wal_trimmed(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, target_size_mb):
http_cli = sk.http_client()
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
sk_wal_size = get_dir_size(os.path.join(sk.data_dir(), str(tenant_id), str(timeline_id)))
sk_wal_size = get_dir_size(os.path.join(sk.data_dir, str(tenant_id), str(timeline_id)))
sk_wal_size_mb = sk_wal_size / 1024 / 1024
log.info(f"Safekeeper id={sk.id} wal_size={sk_wal_size_mb:.2f}MB status={tli_status}")
return sk_wal_size_mb <= target_size_mb
@@ -591,10 +590,10 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder):
# save the last (partial) file to put it back after recreation; others will be fetched from s3
sk = env.safekeepers[0]
tli_dir = Path(sk.data_dir()) / str(tenant_id) / str(timeline_id)
tli_dir = Path(sk.data_dir) / str(tenant_id) / str(timeline_id)
f_partial = Path([f for f in os.listdir(tli_dir) if f.endswith(".partial")][0])
f_partial_path = tli_dir / f_partial
f_partial_saved = Path(sk.data_dir()) / f_partial.name
f_partial_saved = Path(sk.data_dir) / f_partial.name
f_partial_path.rename(f_partial_saved)
pg_version = sk.http_client().timeline_status(tenant_id, timeline_id).pg_version
@@ -616,7 +615,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder):
cli = sk.http_client()
cli.timeline_create(tenant_id, timeline_id, pg_version, last_lsn)
f_partial_path = (
Path(sk.data_dir()) / str(tenant_id) / str(timeline_id) / f_partial_saved.name
Path(sk.data_dir) / str(tenant_id) / str(timeline_id) / f_partial_saved.name
)
shutil.copy(f_partial_saved, f_partial_path)
@@ -1631,7 +1630,7 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
with conn.cursor() as cur:
cur.execute("CREATE TABLE t(key int primary key)")
sk = env.safekeepers[0]
sk_data_dir = Path(sk.data_dir())
sk_data_dir = Path(sk.data_dir)
if not auth_enabled:
sk_http = sk.http_client()
sk_http_other = sk_http
@@ -1724,9 +1723,6 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
def safekeepers_guc(env: NeonEnv, sk_names: List[int]) -> str:
return ",".join([f"localhost:{sk.port.pg}" for sk in env.safekeepers if sk.id in sk_names])
def execute_payload(endpoint: Endpoint):
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
@@ -1812,6 +1808,67 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
show_statuses(env.safekeepers, tenant_id, timeline_id)
# Test pull_timeline while concurrently gc'ing WAL on safekeeper:
# 1) Start pull_timeline, listing files to fetch.
# 2) Write segment, do gc.
# 3) Finish pull_timeline.
# 4) Do some write, verify integrity with timeline_digest.
# Expected to fail while holding off WAL gc plus fetching commit_lsn WAL
# segment is not implemented.
@pytest.mark.xfail
def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
(src_sk, dst_sk) = (env.safekeepers[0], env.safekeepers[2])
log.info("use only first 2 safekeepers, 3rd will be seeded")
endpoint = env.endpoints.create("main")
endpoint.active_safekeepers = [1, 2]
endpoint.start()
endpoint.safe_psql("create table t(key int, value text)")
endpoint.safe_psql("insert into t select generate_series(1, 1000), 'pear'")
src_flush_lsn = src_sk.get_flush_lsn(tenant_id, timeline_id)
log.info(f"flush_lsn on src before pull_timeline: {src_flush_lsn}")
dst_http = dst_sk.http_client()
# run pull_timeline which will halt before downloading files
dst_http.configure_failpoints(("sk-pull-timeline-after-list-pausable", "pause"))
pt_handle = PropagatingThread(
target=dst_sk.pull_timeline, args=([src_sk], tenant_id, timeline_id)
)
pt_handle.start()
dst_sk.wait_until_paused("sk-pull-timeline-after-list-pausable")
# ensure segment exists
endpoint.safe_psql("insert into t select generate_series(1, 180000), 'papaya'")
lsn = last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
assert lsn > Lsn("0/2000000")
# Checkpoint timeline beyond lsn.
src_sk.checkpoint_up_to(tenant_id, timeline_id, lsn)
first_segment_p = os.path.join(
src_sk.timeline_dir(tenant_id, timeline_id), "000000010000000000000001"
)
log.info(f"first segment exist={os.path.exists(first_segment_p)}")
dst_http.configure_failpoints(("sk-pull-timeline-after-list-pausable", "off"))
pt_handle.join()
timeline_start_lsn = src_sk.get_timeline_start_lsn(tenant_id, timeline_id)
dst_flush_lsn = dst_sk.get_flush_lsn(tenant_id, timeline_id)
log.info(f"flush_lsn on dst after pull_timeline: {dst_flush_lsn}")
assert dst_flush_lsn >= src_flush_lsn
digests = [
sk.http_client().timeline_digest(tenant_id, timeline_id, timeline_start_lsn, dst_flush_lsn)
for sk in [src_sk, dst_sk]
]
assert digests[0] == digests[1], f"digest on src is {digests[0]} but on dst is {digests[1]}"
# In this test we check for excessive START_REPLICATION and START_WAL_PUSH queries
# when compute is active, but there are no writes to the timeline. In that case
# pageserver should maintain a single connection to safekeeper and don't attempt