Extract common remote storage operations into GenericRemoteStorage (#2373)

This commit is contained in:
Kirill Bulatov
2022-09-02 11:58:28 +03:00
committed by GitHub
parent 47bd307cb8
commit 8a7333438a
4 changed files with 128 additions and 143 deletions

View File

@@ -164,6 +164,102 @@ impl GenericRemoteStorage {
_ => None,
}
}
/// Takes storage object contents and its size and uploads to remote storage,
/// mapping `from_path` to the corresponding remote object id in the storage.
///
/// The storage object does not have to be present on the `from_path`,
/// this path is used for the remote object id conversion only.
pub async fn upload_storage_object(
&self,
from: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
from_size_bytes: usize,
from_path: &Path,
) -> anyhow::Result<()> {
async fn do_upload_storage_object<P, S>(
storage: &S,
from: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
from_size_bytes: usize,
from_path: &Path,
) -> anyhow::Result<()>
where
P: std::fmt::Debug + Send + Sync + 'static,
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
{
let target_storage_path = storage.remote_object_id(from_path).with_context(|| {
format!(
"Failed to get the storage path for source local path '{}'",
from_path.display()
)
})?;
storage
.upload(from, from_size_bytes, &target_storage_path, None)
.await
.with_context(|| {
format!(
"Failed to upload from '{}' to storage path '{:?}'",
from_path.display(),
target_storage_path
)
})
}
match self {
GenericRemoteStorage::Local(storage) => {
do_upload_storage_object(storage, from, from_size_bytes, from_path).await
}
GenericRemoteStorage::S3(storage) => {
do_upload_storage_object(storage, from, from_size_bytes, from_path).await
}
}
}
/// Downloads the storage object into the `to_path` provided.
/// `byte_range` could be specified to dowload only a part of the file, if needed.
pub async fn download_storage_object(
&self,
byte_range: Option<(u64, Option<u64>)>,
to_path: &Path,
) -> Result<Download, DownloadError> {
async fn do_download_storage_object<P, S>(
storage: &S,
byte_range: Option<(u64, Option<u64>)>,
to_path: &Path,
) -> Result<Download, DownloadError>
where
P: std::fmt::Debug + Send + Sync + 'static,
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
{
let remote_object_path = storage
.remote_object_id(to_path)
.with_context(|| {
format!(
"Failed to get the storage path for target local path '{}'",
to_path.display()
)
})
.map_err(DownloadError::BadInput)?;
match byte_range {
Some((start, end)) => {
storage
.download_byte_range(&remote_object_path, start, end)
.await
}
None => storage.download(&remote_object_path).await,
}
}
match self {
GenericRemoteStorage::Local(storage) => {
do_download_storage_object(storage, byte_range, to_path).await
}
GenericRemoteStorage::S3(storage) => {
do_download_storage_object(storage, byte_range, to_path).await
}
}
}
}
/// Extra set of key-value pairs that contain arbitrary metadata about the storage entry.

View File

