storage_sync: track upload queue initialization state using enum & fix last_uploaded_consistent_lsn initialization for empty remote storage

As pointed out in b8488e70a9 (r1024319620)
the following is wrong for the case where the remote storage is empty:

    metadata = whatever the local-ONLY metadata is
    ...
    upload_queue.latest_metadata = Some(metadata.clone());
    upload_queue.last_uploaded_consistent_lsn = Some(metadata.disk_consistent_lsn());

The reason why it's wrong is that we return last_uploaded_consistent_lsn
to safekeepers. So, we'd be returning an Lsn that is not yet uploaded to
S3.
This commit is contained in:
Christian Schwarz
2022-11-16 17:23:54 -05:00
committed by Dmitry Rodionov
parent decef74503
commit 71bc45a21b
3 changed files with 114 additions and 80 deletions

View File

@@ -127,6 +127,7 @@ mod download;
pub mod index;
mod upload;
use anyhow::Context;
// re-export this
pub use download::is_temp_download_file;
pub use download::list_remote_timelines;
@@ -135,7 +136,7 @@ use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::sync::{Arc, Mutex};
use anyhow::ensure;
use remote_storage::{DownloadError, GenericRemoteStorage};
@@ -189,10 +190,13 @@ pub struct RemoteTimelineClient {
storage_impl: GenericRemoteStorage,
}
enum UploadQueue {
Uninitialized,
Initialized(UploadQueueInitialized),
}
/// This keeps track of queued and in-progress tasks.
#[derive(Default)]
struct UploadQueue {
struct UploadQueueInitialized {
/// Counter to assign task IDs
task_counter: u64,
@@ -226,6 +230,66 @@ struct UploadQueue {
queued_operations: VecDeque<UploadOp>,
}
impl UploadQueue {
fn initialize_empty_remote(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> {
match self {
UploadQueue::Uninitialized => (),
UploadQueue::Initialized(_) => anyhow::bail!("already initialized"),
}
info!("initializing upload queue for new timeline");
let state = UploadQueueInitialized::default();
*self = UploadQueue::Initialized(state);
Ok(self.initialized_mut().expect("we just set it"))
}
fn initialize_with_current_remote_index_part(
&mut self,
index_part: &IndexPart,
) -> anyhow::Result<&mut UploadQueueInitialized> {
match self {
UploadQueue::Uninitialized => (),
UploadQueue::Initialized(_) => anyhow::bail!("already initialized"),
}
let mut files = HashMap::new();
for path in &index_part.timeline_layers {
let layer_metadata = index_part
.layer_metadata
.get(path)
.map(LayerFileMetadata::from)
.unwrap_or(LayerFileMetadata::MISSING);
files.insert(path.clone(), layer_metadata);
}
let metadata = index_part.parse_metadata()?;
info!(
"initializing upload queue with disk_consistent_lsn: {}",
metadata.disk_consistent_lsn()
);
let state = UploadQueueInitialized {
latest_files: files,
latest_metadata: Some(metadata.clone()),
last_uploaded_consistent_lsn: Some(metadata.disk_consistent_lsn()),
..Default::default()
};
*self = UploadQueue::Initialized(state);
Ok(self.initialized_mut().expect("we just set it"))
}
fn initialized_mut(&mut self) -> Option<&mut UploadQueueInitialized> {
match self {
UploadQueue::Uninitialized => None,
UploadQueue::Initialized(x) => Some(x),
}
}
}
/// An in-progress upload or delete task.
#[derive(Debug)]
struct UploadTask {
@@ -273,61 +337,26 @@ impl RemoteTimelineClient {
///
/// 'index_part' is the current remote metadata file.
pub fn init_upload_queue(&self, index_part: &IndexPart) -> anyhow::Result<()> {
let mut files = HashMap::new();
for path in &index_part.timeline_layers {
let layer_metadata = index_part
.layer_metadata
.get(path)
.map(LayerFileMetadata::from)
.unwrap_or(LayerFileMetadata::MISSING);
files.insert(path.clone(), layer_metadata);
}
let metadata = index_part.parse_metadata()?;
info!(
"initializing upload queue with disk_consistent_lsn: {}",
metadata.disk_consistent_lsn()
);
let mut upload_queue = self.upload_queue.lock().unwrap();
if upload_queue.latest_metadata.is_some()
|| upload_queue.last_uploaded_consistent_lsn.is_some()
{
anyhow::bail!("attempted to initialize already initialized upload queue")
}
upload_queue.latest_files = files;
upload_queue.latest_metadata = Some(metadata.clone());
upload_queue.last_uploaded_consistent_lsn = Some(metadata.disk_consistent_lsn());
upload_queue.initialize_with_current_remote_index_part(index_part)?;
Ok(())
}
/// Like init_upload_queue, but the list of files is initially empty, and
/// metadata is passed as a TimelineMetadata. This is used when creating a
/// new timeline.
pub fn init_upload_queue_empty(&self, metadata: &TimelineMetadata) -> anyhow::Result<()> {
info!(
"initializing upload queue for new timeline with disk_consistent_lsn: {}",
metadata.disk_consistent_lsn()
);
pub fn init_upload_queue_empty(&self) -> anyhow::Result<()> {
info!("initializing upload queue for new timeline");
let mut upload_queue = self.upload_queue.lock().unwrap();
if upload_queue.latest_metadata.is_some()
|| upload_queue.last_uploaded_consistent_lsn.is_some()
{
anyhow::bail!("attempted to initialize already initialized upload queue")
}
upload_queue.latest_metadata = Some(metadata.clone());
upload_queue.last_uploaded_consistent_lsn = Some(metadata.disk_consistent_lsn());
upload_queue.initialize_empty_remote()?;
Ok(())
}
pub fn last_uploaded_consistent_lsn(&self) -> Option<Lsn> {
let upload_queue = self.upload_queue.lock().unwrap();
upload_queue.last_uploaded_consistent_lsn
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialized_mut()?.last_uploaded_consistent_lsn
}
//
@@ -371,7 +400,10 @@ impl RemoteTimelineClient {
// to fill in the missing details.
if layer_metadata.file_size().is_none() {
let new_metadata = LayerFileMetadata::new(downloaded_size);
let mut upload_queue = self.upload_queue.lock().unwrap();
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard
.initialized_mut()
.context("upload queue is not initialized")?;
if let Some(upgraded) = upload_queue.latest_files.get_mut(path) {
upgraded.merge(&new_metadata);
} else {
@@ -401,12 +433,10 @@ impl RemoteTimelineClient {
self: &Arc<Self>,
metadata: &TimelineMetadata,
) -> anyhow::Result<()> {
let mut upload_queue = self.upload_queue.lock().unwrap();
ensure!(
upload_queue.latest_metadata.is_some(),
"upload queue not initialized"
);
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard
.initialized_mut()
.context("upload queue is not initialized")?;
upload_queue.latest_metadata = Some(metadata.clone());
let disk_consistent_lsn = upload_queue
@@ -443,11 +473,11 @@ impl RemoteTimelineClient {
path: &Path,
layer_metadata: &LayerFileMetadata,
) -> anyhow::Result<()> {
let mut upload_queue = self.upload_queue.lock().unwrap();
ensure!(
upload_queue.last_uploaded_consistent_lsn.is_some(),
"upload queue not initialized"
);
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard
.initialized_mut()
.context("upload queue is not initialized")?;
// The file size can be missing for files that were created before we tracked that
// in the metadata, but it should be present for any new files we create.
ensure!(
@@ -484,11 +514,10 @@ impl RemoteTimelineClient {
/// The deletion won't actually be performed, until all preceding
/// upload operations have completed succesfully.
pub fn schedule_layer_file_deletion(self: &Arc<Self>, paths: &[PathBuf]) -> anyhow::Result<()> {
let mut upload_queue = self.upload_queue.lock().unwrap();
ensure!(
upload_queue.latest_metadata.is_some(),
"upload queue not initialized"
);
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard
.initialized_mut()
.context("upload queue is not initialized")?;
// Update the remote index file, removing the to-be-deleted files from the index,
// before deleting the actual files.
@@ -534,14 +563,11 @@ impl RemoteTimelineClient {
let barrier_op = UploadOp::Barrier(sender);
{
let mut upload_queue = self.upload_queue.lock().unwrap();
ensure!(
upload_queue.latest_metadata.is_some(),
"upload queue not initialized"
);
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard
.initialized_mut()
.context("upload queue is not initialized")?;
upload_queue.queued_operations.push_back(barrier_op);
// Launch the task immediately, if possible
self.launch_queued_tasks(upload_queue);
}
@@ -555,7 +581,7 @@ impl RemoteTimelineClient {
/// the ordering constraints.
///
/// The caller needs to already hold the `upload_queue` lock.
fn launch_queued_tasks(self: &Arc<Self>, mut upload_queue: MutexGuard<UploadQueue>) {
fn launch_queued_tasks(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
while let Some(next_op) = upload_queue.queued_operations.front() {
// Can we run this task now?
let can_run_now = match next_op {
@@ -710,6 +736,9 @@ impl RemoteTimelineClient {
// The task has completed succesfully. Remove it from the in-progress list.
{
let mut upload_queue = self.upload_queue.lock().unwrap();
let mut upload_queue = upload_queue.initialized_mut().expect(
"callers are responsible for ensuring this is only called on initialized queue",
);
upload_queue.inprogress_tasks.remove(&task.task_id);
match task.op {
@@ -750,7 +779,7 @@ pub fn create_remote_timeline_client(
tenant_id,
timeline_id,
storage_impl: remote_storage,
upload_queue: Mutex::new(UploadQueue::default()),
upload_queue: Mutex::new(UploadQueue::Uninitialized),
})
}
@@ -867,15 +896,14 @@ mod tests {
tenant_id: harness.tenant_id,
timeline_id: TIMELINE_ID,
storage_impl,
upload_queue: Mutex::new(UploadQueue::default()),
upload_queue: Mutex::new(UploadQueue::Uninitialized),
});
let remote_timeline_dir =
remote_fs_dir.join(timeline_path.strip_prefix(&harness.conf.workdir)?);
println!("remote_timeline_dir: {}", remote_timeline_dir.display());
let metadata = dummy_metadata(Lsn(0x10));
client.init_upload_queue_empty(&metadata)?;
client.init_upload_queue_empty()?;
// Create a couple of dummy files, schedule upload for them
let content_foo = dummy_contents("foo");
@@ -894,7 +922,8 @@ mod tests {
// Check that they are started immediately, not queued
{
let upload_queue = client.upload_queue.lock().unwrap();
let mut guard = client.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut().unwrap();
assert!(upload_queue.queued_operations.is_empty());
assert!(upload_queue.inprogress_tasks.len() == 2);
assert!(upload_queue.num_inprogress_layer_uploads == 2);
@@ -904,14 +933,17 @@ mod tests {
let metadata = dummy_metadata(Lsn(0x20));
client.schedule_index_upload(&metadata)?;
{
let upload_queue = client.upload_queue.lock().unwrap();
let mut guard = client.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut().unwrap();
assert!(upload_queue.queued_operations.len() == 1);
}
// Wait for the uploads to finish
runtime.block_on(client.wait_completion())?;
{
let upload_queue = client.upload_queue.lock().unwrap();
let mut guard = client.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut().unwrap();
assert!(upload_queue.queued_operations.is_empty());
assert!(upload_queue.inprogress_tasks.is_empty());
}
@@ -931,7 +963,9 @@ mod tests {
)?;
client.schedule_layer_file_deletion(&[timeline_path.join("foo")])?;
{
let upload_queue = client.upload_queue.lock().unwrap();
let mut guard = client.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut().unwrap();
// Deletion schedules upload of the index file, and the file deletion itself
assert!(upload_queue.queued_operations.len() == 2);
assert!(upload_queue.inprogress_tasks.len() == 1);

View File

@@ -1865,7 +1865,7 @@ impl Tenant {
tenant_id,
new_timeline_id,
)?;
remote_client.init_upload_queue_empty(&new_metadata)?;
remote_client.init_upload_queue_empty()?;
Some(remote_client)
} else {
None

View File

@@ -1198,6 +1198,7 @@ impl Timeline {
};
let (local_only_filenames, up_to_date_metadata) = match index_part {
Ok(index_part) => {
remote_client.init_upload_queue(&index_part)?;
let (local_only_filenames, up_to_date_metadata) = self
.download_missing(
&index_part,
@@ -1207,12 +1208,11 @@ impl Timeline {
first_save,
)
.await?;
remote_client.init_upload_queue(&index_part)?;
(local_only_filenames, up_to_date_metadata)
}
Err(DownloadError::NotFound) => {
info!("no index file was found on the remote");
remote_client.init_upload_queue_empty(&local_metadata)?;
remote_client.init_upload_queue_empty()?;
(local_filenames, local_metadata)
}
Err(e) => return Err(anyhow::anyhow!(e)),