mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 08:52:56 +00:00
Clean up after self-review
This commit is contained in:
@@ -53,6 +53,7 @@ pub struct FileStorage {
|
||||
}
|
||||
|
||||
impl FileStorage {
|
||||
/// Initialize storage by loading state from disk.
|
||||
pub fn restore_new(zttid: &ZTenantTimelineId, conf: &SafeKeeperConf) -> Result<FileStorage> {
|
||||
let timeline_dir = conf.timeline_dir(zttid);
|
||||
let tenant_id = zttid.tenant_id.to_string();
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
//! This module implements Timeline lifecycle management and has all neccessary code
|
||||
//! to glue together SafeKeeper and all other background services.
|
||||
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use anyhow::{bail, Result};
|
||||
|
||||
use etcd_broker::subscription_value::SkTimelineInfo;
|
||||
|
||||
@@ -12,7 +12,7 @@ use tokio::sync::watch;
|
||||
use std::cmp::{max, min};
|
||||
|
||||
use parking_lot::{Mutex, MutexGuard};
|
||||
use std::io;
|
||||
|
||||
use std::path::PathBuf;
|
||||
|
||||
use tokio::sync::mpsc::Sender;
|
||||
@@ -121,15 +121,7 @@ impl SharedState {
|
||||
|
||||
/// Restore SharedState from control file. If file doesn't exist, bails out.
|
||||
fn restore(conf: &SafeKeeperConf, zttid: &ZTenantTimelineId) -> Result<Self> {
|
||||
let control_store = control_file::FileStorage::restore_new(zttid, conf).map_err(|e| {
|
||||
if let Some(e) = e.downcast_ref::<io::Error>() {
|
||||
if e.kind() == io::ErrorKind::NotFound {
|
||||
return anyhow!(TimelineError::NotFound(*zttid));
|
||||
}
|
||||
}
|
||||
e
|
||||
})?;
|
||||
|
||||
let control_store = control_file::FileStorage::restore_new(zttid, conf)?;
|
||||
if control_store.server.wal_seg_size == 0 {
|
||||
bail!(TimelineError::UninitializedWalSegSize(*zttid));
|
||||
}
|
||||
@@ -548,8 +540,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns commit_lsn watch channel. Channel should be obtained only after
|
||||
/// the timeline is loaded. Otherwise, the channel will have invalid LSN value.
|
||||
/// Returns commit_lsn watch channel.
|
||||
pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
|
||||
self.commit_lsn_watch_rx.clone()
|
||||
}
|
||||
|
||||
@@ -213,16 +213,16 @@ impl GlobalTimelines {
|
||||
/// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid,
|
||||
/// i.e. loaded in memory and not cancelled.
|
||||
pub fn get(zttid: ZTenantTimelineId) -> Result<Arc<Timeline>> {
|
||||
let global_lock = TIMELINES_STATE.lock().unwrap();
|
||||
let res = TIMELINES_STATE.lock().unwrap().get(&zttid);
|
||||
|
||||
match global_lock.timelines.get(&zttid) {
|
||||
Some(result) => {
|
||||
if result.is_cancelled() {
|
||||
match res {
|
||||
Ok(tli) => {
|
||||
if tli.is_cancelled() {
|
||||
anyhow::bail!(TimelineError::Cancelled(zttid));
|
||||
}
|
||||
Ok(Arc::clone(result))
|
||||
Ok(tli)
|
||||
}
|
||||
None => anyhow::bail!(TimelineError::NotFound(zttid)),
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -204,7 +204,7 @@ async fn backup_task_main(
|
||||
info!("started");
|
||||
let res = GlobalTimelines::get(zttid);
|
||||
if let Err(e) = res {
|
||||
info!("backup error for timeline {}: {}", zttid, e);
|
||||
error!("backup error for timeline {}: {}", zttid, e);
|
||||
return;
|
||||
}
|
||||
let tli = res.unwrap();
|
||||
@@ -255,7 +255,7 @@ impl WalBackupTask {
|
||||
if retry_attempt == 0 {
|
||||
// wait for new WAL to arrive
|
||||
if let Err(e) = self.commit_lsn_watch_rx.changed().await {
|
||||
// can happen if timeline is deleted
|
||||
// should never happen, as we hold Arc to timeline.
|
||||
error!("commit_lsn watch shut down: {:?}", e);
|
||||
return;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user