mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 13:32:57 +00:00
Hide remote timeline index access details
This commit is contained in:
committed by
Kirill Bulatov
parent
d56a0ee19a
commit
55de0b88f5
@@ -3,7 +3,6 @@ use std::sync::Arc;
|
||||
use anyhow::Result;
|
||||
use hyper::StatusCode;
|
||||
use hyper::{Body, Request, Response, Uri};
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::*;
|
||||
use zenith_utils::auth::JwtAuth;
|
||||
use zenith_utils::http::endpoint::attach_openapi_ui;
|
||||
@@ -22,17 +21,14 @@ use zenith_utils::zid::{ZTenantTimelineId, ZTimelineId};
|
||||
use super::models::{
|
||||
StatusResponse, TenantCreateRequest, TenantCreateResponse, TimelineCreateRequest,
|
||||
};
|
||||
use crate::remote_storage::{schedule_timeline_download, RemoteTimelineIndex};
|
||||
use crate::timelines::{
|
||||
extract_remote_timeline_info, LocalTimelineInfo, RemoteTimelineInfo, TimelineInfo,
|
||||
};
|
||||
use crate::remote_storage::{schedule_timeline_download, RemoteIndex};
|
||||
use crate::timelines::{LocalTimelineInfo, RemoteTimelineInfo, TimelineInfo};
|
||||
use crate::{config::PageServerConf, tenant_mgr, timelines, ZTenantId};
|
||||
|
||||
#[derive(Debug)]
|
||||
struct State {
|
||||
conf: &'static PageServerConf,
|
||||
auth: Option<Arc<JwtAuth>>,
|
||||
remote_index: Arc<RwLock<RemoteTimelineIndex>>,
|
||||
remote_index: RemoteIndex,
|
||||
allowlist_routes: Vec<Uri>,
|
||||
}
|
||||
|
||||
@@ -40,7 +36,7 @@ impl State {
|
||||
fn new(
|
||||
conf: &'static PageServerConf,
|
||||
auth: Option<Arc<JwtAuth>>,
|
||||
remote_index: Arc<RwLock<RemoteTimelineIndex>>,
|
||||
remote_index: RemoteIndex,
|
||||
) -> Self {
|
||||
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"]
|
||||
.iter()
|
||||
@@ -113,14 +109,24 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
|
||||
.await
|
||||
.map_err(ApiError::from_err)??;
|
||||
|
||||
let remote_index = get_state(&request).remote_index.read().await;
|
||||
let mut response_data = Vec::with_capacity(local_timeline_infos.len());
|
||||
for (timeline_id, local_timeline_info) in local_timeline_infos {
|
||||
response_data.push(TimelineInfo {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
local: Some(local_timeline_info),
|
||||
remote: extract_remote_timeline_info(tenant_id, timeline_id, &remote_index),
|
||||
remote: get_state(&request)
|
||||
.remote_index
|
||||
.read()
|
||||
.await
|
||||
.timeline_entry(&ZTenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
})
|
||||
.map(|remote_entry| RemoteTimelineInfo {
|
||||
remote_consistent_lsn: remote_entry.disk_consistent_lsn(),
|
||||
awaits_download: remote_entry.get_awaits_download(),
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -277,7 +283,7 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
check_permission(&request, None)?;
|
||||
|
||||
let request_data: TenantCreateRequest = json_request(&mut request).await?;
|
||||
let remote_index = Arc::clone(&get_state(&request).remote_index);
|
||||
let remote_index = get_state(&request).remote_index.clone();
|
||||
|
||||
let target_tenant_id = request_data
|
||||
.new_tenant_id
|
||||
@@ -308,7 +314,7 @@ async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
pub fn make_router(
|
||||
conf: &'static PageServerConf,
|
||||
auth: Option<Arc<JwtAuth>>,
|
||||
remote_index: Arc<RwLock<RemoteTimelineIndex>>,
|
||||
remote_index: RemoteIndex,
|
||||
) -> RouterBuilder<hyper::Body, ApiError> {
|
||||
let spec = include_bytes!("openapi_spec.yml");
|
||||
let mut router = attach_openapi_ui(endpoint::make_router(), spec, "/swagger.yml", "/v1/doc");
|
||||
|
||||
@@ -35,7 +35,7 @@ use self::metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::page_cache;
|
||||
use crate::relish::*;
|
||||
use crate::remote_storage::{schedule_timeline_checkpoint_upload, RemoteTimelineIndex};
|
||||
use crate::remote_storage::{schedule_timeline_checkpoint_upload, RemoteIndex};
|
||||
use crate::repository::{
|
||||
BlockNumber, GcResult, Repository, RepositoryTimeline, Timeline, TimelineSyncStatusUpdate,
|
||||
TimelineWriter, ZenithWalRecord,
|
||||
@@ -132,7 +132,7 @@ pub struct LayeredRepository {
|
||||
|
||||
// provides access to timeline data sitting in the remote storage
|
||||
// supposed to be used for retrieval of remote consistent lsn in walreceiver
|
||||
remote_index: Arc<tokio::sync::RwLock<RemoteTimelineIndex>>,
|
||||
remote_index: RemoteIndex,
|
||||
|
||||
/// Makes every timeline to backup their files to remote storage.
|
||||
upload_relishes: bool,
|
||||
@@ -355,8 +355,8 @@ impl Repository for LayeredRepository {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_remote_index(&self) -> &tokio::sync::RwLock<RemoteTimelineIndex> {
|
||||
self.remote_index.as_ref()
|
||||
fn get_remote_index(&self) -> &RemoteIndex {
|
||||
&self.remote_index
|
||||
}
|
||||
}
|
||||
|
||||
@@ -511,7 +511,7 @@ impl LayeredRepository {
|
||||
conf: &'static PageServerConf,
|
||||
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
|
||||
tenantid: ZTenantId,
|
||||
remote_index: Arc<tokio::sync::RwLock<RemoteTimelineIndex>>,
|
||||
remote_index: RemoteIndex,
|
||||
upload_relishes: bool,
|
||||
) -> LayeredRepository {
|
||||
LayeredRepository {
|
||||
|
||||
@@ -89,15 +89,14 @@ use std::{
|
||||
collections::HashMap,
|
||||
ffi, fs,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use tokio::{io, sync::RwLock};
|
||||
use tokio::io;
|
||||
use tracing::{debug, error, info};
|
||||
use zenith_utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};
|
||||
|
||||
pub use self::storage_sync::index::{RemoteTimelineIndex, TimelineIndexEntry};
|
||||
pub use self::storage_sync::index::{RemoteIndex, TimelineIndexEntry};
|
||||
pub use self::storage_sync::{schedule_timeline_checkpoint_upload, schedule_timeline_download};
|
||||
use self::{local_fs::LocalFs, rust_s3::S3};
|
||||
use crate::layered_repository::ephemeral_file::is_ephemeral_file;
|
||||
@@ -120,7 +119,7 @@ type LocalTimelineInitStatuses = HashMap<ZTenantId, HashMap<ZTimelineId, LocalTi
|
||||
/// Successful initialization includes a case when sync loop is not started, in which case the startup data is returned still,
|
||||
/// to simplify the received code.
|
||||
pub struct SyncStartupData {
|
||||
pub remote_index: Arc<RwLock<RemoteTimelineIndex>>,
|
||||
pub remote_index: RemoteIndex,
|
||||
pub local_timeline_init_statuses: LocalTimelineInitStatuses,
|
||||
}
|
||||
|
||||
@@ -172,7 +171,7 @@ pub fn start_local_timeline_sync(
|
||||
}
|
||||
Ok(SyncStartupData {
|
||||
local_timeline_init_statuses,
|
||||
remote_index: Arc::new(RwLock::new(RemoteTimelineIndex::empty())),
|
||||
remote_index: RemoteIndex::empty(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@
|
||||
//! * all never local state gets scheduled for upload, such timelines are "local" and fully operational
|
||||
//! * the rest of the remote timelines are reported to pageserver, but not downloaded before they are actually accessed in pageserver,
|
||||
//! it may schedule the download on such occasions.
|
||||
//! Then, the index is shared across pageserver under [`RemoteIndex`] guard to ensure proper synchronization.
|
||||
//!
|
||||
//! The synchronization unit is an archive: a set of timeline files (or relishes) and a special metadata file, all compressed into a blob.
|
||||
//! Currently, there's no way to process an archive partially, if the archive processing fails, it has to be started from zero next time again.
|
||||
@@ -80,10 +81,7 @@ use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use lazy_static::lazy_static;
|
||||
use tokio::{
|
||||
runtime::Runtime,
|
||||
sync::{
|
||||
mpsc::{self, UnboundedReceiver},
|
||||
RwLock,
|
||||
},
|
||||
sync::mpsc::{self, UnboundedReceiver},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tracing::*;
|
||||
@@ -92,8 +90,8 @@ use self::{
|
||||
compression::ArchiveHeader,
|
||||
download::{download_timeline, DownloadedTimeline},
|
||||
index::{
|
||||
ArchiveDescription, ArchiveId, RemoteTimeline, RemoteTimelineIndex, TimelineIndexEntry,
|
||||
TimelineIndexEntryInner,
|
||||
ArchiveDescription, ArchiveId, RemoteIndex, RemoteTimeline, RemoteTimelineIndex,
|
||||
TimelineIndexEntry, TimelineIndexEntryInner,
|
||||
},
|
||||
upload::upload_timeline_checkpoint,
|
||||
};
|
||||
@@ -392,13 +390,14 @@ pub(super) fn spawn_storage_sync_thread<
|
||||
None
|
||||
}
|
||||
});
|
||||
let mut remote_index =
|
||||
RemoteTimelineIndex::try_parse_descriptions_from_paths(conf, download_paths);
|
||||
let remote_index = RemoteIndex::try_parse_descriptions_from_paths(conf, download_paths);
|
||||
|
||||
let local_timeline_init_statuses =
|
||||
schedule_first_sync_tasks(&mut remote_index, local_timeline_files);
|
||||
let remote_index = Arc::new(RwLock::new(remote_index));
|
||||
let remote_index_cloned = Arc::clone(&remote_index);
|
||||
let local_timeline_init_statuses = schedule_first_sync_tasks(
|
||||
&mut runtime.block_on(remote_index.write()),
|
||||
local_timeline_files,
|
||||
);
|
||||
|
||||
let loop_index = remote_index.clone();
|
||||
thread_mgr::spawn(
|
||||
ThreadKind::StorageSync,
|
||||
None,
|
||||
@@ -410,7 +409,7 @@ pub(super) fn spawn_storage_sync_thread<
|
||||
runtime,
|
||||
conf,
|
||||
receiver,
|
||||
remote_index_cloned,
|
||||
loop_index,
|
||||
storage,
|
||||
max_concurrent_sync,
|
||||
max_sync_errors,
|
||||
@@ -438,14 +437,14 @@ fn storage_sync_loop<
|
||||
runtime: Runtime,
|
||||
conf: &'static PageServerConf,
|
||||
mut receiver: UnboundedReceiver<SyncTask>,
|
||||
index: Arc<RwLock<RemoteTimelineIndex>>,
|
||||
index: RemoteIndex,
|
||||
storage: S,
|
||||
max_concurrent_sync: NonZeroUsize,
|
||||
max_sync_errors: NonZeroU32,
|
||||
) {
|
||||
let remote_assets = Arc::new((storage, Arc::clone(&index)));
|
||||
let remote_assets = Arc::new((storage, index.clone()));
|
||||
loop {
|
||||
let index = Arc::clone(&index);
|
||||
let index = index.clone();
|
||||
let loop_step = runtime.block_on(async {
|
||||
tokio::select! {
|
||||
new_timeline_states = loop_step(
|
||||
@@ -480,7 +479,7 @@ async fn loop_step<
|
||||
>(
|
||||
conf: &'static PageServerConf,
|
||||
receiver: &mut UnboundedReceiver<SyncTask>,
|
||||
remote_assets: Arc<(S, Arc<RwLock<RemoteTimelineIndex>>)>,
|
||||
remote_assets: Arc<(S, RemoteIndex)>,
|
||||
max_concurrent_sync: NonZeroUsize,
|
||||
max_sync_errors: NonZeroU32,
|
||||
) -> HashMap<ZTenantId, HashMap<ZTimelineId, TimelineSyncStatusUpdate>> {
|
||||
@@ -560,7 +559,7 @@ async fn process_task<
|
||||
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
|
||||
>(
|
||||
conf: &'static PageServerConf,
|
||||
remote_assets: Arc<(S, Arc<RwLock<RemoteTimelineIndex>>)>,
|
||||
remote_assets: Arc<(S, RemoteIndex)>,
|
||||
task: SyncTask,
|
||||
max_sync_errors: NonZeroU32,
|
||||
) -> Option<TimelineSyncStatusUpdate> {
|
||||
@@ -584,7 +583,7 @@ async fn process_task<
|
||||
tokio::time::sleep(Duration::from_secs_f64(seconds_to_wait)).await;
|
||||
}
|
||||
|
||||
let remote_index = Arc::clone(&remote_assets.1);
|
||||
let remote_index = &remote_assets.1;
|
||||
|
||||
let sync_start = Instant::now();
|
||||
let sync_name = task.kind.sync_name();
|
||||
@@ -592,7 +591,7 @@ async fn process_task<
|
||||
SyncKind::Download(download_data) => {
|
||||
let download_result = download_timeline(
|
||||
conf,
|
||||
remote_assets,
|
||||
remote_assets.clone(),
|
||||
task.sync_id,
|
||||
download_data,
|
||||
task.retries + 1,
|
||||
@@ -772,7 +771,7 @@ async fn fetch_full_index<
|
||||
P: Send + Sync + 'static,
|
||||
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
|
||||
>(
|
||||
(storage, index): &(S, Arc<RwLock<RemoteTimelineIndex>>),
|
||||
(storage, index): &(S, RemoteIndex),
|
||||
timeline_dir: &Path,
|
||||
id: ZTenantTimelineId,
|
||||
) -> anyhow::Result<RemoteTimeline> {
|
||||
@@ -808,8 +807,9 @@ async fn fetch_full_index<
|
||||
}
|
||||
};
|
||||
drop(index_read); // tokio rw lock is not upgradeable
|
||||
let mut index_write = index.write().await;
|
||||
index_write
|
||||
index
|
||||
.write()
|
||||
.await
|
||||
.upgrade_timeline_entry(&id, full_index.clone())
|
||||
.context("cannot upgrade timeline entry in remote index")?;
|
||||
Ok(full_index)
|
||||
@@ -855,7 +855,7 @@ mod test_utils {
|
||||
#[track_caller]
|
||||
pub async fn ensure_correct_timeline_upload(
|
||||
harness: &RepoHarness,
|
||||
remote_assets: Arc<(LocalFs, Arc<RwLock<RemoteTimelineIndex>>)>,
|
||||
remote_assets: Arc<(LocalFs, RemoteIndex)>,
|
||||
timeline_id: ZTimelineId,
|
||||
new_upload: NewCheckpoint,
|
||||
) {
|
||||
@@ -872,7 +872,7 @@ mod test_utils {
|
||||
let (storage, index) = remote_assets.as_ref();
|
||||
assert_index_descriptions(
|
||||
index,
|
||||
RemoteTimelineIndex::try_parse_descriptions_from_paths(
|
||||
&RemoteIndex::try_parse_descriptions_from_paths(
|
||||
harness.conf,
|
||||
remote_assets
|
||||
.0
|
||||
@@ -914,7 +914,7 @@ mod test_utils {
|
||||
}
|
||||
|
||||
pub async fn expect_timeline(
|
||||
index: &Arc<RwLock<RemoteTimelineIndex>>,
|
||||
index: &RemoteIndex,
|
||||
sync_id: ZTenantTimelineId,
|
||||
) -> RemoteTimeline {
|
||||
if let Some(TimelineIndexEntryInner::Full(remote_timeline)) = index
|
||||
@@ -934,9 +934,11 @@ mod test_utils {
|
||||
|
||||
#[track_caller]
|
||||
pub async fn assert_index_descriptions(
|
||||
index: &Arc<RwLock<RemoteTimelineIndex>>,
|
||||
expected_index_with_descriptions: RemoteTimelineIndex,
|
||||
index: &RemoteIndex,
|
||||
expected_index_with_descriptions: &RemoteIndex,
|
||||
) {
|
||||
let expected_index_with_descriptions = expected_index_with_descriptions.read().await;
|
||||
|
||||
let index_read = index.read().await;
|
||||
let actual_sync_ids = index_read.all_sync_ids().collect::<BTreeSet<_>>();
|
||||
let expected_sync_ids = expected_index_with_descriptions
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
use std::{borrow::Cow, collections::BTreeSet, path::PathBuf, sync::Arc};
|
||||
|
||||
use anyhow::{ensure, Context};
|
||||
use tokio::{fs, sync::RwLock};
|
||||
use tokio::fs;
|
||||
use tracing::{debug, error, trace, warn};
|
||||
use zenith_utils::zid::ZTenantId;
|
||||
|
||||
@@ -20,8 +20,8 @@ use crate::{
|
||||
};
|
||||
|
||||
use super::{
|
||||
index::{ArchiveId, RemoteTimeline, RemoteTimelineIndex},
|
||||
TimelineDownload,
|
||||
index::{ArchiveId, RemoteTimeline},
|
||||
RemoteIndex, TimelineDownload,
|
||||
};
|
||||
|
||||
/// Timeline download result, with extra data, needed for downloading.
|
||||
@@ -47,7 +47,7 @@ pub(super) async fn download_timeline<
|
||||
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
|
||||
>(
|
||||
conf: &'static PageServerConf,
|
||||
remote_assets: Arc<(S, Arc<RwLock<RemoteTimelineIndex>>)>,
|
||||
remote_assets: Arc<(S, RemoteIndex)>,
|
||||
sync_id: ZTenantTimelineId,
|
||||
mut download: TimelineDownload,
|
||||
retries: u32,
|
||||
@@ -167,7 +167,7 @@ async fn try_download_archive<
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
}: ZTenantTimelineId,
|
||||
remote_assets: Arc<(S, Arc<RwLock<RemoteTimelineIndex>>)>,
|
||||
remote_assets: Arc<(S, RemoteIndex)>,
|
||||
remote_timeline: &RemoteTimeline,
|
||||
archive_id: ArchiveId,
|
||||
files_to_skip: Arc<BTreeSet<PathBuf>>,
|
||||
@@ -255,16 +255,14 @@ mod tests {
|
||||
let repo_harness = RepoHarness::create("test_download_timeline")?;
|
||||
let sync_id = ZTenantTimelineId::new(repo_harness.tenant_id, TIMELINE_ID);
|
||||
let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?;
|
||||
let index = Arc::new(RwLock::new(
|
||||
RemoteTimelineIndex::try_parse_descriptions_from_paths(
|
||||
repo_harness.conf,
|
||||
storage
|
||||
.list()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|storage_path| storage.local_path(&storage_path).unwrap()),
|
||||
),
|
||||
));
|
||||
let index = RemoteIndex::try_parse_descriptions_from_paths(
|
||||
repo_harness.conf,
|
||||
storage
|
||||
.list()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|storage_path| storage.local_path(&storage_path).unwrap()),
|
||||
);
|
||||
let remote_assets = Arc::new((storage, index));
|
||||
let storage = &remote_assets.0;
|
||||
let index = &remote_assets.1;
|
||||
@@ -314,7 +312,7 @@ mod tests {
|
||||
.await;
|
||||
assert_index_descriptions(
|
||||
index,
|
||||
RemoteTimelineIndex::try_parse_descriptions_from_paths(
|
||||
&RemoteIndex::try_parse_descriptions_from_paths(
|
||||
repo_harness.conf,
|
||||
remote_assets
|
||||
.0
|
||||
|
||||
@@ -7,10 +7,12 @@
|
||||
use std::{
|
||||
collections::{BTreeMap, BTreeSet, HashMap},
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use anyhow::{bail, ensure, Context};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::*;
|
||||
use zenith_utils::{
|
||||
lsn::Lsn,
|
||||
@@ -55,11 +57,14 @@ pub struct RemoteTimelineIndex {
|
||||
timeline_entries: HashMap<ZTenantTimelineId, TimelineIndexEntry>,
|
||||
}
|
||||
|
||||
impl RemoteTimelineIndex {
|
||||
/// A wrapper to synchrnize access to the index, should be created and used before dealing with any [`RemoteTimelineIndex`].
|
||||
pub struct RemoteIndex(Arc<RwLock<RemoteTimelineIndex>>);
|
||||
|
||||
impl RemoteIndex {
|
||||
pub fn empty() -> Self {
|
||||
Self {
|
||||
Self(Arc::new(RwLock::new(RemoteTimelineIndex {
|
||||
timeline_entries: HashMap::new(),
|
||||
}
|
||||
})))
|
||||
}
|
||||
|
||||
/// Attempts to parse file paths (not checking the file contents) and find files
|
||||
@@ -69,7 +74,9 @@ impl RemoteTimelineIndex {
|
||||
conf: &'static PageServerConf,
|
||||
paths: impl Iterator<Item = P>,
|
||||
) -> Self {
|
||||
let mut index = Self::empty();
|
||||
let mut index = RemoteTimelineIndex {
|
||||
timeline_entries: HashMap::new(),
|
||||
};
|
||||
for path in paths {
|
||||
if let Err(e) = try_parse_index_entry(&mut index, conf, path.as_ref()) {
|
||||
debug!(
|
||||
@@ -79,9 +86,26 @@ impl RemoteTimelineIndex {
|
||||
);
|
||||
}
|
||||
}
|
||||
index
|
||||
|
||||
Self(Arc::new(RwLock::new(index)))
|
||||
}
|
||||
|
||||
pub async fn read(&self) -> tokio::sync::RwLockReadGuard<'_, RemoteTimelineIndex> {
|
||||
self.0.read().await
|
||||
}
|
||||
|
||||
pub async fn write(&self) -> tokio::sync::RwLockWriteGuard<'_, RemoteTimelineIndex> {
|
||||
self.0.write().await
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for RemoteIndex {
|
||||
fn clone(&self) -> Self {
|
||||
Self(Arc::clone(&self.0))
|
||||
}
|
||||
}
|
||||
|
||||
impl RemoteTimelineIndex {
|
||||
pub fn timeline_entry(&self, id: &ZTenantTimelineId) -> Option<&TimelineIndexEntry> {
|
||||
self.timeline_entries.get(id)
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
|
||||
use std::{borrow::Cow, collections::BTreeSet, path::PathBuf, sync::Arc};
|
||||
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::{debug, error, warn};
|
||||
|
||||
use crate::{
|
||||
@@ -17,7 +16,7 @@ use crate::{
|
||||
},
|
||||
};
|
||||
|
||||
use super::{compression::ArchiveHeader, index::RemoteTimelineIndex, NewCheckpoint};
|
||||
use super::{compression::ArchiveHeader, NewCheckpoint, RemoteIndex};
|
||||
|
||||
/// Attempts to compress and upload given checkpoint files.
|
||||
/// No extra checks for overlapping files is made: download takes care of that, ensuring no non-metadata local timeline files are overwritten.
|
||||
@@ -29,7 +28,7 @@ pub(super) async fn upload_timeline_checkpoint<
|
||||
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
|
||||
>(
|
||||
config: &'static PageServerConf,
|
||||
remote_assets: Arc<(S, Arc<RwLock<RemoteTimelineIndex>>)>,
|
||||
remote_assets: Arc<(S, RemoteIndex)>,
|
||||
sync_id: ZTenantTimelineId,
|
||||
new_checkpoint: NewCheckpoint,
|
||||
retries: u32,
|
||||
@@ -156,7 +155,7 @@ async fn try_upload_checkpoint<
|
||||
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
|
||||
>(
|
||||
config: &'static PageServerConf,
|
||||
remote_assets: Arc<(S, Arc<RwLock<RemoteTimelineIndex>>)>,
|
||||
remote_assets: Arc<(S, RemoteIndex)>,
|
||||
sync_id: ZTenantTimelineId,
|
||||
new_checkpoint: &NewCheckpoint,
|
||||
files_to_skip: BTreeSet<PathBuf>,
|
||||
@@ -238,16 +237,14 @@ mod tests {
|
||||
let repo_harness = RepoHarness::create("reupload_timeline")?;
|
||||
let sync_id = ZTenantTimelineId::new(repo_harness.tenant_id, TIMELINE_ID);
|
||||
let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?;
|
||||
let index = Arc::new(RwLock::new(
|
||||
RemoteTimelineIndex::try_parse_descriptions_from_paths(
|
||||
repo_harness.conf,
|
||||
storage
|
||||
.list()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|storage_path| storage.local_path(&storage_path).unwrap()),
|
||||
),
|
||||
));
|
||||
let index = RemoteIndex::try_parse_descriptions_from_paths(
|
||||
repo_harness.conf,
|
||||
storage
|
||||
.list()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|storage_path| storage.local_path(&storage_path).unwrap()),
|
||||
);
|
||||
let remote_assets = Arc::new((storage, index));
|
||||
let index = &remote_assets.1;
|
||||
|
||||
@@ -436,16 +433,14 @@ mod tests {
|
||||
let repo_harness = RepoHarness::create("reupload_timeline_rejected")?;
|
||||
let sync_id = ZTenantTimelineId::new(repo_harness.tenant_id, TIMELINE_ID);
|
||||
let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?;
|
||||
let index = Arc::new(RwLock::new(
|
||||
RemoteTimelineIndex::try_parse_descriptions_from_paths(
|
||||
repo_harness.conf,
|
||||
storage
|
||||
.list()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|storage_path| storage.local_path(&storage_path).unwrap()),
|
||||
),
|
||||
));
|
||||
let index = RemoteIndex::try_parse_descriptions_from_paths(
|
||||
repo_harness.conf,
|
||||
storage
|
||||
.list()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|storage_path| storage.local_path(&storage_path).unwrap()),
|
||||
);
|
||||
let remote_assets = Arc::new((storage, index));
|
||||
let storage = &remote_assets.0;
|
||||
let index = &remote_assets.1;
|
||||
@@ -464,7 +459,7 @@ mod tests {
|
||||
first_checkpoint,
|
||||
)
|
||||
.await;
|
||||
let after_first_uploads = RemoteTimelineIndex::try_parse_descriptions_from_paths(
|
||||
let after_first_uploads = RemoteIndex::try_parse_descriptions_from_paths(
|
||||
repo_harness.conf,
|
||||
remote_assets
|
||||
.0
|
||||
@@ -495,7 +490,7 @@ mod tests {
|
||||
0,
|
||||
)
|
||||
.await;
|
||||
assert_index_descriptions(index, after_first_uploads.clone()).await;
|
||||
assert_index_descriptions(index, &after_first_uploads).await;
|
||||
|
||||
let checkpoint_with_uploaded_lsn = create_local_timeline(
|
||||
&repo_harness,
|
||||
@@ -511,7 +506,7 @@ mod tests {
|
||||
0,
|
||||
)
|
||||
.await;
|
||||
assert_index_descriptions(index, after_first_uploads.clone()).await;
|
||||
assert_index_descriptions(index, &after_first_uploads).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use crate::layered_repository::metadata::TimelineMetadata;
|
||||
use crate::relish::*;
|
||||
use crate::remote_storage::RemoteTimelineIndex;
|
||||
use crate::remote_storage::RemoteIndex;
|
||||
use crate::walrecord::MultiXactMember;
|
||||
use crate::CheckpointConfig;
|
||||
use anyhow::Result;
|
||||
@@ -91,7 +91,7 @@ pub trait Repository: Send + Sync {
|
||||
fn detach_timeline(&self, timeline_id: ZTimelineId) -> Result<()>;
|
||||
|
||||
// Allows to retrieve remote timeline index from the repo. Used in walreceiver to grab remote consistent lsn.
|
||||
fn get_remote_index(&self) -> &tokio::sync::RwLock<RemoteTimelineIndex>;
|
||||
fn get_remote_index(&self) -> &RemoteIndex;
|
||||
}
|
||||
|
||||
/// A timeline, that belongs to the current repository.
|
||||
@@ -407,7 +407,7 @@ pub mod repo_harness {
|
||||
self.conf,
|
||||
walredo_mgr,
|
||||
self.tenant_id,
|
||||
Arc::new(tokio::sync::RwLock::new(RemoteTimelineIndex::empty())),
|
||||
RemoteIndex::empty(),
|
||||
false,
|
||||
));
|
||||
// populate repo with locally available timelines
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::layered_repository::LayeredRepository;
|
||||
use crate::remote_storage::RemoteTimelineIndex;
|
||||
use crate::remote_storage::RemoteIndex;
|
||||
use crate::repository::{Repository, Timeline, TimelineSyncStatusUpdate};
|
||||
use crate::thread_mgr;
|
||||
use crate::thread_mgr::ThreadKind;
|
||||
@@ -66,7 +66,7 @@ fn access_tenants() -> MutexGuard<'static, HashMap<ZTenantId, Tenant>> {
|
||||
pub fn load_local_repo(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: ZTenantId,
|
||||
remote_index: &Arc<tokio::sync::RwLock<RemoteTimelineIndex>>,
|
||||
remote_index: &RemoteIndex,
|
||||
) -> Arc<dyn Repository> {
|
||||
let mut m = access_tenants();
|
||||
let tenant = m.entry(tenant_id).or_insert_with(|| {
|
||||
@@ -78,7 +78,7 @@ pub fn load_local_repo(
|
||||
conf,
|
||||
Arc::new(walredo_mgr),
|
||||
tenant_id,
|
||||
Arc::clone(remote_index),
|
||||
remote_index.clone(),
|
||||
conf.remote_storage_config.is_some(),
|
||||
));
|
||||
Tenant {
|
||||
@@ -92,7 +92,7 @@ pub fn load_local_repo(
|
||||
/// Updates tenants' repositories, changing their timelines state in memory.
|
||||
pub fn apply_timeline_sync_status_updates(
|
||||
conf: &'static PageServerConf,
|
||||
remote_index: Arc<tokio::sync::RwLock<RemoteTimelineIndex>>,
|
||||
remote_index: RemoteIndex,
|
||||
sync_status_updates: HashMap<ZTenantId, HashMap<ZTimelineId, TimelineSyncStatusUpdate>>,
|
||||
) {
|
||||
if sync_status_updates.is_empty() {
|
||||
@@ -172,7 +172,7 @@ pub fn shutdown_all_tenants() {
|
||||
pub fn create_tenant_repository(
|
||||
conf: &'static PageServerConf,
|
||||
tenantid: ZTenantId,
|
||||
remote_index: Arc<tokio::sync::RwLock<RemoteTimelineIndex>>,
|
||||
remote_index: RemoteIndex,
|
||||
) -> Result<Option<ZTenantId>> {
|
||||
match access_tenants().entry(tenantid) {
|
||||
Entry::Occupied(_) => {
|
||||
|
||||
@@ -15,13 +15,13 @@ use std::{
|
||||
use tracing::*;
|
||||
|
||||
use zenith_utils::lsn::Lsn;
|
||||
use zenith_utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};
|
||||
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||
use zenith_utils::{crashsafe_dir, logging};
|
||||
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
layered_repository::metadata::TimelineMetadata,
|
||||
remote_storage::RemoteTimelineIndex,
|
||||
remote_storage::RemoteIndex,
|
||||
repository::{LocalTimelineState, Repository},
|
||||
};
|
||||
use crate::{import_datadir, LOG_FILE_NAME};
|
||||
@@ -127,22 +127,6 @@ pub struct TimelineInfo {
|
||||
pub remote: Option<RemoteTimelineInfo>,
|
||||
}
|
||||
|
||||
pub fn extract_remote_timeline_info(
|
||||
tenant_id: ZTenantId,
|
||||
timeline_id: ZTimelineId,
|
||||
remote_index: &RemoteTimelineIndex,
|
||||
) -> Option<RemoteTimelineInfo> {
|
||||
remote_index
|
||||
.timeline_entry(&ZTenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
})
|
||||
.map(|remote_entry| RemoteTimelineInfo {
|
||||
remote_consistent_lsn: remote_entry.disk_consistent_lsn(),
|
||||
awaits_download: remote_entry.get_awaits_download(),
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct PointInTime {
|
||||
pub timeline_id: ZTimelineId,
|
||||
@@ -179,7 +163,7 @@ pub fn init_pageserver(
|
||||
pub enum CreateRepo {
|
||||
Real {
|
||||
wal_redo_manager: Arc<dyn WalRedoManager + Send + Sync>,
|
||||
remote_index: Arc<tokio::sync::RwLock<RemoteTimelineIndex>>,
|
||||
remote_index: RemoteIndex,
|
||||
},
|
||||
Dummy,
|
||||
}
|
||||
@@ -207,8 +191,7 @@ pub fn create_repo(
|
||||
// anymore, but I think that could still happen.
|
||||
let wal_redo_manager = Arc::new(crate::walredo::DummyRedoManager {});
|
||||
|
||||
let remote_index = Arc::new(tokio::sync::RwLock::new(RemoteTimelineIndex::empty()));
|
||||
(wal_redo_manager as _, remote_index)
|
||||
(wal_redo_manager as _, RemoteIndex::empty())
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user