mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 05:22:56 +00:00
switch to per-tenant attach/detach
download operations of all timelines for one tenant are now grouped together so when attach is invoked pageserver downloads all of them and registers them in a single apply_sync_status_update call so branches can be used safely with attach/detach
This commit is contained in:
committed by
Dmitry Rodionov
parent
ae116ff0a9
commit
4c54e4b37d
@@ -42,13 +42,19 @@ pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10;
|
||||
/// https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/
|
||||
pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
|
||||
|
||||
pub trait RemoteObjectName {
|
||||
// Needed to retrieve last component for RemoteObjectId.
|
||||
// In other words a file name
|
||||
fn object_name(&self) -> Option<&str>;
|
||||
}
|
||||
|
||||
/// Storage (potentially remote) API to manage its state.
|
||||
/// This storage tries to be unaware of any layered repository context,
|
||||
/// providing basic CRUD operations for storage files.
|
||||
#[async_trait::async_trait]
|
||||
pub trait RemoteStorage: Send + Sync {
|
||||
/// A way to uniquely reference a file in the remote storage.
|
||||
type RemoteObjectId;
|
||||
type RemoteObjectId: RemoteObjectName;
|
||||
|
||||
/// Attempts to derive the storage path out of the local path, if the latter is correct.
|
||||
fn remote_object_id(&self, local_path: &Path) -> anyhow::Result<Self::RemoteObjectId>;
|
||||
@@ -59,6 +65,12 @@ pub trait RemoteStorage: Send + Sync {
|
||||
/// Lists all items the storage has right now.
|
||||
async fn list(&self) -> anyhow::Result<Vec<Self::RemoteObjectId>>;
|
||||
|
||||
/// Lists all top level subdirectories for a given prefix
|
||||
async fn list_prefixes(
|
||||
&self,
|
||||
prefix: Option<Self::RemoteObjectId>,
|
||||
) -> anyhow::Result<Vec<Self::RemoteObjectId>>;
|
||||
|
||||
/// Streams the local file contents into remote into the remote storage entry.
|
||||
async fn upload(
|
||||
&self,
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
//! volume is mounted to the local FS.
|
||||
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
future::Future,
|
||||
path::{Path, PathBuf},
|
||||
pin::Pin,
|
||||
@@ -17,10 +18,16 @@ use tokio::{
|
||||
};
|
||||
use tracing::*;
|
||||
|
||||
use crate::{path_with_suffix_extension, Download, DownloadError};
|
||||
use crate::{path_with_suffix_extension, Download, DownloadError, RemoteObjectName};
|
||||
|
||||
use super::{strip_path_prefix, RemoteStorage, StorageMetadata};
|
||||
|
||||
impl RemoteObjectName for PathBuf {
|
||||
fn object_name(&self) -> Option<&str> {
|
||||
self.file_stem().and_then(|n| n.to_str())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LocalFs {
|
||||
working_directory: PathBuf,
|
||||
storage_root: PathBuf,
|
||||
@@ -101,7 +108,18 @@ impl RemoteStorage for LocalFs {
|
||||
}
|
||||
|
||||
async fn list(&self) -> anyhow::Result<Vec<Self::RemoteObjectId>> {
|
||||
get_all_files(&self.storage_root).await
|
||||
get_all_files(&self.storage_root, true).await
|
||||
}
|
||||
|
||||
async fn list_prefixes(
|
||||
&self,
|
||||
prefix: Option<Self::RemoteObjectId>,
|
||||
) -> anyhow::Result<Vec<Self::RemoteObjectId>> {
|
||||
let path = match prefix {
|
||||
Some(prefix) => Cow::Owned(self.storage_root.join(prefix)),
|
||||
None => Cow::Borrowed(&self.storage_root),
|
||||
};
|
||||
get_all_files(path.as_ref(), false).await
|
||||
}
|
||||
|
||||
async fn upload(
|
||||
@@ -299,6 +317,7 @@ fn storage_metadata_path(original_path: &Path) -> PathBuf {
|
||||
|
||||
fn get_all_files<'a, P>(
|
||||
directory_path: P,
|
||||
recursive: bool,
|
||||
) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<PathBuf>>> + Send + Sync + 'a>>
|
||||
where
|
||||
P: AsRef<Path> + Send + Sync + 'a,
|
||||
@@ -315,7 +334,11 @@ where
|
||||
if file_type.is_symlink() {
|
||||
debug!("{:?} us a symlink, skipping", entry_path)
|
||||
} else if file_type.is_dir() {
|
||||
paths.extend(get_all_files(entry_path).await?.into_iter())
|
||||
if recursive {
|
||||
paths.extend(get_all_files(entry_path, true).await?.into_iter())
|
||||
} else {
|
||||
paths.push(dir_entry.path())
|
||||
}
|
||||
} else {
|
||||
paths.push(dir_entry.path());
|
||||
}
|
||||
|
||||
@@ -19,7 +19,9 @@ use tokio::{io, sync::Semaphore};
|
||||
use tokio_util::io::ReaderStream;
|
||||
use tracing::debug;
|
||||
|
||||
use crate::{strip_path_prefix, Download, DownloadError, RemoteStorage, S3Config};
|
||||
use crate::{
|
||||
strip_path_prefix, Download, DownloadError, RemoteObjectName, RemoteStorage, S3Config,
|
||||
};
|
||||
|
||||
use super::StorageMetadata;
|
||||
|
||||
@@ -117,6 +119,24 @@ impl S3ObjectKey {
|
||||
}
|
||||
}
|
||||
|
||||
impl RemoteObjectName for S3ObjectKey {
|
||||
/// Turn a/b/c or a/b/c/ into c
|
||||
fn object_name(&self) -> Option<&str> {
|
||||
// corner case
|
||||
if &self.0 == "/" {
|
||||
return None;
|
||||
}
|
||||
|
||||
if self.0.ends_with(S3_PREFIX_SEPARATOR) {
|
||||
self.0.rsplit(S3_PREFIX_SEPARATOR).nth(1)
|
||||
} else {
|
||||
self.0
|
||||
.rsplit_once(S3_PREFIX_SEPARATOR)
|
||||
.map(|(_, last)| last)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// AWS S3 storage.
|
||||
pub struct S3Bucket {
|
||||
workdir: PathBuf,
|
||||
@@ -283,6 +303,77 @@ impl RemoteStorage for S3Bucket {
|
||||
Ok(document_keys)
|
||||
}
|
||||
|
||||
/// Note: it wont include empty "directories"
|
||||
async fn list_prefixes(
|
||||
&self,
|
||||
prefix: Option<Self::RemoteObjectId>,
|
||||
) -> anyhow::Result<Vec<Self::RemoteObjectId>> {
|
||||
let list_prefix = match prefix {
|
||||
Some(prefix) => {
|
||||
let mut prefix_in_bucket = self.prefix_in_bucket.clone().unwrap_or_default();
|
||||
// if there is no trailing / in default prefix and
|
||||
// supplied prefix does not start with "/" insert it
|
||||
if !(prefix_in_bucket.ends_with(S3_PREFIX_SEPARATOR)
|
||||
|| prefix.0.starts_with(S3_PREFIX_SEPARATOR))
|
||||
{
|
||||
prefix_in_bucket.push(S3_PREFIX_SEPARATOR);
|
||||
}
|
||||
|
||||
prefix_in_bucket.push_str(&prefix.0);
|
||||
// required to end with a separator
|
||||
// otherwise request will return only the entry of a prefix
|
||||
if !prefix_in_bucket.ends_with(S3_PREFIX_SEPARATOR) {
|
||||
prefix_in_bucket.push(S3_PREFIX_SEPARATOR);
|
||||
}
|
||||
Some(prefix_in_bucket)
|
||||
}
|
||||
None => self.prefix_in_bucket.clone(),
|
||||
};
|
||||
|
||||
let mut document_keys = Vec::new();
|
||||
|
||||
let mut continuation_token = None;
|
||||
loop {
|
||||
let _guard = self
|
||||
.concurrency_limiter
|
||||
.acquire()
|
||||
.await
|
||||
.context("Concurrency limiter semaphore got closed during S3 list")?;
|
||||
|
||||
metrics::inc_list_objects();
|
||||
|
||||
let fetch_response = self
|
||||
.client
|
||||
.list_objects_v2(ListObjectsV2Request {
|
||||
bucket: self.bucket_name.clone(),
|
||||
prefix: list_prefix.clone(),
|
||||
continuation_token,
|
||||
delimiter: Some(S3_PREFIX_SEPARATOR.to_string()),
|
||||
..ListObjectsV2Request::default()
|
||||
})
|
||||
.await
|
||||
.map_err(|e| {
|
||||
metrics::inc_list_objects_fail();
|
||||
e
|
||||
})?;
|
||||
|
||||
document_keys.extend(
|
||||
fetch_response
|
||||
.common_prefixes
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.filter_map(|o| Some(S3ObjectKey(o.prefix?))),
|
||||
);
|
||||
|
||||
match fetch_response.continuation_token {
|
||||
Some(new_token) => continuation_token = Some(new_token),
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
|
||||
Ok(document_keys)
|
||||
}
|
||||
|
||||
async fn upload(
|
||||
&self,
|
||||
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
|
||||
@@ -378,6 +469,25 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn object_name() {
|
||||
let k = S3ObjectKey("a/b/c".to_owned());
|
||||
assert_eq!(k.object_name(), Some("c"));
|
||||
|
||||
let k = S3ObjectKey("a/b/c/".to_owned());
|
||||
assert_eq!(k.object_name(), Some("c"));
|
||||
|
||||
let k = S3ObjectKey("a/".to_owned());
|
||||
assert_eq!(k.object_name(), Some("a"));
|
||||
|
||||
// XXX is it impossible to have an empty key?
|
||||
let k = S3ObjectKey("".to_owned());
|
||||
assert_eq!(k.object_name(), None);
|
||||
|
||||
let k = S3ObjectKey("/".to_owned());
|
||||
assert_eq!(k.object_name(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn download_destination() -> anyhow::Result<()> {
|
||||
let workdir = tempdir()?.path().to_owned();
|
||||
|
||||
@@ -170,7 +170,6 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
|
||||
/v1/tenant/{tenant_id}/timeline/{timeline_id}/attach:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
@@ -186,12 +185,27 @@ paths:
|
||||
type: string
|
||||
format: hex
|
||||
post:
|
||||
description: Attach remote timeline
|
||||
description: Deprecated
|
||||
responses:
|
||||
"200":
|
||||
description: Timeline attaching scheduled
|
||||
"410":
|
||||
description: GONE
|
||||
|
||||
|
||||
/v1/tenant/{tenant_id}/attach:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
format: hex
|
||||
post:
|
||||
description: Deprecated
|
||||
responses:
|
||||
"202":
|
||||
description: Tenant attaching scheduled
|
||||
"400":
|
||||
description: Error when no tenant id found in path or no timeline id
|
||||
description: Error when no tenant id found in path parameters
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
@@ -215,7 +229,7 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/NotFoundError"
|
||||
"409":
|
||||
description: Timeline download is already in progress
|
||||
description: Tenant download is already in progress
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
@@ -227,7 +241,6 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
|
||||
|
||||
/v1/tenant/{tenant_id}/timeline/{timeline_id}/detach:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
@@ -243,12 +256,26 @@ paths:
|
||||
type: string
|
||||
format: hex
|
||||
post:
|
||||
description: Detach local timeline
|
||||
description: Deprecated
|
||||
responses:
|
||||
"410":
|
||||
description: GONE
|
||||
|
||||
/v1/tenant/{tenant_id}/detach:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
format: hex
|
||||
post:
|
||||
description: Detach local tenant
|
||||
responses:
|
||||
"200":
|
||||
description: Timeline detached
|
||||
description: Tenant detached
|
||||
"400":
|
||||
description: Error when no tenant id found in path or no timeline id
|
||||
description: Error when no tenant id found in path parameters
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
|
||||
@@ -209,9 +209,9 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
|
||||
.await;
|
||||
|
||||
if local_timeline_info.is_none() && remote_timeline_info.is_none() {
|
||||
return Err(ApiError::NotFound(
|
||||
"Timeline is not found neither locally nor remotely".to_string(),
|
||||
));
|
||||
return Err(ApiError::NotFound(format!(
|
||||
"Timeline {tenant_id}/{timeline_id} is not found neither locally nor remotely"
|
||||
)));
|
||||
}
|
||||
|
||||
let timeline_info = TimelineInfo {
|
||||
@@ -241,119 +241,130 @@ async fn wal_receiver_get_handler(request: Request<Body>) -> Result<Response<Bod
|
||||
json_response(StatusCode::OK, &wal_receiver_entry)
|
||||
}
|
||||
|
||||
async fn timeline_attach_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
async fn timeline_attach_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
json_response(StatusCode::GONE, ())
|
||||
}
|
||||
|
||||
// TODO makes sense to provide tenant config right away the same way as it handled in tenant_create
|
||||
async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
info!(
|
||||
"Handling timeline {} attach for tenant: {}",
|
||||
timeline_id, tenant_id,
|
||||
);
|
||||
info!("Handling tenant attach {}", tenant_id,);
|
||||
|
||||
tokio::task::spawn_blocking(move || {
|
||||
if tenant_mgr::get_local_timeline_with_load(tenant_id, timeline_id).is_ok() {
|
||||
// TODO: maybe answer with 309 Not Modified here?
|
||||
anyhow::bail!("Timeline is already present locally")
|
||||
if tenant_mgr::get_tenant_state(tenant_id).is_some() {
|
||||
anyhow::bail!("Tenant is already present locally")
|
||||
};
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.map_err(ApiError::from_err)??;
|
||||
|
||||
let sync_id = ZTenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
};
|
||||
let state = get_state(&request);
|
||||
let remote_index = &state.remote_index;
|
||||
|
||||
let mut index_accessor = remote_index.write().await;
|
||||
if let Some(remote_timeline) = index_accessor.timeline_entry_mut(&sync_id) {
|
||||
if remote_timeline.awaits_download {
|
||||
if let Some(tenant_entry) = index_accessor.tenant_entry_mut(&tenant_id) {
|
||||
if tenant_entry.has_in_progress_downloads() {
|
||||
return Err(ApiError::Conflict(
|
||||
"Timeline download is already in progress".to_string(),
|
||||
"Tenant download is already in progress".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
remote_timeline.awaits_download = true;
|
||||
storage_sync::schedule_layer_download(tenant_id, timeline_id);
|
||||
return json_response(StatusCode::ACCEPTED, ());
|
||||
} else {
|
||||
// no timeline in the index, release the lock to make the potentially lengthy download opetation
|
||||
drop(index_accessor);
|
||||
}
|
||||
|
||||
let new_timeline = match try_download_index_part_data(state, sync_id).await {
|
||||
Ok(Some(mut new_timeline)) => {
|
||||
tokio::fs::create_dir_all(state.conf.timeline_path(&timeline_id, &tenant_id))
|
||||
.await
|
||||
.context("Failed to create new timeline directory")?;
|
||||
new_timeline.awaits_download = true;
|
||||
new_timeline
|
||||
for (timeline_id, remote_timeline) in tenant_entry.iter_mut() {
|
||||
storage_sync::schedule_layer_download(tenant_id, *timeline_id);
|
||||
remote_timeline.awaits_download = true;
|
||||
}
|
||||
Ok(None) => return Err(ApiError::NotFound("Unknown remote timeline".to_string())),
|
||||
return json_response(StatusCode::ACCEPTED, ());
|
||||
}
|
||||
// no tenant in the index, release the lock to make the potentially lengthy download opetation
|
||||
drop(index_accessor);
|
||||
|
||||
// download index parts for every tenant timeline
|
||||
let remote_timelines = match try_download_tenant_index(state, tenant_id).await {
|
||||
Ok(Some(remote_timelines)) => remote_timelines,
|
||||
Ok(None) => return Err(ApiError::NotFound("Unknown remote tenant".to_string())),
|
||||
Err(e) => {
|
||||
error!("Failed to retrieve remote timeline data: {:?}", e);
|
||||
error!("Failed to retrieve remote tenant data: {:?}", e);
|
||||
return Err(ApiError::NotFound(
|
||||
"Failed to retrieve remote timeline".to_string(),
|
||||
"Failed to retrieve remote tenant".to_string(),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
// recheck that download is not in progress because
|
||||
// we've released the lock to avoid holding it during the download
|
||||
let mut index_accessor = remote_index.write().await;
|
||||
match index_accessor.timeline_entry_mut(&sync_id) {
|
||||
Some(remote_timeline) => {
|
||||
if remote_timeline.awaits_download {
|
||||
let tenant_entry = match index_accessor.tenant_entry_mut(&tenant_id) {
|
||||
Some(tenant_entry) => {
|
||||
if tenant_entry.has_in_progress_downloads() {
|
||||
return Err(ApiError::Conflict(
|
||||
"Timeline download is already in progress".to_string(),
|
||||
"Tenant download is already in progress".to_string(),
|
||||
));
|
||||
}
|
||||
remote_timeline.awaits_download = true;
|
||||
tenant_entry
|
||||
}
|
||||
None => index_accessor.add_timeline_entry(sync_id, new_timeline),
|
||||
None => index_accessor.add_tenant_entry(tenant_id),
|
||||
};
|
||||
|
||||
// populate remote index with the data from index part and create directories on the local filesystem
|
||||
for (timeline_id, mut remote_timeline) in remote_timelines {
|
||||
tokio::fs::create_dir_all(state.conf.timeline_path(&timeline_id, &tenant_id))
|
||||
.await
|
||||
.context("Failed to create new timeline directory")?;
|
||||
|
||||
remote_timeline.awaits_download = true;
|
||||
tenant_entry.insert(timeline_id, remote_timeline);
|
||||
// schedule actual download
|
||||
storage_sync::schedule_layer_download(tenant_id, timeline_id);
|
||||
}
|
||||
storage_sync::schedule_layer_download(tenant_id, timeline_id);
|
||||
|
||||
json_response(StatusCode::ACCEPTED, ())
|
||||
}
|
||||
|
||||
async fn try_download_index_part_data(
|
||||
async fn try_download_tenant_index(
|
||||
state: &State,
|
||||
sync_id: ZTenantTimelineId,
|
||||
) -> anyhow::Result<Option<RemoteTimeline>> {
|
||||
let index_part = match state.remote_storage.as_ref() {
|
||||
tenant_id: ZTenantId,
|
||||
) -> anyhow::Result<Option<Vec<(ZTimelineId, RemoteTimeline)>>> {
|
||||
let index_parts = match state.remote_storage.as_ref() {
|
||||
Some(GenericRemoteStorage::Local(local_storage)) => {
|
||||
storage_sync::download_index_part(state.conf, local_storage, sync_id).await
|
||||
storage_sync::download_tenant_index_parts(state.conf, local_storage, tenant_id).await
|
||||
}
|
||||
// FIXME here s3 storage contains its own limits, that are separate from sync storage thread ones
|
||||
// because it is a different instance. We can move this limit to some global static
|
||||
// or use one instance everywhere.
|
||||
Some(GenericRemoteStorage::S3(s3_storage)) => {
|
||||
storage_sync::download_index_part(state.conf, s3_storage, sync_id).await
|
||||
storage_sync::download_tenant_index_parts(state.conf, s3_storage, tenant_id).await
|
||||
}
|
||||
None => return Ok(None),
|
||||
}
|
||||
.with_context(|| format!("Failed to download index part for timeline {sync_id}"))?;
|
||||
.with_context(|| format!("Failed to download index parts for tenant {tenant_id}"))?;
|
||||
|
||||
let timeline_path = state
|
||||
.conf
|
||||
.timeline_path(&sync_id.timeline_id, &sync_id.tenant_id);
|
||||
RemoteTimeline::from_index_part(&timeline_path, index_part)
|
||||
.map(Some)
|
||||
.with_context(|| {
|
||||
format!("Failed to convert index part into remote timeline for timeline {sync_id}")
|
||||
})
|
||||
let mut remote_timelines = Vec::with_capacity(index_parts.len());
|
||||
for (timeline_id, index_part) in index_parts {
|
||||
let timeline_path = state.conf.timeline_path(&timeline_id, &tenant_id);
|
||||
let remote_timeline = RemoteTimeline::from_index_part(&timeline_path, index_part)
|
||||
.with_context(|| {
|
||||
format!("Failed to convert index part into remote timeline for timeline {tenant_id}/{timeline_id}")
|
||||
})?;
|
||||
remote_timelines.push((timeline_id, remote_timeline));
|
||||
}
|
||||
Ok(Some(remote_timelines))
|
||||
}
|
||||
|
||||
async fn timeline_detach_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
async fn timeline_detach_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
json_response(StatusCode::GONE, ())
|
||||
}
|
||||
|
||||
async fn tenant_detach_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let _enter =
|
||||
info_span!("timeline_detach_handler", tenant = %tenant_id, timeline = %timeline_id)
|
||||
.entered();
|
||||
let _enter = info_span!("tenant_detach_handler", tenant = %tenant_id).entered();
|
||||
let state = get_state(&request);
|
||||
tenant_mgr::detach_timeline(state.conf, tenant_id, timeline_id)
|
||||
tenant_mgr::detach_tenant(state.conf, tenant_id)
|
||||
})
|
||||
.await
|
||||
.map_err(ApiError::from_err)??;
|
||||
@@ -523,6 +534,8 @@ pub fn make_router(
|
||||
.put("/v1/tenant/config", tenant_config_handler)
|
||||
.get("/v1/tenant/:tenant_id/timeline", timeline_list_handler)
|
||||
.post("/v1/tenant/:tenant_id/timeline", timeline_create_handler)
|
||||
.post("/v1/tenant/:tenant_id/attach", tenant_attach_handler)
|
||||
.post("/v1/tenant/:tenant_id/detach", tenant_detach_handler)
|
||||
.get(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id",
|
||||
timeline_detail_handler,
|
||||
|
||||
@@ -331,19 +331,19 @@ impl Repository for LayeredRepository {
|
||||
/// metrics collection.
|
||||
fn gc_iteration(
|
||||
&self,
|
||||
target_timelineid: Option<ZTimelineId>,
|
||||
target_timeline_id: Option<ZTimelineId>,
|
||||
horizon: u64,
|
||||
pitr: Duration,
|
||||
checkpoint_before_gc: bool,
|
||||
) -> Result<GcResult> {
|
||||
let timeline_str = target_timelineid
|
||||
let timeline_str = target_timeline_id
|
||||
.map(|x| x.to_string())
|
||||
.unwrap_or_else(|| "-".to_string());
|
||||
|
||||
STORAGE_TIME
|
||||
.with_label_values(&["gc", &self.tenant_id.to_string(), &timeline_str])
|
||||
.observe_closure_duration(|| {
|
||||
self.gc_iteration_internal(target_timelineid, horizon, pitr, checkpoint_before_gc)
|
||||
self.gc_iteration_internal(target_timeline_id, horizon, pitr, checkpoint_before_gc)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -410,28 +410,6 @@ impl Repository for LayeredRepository {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn detach_timeline(&self, timeline_id: ZTimelineId) -> anyhow::Result<()> {
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
// check no child timelines, because detach will remove files, which will brake child branches
|
||||
// FIXME this can still be violated because we do not guarantee
|
||||
// that all ancestors are downloaded/attached to the same pageserver
|
||||
let num_children = timelines
|
||||
.iter()
|
||||
.filter(|(_, entry)| entry.ancestor_timeline_id() == Some(timeline_id))
|
||||
.count();
|
||||
|
||||
ensure!(
|
||||
num_children == 0,
|
||||
"Cannot detach timeline which has child timelines"
|
||||
);
|
||||
|
||||
ensure!(
|
||||
timelines.remove(&timeline_id).is_some(),
|
||||
"Cannot detach timeline {timeline_id} that is not available locally"
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn apply_timeline_remote_sync_status_update(
|
||||
&self,
|
||||
timeline_id: ZTimelineId,
|
||||
@@ -839,13 +817,13 @@ impl LayeredRepository {
|
||||
// we do.
|
||||
fn gc_iteration_internal(
|
||||
&self,
|
||||
target_timelineid: Option<ZTimelineId>,
|
||||
target_timeline_id: Option<ZTimelineId>,
|
||||
horizon: u64,
|
||||
pitr: Duration,
|
||||
checkpoint_before_gc: bool,
|
||||
) -> Result<GcResult> {
|
||||
let _span_guard =
|
||||
info_span!("gc iteration", tenant = %self.tenant_id, timeline = ?target_timelineid)
|
||||
info_span!("gc iteration", tenant = %self.tenant_id, timeline = ?target_timeline_id)
|
||||
.entered();
|
||||
let mut totals: GcResult = Default::default();
|
||||
let now = Instant::now();
|
||||
@@ -859,6 +837,12 @@ impl LayeredRepository {
|
||||
let mut timeline_ids = Vec::new();
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
|
||||
if let Some(target_timeline_id) = target_timeline_id.as_ref() {
|
||||
if timelines.get(target_timeline_id).is_none() {
|
||||
bail!("gc target timeline does not exist")
|
||||
}
|
||||
};
|
||||
|
||||
for (timeline_id, timeline_entry) in timelines.iter() {
|
||||
timeline_ids.push(*timeline_id);
|
||||
|
||||
@@ -867,7 +851,7 @@ impl LayeredRepository {
|
||||
// Somewhat related: https://github.com/zenithdb/zenith/issues/999
|
||||
if let Some(ancestor_timeline_id) = &timeline_entry.ancestor_timeline_id() {
|
||||
// If target_timeline is specified, we only need to know branchpoints of its children
|
||||
if let Some(timelineid) = target_timelineid {
|
||||
if let Some(timelineid) = target_timeline_id {
|
||||
if ancestor_timeline_id == &timelineid {
|
||||
all_branchpoints
|
||||
.insert((*ancestor_timeline_id, timeline_entry.ancestor_lsn()));
|
||||
@@ -882,7 +866,7 @@ impl LayeredRepository {
|
||||
|
||||
// Ok, we now know all the branch points.
|
||||
// Perform GC for each timeline.
|
||||
for timelineid in timeline_ids.into_iter() {
|
||||
for timeline_id in timeline_ids.into_iter() {
|
||||
if thread_mgr::is_shutdown_requested() {
|
||||
// We were requested to shut down. Stop and return with the progress we
|
||||
// made.
|
||||
@@ -891,12 +875,12 @@ impl LayeredRepository {
|
||||
|
||||
// Timeline is known to be local and loaded.
|
||||
let timeline = self
|
||||
.get_timeline_load_internal(timelineid, &mut *timelines)?
|
||||
.get_timeline_load_internal(timeline_id, &mut *timelines)?
|
||||
.expect("checked above that timeline is local and loaded");
|
||||
|
||||
// If target_timeline is specified, only GC it
|
||||
if let Some(target_timelineid) = target_timelineid {
|
||||
if timelineid != target_timelineid {
|
||||
if let Some(target_timelineid) = target_timeline_id {
|
||||
if timeline_id != target_timelineid {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@@ -905,8 +889,8 @@ impl LayeredRepository {
|
||||
drop(timelines);
|
||||
let branchpoints: Vec<Lsn> = all_branchpoints
|
||||
.range((
|
||||
Included((timelineid, Lsn(0))),
|
||||
Included((timelineid, Lsn(u64::MAX))),
|
||||
Included((timeline_id, Lsn(0))),
|
||||
Included((timeline_id, Lsn(u64::MAX))),
|
||||
))
|
||||
.map(|&x| x.1)
|
||||
.collect();
|
||||
@@ -916,7 +900,7 @@ impl LayeredRepository {
|
||||
// used in tests, so we want as deterministic results as possible.
|
||||
if checkpoint_before_gc {
|
||||
timeline.checkpoint(CheckpointConfig::Forced)?;
|
||||
info!("timeline {} checkpoint_before_gc done", timelineid);
|
||||
info!("timeline {} checkpoint_before_gc done", timeline_id);
|
||||
}
|
||||
timeline.update_gc_info(branchpoints, cutoff, pitr);
|
||||
let result = timeline.gc()?;
|
||||
|
||||
@@ -260,9 +260,6 @@ pub trait Repository: Send + Sync {
|
||||
/// api's 'compact' command.
|
||||
fn compaction_iteration(&self) -> Result<()>;
|
||||
|
||||
/// detaches timeline-related in-memory data.
|
||||
fn detach_timeline(&self, timeline_id: ZTimelineId) -> Result<()>;
|
||||
|
||||
// Allows to retrieve remote timeline index from the repo. Used in walreceiver to grab remote consistent lsn.
|
||||
fn get_remote_index(&self) -> &RemoteIndex;
|
||||
}
|
||||
@@ -537,7 +534,7 @@ pub mod repo_harness {
|
||||
TenantConfOpt::from(self.tenant_conf),
|
||||
walredo_mgr,
|
||||
self.tenant_id,
|
||||
RemoteIndex::empty(),
|
||||
RemoteIndex::default(),
|
||||
false,
|
||||
);
|
||||
// populate repo with locally available timelines
|
||||
|
||||
@@ -192,6 +192,8 @@ use metrics::{
|
||||
use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};
|
||||
|
||||
pub use self::download::download_index_part;
|
||||
pub use self::download::download_tenant_index_parts;
|
||||
pub use self::download::try_download_index_parts;
|
||||
pub use self::download::TEMP_DOWNLOAD_EXTENSION;
|
||||
|
||||
lazy_static! {
|
||||
@@ -301,7 +303,7 @@ pub fn start_local_timeline_sync(
|
||||
}
|
||||
Ok(SyncStartupData {
|
||||
local_timeline_init_statuses,
|
||||
remote_index: RemoteIndex::empty(),
|
||||
remote_index: RemoteIndex::default(),
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -835,7 +837,7 @@ where
|
||||
.build()
|
||||
.context("Failed to create storage sync runtime")?;
|
||||
|
||||
let applicable_index_parts = runtime.block_on(try_fetch_index_parts(
|
||||
let applicable_index_parts = runtime.block_on(try_download_index_parts(
|
||||
conf,
|
||||
&storage,
|
||||
local_timeline_files.keys().copied().collect(),
|
||||
@@ -918,16 +920,59 @@ fn storage_sync_loop<P, S>(
|
||||
});
|
||||
|
||||
match loop_step {
|
||||
ControlFlow::Continue(new_timeline_states) => {
|
||||
if new_timeline_states.is_empty() {
|
||||
debug!("Sync loop step completed, no new timeline states");
|
||||
ControlFlow::Continue(updated_tenants) => {
|
||||
if updated_tenants.is_empty() {
|
||||
debug!("Sync loop step completed, no new tenant states");
|
||||
} else {
|
||||
info!(
|
||||
"Sync loop step completed, {} new timeline state update(s)",
|
||||
new_timeline_states.len()
|
||||
"Sync loop step completed, {} new tenant state update(s)",
|
||||
updated_tenants.len()
|
||||
);
|
||||
// Batch timeline download registration to ensure that the external registration code won't block any running tasks before.
|
||||
apply_timeline_sync_status_updates(conf, &index, new_timeline_states);
|
||||
let index_accessor = runtime.block_on(index.write());
|
||||
for tenant_id in updated_tenants {
|
||||
let tenant_entry = match index_accessor.tenant_entry(&tenant_id) {
|
||||
Some(tenant_entry) => tenant_entry,
|
||||
None => {
|
||||
error!(
|
||||
"cannot find tenant in remote index for timeline sync update"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if tenant_entry.has_in_progress_downloads() {
|
||||
info!("Tenant {tenant_id} has pending timeline downloads, skipping repository registration");
|
||||
continue;
|
||||
} else {
|
||||
info!(
|
||||
"Tenant {tenant_id} download completed. Registering in repository"
|
||||
);
|
||||
// Here we assume that if tenant has no in-progress downloads that
|
||||
// means that it is the last completed timeline download that triggered
|
||||
// sync status update. So we look at the index for available timelines
|
||||
// and register them all at once in a repository for download
|
||||
// to be submitted in a single operation to repository
|
||||
// so it can apply them at once to internal timeline map.
|
||||
let sync_status_updates: HashMap<
|
||||
ZTimelineId,
|
||||
TimelineSyncStatusUpdate,
|
||||
> = tenant_entry
|
||||
.keys()
|
||||
.copied()
|
||||
.map(|timeline_id| {
|
||||
(timeline_id, TimelineSyncStatusUpdate::Downloaded)
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Batch timeline download registration to ensure that the external registration code won't block any running tasks before.
|
||||
apply_timeline_sync_status_updates(
|
||||
conf,
|
||||
&index,
|
||||
tenant_id,
|
||||
sync_status_updates,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
ControlFlow::Break(()) => {
|
||||
@@ -945,7 +990,7 @@ async fn process_batches<P, S>(
|
||||
index: &RemoteIndex,
|
||||
batched_tasks: HashMap<ZTenantTimelineId, SyncTaskBatch>,
|
||||
sync_queue: &SyncQueue,
|
||||
) -> HashMap<ZTenantId, HashMap<ZTimelineId, TimelineSyncStatusUpdate>>
|
||||
) -> HashSet<ZTenantId>
|
||||
where
|
||||
P: Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
@@ -970,18 +1015,13 @@ where
|
||||
})
|
||||
.collect::<FuturesUnordered<_>>();
|
||||
|
||||
let mut new_timeline_states: HashMap<
|
||||
ZTenantId,
|
||||
HashMap<ZTimelineId, TimelineSyncStatusUpdate>,
|
||||
> = HashMap::new();
|
||||
let mut new_timeline_states = HashSet::new();
|
||||
|
||||
// we purposely ignore actual state update, because we're waiting for last timeline download to happen
|
||||
while let Some((sync_id, state_update)) = sync_results.next().await {
|
||||
debug!("Finished storage sync task for sync id {sync_id}");
|
||||
if let Some(state_update) = state_update {
|
||||
new_timeline_states
|
||||
.entry(sync_id.tenant_id)
|
||||
.or_default()
|
||||
.insert(sync_id.timeline_id, state_update);
|
||||
if state_update.is_some() {
|
||||
new_timeline_states.insert(sync_id.tenant_id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1458,35 +1498,6 @@ async fn validate_task_retries<T>(
|
||||
ControlFlow::Continue(sync_data)
|
||||
}
|
||||
|
||||
async fn try_fetch_index_parts<P, S>(
|
||||
conf: &'static PageServerConf,
|
||||
storage: &S,
|
||||
keys: HashSet<ZTenantTimelineId>,
|
||||
) -> HashMap<ZTenantTimelineId, IndexPart>
|
||||
where
|
||||
P: Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
let mut index_parts = HashMap::with_capacity(keys.len());
|
||||
|
||||
let mut part_downloads = keys
|
||||
.into_iter()
|
||||
.map(|id| async move { (id, download_index_part(conf, storage, id).await) })
|
||||
.collect::<FuturesUnordered<_>>();
|
||||
|
||||
while let Some((id, part_upload_result)) = part_downloads.next().await {
|
||||
match part_upload_result {
|
||||
Ok(index_part) => {
|
||||
debug!("Successfully fetched index part for {id}");
|
||||
index_parts.insert(id, index_part);
|
||||
}
|
||||
Err(e) => warn!("Failed to fetch index part for {id}: {e}"),
|
||||
}
|
||||
}
|
||||
|
||||
index_parts
|
||||
}
|
||||
|
||||
fn schedule_first_sync_tasks(
|
||||
index: &mut RemoteTimelineIndex,
|
||||
sync_queue: &SyncQueue,
|
||||
|
||||
@@ -1,10 +1,14 @@
|
||||
//! Timeline synchronization logic to fetch the layer files from remote storage into pageserver's local directory.
|
||||
|
||||
use std::{collections::HashSet, fmt::Debug, path::Path};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
fmt::Debug,
|
||||
path::Path,
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use remote_storage::{path_with_suffix_extension, RemoteStorage};
|
||||
use remote_storage::{path_with_suffix_extension, RemoteObjectName, RemoteStorage};
|
||||
use tokio::{
|
||||
fs,
|
||||
io::{self, AsyncWriteExt},
|
||||
@@ -14,7 +18,7 @@ use tracing::{debug, error, info, warn};
|
||||
use crate::{
|
||||
config::PageServerConf, layered_repository::metadata::metadata_path, storage_sync::SyncTask,
|
||||
};
|
||||
use utils::zid::ZTenantTimelineId;
|
||||
use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};
|
||||
|
||||
use super::{
|
||||
index::{IndexPart, RemoteTimeline},
|
||||
@@ -23,6 +27,105 @@ use super::{
|
||||
|
||||
pub const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
|
||||
|
||||
/// FIXME: Needs cleanup. Currently it swallows errors. Here we need to ensure that
|
||||
/// we successfully downloaded all metadata parts for one tenant.
|
||||
/// And successful includes absence of index_part in the remote. Because it is valid situation
|
||||
/// when timeline was just created and pageserver restarted before upload of index part was completed.
|
||||
/// But currently RemoteStorage interface does not provide this knowledge because it uses
|
||||
/// anyhow::Error as an error type. So this needs a refactoring.
|
||||
///
|
||||
/// In other words we need to yield only complete sets of tenant timelines.
|
||||
/// Failure for one timeline of a tenant should exclude whole tenant from returned hashmap.
|
||||
/// So there are two requirements: keep everything in one futures unordered
|
||||
/// to allow higher concurrency. Mark tenants as failed independently.
|
||||
/// That requires some bookeeping.
|
||||
pub async fn try_download_index_parts<P, S>(
|
||||
conf: &'static PageServerConf,
|
||||
storage: &S,
|
||||
keys: HashSet<ZTenantTimelineId>,
|
||||
) -> HashMap<ZTenantId, HashMap<ZTimelineId, IndexPart>>
|
||||
where
|
||||
P: Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
let mut index_parts: HashMap<ZTenantId, HashMap<ZTimelineId, IndexPart>> = HashMap::new();
|
||||
|
||||
let mut part_downloads = keys
|
||||
.into_iter()
|
||||
.map(|id| async move { (id, download_index_part(conf, storage, id).await) })
|
||||
.collect::<FuturesUnordered<_>>();
|
||||
|
||||
while let Some((id, part_upload_result)) = part_downloads.next().await {
|
||||
match part_upload_result {
|
||||
Ok(index_part) => {
|
||||
debug!("Successfully fetched index part for {id}");
|
||||
index_parts
|
||||
.entry(id.tenant_id)
|
||||
.or_default()
|
||||
.insert(id.timeline_id, index_part);
|
||||
}
|
||||
Err(e) => error!("Failed to fetch index part for {id}: {e}"),
|
||||
}
|
||||
}
|
||||
|
||||
index_parts
|
||||
}
|
||||
|
||||
pub async fn download_tenant_index_parts<P, S>(
|
||||
conf: &'static PageServerConf,
|
||||
storage: &S,
|
||||
tenant_id: ZTenantId,
|
||||
) -> anyhow::Result<HashMap<ZTimelineId, IndexPart>>
|
||||
where
|
||||
P: RemoteObjectName + Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
let tenant_path = conf.timelines_path(&tenant_id);
|
||||
let tenant_storage_path = storage.remote_object_id(&tenant_path).with_context(|| {
|
||||
format!(
|
||||
"Failed to get tenant storage path for local path '{}'",
|
||||
tenant_path.display()
|
||||
)
|
||||
})?;
|
||||
let timelines = storage
|
||||
.list_prefixes(Some(tenant_storage_path))
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to list tenant storage path to get remote timelines to download: {}",
|
||||
tenant_id
|
||||
)
|
||||
})?;
|
||||
|
||||
let mut sync_ids = HashSet::new();
|
||||
|
||||
for timeline_remote_storage_key in timelines {
|
||||
let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| {
|
||||
anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_id}")
|
||||
})?;
|
||||
|
||||
let timeline_id: ZTimelineId = object_name
|
||||
.parse()
|
||||
.with_context(|| {
|
||||
format!("failed to parse object name into timeline id for tenant {tenant_id} '{object_name}'")
|
||||
})?;
|
||||
|
||||
sync_ids.insert(ZTenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
});
|
||||
}
|
||||
|
||||
let index_parts = try_download_index_parts(conf, storage, sync_ids)
|
||||
.await
|
||||
.remove(&tenant_id)
|
||||
.ok_or(anyhow::anyhow!(
|
||||
"Missing tenant index parts. This is a bug."
|
||||
))?;
|
||||
|
||||
Ok(index_parts)
|
||||
}
|
||||
|
||||
/// Retrieves index data from the remote storage for a given timeline.
|
||||
pub async fn download_index_part<P, S>(
|
||||
conf: &'static PageServerConf,
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
//! Able to restore itself from the storage index parts, that are located in every timeline's remote directory and contain all data about
|
||||
//! remote timeline layers and its metadata.
|
||||
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
path::{Path, PathBuf},
|
||||
@@ -14,7 +15,10 @@ use serde_with::{serde_as, DisplayFromStr};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::{config::PageServerConf, layered_repository::metadata::TimelineMetadata};
|
||||
use utils::{lsn::Lsn, zid::ZTenantTimelineId};
|
||||
use utils::{
|
||||
lsn::Lsn,
|
||||
zid::{ZTenantId, ZTenantTimelineId, ZTimelineId},
|
||||
};
|
||||
|
||||
/// A part of the filesystem path, that needs a root to become a path again.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
|
||||
@@ -41,38 +45,68 @@ impl RelativePath {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct TenantEntry(HashMap<ZTimelineId, RemoteTimeline>);
|
||||
|
||||
impl TenantEntry {
|
||||
pub fn has_in_progress_downloads(&self) -> bool {
|
||||
self.values()
|
||||
.any(|remote_timeline| remote_timeline.awaits_download)
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for TenantEntry {
|
||||
type Target = HashMap<ZTimelineId, RemoteTimeline>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for TenantEntry {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl From<HashMap<ZTimelineId, RemoteTimeline>> for TenantEntry {
|
||||
fn from(inner: HashMap<ZTimelineId, RemoteTimeline>) -> Self {
|
||||
Self(inner)
|
||||
}
|
||||
}
|
||||
|
||||
/// An index to track tenant files that exist on the remote storage.
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct RemoteTimelineIndex {
|
||||
timeline_entries: HashMap<ZTenantTimelineId, RemoteTimeline>,
|
||||
entries: HashMap<ZTenantId, TenantEntry>,
|
||||
}
|
||||
|
||||
/// A wrapper to synchronize the access to the index, should be created and used before dealing with any [`RemoteTimelineIndex`].
|
||||
#[derive(Default)]
|
||||
pub struct RemoteIndex(Arc<RwLock<RemoteTimelineIndex>>);
|
||||
|
||||
impl RemoteIndex {
|
||||
pub fn empty() -> Self {
|
||||
Self(Arc::new(RwLock::new(RemoteTimelineIndex {
|
||||
timeline_entries: HashMap::new(),
|
||||
})))
|
||||
}
|
||||
|
||||
pub fn from_parts(
|
||||
conf: &'static PageServerConf,
|
||||
index_parts: HashMap<ZTenantTimelineId, IndexPart>,
|
||||
index_parts: HashMap<ZTenantId, HashMap<ZTimelineId, IndexPart>>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let mut timeline_entries = HashMap::new();
|
||||
let mut entries: HashMap<ZTenantId, TenantEntry> = HashMap::new();
|
||||
|
||||
for (sync_id, index_part) in index_parts {
|
||||
let timeline_path = conf.timeline_path(&sync_id.timeline_id, &sync_id.tenant_id);
|
||||
let remote_timeline = RemoteTimeline::from_index_part(&timeline_path, index_part)
|
||||
.context("Failed to restore remote timeline data from index part")?;
|
||||
timeline_entries.insert(sync_id, remote_timeline);
|
||||
for (tenant_id, timelines) in index_parts {
|
||||
for (timeline_id, index_part) in timelines {
|
||||
let timeline_path = conf.timeline_path(&timeline_id, &tenant_id);
|
||||
let remote_timeline =
|
||||
RemoteTimeline::from_index_part(&timeline_path, index_part)
|
||||
.context("Failed to restore remote timeline data from index part")?;
|
||||
|
||||
entries
|
||||
.entry(tenant_id)
|
||||
.or_default()
|
||||
.insert(timeline_id, remote_timeline);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Self(Arc::new(RwLock::new(RemoteTimelineIndex {
|
||||
timeline_entries,
|
||||
}))))
|
||||
Ok(Self(Arc::new(RwLock::new(RemoteTimelineIndex { entries }))))
|
||||
}
|
||||
|
||||
pub async fn read(&self) -> tokio::sync::RwLockReadGuard<'_, RemoteTimelineIndex> {
|
||||
@@ -91,20 +125,50 @@ impl Clone for RemoteIndex {
|
||||
}
|
||||
|
||||
impl RemoteTimelineIndex {
|
||||
pub fn timeline_entry(&self, id: &ZTenantTimelineId) -> Option<&RemoteTimeline> {
|
||||
self.timeline_entries.get(id)
|
||||
pub fn timeline_entry(
|
||||
&self,
|
||||
ZTenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
}: &ZTenantTimelineId,
|
||||
) -> Option<&RemoteTimeline> {
|
||||
self.entries.get(tenant_id)?.get(timeline_id)
|
||||
}
|
||||
|
||||
pub fn timeline_entry_mut(&mut self, id: &ZTenantTimelineId) -> Option<&mut RemoteTimeline> {
|
||||
self.timeline_entries.get_mut(id)
|
||||
pub fn timeline_entry_mut(
|
||||
&mut self,
|
||||
ZTenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
}: &ZTenantTimelineId,
|
||||
) -> Option<&mut RemoteTimeline> {
|
||||
self.entries.get_mut(tenant_id)?.get_mut(timeline_id)
|
||||
}
|
||||
|
||||
pub fn add_timeline_entry(&mut self, id: ZTenantTimelineId, entry: RemoteTimeline) {
|
||||
self.timeline_entries.insert(id, entry);
|
||||
pub fn add_timeline_entry(
|
||||
&mut self,
|
||||
ZTenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
}: ZTenantTimelineId,
|
||||
entry: RemoteTimeline,
|
||||
) {
|
||||
self.entries
|
||||
.entry(tenant_id)
|
||||
.or_default()
|
||||
.insert(timeline_id, entry);
|
||||
}
|
||||
|
||||
pub fn all_sync_ids(&self) -> impl Iterator<Item = ZTenantTimelineId> + '_ {
|
||||
self.timeline_entries.keys().copied()
|
||||
pub fn tenant_entry(&self, tenant_id: &ZTenantId) -> Option<&TenantEntry> {
|
||||
self.entries.get(tenant_id)
|
||||
}
|
||||
|
||||
pub fn tenant_entry_mut(&mut self, tenant_id: &ZTenantId) -> Option<&mut TenantEntry> {
|
||||
self.entries.get_mut(tenant_id)
|
||||
}
|
||||
|
||||
pub fn add_tenant_entry(&mut self, tenant_id: ZTenantId) -> &mut TenantEntry {
|
||||
self.entries.entry(tenant_id).or_default()
|
||||
}
|
||||
|
||||
pub fn set_awaits_download(
|
||||
|
||||
@@ -165,14 +165,14 @@ pub fn init_tenant_mgr(conf: &'static PageServerConf) -> anyhow::Result<RemoteIn
|
||||
}
|
||||
|
||||
pub enum LocalTimelineUpdate {
|
||||
Detach(ZTenantTimelineId),
|
||||
Detach(ZTenantTimelineId, std::sync::mpsc::Sender<()>),
|
||||
Attach(ZTenantTimelineId, Arc<DatadirTimelineImpl>),
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for LocalTimelineUpdate {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Self::Detach(ttid) => f.debug_tuple("Remove").field(ttid).finish(),
|
||||
Self::Detach(ttid, _) => f.debug_tuple("Remove").field(ttid).finish(),
|
||||
Self::Attach(ttid, _) => f.debug_tuple("Add").field(ttid).finish(),
|
||||
}
|
||||
}
|
||||
@@ -182,32 +182,31 @@ impl std::fmt::Debug for LocalTimelineUpdate {
|
||||
pub fn apply_timeline_sync_status_updates(
|
||||
conf: &'static PageServerConf,
|
||||
remote_index: &RemoteIndex,
|
||||
sync_status_updates: HashMap<ZTenantId, HashMap<ZTimelineId, TimelineSyncStatusUpdate>>,
|
||||
tenant_id: ZTenantId,
|
||||
sync_status_updates: HashMap<ZTimelineId, TimelineSyncStatusUpdate>,
|
||||
) {
|
||||
if sync_status_updates.is_empty() {
|
||||
debug!("no sync status updates to apply");
|
||||
return;
|
||||
}
|
||||
info!(
|
||||
"Applying sync status updates for {} timelines",
|
||||
"Applying sync status updates for tenant {tenant_id} {} timelines",
|
||||
sync_status_updates.len()
|
||||
);
|
||||
debug!("Sync status updates: {sync_status_updates:?}");
|
||||
|
||||
for (tenant_id, status_updates) in sync_status_updates {
|
||||
let repo = match load_local_repo(conf, tenant_id, remote_index) {
|
||||
Ok(repo) => repo,
|
||||
Err(e) => {
|
||||
error!("Failed to load repo for tenant {tenant_id} Error: {e:?}",);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
match apply_timeline_remote_sync_status_updates(&repo, status_updates) {
|
||||
Ok(()) => info!("successfully applied sync status updates for tenant {tenant_id}"),
|
||||
Err(e) => error!(
|
||||
"Failed to apply timeline sync timeline status updates for tenant {tenant_id}: {e:?}"
|
||||
),
|
||||
let repo = match load_local_repo(conf, tenant_id, remote_index) {
|
||||
Ok(repo) => repo,
|
||||
Err(e) => {
|
||||
error!("Failed to load repo for tenant {tenant_id} Error: {e:?}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
match apply_timeline_remote_sync_status_updates(&repo, sync_status_updates) {
|
||||
Ok(()) => info!("successfully applied sync status updates for tenant {tenant_id}"),
|
||||
Err(e) => error!(
|
||||
"Failed to apply timeline sync timeline status updates for tenant {tenant_id}: {e:?}"
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -387,29 +386,49 @@ pub fn get_local_timeline_with_load(
|
||||
}
|
||||
}
|
||||
|
||||
pub fn detach_timeline(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: ZTenantId,
|
||||
timeline_id: ZTimelineId,
|
||||
) -> anyhow::Result<()> {
|
||||
// shutdown the timeline threads (this shuts down the walreceiver)
|
||||
thread_mgr::shutdown_threads(None, Some(tenant_id), Some(timeline_id));
|
||||
pub fn detach_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> anyhow::Result<()> {
|
||||
set_tenant_state(tenant_id, TenantState::Stopping)?;
|
||||
// shutdown the tenant and timeline threads: gc, compaction, page service threads)
|
||||
thread_mgr::shutdown_threads(None, Some(tenant_id), None);
|
||||
|
||||
match tenants_state::write_tenants().get_mut(&tenant_id) {
|
||||
Some(tenant) => {
|
||||
tenant
|
||||
.repo
|
||||
.detach_timeline(timeline_id)
|
||||
.context("Failed to detach inmem tenant timeline")?;
|
||||
tenant.local_timelines.remove(&timeline_id);
|
||||
// FIXME should we protect somehow from starting new threads/walreceivers when tenant is in stopping state?
|
||||
// send stop signal to wal receiver and collect join handles while holding the lock
|
||||
let walreceiver_join_handles = {
|
||||
let tenants = tenants_state::write_tenants();
|
||||
let tenant = tenants.get(&tenant_id).context("tenant not found")?;
|
||||
let mut walreceiver_join_handles = Vec::with_capacity(tenant.local_timelines.len());
|
||||
for timeline_id in tenant.local_timelines.keys() {
|
||||
let (sender, receiver) = std::sync::mpsc::channel::<()>();
|
||||
tenants_state::try_send_timeline_update(LocalTimelineUpdate::Detach(
|
||||
ZTenantTimelineId::new(tenant_id, timeline_id),
|
||||
ZTenantTimelineId::new(tenant_id, *timeline_id),
|
||||
sender,
|
||||
));
|
||||
walreceiver_join_handles.push((*timeline_id, receiver));
|
||||
}
|
||||
None => bail!("Tenant {tenant_id} not found in local tenant state"),
|
||||
// drop the tenants lock
|
||||
walreceiver_join_handles
|
||||
};
|
||||
|
||||
// wait for wal receivers to stop without holding the lock, because walreceiver
|
||||
// will attempt to change tenant state which is protected by the same global tenants lock.
|
||||
// TODO do we need a timeout here? how to handle it?
|
||||
// recv_timeout is broken: https://github.com/rust-lang/rust/issues/94518#issuecomment-1057440631
|
||||
// need to use crossbeam-channel
|
||||
for (timeline_id, join_handle) in walreceiver_join_handles {
|
||||
info!("waiting for wal receiver to shutdown timeline_id {timeline_id}");
|
||||
join_handle.recv().context("failed to join walreceiver")?;
|
||||
info!("wal receiver shutdown confirmed timeline_id {timeline_id}");
|
||||
}
|
||||
|
||||
let local_timeline_directory = conf.timeline_path(&timeline_id, &tenant_id);
|
||||
tenants_state::write_tenants().remove(&tenant_id);
|
||||
|
||||
// If removal fails there will be no way to successfully retry detach,
|
||||
// because tenant no longer exists in in memory map. And it needs to be removed from it
|
||||
// before we remove files because it contains references to repository
|
||||
// which references ephemeral files which are deleted on drop. So if we keep these references
|
||||
// code will attempt to remove files which no longer exist. This can be fixed by having shutdown
|
||||
// mechanism for repository that will clean temporary data to avoid any references to ephemeral files
|
||||
let local_timeline_directory = conf.tenant_path(&tenant_id);
|
||||
std::fs::remove_dir_all(&local_timeline_directory).with_context(|| {
|
||||
format!(
|
||||
"Failed to remove local timeline directory '{}'",
|
||||
|
||||
@@ -202,7 +202,7 @@ pub fn create_repo(
|
||||
// anymore, but I think that could still happen.
|
||||
let wal_redo_manager = Arc::new(crate::walredo::DummyRedoManager {});
|
||||
|
||||
(wal_redo_manager as _, RemoteIndex::empty())
|
||||
(wal_redo_manager as _, RemoteIndex::default())
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -264,7 +264,7 @@ async fn wal_receiver_main_thread_loop_step<'a>(
|
||||
info!("Processing timeline update: {update:?}");
|
||||
match update {
|
||||
// Timeline got detached, stop all related tasks and remove public timeline data.
|
||||
LocalTimelineUpdate::Detach(id) => {
|
||||
LocalTimelineUpdate::Detach(id, join_sender) => {
|
||||
match local_timeline_wal_receivers.get_mut(&id.tenant_id) {
|
||||
Some(wal_receivers) => {
|
||||
if let hash_map::Entry::Occupied(o) = wal_receivers.entry(id.timeline_id) {
|
||||
@@ -280,6 +280,11 @@ async fn wal_receiver_main_thread_loop_step<'a>(
|
||||
};
|
||||
{
|
||||
WAL_RECEIVER_ENTRIES.write().await.remove(&id);
|
||||
if let Err(e) = join_sender.send(()) {
|
||||
warn!("cannot send wal_receiver shutdown confirmation {e}")
|
||||
} else {
|
||||
info!("confirm walreceiver shutdown for {id}");
|
||||
}
|
||||
}
|
||||
}
|
||||
// Timeline got attached, retrieve all necessary information to start its broker loop and maintain this loop endlessly.
|
||||
|
||||
@@ -105,16 +105,3 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
branch2_cur.execute('SELECT count(*) FROM foo')
|
||||
assert branch2_cur.fetchone() == (300000, )
|
||||
|
||||
|
||||
def test_ancestor_branch_detach(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
parent_timeline_id = env.neon_cli.create_branch("test_ancestor_branch_detach_parent", "empty")
|
||||
|
||||
env.neon_cli.create_branch("test_ancestor_branch_detach_branch1",
|
||||
"test_ancestor_branch_detach_parent")
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
with pytest.raises(NeonPageserverApiException, match="Failed to detach inmem tenant timeline"):
|
||||
ps_http.timeline_detach(env.initial_tenant, parent_timeline_id)
|
||||
|
||||
49
test_runner/batch_others/test_detach.py
Normal file
49
test_runner/batch_others/test_detach.py
Normal file
@@ -0,0 +1,49 @@
|
||||
from threading import Thread
|
||||
from uuid import uuid4
|
||||
import psycopg2
|
||||
import pytest
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
|
||||
def test_detach_smoke(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant()
|
||||
pg = env.postgres.create_start('main', tenant_id=tenant_id)
|
||||
# we rely upon autocommit after each statement
|
||||
pg.safe_psql_many(queries=[
|
||||
'CREATE TABLE t(key int primary key, value text)',
|
||||
'INSERT INTO t SELECT generate_series(1,100000), \'payload\'',
|
||||
])
|
||||
|
||||
# gc should try to even start
|
||||
with pytest.raises(expected_exception=psycopg2.DatabaseError,
|
||||
match='gc target timeline does not exist'):
|
||||
env.pageserver.safe_psql(f'do_gc {tenant_id.hex} {uuid4().hex} 0')
|
||||
|
||||
gc_thread = Thread(
|
||||
target=lambda: env.pageserver.safe_psql(f'do_gc {tenant_id.hex} {timeline_id.hex} 0'), )
|
||||
gc_thread.start()
|
||||
|
||||
last_error = None
|
||||
for i in range(3):
|
||||
try:
|
||||
pageserver_http.tenant_detach(tenant_id)
|
||||
except Exception as e:
|
||||
last_error = e
|
||||
log.error(f"try {i} error detaching tenant: {e}")
|
||||
continue
|
||||
else:
|
||||
break
|
||||
# else is called if the loop finished without reaching "break"
|
||||
else:
|
||||
pytest.fail(f"could not detach timeline: {last_error}")
|
||||
|
||||
gc_thread.join(timeout=10)
|
||||
|
||||
with pytest.raises(expected_exception=psycopg2.DatabaseError,
|
||||
match=f'Tenant {tenant_id.hex} not found'):
|
||||
env.pageserver.safe_psql(f'do_gc {tenant_id.hex} {timeline_id.hex} 0')
|
||||
@@ -24,7 +24,7 @@ def check_tenant(env: NeonEnv, pageserver_http: NeonPageserverHttpClient):
|
||||
assert res_2[0] == (5000050000, )
|
||||
|
||||
pg.stop()
|
||||
pageserver_http.timeline_detach(tenant_id, timeline_id)
|
||||
pageserver_http.tenant_detach(tenant_id)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('num_timelines,num_safekeepers', [(3, 1)])
|
||||
|
||||
@@ -91,14 +91,14 @@ def test_remote_storage_backup_and_restore(neon_env_builder: NeonEnvBuilder, sto
|
||||
# Introduce failpoint in download
|
||||
env.pageserver.safe_psql(f"failpoints remote-storage-download-pre-rename=return")
|
||||
|
||||
client.timeline_attach(UUID(tenant_id), UUID(timeline_id))
|
||||
client.tenant_attach(UUID(tenant_id))
|
||||
|
||||
# is there a better way to assert that fafilpoint triggered?
|
||||
# is there a better way to assert that failpoint triggered?
|
||||
time.sleep(10)
|
||||
|
||||
# assert cannot attach timeline that is scheduled for download
|
||||
with pytest.raises(Exception, match="Timeline download is already in progress"):
|
||||
client.timeline_attach(UUID(tenant_id), UUID(timeline_id))
|
||||
with pytest.raises(Exception, match="Conflict: Tenant download is already in progress"):
|
||||
client.tenant_attach(UUID(tenant_id))
|
||||
|
||||
detail = client.timeline_detail(UUID(tenant_id), UUID(timeline_id))
|
||||
log.info("Timeline detail with active failpoint: %s", detail)
|
||||
@@ -109,7 +109,7 @@ def test_remote_storage_backup_and_restore(neon_env_builder: NeonEnvBuilder, sto
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
client.timeline_attach(UUID(tenant_id), UUID(timeline_id))
|
||||
client.tenant_attach(UUID(tenant_id))
|
||||
|
||||
log.info("waiting for timeline redownload")
|
||||
wait_until(number_of_iterations=10,
|
||||
|
||||
@@ -3,14 +3,13 @@ import os
|
||||
import pathlib
|
||||
import subprocess
|
||||
import threading
|
||||
import typing
|
||||
from uuid import UUID
|
||||
from fixtures.log_helper import log
|
||||
from typing import Optional
|
||||
from typing import Any, Dict, Optional, Tuple
|
||||
import signal
|
||||
import pytest
|
||||
|
||||
from fixtures.neon_fixtures import PgProtocol, PortDistributor, Postgres, NeonEnvBuilder, Etcd, NeonPageserverHttpClient, assert_local, wait_until, wait_for_last_record_lsn, wait_for_upload, neon_binpath, pg_distrib_dir, base_dir
|
||||
from fixtures.neon_fixtures import NeonEnv, PortDistributor, Postgres, NeonEnvBuilder, Etcd, NeonPageserverHttpClient, assert_local, wait_until, wait_for_last_record_lsn, wait_for_upload, neon_binpath, pg_distrib_dir, base_dir
|
||||
from fixtures.utils import lsn_from_hex, subprocess_capture
|
||||
|
||||
|
||||
@@ -101,6 +100,102 @@ def load(pg: Postgres, stop_event: threading.Event, load_ok_event: threading.Eve
|
||||
log.info('load thread stopped')
|
||||
|
||||
|
||||
def populate_branch(pg: Postgres, create_table: bool,
|
||||
expected_sum: Optional[int]) -> Tuple[UUID, int]:
|
||||
# insert some data
|
||||
with pg_cur(pg) as cur:
|
||||
cur.execute("SHOW neon.timeline_id")
|
||||
timeline_id = UUID(cur.fetchone()[0])
|
||||
log.info("timeline to relocate %s", timeline_id.hex)
|
||||
|
||||
# we rely upon autocommit after each statement
|
||||
# as waiting for acceptors happens there
|
||||
if create_table:
|
||||
cur.execute("CREATE TABLE t(key int, value text)")
|
||||
cur.execute("INSERT INTO t SELECT generate_series(1,1000), 'some payload'")
|
||||
if expected_sum is not None:
|
||||
cur.execute("SELECT sum(key) FROM t")
|
||||
assert cur.fetchone() == (expected_sum, )
|
||||
cur.execute("SELECT pg_current_wal_flush_lsn()")
|
||||
|
||||
current_lsn = lsn_from_hex(cur.fetchone()[0])
|
||||
return timeline_id, current_lsn
|
||||
|
||||
|
||||
def ensure_checkpoint(
|
||||
pageserver_cur,
|
||||
pageserver_http: NeonPageserverHttpClient,
|
||||
tenant_id: UUID,
|
||||
timeline_id: UUID,
|
||||
current_lsn: int,
|
||||
):
|
||||
# run checkpoint manually to be sure that data landed in remote storage
|
||||
pageserver_cur.execute(f"checkpoint {tenant_id.hex} {timeline_id.hex}")
|
||||
|
||||
# wait until pageserver successfully uploaded a checkpoint to remote storage
|
||||
wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn)
|
||||
|
||||
|
||||
def check_timeline_attached(
|
||||
new_pageserver_http_client: NeonPageserverHttpClient,
|
||||
tenant_id: UUID,
|
||||
timeline_id: UUID,
|
||||
old_timeline_detail: Dict[str, Any],
|
||||
old_current_lsn: int,
|
||||
):
|
||||
# new pageserver should be in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint
|
||||
new_timeline_detail = wait_until(
|
||||
number_of_iterations=5,
|
||||
interval=1,
|
||||
func=lambda: assert_local(new_pageserver_http_client, tenant_id, timeline_id))
|
||||
|
||||
# when load is active these checks can break because lsns are not static
|
||||
# so lets check with some margin
|
||||
assert_abs_margin_ratio(lsn_from_hex(new_timeline_detail['local']['disk_consistent_lsn']),
|
||||
lsn_from_hex(old_timeline_detail['local']['disk_consistent_lsn']),
|
||||
0.03)
|
||||
|
||||
assert_abs_margin_ratio(lsn_from_hex(new_timeline_detail['local']['disk_consistent_lsn']),
|
||||
old_current_lsn,
|
||||
0.03)
|
||||
|
||||
|
||||
def switch_pg_to_new_pageserver(env: NeonEnv,
|
||||
pg: Postgres,
|
||||
new_pageserver_port: int,
|
||||
tenant_id: UUID,
|
||||
timeline_id: UUID) -> pathlib.Path:
|
||||
pg.stop()
|
||||
|
||||
pg_config_file_path = pathlib.Path(pg.config_file_path())
|
||||
pg_config_file_path.open('a').write(
|
||||
f"\nneon.pageserver_connstring = 'postgresql://no_user:@localhost:{new_pageserver_port}'")
|
||||
|
||||
pg.start()
|
||||
|
||||
timeline_to_detach_local_path = env.repo_dir / 'tenants' / tenant_id.hex / 'timelines' / timeline_id.hex
|
||||
files_before_detach = os.listdir(timeline_to_detach_local_path)
|
||||
assert 'metadata' in files_before_detach, f'Regular timeline {timeline_to_detach_local_path} should have the metadata file,\
|
||||
but got: {files_before_detach}'
|
||||
assert len(files_before_detach) >= 2, f'Regular timeline {timeline_to_detach_local_path} should have at least one layer file,\
|
||||
but got {files_before_detach}'
|
||||
|
||||
return timeline_to_detach_local_path
|
||||
|
||||
|
||||
def post_migration_check(pg: Postgres, sum_before_migration: int, old_local_path: pathlib.Path):
|
||||
with pg_cur(pg) as cur:
|
||||
# check that data is still there
|
||||
cur.execute("SELECT sum(key) FROM t")
|
||||
assert cur.fetchone() == (sum_before_migration, )
|
||||
# check that we can write new data
|
||||
cur.execute("INSERT INTO t SELECT generate_series(1001,2000), 'some payload'")
|
||||
cur.execute("SELECT sum(key) FROM t")
|
||||
assert cur.fetchone() == (sum_before_migration + 1500500, )
|
||||
|
||||
assert not os.path.exists(old_local_path), f'After detach, local timeline dir {old_local_path} should be removed'
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'method',
|
||||
[
|
||||
@@ -126,61 +221,73 @@ def test_tenant_relocation(neon_env_builder: NeonEnvBuilder,
|
||||
# create folder for remote storage mock
|
||||
remote_storage_mock_path = env.repo_dir / 'local_fs_remote_storage'
|
||||
|
||||
tenant, _ = env.neon_cli.create_tenant(UUID("74ee8b079a0e437eb0afea7d26a07209"))
|
||||
log.info("tenant to relocate %s", tenant)
|
||||
# we use two branches to check that they are both relocated
|
||||
# first branch is used for load, compute for second one is used to
|
||||
# check that data is not lost
|
||||
|
||||
# attach does not download ancestor branches (should it?), just use root branch for now
|
||||
env.neon_cli.create_root_branch('test_tenant_relocation', tenant_id=tenant)
|
||||
tenant_id, initial_timeline_id = env.neon_cli.create_tenant(UUID("74ee8b079a0e437eb0afea7d26a07209"))
|
||||
log.info("tenant to relocate %s initial_timeline_id %s", tenant_id, initial_timeline_id)
|
||||
|
||||
tenant_pg = env.postgres.create_start(branch_name='test_tenant_relocation',
|
||||
node_name='test_tenant_relocation',
|
||||
tenant_id=tenant)
|
||||
env.neon_cli.create_branch("test_tenant_relocation_main", tenant_id=tenant_id)
|
||||
pg_main = env.postgres.create_start(branch_name='test_tenant_relocation_main',
|
||||
tenant_id=tenant_id)
|
||||
|
||||
# insert some data
|
||||
with closing(tenant_pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# save timeline for later gc call
|
||||
cur.execute("SHOW neon.timeline_id")
|
||||
timeline = UUID(cur.fetchone()[0])
|
||||
log.info("timeline to relocate %s", timeline.hex)
|
||||
timeline_id_main, current_lsn_main = populate_branch(pg_main, create_table=True, expected_sum=500500)
|
||||
|
||||
# we rely upon autocommit after each statement
|
||||
# as waiting for acceptors happens there
|
||||
cur.execute("CREATE TABLE t(key int primary key, value text)")
|
||||
cur.execute("INSERT INTO t SELECT generate_series(1,1000), 'some payload'")
|
||||
cur.execute("SELECT sum(key) FROM t")
|
||||
assert cur.fetchone() == (500500, )
|
||||
cur.execute("SELECT pg_current_wal_flush_lsn()")
|
||||
env.neon_cli.create_branch(
|
||||
new_branch_name="test_tenant_relocation_second",
|
||||
ancestor_branch_name="test_tenant_relocation_main",
|
||||
tenant_id=tenant_id,
|
||||
)
|
||||
pg_second = env.postgres.create_start(branch_name='test_tenant_relocation_second',
|
||||
tenant_id=tenant_id)
|
||||
|
||||
current_lsn = lsn_from_hex(cur.fetchone()[0])
|
||||
# do not select sum for second branch, this select will wait until wal reaches pageserver
|
||||
# try to check another case when pageserver didnt receive that wal and needs to get it from safekeeper
|
||||
timeline_id_second, current_lsn_second = populate_branch(pg_second, create_table=False, expected_sum=1001000)
|
||||
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
# wait until pageserver receives that data
|
||||
wait_for_last_record_lsn(pageserver_http, tenant, timeline, current_lsn)
|
||||
timeline_detail = assert_local(pageserver_http, tenant, timeline)
|
||||
wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id_main, current_lsn_main)
|
||||
timeline_detail_main = assert_local(pageserver_http, tenant_id, timeline_id_main)
|
||||
|
||||
wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id_second, current_lsn_second)
|
||||
timeline_detail_second = assert_local(pageserver_http, tenant_id, timeline_id_second)
|
||||
|
||||
if with_load == 'with_load':
|
||||
# create load table
|
||||
with pg_cur(tenant_pg) as cur:
|
||||
with pg_cur(pg_main) as cur:
|
||||
cur.execute("CREATE TABLE load(value text)")
|
||||
|
||||
load_stop_event = threading.Event()
|
||||
load_ok_event = threading.Event()
|
||||
load_thread = threading.Thread(
|
||||
target=load,
|
||||
args=(tenant_pg, load_stop_event, load_ok_event),
|
||||
args=(pg_main, load_stop_event, load_ok_event),
|
||||
daemon=True, # To make sure the child dies when the parent errors
|
||||
)
|
||||
load_thread.start()
|
||||
|
||||
# run checkpoint manually to be sure that data landed in remote storage
|
||||
with closing(env.pageserver.connect()) as psconn:
|
||||
with psconn.cursor() as pscur:
|
||||
pscur.execute(f"checkpoint {tenant.hex} {timeline.hex}")
|
||||
# this requirement introduces a problem
|
||||
# if user creates a branch during migration
|
||||
# it wont appear on the new pageserver
|
||||
with pg_cur(env.pageserver) as cur:
|
||||
ensure_checkpoint(
|
||||
cur,
|
||||
pageserver_http=pageserver_http,
|
||||
tenant_id=tenant_id,
|
||||
timeline_id=timeline_id_main,
|
||||
current_lsn=current_lsn_main,
|
||||
)
|
||||
|
||||
# wait until pageserver successfully uploaded a checkpoint to remote storage
|
||||
wait_for_upload(pageserver_http, tenant, timeline, current_lsn)
|
||||
ensure_checkpoint(
|
||||
cur,
|
||||
pageserver_http=pageserver_http,
|
||||
tenant_id=tenant_id,
|
||||
timeline_id=timeline_id_second,
|
||||
current_lsn=current_lsn_second,
|
||||
)
|
||||
|
||||
log.info("inititalizing new pageserver")
|
||||
# bootstrap second pageserver
|
||||
@@ -207,7 +314,7 @@ def test_tenant_relocation(neon_env_builder: NeonEnvBuilder,
|
||||
"python",
|
||||
os.path.join(base_dir, "scripts/export_import_between_pageservers.py"),
|
||||
"--tenant-id",
|
||||
tenant.hex,
|
||||
tenant_id.hex,
|
||||
"--from-host",
|
||||
"localhost",
|
||||
"--from-http-port",
|
||||
@@ -228,22 +335,23 @@ def test_tenant_relocation(neon_env_builder: NeonEnvBuilder,
|
||||
subprocess_capture(str(env.repo_dir), cmd, check=True)
|
||||
elif method == "minor":
|
||||
# call to attach timeline to new pageserver
|
||||
new_pageserver_http.timeline_attach(tenant, timeline)
|
||||
new_pageserver_http.tenant_attach(tenant_id)
|
||||
|
||||
# new pageserver should be in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint
|
||||
new_timeline_detail = wait_until(
|
||||
number_of_iterations=5,
|
||||
interval=1,
|
||||
func=lambda: assert_local(new_pageserver_http, tenant, timeline))
|
||||
check_timeline_attached(
|
||||
new_pageserver_http,
|
||||
tenant_id,
|
||||
timeline_id_main,
|
||||
timeline_detail_main,
|
||||
current_lsn_main,
|
||||
)
|
||||
|
||||
# when load is active these checks can break because lsns are not static
|
||||
# so lets check with some margin
|
||||
assert_abs_margin_ratio(
|
||||
lsn_from_hex(new_timeline_detail['local']['disk_consistent_lsn']),
|
||||
lsn_from_hex(timeline_detail['local']['disk_consistent_lsn']),
|
||||
0.03)
|
||||
|
||||
tenant_pg.stop()
|
||||
check_timeline_attached(
|
||||
new_pageserver_http,
|
||||
tenant_id,
|
||||
timeline_id_second,
|
||||
timeline_detail_second,
|
||||
current_lsn_second,
|
||||
)
|
||||
|
||||
# rewrite neon cli config to use new pageserver for basebackup to start new compute
|
||||
cli_config_lines = (env.repo_dir / 'config').read_text().splitlines()
|
||||
@@ -251,33 +359,29 @@ def test_tenant_relocation(neon_env_builder: NeonEnvBuilder,
|
||||
cli_config_lines[-1] = f"listen_pg_addr = 'localhost:{new_pageserver_pg_port}'"
|
||||
(env.repo_dir / 'config').write_text('\n'.join(cli_config_lines))
|
||||
|
||||
tenant_pg_config_file_path = pathlib.Path(tenant_pg.config_file_path())
|
||||
tenant_pg_config_file_path.open('a').write(
|
||||
f"\nneon.pageserver_connstring = 'postgresql://no_user:@localhost:{new_pageserver_pg_port}'"
|
||||
old_local_path_main = switch_pg_to_new_pageserver(
|
||||
env,
|
||||
pg_main,
|
||||
new_pageserver_pg_port,
|
||||
tenant_id,
|
||||
timeline_id_main,
|
||||
)
|
||||
|
||||
tenant_pg.start()
|
||||
|
||||
timeline_to_detach_local_path = env.repo_dir / 'tenants' / tenant.hex / 'timelines' / timeline.hex
|
||||
files_before_detach = os.listdir(timeline_to_detach_local_path)
|
||||
assert 'metadata' in files_before_detach, f'Regular timeline {timeline_to_detach_local_path} should have the metadata file,\
|
||||
but got: {files_before_detach}'
|
||||
assert len(files_before_detach) > 2, f'Regular timeline {timeline_to_detach_local_path} should have at least one layer file,\
|
||||
but got {files_before_detach}'
|
||||
old_local_path_second = switch_pg_to_new_pageserver(
|
||||
env,
|
||||
pg_second,
|
||||
new_pageserver_pg_port,
|
||||
tenant_id,
|
||||
timeline_id_second,
|
||||
)
|
||||
|
||||
# detach tenant from old pageserver before we check
|
||||
# that all the data is there to be sure that old pageserver
|
||||
# is no longer involved, and if it is, we will see the errors
|
||||
pageserver_http.timeline_detach(tenant, timeline)
|
||||
pageserver_http.tenant_detach(tenant_id)
|
||||
|
||||
with pg_cur(tenant_pg) as cur:
|
||||
# check that data is still there
|
||||
cur.execute("SELECT sum(key) FROM t")
|
||||
assert cur.fetchone() == (500500, )
|
||||
# check that we can write new data
|
||||
cur.execute("INSERT INTO t SELECT generate_series(1001,2000), 'some payload'")
|
||||
cur.execute("SELECT sum(key) FROM t")
|
||||
assert cur.fetchone() == (2001000, )
|
||||
post_migration_check(pg_main, 500500, old_local_path_main)
|
||||
post_migration_check(pg_second, 1001000, old_local_path_second)
|
||||
|
||||
if with_load == 'with_load':
|
||||
assert load_ok_event.wait(3)
|
||||
@@ -286,8 +390,6 @@ def test_tenant_relocation(neon_env_builder: NeonEnvBuilder,
|
||||
load_thread.join(timeout=10)
|
||||
log.info('load thread stopped')
|
||||
|
||||
assert not os.path.exists(timeline_to_detach_local_path), f'After detach, local timeline dir {timeline_to_detach_local_path} should be removed'
|
||||
|
||||
# bring old pageserver back for clean shutdown via neon cli
|
||||
# new pageserver will be shut down by the context manager
|
||||
cli_config_lines = (env.repo_dir / 'config').read_text().splitlines()
|
||||
|
||||
@@ -795,16 +795,12 @@ class NeonPageserverHttpClient(requests.Session):
|
||||
def check_status(self):
|
||||
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
|
||||
|
||||
def timeline_attach(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID):
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}/attach",
|
||||
)
|
||||
def tenant_attach(self, tenant_id: uuid.UUID):
|
||||
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/attach")
|
||||
self.verbose_error(res)
|
||||
|
||||
def timeline_detach(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID):
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}/detach",
|
||||
)
|
||||
def tenant_detach(self, tenant_id: uuid.UUID):
|
||||
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/detach")
|
||||
self.verbose_error(res)
|
||||
|
||||
def timeline_create(
|
||||
|
||||
Reference in New Issue
Block a user