Reimplement explicit timeline creation on safekeepers.

With the ability to pass commit_lsn. This allows to perform project WAL recovery
through different (from the original) set of safekeepers (or under different
ttid) by
1) moving WAL files to s3 under proper ttid;
2) explicitly creating timeline on safekeepers, setting commit_lsn to the
latest point;
3) putting the lastest .parital file to the timeline directory on safekeepers, if
desired.

Extend test_s3_wal_replay to exersise this behaviour.

Also extends timeline_status endpoint to return postgres information.
This commit is contained in:
Arseny Sher
2022-10-12 18:14:02 +04:00
parent 14c623b254
commit 9fe4548e13
12 changed files with 195 additions and 102 deletions

View File

@@ -12,13 +12,8 @@ use nix::unistd::Pid;
use postgres::Config;
use reqwest::blocking::{Client, RequestBuilder, Response};
use reqwest::{IntoUrl, Method};
use safekeeper_api::models::TimelineCreateRequest;
use thiserror::Error;
use utils::{
connstring::connection_address,
http::error::HttpErrorBody,
id::{NodeId, TenantId, TimelineId},
};
use utils::{connstring::connection_address, http::error::HttpErrorBody, id::NodeId};
use crate::local_env::{LocalEnv, SafekeeperConf};
use crate::storage::PageServerNode;
@@ -281,24 +276,4 @@ impl SafekeeperNode {
.error_from_body()?;
Ok(())
}
pub fn timeline_create(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
peer_ids: Vec<NodeId>,
) -> Result<()> {
Ok(self
.http_request(
Method::POST,
format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id),
)
.json(&TimelineCreateRequest {
timeline_id,
peer_ids,
})
.send()?
.error_from_body()?
.json()?)
}
}

View File

@@ -1,8 +1,21 @@
use serde::{Deserialize, Serialize};
use utils::id::{NodeId, TimelineId};
use utils::{
id::{NodeId, TenantId, TimelineId},
lsn::Lsn,
};
#[derive(Serialize, Deserialize)]
pub struct TimelineCreateRequest {
#[serde(with = "serde_with::rust::display_fromstr")]
pub tenant_id: TenantId,
#[serde(with = "serde_with::rust::display_fromstr")]
pub timeline_id: TimelineId,
pub peer_ids: Vec<NodeId>,
pub peer_ids: Option<Vec<NodeId>>,
pub pg_version: u32,
pub system_id: Option<u64>,
pub wal_seg_size: Option<u32>,
#[serde(with = "serde_with::rust::display_fromstr")]
pub commit_lsn: Lsn,
// If not passed, it is assigned to the beginning of commit_lsn segment.
pub local_start_lsn: Option<Lsn>,
}

View File