@@ -10,7 +10,7 @@ use std::{
use anyhow::Context;
use futures::stream::{FuturesUnordered, StreamExt};
use remote_storage::{
path_with_suffix_extension, Download, DownloadError, GenericRemoteStorage, RemoteStorage,
path_with_suffix_extension, DownloadError, GenericRemoteStorage, RemoteStorage,
};
use tokio::{
fs,
@@ -143,7 +143,9 @@ async fn download_index_part(
let index_part_path = metadata_path(conf, sync_id.timeline_id, sync_id.tenant_id)
.with_file_name(IndexPart::FILE_NAME)
.with_extension(IndexPart::FILE_EXTENSION);
let mut index_part_download = download_storage_object(storage, &index_part_path).await?;
let mut index_part_download = storage
.download_storage_object(None, &index_part_path)
.await?;
let mut index_part_bytes = Vec::new();
io::copy(
@@ -262,7 +264,7 @@ pub(super) async fn download_timeline_layers<'a>(
)
})?;
let mut layer_download = download_storage_object(storage, &layer_destination_path)
let mut layer_download = storage.download_storage_object(None, &layer_destination_path)
.await
.with_context(|| {
format!(
@@ -365,37 +367,6 @@ pub(super) async fn download_timeline_layers<'a>(
}
}
async fn download_storage_object(
storage: &GenericRemoteStorage,
to_path: &Path,
) -> Result<Download, DownloadError> {
async fn do_download_storage_object<P, S>(
storage: &S,
to_path: &Path,
) -> Result<Download, DownloadError>
where
P: std::fmt::Debug + Send + Sync + 'static,
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
{
let remote_object_path = storage
.remote_object_id(to_path)
.with_context(|| {
format!(
"Failed to get the storage path for target local path '{}'",
to_path.display()
)
})
.map_err(DownloadError::BadInput)?;
storage.download(&remote_object_path).await
}
match storage {
GenericRemoteStorage::Local(storage) => do_download_storage_object(storage, to_path).await,
GenericRemoteStorage::S3(storage) => do_download_storage_object(storage, to_path).await,
}
}
async fn get_timeline_sync_ids(
storage: &GenericRemoteStorage,
tenant_path: &Path,

View File

@@ -1,14 +1,11 @@
//! Timeline synchronization logic to compress and upload to the remote storage all new timeline files from the checkpoints.
use std::{
fmt::Debug,
path::{Path, PathBuf},
};
use std::{fmt::Debug, path::PathBuf};
use anyhow::Context;
use futures::stream::{FuturesUnordered, StreamExt};
use once_cell::sync::Lazy;
use remote_storage::{GenericRemoteStorage, RemoteStorage};
use remote_storage::GenericRemoteStorage;
use tokio::fs;
use tracing::{debug, error, info, warn};
@@ -47,7 +44,8 @@ pub(super) async fn upload_index_part(
let index_part_path = metadata_path(conf, sync_id.timeline_id, sync_id.tenant_id)
.with_file_name(IndexPart::FILE_NAME)
.with_extension(IndexPart::FILE_EXTENSION);
upload_storage_object(storage, index_part_bytes, index_part_size, &index_part_path)
storage
.upload_storage_object(index_part_bytes, index_part_size, &index_part_path)
.await
.with_context(|| format!("Failed to upload index part for '{sync_id}'"))
}
@@ -131,7 +129,8 @@ pub(super) async fn upload_timeline_layers<'a>(
.map_err(UploadError::Other)?
.len() as usize;
match upload_storage_object(storage, source_file, source_size, &source_path)
match storage
.upload_storage_object(source_file, source_size, &source_path)
.await
.with_context(|| format!("Failed to upload layer file for {sync_id}"))
{
@@ -193,51 +192,6 @@ pub(super) async fn upload_timeline_layers<'a>(
}
}
async fn upload_storage_object(
storage: &GenericRemoteStorage,
from: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
from_size_bytes: usize,
from_path: &Path,
) -> anyhow::Result<()> {
async fn do_upload_storage_object<P, S>(
storage: &S,
from: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
from_size_bytes: usize,
from_path: &Path,
) -> anyhow::Result<()>
where
P: std::fmt::Debug + Send + Sync + 'static,
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
{
let target_storage_path = storage.remote_object_id(from_path).with_context(|| {
format!(
"Failed to get the storage path for source local path '{}'",
from_path.display()
)
})?;
storage
.upload(from, from_size_bytes, &target_storage_path, None)
.await
.with_context(|| {
format!(
"Failed to upload from '{}' to storage path '{:?}'",
from_path.display(),
target_storage_path
)
})
}
match storage {
GenericRemoteStorage::Local(storage) => {
do_upload_storage_object(storage, from, from_size_bytes, from_path).await
}
GenericRemoteStorage::S3(storage) => {
do_upload_storage_object(storage, from, from_size_bytes, from_path).await
}
}
}
enum UploadError {
MissingLocalFile(PathBuf, anyhow::Error),
Other(anyhow::Error),

View File

@@ -13,7 +13,7 @@ use std::time::Duration;
use postgres_ffi::v14::xlog_utils::{XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr};
use postgres_ffi::PG_TLI;
use remote_storage::{GenericRemoteStorage, RemoteStorage};
use remote_storage::GenericRemoteStorage;
use tokio::fs::File;
use tokio::runtime::Builder;
@@ -419,73 +419,37 @@ static REMOTE_STORAGE: OnceCell<Option<GenericRemoteStorage>> = OnceCell::new();
async fn backup_object(source_file: &Path, size: usize) -> Result<()> {
let storage = REMOTE_STORAGE.get().expect("failed to get remote storage");
let file = File::open(&source_file).await?;
let file = tokio::io::BufReader::new(File::open(&source_file).await.with_context(|| {
format!(
"Failed to open file {} for wal backup",
source_file.display()
)
})?);
// Storage is initialized by launcher at this point.
match storage.as_ref().unwrap() {
GenericRemoteStorage::Local(local_storage) => {
let destination = local_storage.remote_object_id(source_file)?;
debug!(
"local upload about to start from {} to {}",
source_file.display(),
destination.display()
);
local_storage.upload(file, size, &destination, None).await
}
GenericRemoteStorage::S3(s3_storage) => {
let s3key = s3_storage.remote_object_id(source_file)?;
debug!(
"S3 upload about to start from {} to {:?}",
source_file.display(),
s3key
);
s3_storage.upload(file, size, &s3key, None).await
}
}?;
Ok(())
storage
.as_ref()
.expect("Storage should be initialized by launcher at this point.")
.upload_storage_object(file, size, source_file)
.await
}
pub async fn read_object(
file_path: PathBuf,
offset: u64,
) -> anyhow::Result<Pin<Box<dyn tokio::io::AsyncRead>>> {
let download = match REMOTE_STORAGE
let download = REMOTE_STORAGE
.get()
.context("Failed to get remote storage")?
.as_ref()
.context("No remote storage configured")?
{
GenericRemoteStorage::Local(local_storage) => {
let source = local_storage.remote_object_id(&file_path)?;
info!(
"local download about to start from {} at offset {}",
source.display(),
offset
);
local_storage
.download_byte_range(&source, offset, None)
.await
}
GenericRemoteStorage::S3(s3_storage) => {
let s3key = s3_storage.remote_object_id(&file_path)?;
info!(
"S3 download about to start from {:?} at offset {}",
s3key, offset
);
s3_storage.download_byte_range(&s3key, offset, None).await
}
}
.with_context(|| {
format!(
"Failed to open WAL segment download stream for local storage path {}",
file_path.display()
)
})?;
.download_storage_object(Some((offset, None)), &file_path)
.await
.with_context(|| {
format!(
"Failed to open WAL segment download stream for local storage path {}",
file_path.display()
)
})?;
Ok(download.download_stream)
}