s3 WAL offloading staging review.

- Uncomment accidently `self.keep_alive.abort()` commented line, due to this
  task never finished, which blocked launcher.
- Mess up with initialization one more time, to fix offloader trying to back up
  segment 0. Now we initialize all required LSNs in handle_elected,
  where we learn start LSN for the first time.
- Fix blind attempt to provide safekeeper service file with remote storage
  params.
This commit is contained in:
Arseny Sher
2022-05-27 13:09:17 +04:00
parent 0e1bd57c53
commit a42eba3cd7
5 changed files with 35 additions and 40 deletions

View File

@@ -6,7 +6,7 @@ After=network.target auditd.service
Type=simple
User=safekeeper
Environment=RUST_BACKTRACE=1 ZENITH_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/lib
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}.local:6500 --listen-http {{ inventory_hostname }}.local:7676 -p {{ first_pageserver }}:6400 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --remote_storage='{bucket_name={{bucket_name}}, bucket_region={{bucket_region}}, prefix_in_bucket=wal}'
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}.local:6500 --listen-http {{ inventory_hostname }}.local:7676 -p {{ first_pageserver }}:6400 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --remote-storage='{bucket_name="{{bucket_name}}", bucket_region="{{bucket_region}}", prefix_in_bucket="wal"}'
ExecReload=/bin/kill -HUP $MAINPID
KillMode=mixed
KillSignal=SIGINT

View File

@@ -218,7 +218,7 @@ impl ZTenantTimelineId {
impl fmt::Display for ZTenantTimelineId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}-{}", self.tenant_id, self.timeline_id)
write!(f, "{}/{}", self.tenant_id, self.timeline_id)
}
}

View File

@@ -83,7 +83,7 @@ impl ElectionLeader {
}
pub async fn give_up(self) {
// self.keep_alive.abort();
self.keep_alive.abort();
// TODO: it'll be wise to resign here but it'll happen after lease expiration anyway
// should we await for keep alive termination?
let _ = self.keep_alive.await;

View File

@@ -731,24 +731,36 @@ where
{
let mut state = self.state.clone();
// Remeber point where WAL begins globally, if not yet.
// Here we learn initial LSN for the first time, set fields
// interested in that.
if state.timeline_start_lsn == Lsn(0) {
// Remember point where WAL begins globally.
state.timeline_start_lsn = msg.timeline_start_lsn;
info!(
"setting timeline_start_lsn to {:?}",
state.timeline_start_lsn
);
}
// Remember point where WAL begins locally, if not yet. (I doubt the
// second condition is ever possible)
if state.local_start_lsn == Lsn(0) || state.local_start_lsn >= msg.start_streaming_at {
state.local_start_lsn = msg.start_streaming_at;
info!("setting local_start_lsn to {:?}", state.local_start_lsn);
}
// Initializing commit_lsn before acking first flushed record is
// important to let find_end_of_wal skip the whole in the beginning
// of the first segment.
//
// NB: on new clusters, this happens at the same time as
// timeline_start_lsn initialization, it is taken outside to provide
// upgrade.
self.global_commit_lsn = max(self.global_commit_lsn, state.timeline_start_lsn);
self.inmem.commit_lsn = max(self.inmem.commit_lsn, state.timeline_start_lsn);
self.metrics.commit_lsn.set(self.inmem.commit_lsn.0 as f64);
// Initalizing backup_lsn is useful to avoid making backup think it should upload 0 segment.
self.inmem.backup_lsn = max(self.inmem.backup_lsn, state.timeline_start_lsn);
state.acceptor_state.term_history = msg.term_history.clone();
self.state.persist(&state)?;
self.persist_control_file(state)?;
}
info!("start receiving WAL since {:?}", msg.start_streaming_at);
@@ -764,14 +776,6 @@ where
self.inmem.commit_lsn = commit_lsn;
self.metrics.commit_lsn.set(self.inmem.commit_lsn.0 as f64);
// We got our first commit_lsn, which means we should sync
// everything to disk, to initialize the state.
if self.state.commit_lsn == Lsn::INVALID && commit_lsn != Lsn::INVALID {
self.inmem.backup_lsn = self.inmem.commit_lsn; // initialize backup_lsn
self.wal_store.flush_wal()?;
self.persist_control_file()?;
}
// If new commit_lsn reached epoch switch, force sync of control
// file: walproposer in sync mode is very interested when this
// happens. Note: this is for sync-safekeepers mode only, as
@@ -780,15 +784,14 @@ where
// that we receive new epoch_start_lsn, and we still need to sync
// control file in this case.
if commit_lsn == self.epoch_start_lsn && self.state.commit_lsn != commit_lsn {
self.persist_control_file()?;
self.persist_control_file(self.state.clone())?;
}
Ok(())
}
/// Persist in-memory state to the disk.
fn persist_control_file(&mut self) -> Result<()> {
let mut state = self.state.clone();
/// Persist in-memory state to the disk, taking other data from state.
fn persist_control_file(&mut self, mut state: SafeKeeperState) -> Result<()> {
state.commit_lsn = self.inmem.commit_lsn;
state.backup_lsn = self.inmem.backup_lsn;
state.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
@@ -823,13 +826,6 @@ where
// do the job
if !msg.wal_data.is_empty() {
self.wal_store.write_wal(msg.h.begin_lsn, &msg.wal_data)?;
// If this was the first record we ever received, initialize
// commit_lsn to help find_end_of_wal skip the hole in the
// beginning.
if self.global_commit_lsn == Lsn(0) {
self.global_commit_lsn = msg.h.begin_lsn;
}
}
// flush wal to the disk, if required
@@ -852,7 +848,7 @@ where
if self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64)
< self.inmem.peer_horizon_lsn
{
self.persist_control_file()?;
self.persist_control_file(self.state.clone())?;
}
trace!(
@@ -920,7 +916,7 @@ where
self.inmem.peer_horizon_lsn = new_peer_horizon_lsn;
}
if sync_control_file {
self.persist_control_file()?;
self.persist_control_file(self.state.clone())?;
}
Ok(())
}