@@ -66,6 +66,11 @@ impl Lsn {
(self.0 % seg_sz as u64) as usize
}
/// Compute LSN of the segment start.
pub fn segment_lsn(self, seg_sz: usize) -> Lsn {
Lsn(self.0 - (self.0 % seg_sz as u64))
}
/// Compute the segment number
pub fn segment_number(self, seg_sz: usize) -> u64 {
self.0 / seg_sz as u64

View File

@@ -1471,12 +1471,6 @@ SendProposerElected(Safekeeper *sk)
*/
th = &sk->voteResponse.termHistory;
/*
* If any WAL is present on the sk, it must be authorized by some term.
* OTOH, without any WAL there are no term swiches in the log.
*/
Assert((th->n_entries == 0) ==
(sk->voteResponse.flushLsn == InvalidXLogRecPtr));
/* We must start somewhere. */
Assert(propTermHistory.n_entries >= 1);

View File

@@ -1,8 +1,8 @@
use anyhow::anyhow;
use hyper::{Body, Request, Response, StatusCode, Uri};
use anyhow::Context;
use once_cell::sync::Lazy;
use postgres_ffi::WAL_SEGMENT_SIZE;
use serde::Serialize;
use serde::Serializer;
use std::collections::{HashMap, HashSet};
@@ -10,6 +10,7 @@ use std::fmt::Display;
use std::sync::Arc;
use tokio::task::JoinError;
use crate::safekeeper::ServerInfo;
use crate::safekeeper::Term;
use crate::safekeeper::TermHistory;
@@ -77,6 +78,7 @@ struct TimelineStatus {
#[serde(serialize_with = "display_serialize")]
timeline_id: TimelineId,
acceptor_state: AcceptorStateStatus,
pg_info: ServerInfo,
#[serde(serialize_with = "display_serialize")]
flush_lsn: Lsn,
#[serde(serialize_with = "display_serialize")]
@@ -121,6 +123,7 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
tenant_id: ttid.tenant_id,
timeline_id: ttid.timeline_id,
acceptor_state: acc_state,
pg_info: state.server,
flush_lsn,
timeline_start_lsn: state.timeline_start_lsn,
local_start_lsn: state.local_start_lsn,
@@ -136,12 +139,29 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
let request_data: TimelineCreateRequest = json_request(&mut request).await?;
let ttid = TenantTimelineId {
tenant_id: parse_request_param(&request, "tenant_id")?,
tenant_id: request_data.tenant_id,
timeline_id: request_data.timeline_id,
};
check_permission(&request, Some(ttid.tenant_id))?;
Err(ApiError::BadRequest(anyhow!("not implemented")))
let server_info = ServerInfo {
pg_version: request_data.pg_version,
system_id: request_data.system_id.unwrap_or(0),
wal_seg_size: request_data.wal_seg_size.unwrap_or(WAL_SEGMENT_SIZE as u32),
};
let local_start_lsn = request_data.local_start_lsn.unwrap_or_else(|| {
request_data
.commit_lsn
.segment_lsn(server_info.wal_seg_size as usize)
});
tokio::task::spawn_blocking(move || {
GlobalTimelines::create(ttid, server_info, request_data.commit_lsn, local_start_lsn)
})
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
/// Deactivates the timeline and removes its data directory.
@@ -241,7 +261,7 @@ pub fn make_router(
.data(auth)
.get("/v1/status", status_handler)
// Will be used in the future instead of implicit timeline creation
.post("/v1/tenant/:tenant_id/timeline", timeline_create_handler)
.post("/v1/tenant/timeline", timeline_create_handler)
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id",
timeline_status_handler,

View File

@@ -104,6 +104,8 @@ fn prepare_safekeeper(ttid: TenantTimelineId, pg_version: u32) -> Result<Arc<Tim
wal_seg_size: WAL_SEGMENT_SIZE as u32,
system_id: 0,
},
Lsn::INVALID,
Lsn::INVALID,
)
}

View File

@@ -6,6 +6,7 @@ use anyhow::{anyhow, bail, Result};
use bytes::BytesMut;
use tracing::*;
use utils::lsn::Lsn;
use crate::safekeeper::ServerInfo;
use crate::timeline::Timeline;
@@ -79,7 +80,7 @@ impl<'pg> ReceiveWalConn<'pg> {
system_id: greeting.system_id,
wal_seg_size: greeting.wal_seg_size,
};
GlobalTimelines::create(spg.ttid, server_info)?
GlobalTimelines::create(spg.ttid, server_info, Lsn::INVALID, Lsn::INVALID)?
}
_ => bail!("unexpected message {:?} instead of greeting", next_msg),
};

View File

@@ -222,6 +222,8 @@ impl SafeKeeperState {
ttid: &TenantTimelineId,
server_info: ServerInfo,
peers: Vec<NodeId>,
commit_lsn: Lsn,
local_start_lsn: Lsn,
) -> SafeKeeperState {
SafeKeeperState {
tenant_id: ttid.tenant_id,
@@ -233,10 +235,10 @@ impl SafeKeeperState {
server: server_info,
proposer_uuid: [0; 16],
timeline_start_lsn: Lsn(0),
local_start_lsn: Lsn(0),
commit_lsn: Lsn(0),
backup_lsn: Lsn::INVALID,
peer_horizon_lsn: Lsn(0),
local_start_lsn,
commit_lsn,
backup_lsn: local_start_lsn,
peer_horizon_lsn: local_start_lsn,
remote_consistent_lsn: Lsn(0),
peers: Peers(peers.iter().map(|p| (*p, PeerInfo::new())).collect()),
}
@@ -252,6 +254,8 @@ impl SafeKeeperState {
wal_seg_size: 0,
},
vec![],
Lsn::INVALID,
Lsn::INVALID,
)
}
}
@@ -740,7 +744,8 @@ where
"setting timeline_start_lsn to {:?}",
state.timeline_start_lsn
);
}
if state.local_start_lsn == Lsn(0) {
state.local_start_lsn = msg.start_streaming_at;
info!("setting local_start_lsn to {:?}", state.local_start_lsn);
}

View File

