mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-24 06:39:58 +00:00
s3 WAL offloading staging review.
- Uncomment accidently `self.keep_alive.abort()` commented line, due to this task never finished, which blocked launcher. - Mess up with initialization one more time, to fix offloader trying to back up segment 0. Now we initialize all required LSNs in handle_elected, where we learn start LSN for the first time. - Fix blind attempt to provide safekeeper service file with remote storage params.
This commit is contained in:
@@ -6,7 +6,7 @@ After=network.target auditd.service
|
||||
Type=simple
|
||||
User=safekeeper
|
||||
Environment=RUST_BACKTRACE=1 ZENITH_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/lib
|
||||
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}.local:6500 --listen-http {{ inventory_hostname }}.local:7676 -p {{ first_pageserver }}:6400 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --remote_storage='{bucket_name={{bucket_name}}, bucket_region={{bucket_region}}, prefix_in_bucket=wal}'
|
||||
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}.local:6500 --listen-http {{ inventory_hostname }}.local:7676 -p {{ first_pageserver }}:6400 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --remote-storage='{bucket_name="{{bucket_name}}", bucket_region="{{bucket_region}}", prefix_in_bucket="wal"}'
|
||||
ExecReload=/bin/kill -HUP $MAINPID
|
||||
KillMode=mixed
|
||||
KillSignal=SIGINT
|
||||
|
||||
@@ -218,7 +218,7 @@ impl ZTenantTimelineId {
|
||||
|
||||
impl fmt::Display for ZTenantTimelineId {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{}-{}", self.tenant_id, self.timeline_id)
|
||||
write!(f, "{}/{}", self.tenant_id, self.timeline_id)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -83,7 +83,7 @@ impl ElectionLeader {
|
||||
}
|
||||
|
||||
pub async fn give_up(self) {
|
||||
// self.keep_alive.abort();
|
||||
self.keep_alive.abort();
|
||||
// TODO: it'll be wise to resign here but it'll happen after lease expiration anyway
|
||||
// should we await for keep alive termination?
|
||||
let _ = self.keep_alive.await;
|
||||
|
||||
@@ -731,24 +731,36 @@ where
|
||||
{
|
||||
let mut state = self.state.clone();
|
||||
|
||||
// Remeber point where WAL begins globally, if not yet.
|
||||
// Here we learn initial LSN for the first time, set fields
|
||||
// interested in that.
|
||||
|
||||
if state.timeline_start_lsn == Lsn(0) {
|
||||
// Remember point where WAL begins globally.
|
||||
state.timeline_start_lsn = msg.timeline_start_lsn;
|
||||
info!(
|
||||
"setting timeline_start_lsn to {:?}",
|
||||
state.timeline_start_lsn
|
||||
);
|
||||
}
|
||||
|
||||
// Remember point where WAL begins locally, if not yet. (I doubt the
|
||||
// second condition is ever possible)
|
||||
if state.local_start_lsn == Lsn(0) || state.local_start_lsn >= msg.start_streaming_at {
|
||||
state.local_start_lsn = msg.start_streaming_at;
|
||||
info!("setting local_start_lsn to {:?}", state.local_start_lsn);
|
||||
}
|
||||
// Initializing commit_lsn before acking first flushed record is
|
||||
// important to let find_end_of_wal skip the whole in the beginning
|
||||
// of the first segment.
|
||||
//
|
||||
// NB: on new clusters, this happens at the same time as
|
||||
// timeline_start_lsn initialization, it is taken outside to provide
|
||||
// upgrade.
|
||||
self.global_commit_lsn = max(self.global_commit_lsn, state.timeline_start_lsn);
|
||||
self.inmem.commit_lsn = max(self.inmem.commit_lsn, state.timeline_start_lsn);
|
||||
self.metrics.commit_lsn.set(self.inmem.commit_lsn.0 as f64);
|
||||
|
||||
// Initalizing backup_lsn is useful to avoid making backup think it should upload 0 segment.
|
||||
self.inmem.backup_lsn = max(self.inmem.backup_lsn, state.timeline_start_lsn);
|
||||
|
||||
state.acceptor_state.term_history = msg.term_history.clone();
|
||||
self.state.persist(&state)?;
|
||||
self.persist_control_file(state)?;
|
||||
}
|
||||
|
||||
info!("start receiving WAL since {:?}", msg.start_streaming_at);
|
||||
@@ -764,14 +776,6 @@ where
|
||||
self.inmem.commit_lsn = commit_lsn;
|
||||
self.metrics.commit_lsn.set(self.inmem.commit_lsn.0 as f64);
|
||||
|
||||
// We got our first commit_lsn, which means we should sync
|
||||
// everything to disk, to initialize the state.
|
||||
if self.state.commit_lsn == Lsn::INVALID && commit_lsn != Lsn::INVALID {
|
||||
self.inmem.backup_lsn = self.inmem.commit_lsn; // initialize backup_lsn
|
||||
self.wal_store.flush_wal()?;
|
||||
self.persist_control_file()?;
|
||||
}
|
||||
|
||||
// If new commit_lsn reached epoch switch, force sync of control
|
||||
// file: walproposer in sync mode is very interested when this
|
||||
// happens. Note: this is for sync-safekeepers mode only, as
|
||||
@@ -780,15 +784,14 @@ where
|
||||
// that we receive new epoch_start_lsn, and we still need to sync
|
||||
// control file in this case.
|
||||
if commit_lsn == self.epoch_start_lsn && self.state.commit_lsn != commit_lsn {
|
||||
self.persist_control_file()?;
|
||||
self.persist_control_file(self.state.clone())?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Persist in-memory state to the disk.
|
||||
fn persist_control_file(&mut self) -> Result<()> {
|
||||
let mut state = self.state.clone();
|
||||
/// Persist in-memory state to the disk, taking other data from state.
|
||||
fn persist_control_file(&mut self, mut state: SafeKeeperState) -> Result<()> {
|
||||
state.commit_lsn = self.inmem.commit_lsn;
|
||||
state.backup_lsn = self.inmem.backup_lsn;
|
||||
state.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
|
||||
@@ -823,13 +826,6 @@ where
|
||||
// do the job
|
||||
if !msg.wal_data.is_empty() {
|
||||
self.wal_store.write_wal(msg.h.begin_lsn, &msg.wal_data)?;
|
||||
|
||||
// If this was the first record we ever received, initialize
|
||||
// commit_lsn to help find_end_of_wal skip the hole in the
|
||||
// beginning.
|
||||
if self.global_commit_lsn == Lsn(0) {
|
||||
self.global_commit_lsn = msg.h.begin_lsn;
|
||||
}
|
||||
}
|
||||
|
||||
// flush wal to the disk, if required
|
||||
@@ -852,7 +848,7 @@ where
|
||||
if self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64)
|
||||
< self.inmem.peer_horizon_lsn
|
||||
{
|
||||
self.persist_control_file()?;
|
||||
self.persist_control_file(self.state.clone())?;
|
||||
}
|
||||
|
||||
trace!(
|
||||
@@ -920,7 +916,7 @@ where
|
||||
self.inmem.peer_horizon_lsn = new_peer_horizon_lsn;
|
||||
}
|
||||
if sync_control_file {
|
||||
self.persist_control_file()?;
|
||||
self.persist_control_file(self.state.clone())?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -71,7 +71,7 @@ async fn wal_backup_launcher_main_loop(
|
||||
mut wal_backup_launcher_rx: Receiver<ZTenantTimelineId>,
|
||||
) {
|
||||
info!(
|
||||
"wal backup launcher started, remote config {:?}",
|
||||
"WAL backup launcher: started, remote config {:?}",
|
||||
conf.remote_storage
|
||||
);
|
||||
|
||||
@@ -95,7 +95,7 @@ async fn wal_backup_launcher_main_loop(
|
||||
if is_wal_backup_required != tasks.contains_key(&zttid) {
|
||||
if is_wal_backup_required {
|
||||
// need to start the task
|
||||
info!("starting wal backup task for {}", zttid);
|
||||
info!("starting WAL backup task for {}", zttid);
|
||||
|
||||
// TODO: decide who should offload in launcher itself by simply checking current state
|
||||
let election_name = broker::get_campaign_name(
|
||||
@@ -115,7 +115,7 @@ async fn wal_backup_launcher_main_loop(
|
||||
|
||||
let handle = tokio::spawn(
|
||||
backup_task_main(zttid, timeline_dir, shutdown_rx, election)
|
||||
.instrument(info_span!("WAL backup", zttid = %zttid)),
|
||||
.instrument(info_span!("WAL backup task", zttid = %zttid)),
|
||||
);
|
||||
|
||||
tasks.insert(
|
||||
@@ -127,7 +127,7 @@ async fn wal_backup_launcher_main_loop(
|
||||
);
|
||||
} else {
|
||||
// need to stop the task
|
||||
info!("stopping wal backup task for {}", zttid);
|
||||
info!("stopping WAL backup task for {}", zttid);
|
||||
|
||||
let wb_handle = tasks.remove(&zttid).unwrap();
|
||||
// Tell the task to shutdown. Error means task exited earlier, that's ok.
|
||||
@@ -236,20 +236,19 @@ impl WalBackupTask {
|
||||
}
|
||||
|
||||
let commit_lsn = *self.commit_lsn_watch_rx.borrow();
|
||||
assert!(
|
||||
commit_lsn >= backup_lsn,
|
||||
"backup lsn should never pass commit lsn"
|
||||
);
|
||||
|
||||
// Note that backup_lsn can be higher than commit_lsn if we
|
||||
// don't have much local WAL and others already uploaded
|
||||
// segments we don't even have.
|
||||
if backup_lsn.segment_number(self.wal_seg_size)
|
||||
== commit_lsn.segment_number(self.wal_seg_size)
|
||||
>= commit_lsn.segment_number(self.wal_seg_size)
|
||||
{
|
||||
continue; /* nothing to do, common case as we wake up on every commit_lsn bump */
|
||||
}
|
||||
// Perhaps peers advanced the position, check shmem value.
|
||||
backup_lsn = self.timeline.get_wal_backup_lsn();
|
||||
if backup_lsn.segment_number(self.wal_seg_size)
|
||||
== commit_lsn.segment_number(self.wal_seg_size)
|
||||
>= commit_lsn.segment_number(self.wal_seg_size)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user