View File

@@ -71,7 +71,7 @@ async fn wal_backup_launcher_main_loop(
mut wal_backup_launcher_rx: Receiver<ZTenantTimelineId>,
) {
info!(
"wal backup launcher started, remote config {:?}",
"WAL backup launcher: started, remote config {:?}",
conf.remote_storage
);
@@ -95,7 +95,7 @@ async fn wal_backup_launcher_main_loop(
if is_wal_backup_required != tasks.contains_key(&zttid) {
if is_wal_backup_required {
// need to start the task
info!("starting wal backup task for {}", zttid);
info!("starting WAL backup task for {}", zttid);
// TODO: decide who should offload in launcher itself by simply checking current state
let election_name = broker::get_campaign_name(
@@ -115,7 +115,7 @@ async fn wal_backup_launcher_main_loop(
let handle = tokio::spawn(
backup_task_main(zttid, timeline_dir, shutdown_rx, election)
.instrument(info_span!("WAL backup", zttid = %zttid)),
.instrument(info_span!("WAL backup task", zttid = %zttid)),
);
tasks.insert(
@@ -127,7 +127,7 @@ async fn wal_backup_launcher_main_loop(
);
} else {
// need to stop the task
info!("stopping wal backup task for {}", zttid);
info!("stopping WAL backup task for {}", zttid);
let wb_handle = tasks.remove(&zttid).unwrap();
// Tell the task to shutdown. Error means task exited earlier, that's ok.
@@ -236,20 +236,19 @@ impl WalBackupTask {
}
let commit_lsn = *self.commit_lsn_watch_rx.borrow();
assert!(
commit_lsn >= backup_lsn,
"backup lsn should never pass commit lsn"
);
// Note that backup_lsn can be higher than commit_lsn if we
// don't have much local WAL and others already uploaded
// segments we don't even have.
if backup_lsn.segment_number(self.wal_seg_size)
== commit_lsn.segment_number(self.wal_seg_size)
>= commit_lsn.segment_number(self.wal_seg_size)
{
continue; /* nothing to do, common case as we wake up on every commit_lsn bump */
}
// Perhaps peers advanced the position, check shmem value.
backup_lsn = self.timeline.get_wal_backup_lsn();
if backup_lsn.segment_number(self.wal_seg_size)
== commit_lsn.segment_number(self.wal_seg_size)
>= commit_lsn.segment_number(self.wal_seg_size)
{
continue;
}