@@ -107,6 +107,14 @@ impl SharedState {
bail!(TimelineError::UninitialinzedPgVersion(*ttid));
}
if state.commit_lsn < state.local_start_lsn {
bail!(
"commit_lsn {} is higher than local_start_lsn {}",
state.commit_lsn,
state.local_start_lsn
);
}
// We don't want to write anything to disk, because we may have existing timeline there.
// These functions should not change anything on disk.
let control_store = control_file::FileStorage::create_new(ttid, conf, state)?;
@@ -286,7 +294,7 @@ pub struct Timeline {
/// Sending here asks for wal backup launcher attention (start/stop
/// offloading). Sending ttid instead of concrete command allows to do
/// sending without timeline lock.
wal_backup_launcher_tx: Sender<TenantTimelineId>,
pub wal_backup_launcher_tx: Sender<TenantTimelineId>,
/// Used to broadcast commit_lsn updates to all background jobs.
commit_lsn_watch_tx: watch::Sender<Lsn>,
@@ -339,10 +347,12 @@ impl Timeline {
ttid: TenantTimelineId,
wal_backup_launcher_tx: Sender<TenantTimelineId>,
server_info: ServerInfo,
commit_lsn: Lsn,
local_start_lsn: Lsn,
) -> Result<Timeline> {
let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID);
let (cancellation_tx, cancellation_rx) = watch::channel(false);
let state = SafeKeeperState::new(&ttid, server_info, vec![]);
let state = SafeKeeperState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
Ok(Timeline {
ttid,
@@ -381,6 +391,7 @@ impl Timeline {
match || -> Result<()> {
shared_state.sk.persist()?;
// TODO: add more initialization steps here
shared_state.update_status(self.ttid);
Ok(())
}() {
Ok(_) => Ok(()),

View File

@@ -15,6 +15,7 @@ use std::sync::{Arc, Mutex, MutexGuard};
use tokio::sync::mpsc::Sender;
use tracing::*;
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
struct GlobalTimelinesState {
timelines: HashMap<TenantTimelineId, Arc<Timeline>>,
@@ -153,7 +154,12 @@ impl GlobalTimelines {
/// Create a new timeline with the given id. If the timeline already exists, returns
/// an existing timeline.
pub fn create(ttid: TenantTimelineId, server_info: ServerInfo) -> Result<Arc<Timeline>> {
pub fn create(
ttid: TenantTimelineId,
server_info: ServerInfo,
commit_lsn: Lsn,
local_start_lsn: Lsn,
) -> Result<Arc<Timeline>> {
let (conf, wal_backup_launcher_tx) = {
let state = TIMELINES_STATE.lock().unwrap();
if let Ok(timeline) = state.get(&ttid) {
@@ -170,6 +176,8 @@ impl GlobalTimelines {
ttid,
wal_backup_launcher_tx,
server_info,
commit_lsn,
local_start_lsn,
)?);
// Take a lock and finish the initialization holding this mutex. No other threads
@@ -190,6 +198,9 @@ impl GlobalTimelines {
Ok(_) => {
// We are done with bootstrap, release the lock, return the timeline.
drop(shared_state);
timeline
.wal_backup_launcher_tx
.blocking_send(timeline.ttid)?;
Ok(timeline)
}
Err(e) => {

View File

@@ -2339,6 +2339,7 @@ class Safekeeper:
@dataclass
class SafekeeperTimelineStatus:
acceptor_epoch: int
pg_version: int
flush_lsn: Lsn
timeline_start_lsn: Lsn
backup_lsn: Lsn
@@ -2367,6 +2368,18 @@ class SafekeeperHttpClient(requests.Session):
def check_status(self):
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
def timeline_create(
self, tenant_id: TenantId, timeline_id: TimelineId, pg_version: int, commit_lsn: Lsn
):
body = {
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
"pg_version": pg_version,
"commit_lsn": str(commit_lsn),
}
res = self.post(f"http://localhost:{self.port}/v1/tenant/timeline", json=body)
res.raise_for_status()
def timeline_status(
self, tenant_id: TenantId, timeline_id: TimelineId
) -> SafekeeperTimelineStatus:
@@ -2375,6 +2388,7 @@ class SafekeeperHttpClient(requests.Session):
resj = res.json()
return SafekeeperTimelineStatus(
acceptor_epoch=resj["acceptor_state"]["epoch"],
pg_version=resj["pg_info"]["pg_version"],
flush_lsn=Lsn(resj["flush_lsn"]),
timeline_start_lsn=Lsn(resj["timeline_start_lsn"]),
backup_lsn=Lsn(resj["backup_lsn"]),

View File

@@ -1,6 +1,7 @@
import os
import pathlib
import random
import shutil
import signal
import subprocess
import sys
@@ -8,6 +9,7 @@ import threading
import time
from contextlib import closing
from dataclasses import dataclass, field
from functools import partial
from pathlib import Path
from typing import Any, List, Optional
@@ -371,51 +373,48 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
)
# wait till first segment is removed on all safekeepers
wait(
lambda first_segments=first_segments: all(not os.path.exists(p) for p in first_segments),
"first segment get removed",
)
# Wait for something, defined as f() returning True, raising error if this
# doesn't happen without timeout seconds.
def wait(f, desc, timeout=30):
started_at = time.time()
while True:
if all(not os.path.exists(p) for p in first_segments):
if f():
break
elapsed = time.time() - started_at
if elapsed > 20:
raise RuntimeError(f"timed out waiting {elapsed:.0f}s for first segment get removed")
if elapsed > timeout:
raise RuntimeError(f"timed out waiting {elapsed:.0f}s for {desc}")
time.sleep(0.5)
def wait_segment_offload(tenant_id, timeline_id, live_sk, seg_end: Lsn):
started_at = time.time()
http_cli = live_sk.http_client()
while True:
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"live sk status is {tli_status}")
if tli_status.backup_lsn >= seg_end:
break
elapsed = time.time() - started_at
if elapsed > 30:
raise RuntimeError(
f"timed out waiting {elapsed:.0f}s for segment ending at {seg_end} get offloaded"
)
time.sleep(0.5)
def wait_wal_trim(tenant_id, timeline_id, sk, target_size_mb):
started_at = time.time()
def is_segment_offloaded(
sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, seg_end: Lsn
):
http_cli = sk.http_client()
while True:
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
sk_wal_size = get_dir_size(os.path.join(sk.data_dir(), str(tenant_id), str(timeline_id)))
sk_wal_size_mb = sk_wal_size / 1024 / 1024
log.info(f"Safekeeper id={sk.id} wal_size={sk_wal_size_mb:.2f}MB status={tli_status}")
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"sk status is {tli_status}")
return tli_status.backup_lsn >= seg_end
if sk_wal_size_mb <= target_size_mb:
break
elapsed = time.time() - started_at
if elapsed > 20:
raise RuntimeError(
f"timed out waiting {elapsed:.0f}s for sk_id={sk.id} to trim WAL to {target_size_mb:.2f}MB, current size is {sk_wal_size_mb:.2f}MB"
)
time.sleep(0.5)
def is_flush_lsn_caught_up(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn):
http_cli = sk.http_client()
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"sk status is {tli_status}")
return tli_status.flush_lsn >= lsn
def is_wal_trimmed(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, target_size_mb):
http_cli = sk.http_client()
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
sk_wal_size = get_dir_size(os.path.join(sk.data_dir(), str(tenant_id), str(timeline_id)))
sk_wal_size_mb = sk_wal_size / 1024 / 1024
log.info(f"Safekeeper id={sk.id} wal_size={sk_wal_size_mb:.2f}MB status={tli_status}")
return sk_wal_size_mb <= target_size_mb
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
@@ -451,7 +450,10 @@ def test_wal_backup(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Remot
cur.execute("insert into t select generate_series(1,250000), 'payload'")
live_sk = [sk for sk in env.safekeepers if sk != victim][0]
wait_segment_offload(tenant_id, timeline_id, live_sk, seg_end)
wait(
partial(is_segment_offloaded, live_sk, tenant_id, timeline_id, seg_end),
f"segment ending at {seg_end} get offloaded",
)
victim.start()
@@ -463,7 +465,11 @@ def test_wal_backup(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Remot
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("insert into t select generate_series(1,250000), 'payload'")
wait_segment_offload(tenant_id, timeline_id, env.safekeepers[1], Lsn("0/5000000"))
seg_end = Lsn("0/5000000")
wait(
partial(is_segment_offloaded, env.safekeepers[1], tenant_id, timeline_id, seg_end),
f"segment ending at {seg_end} get offloaded",
)
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
@@ -494,38 +500,72 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
cur.execute("insert into t values (1, 'payload')")
expected_sum += 1
offloaded_seg_end = [Lsn("0/3000000")]
for seg_end in offloaded_seg_end:
# roughly fills two segments
cur.execute("insert into t select generate_series(1,500000), 'payload'")
expected_sum += 500000 * 500001 // 2
offloaded_seg_end = Lsn("0/3000000")
# roughly fills two segments
cur.execute("insert into t select generate_series(1,500000), 'payload'")
expected_sum += 500000 * 500001 // 2
assert query_scalar(cur, "select sum(key) from t") == expected_sum
assert query_scalar(cur, "select sum(key) from t") == expected_sum
for sk in env.safekeepers:
wait_segment_offload(tenant_id, timeline_id, sk, seg_end)
for sk in env.safekeepers:
wait(
partial(is_segment_offloaded, sk, tenant_id, timeline_id, offloaded_seg_end),
f"segment ending at {offloaded_seg_end} get offloaded",
)
# advance remote_consistent_lsn to trigger WAL trimming
# this LSN should be less than commit_lsn, so timeline will be active=true in safekeepers, to push etcd updates
env.safekeepers[0].http_client().record_safekeeper_info(
tenant_id, timeline_id, {"remote_consistent_lsn": str(offloaded_seg_end[-1])}
tenant_id, timeline_id, {"remote_consistent_lsn": str(offloaded_seg_end)}
)
last_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
for sk in env.safekeepers:
# require WAL to be trimmed, so no more than one segment is left on disk
wait_wal_trim(tenant_id, timeline_id, sk, 16 * 1.5)
last_lsn = query_scalar(cur, "SELECT pg_current_wal_flush_lsn()")
target_size_mb = 16 * 1.5
wait(
partial(is_wal_trimmed, sk, tenant_id, timeline_id, target_size_mb),
f"sk_id={sk.id} to trim WAL to {target_size_mb:.2f}MB",
)
# wait till everyone puts data up to last_lsn on disk, we are
# going to recreate state on safekeepers claiming they have data till last_lsn.
wait(
partial(is_flush_lsn_caught_up, sk, tenant_id, timeline_id, last_lsn),
f"sk_id={sk.id} to flush {last_lsn}",
)
ps_cli = env.pageserver.http_client()
pageserver_lsn = ps_cli.timeline_detail(tenant_id, timeline_id)["local"]["last_record_lsn"]
lag = Lsn(last_lsn) - Lsn(pageserver_lsn)
pageserver_lsn = Lsn(ps_cli.timeline_detail(tenant_id, timeline_id)["local"]["last_record_lsn"])
lag = last_lsn - pageserver_lsn
log.info(
f"Pageserver last_record_lsn={pageserver_lsn}; flush_lsn={last_lsn}; lag before replay is {lag / 1024}kb"
)
pg.stop_and_destroy()
# Also delete and manually create timeline on safekeepers -- this tests
# scenario of manual recovery on different set of safekeepers.
# save the last (partial) file to put it back after recreation; others will be fetched from s3
sk = env.safekeepers[0]
tli_dir = Path(sk.data_dir()) / str(tenant_id) / str(timeline_id)
f_partial = Path([f for f in os.listdir(tli_dir) if f.endswith(".partial")][0])
f_partial_path = tli_dir / f_partial
f_partial_saved = Path(sk.data_dir()) / f_partial.name
f_partial_path.rename(f_partial_saved)
pg_version = sk.http_client().timeline_status(tenant_id, timeline_id).pg_version
for sk in env.safekeepers:
cli = sk.http_client()
cli.timeline_delete_force(tenant_id, timeline_id)
cli.timeline_create(tenant_id, timeline_id, pg_version, last_lsn)
f_partial_path = (
Path(sk.data_dir()) / str(tenant_id) / str(timeline_id) / f_partial_saved.name
)
shutil.copy(f_partial_saved, f_partial_path)
# recreate timeline on pageserver from scratch
ps_cli.timeline_delete(tenant_id, timeline_id)
ps_cli.timeline_create(tenant_id, timeline_id)
@@ -539,10 +579,12 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
if elapsed > wait_lsn_timeout:
raise RuntimeError("Timed out waiting for WAL redo")
pageserver_lsn = env.pageserver.http_client().timeline_detail(tenant_id, timeline_id)[
"local"
]["last_record_lsn"]
lag = Lsn(last_lsn) - Lsn(pageserver_lsn)
pageserver_lsn = Lsn(
env.pageserver.http_client().timeline_detail(tenant_id, timeline_id)["local"][
"last_record_lsn"
]
)
lag = last_lsn - pageserver_lsn
if time.time() > last_debug_print + 10 or lag <= 0:
last_debug_print = time.time()