Compare commits

..

2 Commits

Author SHA1 Message Date
Christian Schwarz
3b359c9e51 improve instructions 2023-10-12 10:31:52 +02:00
Christian Schwarz
cb19f95872 WIP: DR from S3 object versioning 2023-10-11 18:10:47 +02:00
13 changed files with 608 additions and 191 deletions

25
Cargo.lock generated
View File

@@ -570,9 +570,9 @@ dependencies = [
[[package]]
name = "aws-smithy-types"
version = "0.56.0"
version = "0.56.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eed0a94eefd845a2a78677f1b72f02fa75802d38f7f59be675add140279aa8bf"
checksum = "d90dbc8da2f6be461fa3c1906b20af8f79d14968fe47f2b7d29d086f62a51728"
dependencies = [
"base64-simd",
"itoa",
@@ -2420,6 +2420,16 @@ dependencies = [
"winapi",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]]
name = "num-bigint"
version = "0.4.3"
@@ -2652,6 +2662,12 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "pagectl"
version = "0.1.0"
@@ -3748,9 +3764,11 @@ dependencies = [
"aws-config",
"aws-sdk-s3",
"aws-smithy-http",
"aws-smithy-types",
"aws-types",
"bincode",
"bytes",
"camino",
"chrono",
"clap",
"crc32c",
@@ -3758,9 +3776,11 @@ dependencies = [
"futures-util",
"hex",
"histogram",
"humantime",
"itertools",
"pageserver",
"rand",
"remote_storage",
"reqwest",
"serde",
"serde_json",
@@ -4942,6 +4962,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"serde",

View File

@@ -153,7 +153,7 @@ impl FlushOp {
#[derive(Clone, Debug)]
pub struct DeletionQueueClient {
tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
tx: tokio::sync::mpsc::Sender<ListWriterQueueMessage>,
executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
@@ -416,7 +416,7 @@ pub enum DeletionQueueError {
impl DeletionQueueClient {
pub(crate) fn broken() -> Self {
// Channels whose receivers are immediately dropped.
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
let (tx, _rx) = tokio::sync::mpsc::channel(1);
let (executor_tx, _executor_rx) = tokio::sync::mpsc::channel(1);
Self {
tx,
@@ -428,12 +428,12 @@ impl DeletionQueueClient {
/// This is cancel-safe. If you drop the future before it completes, the message
/// is not pushed, although in the context of the deletion queue it doesn't matter: once
/// we decide to do a deletion the decision is always final.
fn do_push<T>(
async fn do_push<T>(
&self,
queue: &tokio::sync::mpsc::UnboundedSender<T>,
queue: &tokio::sync::mpsc::Sender<T>,
msg: T,
) -> Result<(), DeletionQueueError> {
match queue.send(msg) {
match queue.send(msg).await {
Ok(_) => Ok(()),
Err(e) => {
// This shouldn't happen, we should shut down all tenants before
@@ -445,7 +445,7 @@ impl DeletionQueueClient {
}
}
pub(crate) fn recover(
pub(crate) async fn recover(
&self,
attached_tenants: HashMap<TenantId, Generation>,
) -> Result<(), DeletionQueueError> {
@@ -453,6 +453,7 @@ impl DeletionQueueClient {
&self.tx,
ListWriterQueueMessage::Recover(RecoverOp { attached_tenants }),
)
.await
}
/// When a Timeline wishes to update the remote_consistent_lsn that it exposes to the outside
@@ -525,21 +526,6 @@ impl DeletionQueueClient {
return self.flush_immediate().await;
}
self.push_layers_sync(tenant_id, timeline_id, current_generation, layers)
}
/// When a Tenant has a generation, push_layers is always synchronous because
/// the ListValidator channel is an unbounded channel.
///
/// This can be merged into push_layers when we remove the Generation-less mode
/// support (`<https://github.com/neondatabase/neon/issues/5395>`)
pub(crate) fn push_layers_sync(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
current_generation: Generation,
layers: Vec<(LayerFileName, Generation)>,
) -> Result<(), DeletionQueueError> {
metrics::DELETION_QUEUE
.keys_submitted
.inc_by(layers.len() as u64);
@@ -553,16 +539,17 @@ impl DeletionQueueClient {
objects: Vec::new(),
}),
)
.await
}
/// This is cancel-safe. If you drop the future the flush may still happen in the background.
async fn do_flush<T>(
&self,
queue: &tokio::sync::mpsc::UnboundedSender<T>,
queue: &tokio::sync::mpsc::Sender<T>,
msg: T,
rx: tokio::sync::oneshot::Receiver<()>,
) -> Result<(), DeletionQueueError> {
self.do_push(queue, msg)?;
self.do_push(queue, msg).await?;
if rx.await.is_err() {
// This shouldn't happen if tenants are shut down before deletion queue. If we
// encounter a bug like this, then a flusher will incorrectly believe it has flushed
@@ -583,18 +570,6 @@ impl DeletionQueueClient {
.await
}
/// Issue a flush without waiting for it to complete. This is useful on advisory flushes where
/// the caller wants to avoid the risk of waiting for lots of enqueued work, such as on tenant
/// detach where flushing is nice but not necessary.
///
/// This function provides no guarantees of work being done.
pub fn flush_advisory(&self) {
let (flush_op, _) = FlushOp::new();
// Transmit the flush message, ignoring any result (such as a closed channel during shutdown).
drop(self.tx.send(ListWriterQueueMessage::FlushExecute(flush_op)));
}
// Wait until all previous deletions are executed
pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> {
debug!("flush_execute: flushing to deletion lists...");
@@ -611,7 +586,9 @@ impl DeletionQueueClient {
// Flush any immediate-mode deletions (the above backend flush will only flush
// the executor if deletions had flowed through the backend)
debug!("flush_execute: flushing execution...");
self.flush_immediate().await?;
let (flush_op, rx) = FlushOp::new();
self.do_flush(&self.executor_tx, DeleterMessage::Flush(flush_op), rx)
.await?;
debug!("flush_execute: finished flushing execution...");
Ok(())
}
@@ -666,10 +643,8 @@ impl DeletionQueue {
where
C: ControlPlaneGenerationsApi + Send + Sync,
{
// Unbounded channel: enables non-async functions to submit deletions. The actual length is
// constrained by how promptly the ListWriter wakes up and drains it, which should be frequent
// enough to avoid this taking pathologically large amount of memory.
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
// Deep channel: it consumes deletions from all timelines and we do not want to block them
let (tx, rx) = tokio::sync::mpsc::channel(16384);
// Shallow channel: it carries DeletionLists which each contain up to thousands of deletions
let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16);
@@ -982,7 +957,7 @@ mod test {
// Basic test that the deletion queue processes the deletions we pass into it
let ctx = setup("deletion_queue_smoke").expect("Failed test setup");
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::new())?;
client.recover(HashMap::new()).await?;
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
let tenant_id = ctx.harness.tenant_id;
@@ -1050,7 +1025,7 @@ mod test {
async fn deletion_queue_validation() -> anyhow::Result<()> {
let ctx = setup("deletion_queue_validation").expect("Failed test setup");
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::new())?;
client.recover(HashMap::new()).await?;
// Generation that the control plane thinks is current
let latest_generation = Generation::new(0xdeadbeef);
@@ -1107,7 +1082,7 @@ mod test {
// Basic test that the deletion queue processes the deletions we pass into it
let mut ctx = setup("deletion_queue_recovery").expect("Failed test setup");
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::new())?;
client.recover(HashMap::new()).await?;
let tenant_id = ctx.harness.tenant_id;
@@ -1170,7 +1145,9 @@ mod test {
drop(client);
ctx.restart().await;
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::from([(tenant_id, now_generation)]))?;
client
.recover(HashMap::from([(tenant_id, now_generation)]))
.await?;
info!("Flush-executing");
client.flush_execute().await?;
@@ -1196,7 +1173,7 @@ pub(crate) mod mock {
};
pub struct ConsumerState {
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
rx: tokio::sync::mpsc::Receiver<ListWriterQueueMessage>,
executor_rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
}
@@ -1273,7 +1250,7 @@ pub(crate) mod mock {
}
pub struct MockDeletionQueue {
tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
tx: tokio::sync::mpsc::Sender<ListWriterQueueMessage>,
executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
executed: Arc<AtomicUsize>,
remote_storage: Option<GenericRemoteStorage>,
@@ -1283,7 +1260,7 @@ pub(crate) mod mock {
impl MockDeletionQueue {
pub fn new(remote_storage: Option<GenericRemoteStorage>) -> Self {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let (tx, rx) = tokio::sync::mpsc::channel(16384);
let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16384);
let executed = Arc::new(AtomicUsize::new(0));

View File

@@ -12,6 +12,7 @@ use remote_storage::MAX_KEYS_PER_DELETE;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing::warn;
use crate::metrics;
@@ -87,10 +88,7 @@ impl Deleter {
self.accumulator.clear();
}
Err(e) => {
// The RemoteStorage interface doesn't discriminate between
// real errors and 503/429 responses, so we log at INFO level
// to avoid propagating spurious error-severity logs.
info!("DeleteObjects request failed: {e:#}, will retry");
warn!("DeleteObjects request failed: {e:#}, will retry");
metrics::DELETION_QUEUE
.remote_errors
.with_label_values(&["execute"])

View File

@@ -85,7 +85,7 @@ pub(super) struct ListWriter {
conf: &'static PageServerConf,
// Incoming frontend requests to delete some keys
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
rx: tokio::sync::mpsc::Receiver<ListWriterQueueMessage>,
// Outbound requests to the backend to execute deletion lists we have composed.
tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
@@ -111,7 +111,7 @@ impl ListWriter {
pub(super) fn new(
conf: &'static PageServerConf,
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
rx: tokio::sync::mpsc::Receiver<ListWriterQueueMessage>,
tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
cancel: CancellationToken,
) -> Self {

View File

@@ -575,14 +575,9 @@ async fn tenant_detach_handler(
let state = get_state(&request);
let conf = state.conf;
mgr::detach_tenant(
conf,
tenant_id,
detach_ignored.unwrap_or(false),
&state.deletion_queue_client,
)
.instrument(info_span!("tenant_detach", %tenant_id))
.await?;
mgr::detach_tenant(conf, tenant_id, detach_ignored.unwrap_or(false))
.instrument(info_span!("tenant_detach", %tenant_id))
.await?;
json_response(StatusCode::OK, ())
}
@@ -1039,7 +1034,7 @@ async fn put_tenant_location_config_handler(
// The `Detached` state is special, it doesn't upsert a tenant, it removes
// its local disk content and drops it from memory.
if let LocationConfigMode::Detached = request_data.config.mode {
mgr::detach_tenant(conf, tenant_id, true, &state.deletion_queue_client)
mgr::detach_tenant(conf, tenant_id, true)
.instrument(info_span!("tenant_detach", %tenant_id))
.await?;
return json_response(StatusCode::OK, ());

View File

@@ -45,7 +45,6 @@ use std::sync::{Mutex, RwLock};
use std::time::{Duration, Instant};
use self::config::AttachedLocationConfig;
use self::config::AttachmentMode;
use self::config::LocationConf;
use self::config::TenantConf;
use self::delete::DeleteTenantFlow;
@@ -2077,15 +2076,6 @@ impl Tenant {
}
}
}
pub(crate) fn get_attach_mode(&self) -> AttachmentMode {
self.tenant_conf
.read()
.unwrap()
.location
.attach_mode
.clone()
}
}
/// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),

View File

@@ -24,7 +24,7 @@ use crate::control_plane_client::{
};
use crate::deletion_queue::DeletionQueueClient;
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt};
use crate::tenant::config::{LocationConf, LocationMode, TenantConfOpt};
use crate::tenant::delete::DeleteTenantFlow;
use crate::tenant::{
create_tenant_files, AttachedTenantConf, CreateTenantFilesMode, Tenant, TenantState,
@@ -206,7 +206,8 @@ async fn init_load_generations(
if resources.remote_storage.is_some() {
resources
.deletion_queue_client
.recover(generations.clone())?;
.recover(generations.clone())
.await?;
}
Ok(Some(generations))
@@ -694,18 +695,6 @@ pub(crate) async fn upsert_location(
if let Some(tenant) = shutdown_tenant {
let (_guard, progress) = utils::completion::channel();
match tenant.get_attach_mode() {
AttachmentMode::Single | AttachmentMode::Multi => {
// Before we leave our state as the presumed holder of the latest generation,
// flush any outstanding deletions to reduce the risk of leaking objects.
deletion_queue_client.flush_advisory()
}
AttachmentMode::Stale => {
// If we're stale there's not point trying to flush deletions
}
};
info!("Shutting down attached tenant");
match tenant.shutdown(progress, false).await {
Ok(()) => {}
@@ -860,16 +849,8 @@ pub async fn detach_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
detach_ignored: bool,
deletion_queue_client: &DeletionQueueClient,
) -> Result<(), TenantStateError> {
let tmp_path = detach_tenant0(
conf,
&TENANTS,
tenant_id,
detach_ignored,
deletion_queue_client,
)
.await?;
let tmp_path = detach_tenant0(conf, &TENANTS, tenant_id, detach_ignored).await?;
// Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
// After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
let task_tenant_id = None;
@@ -894,7 +875,6 @@ async fn detach_tenant0(
tenants: &tokio::sync::RwLock<TenantsMap>,
tenant_id: TenantId,
detach_ignored: bool,
deletion_queue_client: &DeletionQueueClient,
) -> Result<Utf8PathBuf, TenantStateError> {
let tenant_dir_rename_operation = |tenant_id_to_clean| async move {
let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
@@ -906,10 +886,6 @@ async fn detach_tenant0(
let removal_result =
remove_tenant_from_memory(tenants, tenant_id, tenant_dir_rename_operation(tenant_id)).await;
// Flush pending deletions, so that they have a good chance of passing validation
// before this tenant is potentially re-attached elsewhere.
deletion_queue_client.flush_advisory();
// Ignored tenants are not present in memory and will bail the removal from memory operation.
// Before returning the error, check for ignored tenant removal case — we only need to clean its local files then.
if detach_ignored && matches!(removal_result, Err(TenantStateError::NotFound(_))) {

View File

@@ -47,7 +47,6 @@ enum Payload {
const MAX_RESPONSE_SIZE: usize = 10 * 1024 * 1024; // 10 MiB
const MAX_REQUEST_SIZE: u64 = 10 * 1024 * 1024; // 10 MiB
const HTTP_CONNECTION_TIMEOUT: tokio::time::Duration = tokio::time::Duration::from_secs(15);
static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode");
@@ -190,44 +189,27 @@ pub async fn handle(
conn_pool: Arc<GlobalConnPool>,
session_id: uuid::Uuid,
) -> Result<Response<Body>, ApiError> {
let result = tokio::time::timeout(
HTTP_CONNECTION_TIMEOUT,
handle_inner(request, sni_hostname, conn_pool, session_id),
)
.await;
let result = handle_inner(request, sni_hostname, conn_pool, session_id).await;
let mut response = match result {
Ok(r) => match r {
Ok(r) => r,
Err(e) => {
let message = format!("{:?}", e);
let code = e.downcast_ref::<tokio_postgres::Error>().and_then(|e| {
e.code()
.map(|s| serde_json::to_value(s.code()).unwrap_or_default())
});
let code = match code {
Some(c) => c,
Ok(r) => r,
Err(e) => {
let message = format!("{:?}", e);
let code = match e.downcast_ref::<tokio_postgres::Error>() {
Some(e) => match e.code() {
Some(e) => serde_json::to_value(e.code()).unwrap(),
None => Value::Null,
};
error!(
?code,
"sql-over-http per-client task finished with an error: {e:#}"
);
// TODO: this shouldn't always be bad request.
json_response(
StatusCode::BAD_REQUEST,
json!({ "message": message, "code": code }),
)?
}
},
Err(_) => {
let message = format!(
"HTTP-Connection timed out, execution time exeeded {} seconds",
HTTP_CONNECTION_TIMEOUT.as_secs()
},
None => Value::Null,
};
error!(
?code,
"sql-over-http per-client task finished with an error: {e:#}"
);
error!(message);
// TODO: this shouldn't always be bad request.
json_response(
StatusCode::GATEWAY_TIMEOUT,
json!({ "message": message, "code": StatusCode::GATEWAY_TIMEOUT.as_u16() }),
StatusCode::BAD_REQUEST,
json!({ "message": message, "code": code }),
)?
}
};

View File

@@ -7,7 +7,6 @@ use crate::{
compute::{self, PostgresConnection},
config::{ProxyConfig, TlsConfig},
console::{self, errors::WakeComputeError, messages::MetricsAuxInfo, Api},
http::StatusCode,
metrics::{Ids, USAGE_METRICS},
protocol2::WithClientIp,
stream::{PqStream, Stream},
@@ -76,15 +75,6 @@ static NUM_CONNECTION_FAILURES: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});
static NUM_WAKEUP_FAILURES: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_connection_failures_breakdown",
"Number of wake-up failures (per kind).",
&["retry", "kind"],
)
.unwrap()
});
static NUM_BYTES_PROXIED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_io_bytes_per_client",
@@ -407,46 +397,6 @@ impl ConnectMechanism for TcpMechanism<'_> {
}
}
const fn bool_to_str(x: bool) -> &'static str {
if x {
"true"
} else {
"false"
}
}
fn report_error(e: &WakeComputeError, retry: bool) {
use crate::console::errors::ApiError;
let retry = bool_to_str(retry);
let kind = match e {
WakeComputeError::BadComputeAddress(_) => "bad_compute_address",
WakeComputeError::ApiError(ApiError::Transport(_)) => "api_transport_error",
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::LOCKED,
ref text,
}) if text.contains("written data quota exceeded")
|| text.contains("the limit for current plan reached") =>
{
"quota_exceeded"
}
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::LOCKED,
..
}) => "api_console_locked",
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::BAD_REQUEST,
..
}) => "api_console_bad_request",
WakeComputeError::ApiError(ApiError::Console { status, .. })
if status.is_server_error() =>
{
"api_console_other_server_error"
}
WakeComputeError::ApiError(ApiError::Console { .. }) => "api_console_other_error",
};
NUM_WAKEUP_FAILURES.with_label_values(&[retry, kind]).inc();
}
/// Try to connect to the compute node, retrying if necessary.
/// This function might update `node_info`, so we take it by `&mut`.
#[tracing::instrument(skip_all)]
@@ -490,12 +440,10 @@ where
match handle_try_wake(wake_res, num_retries) {
Err(e) => {
error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node");
report_error(&e, false);
return Err(e.into());
}
// failed to wake up but we can continue to retry
Ok(ControlFlow::Continue(e)) => {
report_error(&e, true);
warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node");
}
// successfully woke up a compute node and can break the wakeup loop

View File

@@ -33,9 +33,13 @@ reqwest = { workspace = true, default-features = false, features = ["rustls-tls"
aws-config = { workspace = true, default-features = false, features = ["rustls", "credentials-sso"] }
pageserver = { path = "../pageserver" }
remote_storage = { path = "../libs/remote_storage" }
tracing.workspace = true
tracing-subscriber.workspace = true
tracing-subscriber = { version = "0.3.17", features = ["ansi"] }
clap.workspace = true
tracing-appender = "0.2"
histogram = "0.7"
humantime.workspace = true
camino.workspace = true
aws-smithy-types = "0.56.1"

View File

@@ -4,7 +4,6 @@ pub mod delete_batch_producer;
pub mod metadata_stream;
mod s3_deletion;
pub mod scan_metadata;
use std::env;
use std::fmt::Display;
use std::time::Duration;

View File

@@ -16,6 +16,8 @@ use tracing::{info, warn};
use clap::{Parser, Subcommand, ValueEnum};
mod restore_tenant_from_object_versioning;
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
#[command(arg_required_else_help(true))]
@@ -59,6 +61,9 @@ enum Command {
skip_validation: bool,
},
ScanMetadata {},
RestoreTenantFromObjectVersioningMostRecentIndexPart(
restore_tenant_from_object_versioning::Command,
),
}
async fn tidy(
@@ -247,5 +252,14 @@ async fn main() -> anyhow::Result<()> {
}
}
},
Command::RestoreTenantFromObjectVersioningMostRecentIndexPart(arg) => {
match restore_tenant_from_object_versioning::doit(arg).await {
Err(e) => {
tracing::error!("Failed: {e:?}");
Err(e)
}
Ok(()) => Ok(()),
}
}
}
}

View File

@@ -0,0 +1,513 @@
//! Restore pageserver state from S3 object versioning.
//!
//! This sub-cmmand allows restoring a tenant's pageserver S3 state from S3 object versioning.
//!
//! # Instructions
//!
//! - Run command
//! ```
//! SSO_ACCOUNT_ID=... REGION=... \
//! BUCKET=neon-{prod,staging}-storage-... \
//! cargo run -p s3_scrubber \
//! restore-tenant-from-object-versioning-most-recent-index-part \
//! TENANT_TO_RESTORE \
//! ./restore
//! timeline-list TIMELINE_TO_RESTORE TIMELINE_TO_RESTORE ...
//! ```
//! - `./restore` now contains the timeline state referenced by the latest `index_part.json`s of the
//! specified timelines in the `timeline-list`` argument
//! - Use `cargo neon` to start a pageserver
//! - `rm -rf .neon`
//! - `cargo neon init`
//! - `sed -i 's/\(.*control_plane_api.*\)/#\1/' .neon/config`
//! - `sed -i 's/\(.*control_plane_api.*\)/#\1/' .neon/pageserver_1/pageserver.toml`
//! - configure the pageserver remote storage config to point to the restore directory.
//! Use your text editor to edit the TOML file: `.neon/pageserver_1/pageserver.toml`.
//! ```
//! [remote_storage]
//! local_path = "/path/to/restore/pageserver/v1"
//! ````
//! - `cargo neon start`
//! - make sure attaching the tenant works
//! - `curl -X POST localhost:9898/v1/tenant/TENANT_TO_RESTORE/attach`
//! - check `curl -X GET localhost:9898/v1/tenant/TENANT_TO_RESTORE | jq`
//! - for each timeline $timeline_id to restore:
//! - `cargo neon mappings map --branch-name restore-$timeline_id --tenant-id TENANT_TO_RESTORE --timeline-id $timeline_id`
//! - `cargo neon endpoint create --tenant-id TENANT_TO_RESTORE --branch-name restore-$timeline_id ep-restore-$timeline_id`
//! - `cargo neon endpoint start --tenant-id TENANT_TO_RESTORE ep-restore-$timeline_id`
//! - it prints a connection string, looking like `postgresql://cloud_admin@127.0.0.1:PORT/DB`
//! - dump database contents using postgres tools
//! - determine PG version `$restore_pg_version` using
//! ```
//! curl -s -X GET localhost:9898/v1/tenant/TENANT_TO_RESTORE/timeline/$timeline_id | jq .pg_version
//! ```
//! - pg_dumpall
//! ```
//! ./pg_install/$restore_pg_version/bin/pg_dumpall -d THE_CONNECTION_STRING/postgres > ./restore/pg_dumpall.out
//! ```
//! - pg_dump a specific database
//! ```
//! ./pg_install/v15/bin/pg_dump -d 'THE_CONNECTION_STRING/THEDBTODUMP' > ./restore/pg_dump_THEDBTODUMP.out
//! ```
//! - `cargo neon endpoint stop --tenant-id TENANT_TO_RESTORE restore-$timeline_id`
//!
//! - Use the pg_dump files to restore the database into a new Neon project.
//!
//! # Limitations & Future Work
//!
//! Just restoring Pageserver S3 state restores a consistent state at an LSN that is NOT THE LAST COMMIT LSN.
//! The reason is that Pageserver uploads layers to S3 with implementation-specific delays that are optimized for day-to-day operation.
//!
//! If we still had the Safekeeper WAL, we could restore the Safekeeper S3 state in a similar way.
//! In that case, we wouldn't need the `pg_dump` step.
//! We would simply attach the tenant to safekeepers and pageservers.
//! When attaching to Safekeeper, we would need to tell it that PS remote_consistent_lsn is the restore-point-LSN,
//! i.e., the LSNs in the restored index_part.json's in Pageserver S3 state.
//! Pageserver attach would pick up the restored state from S3, and the Safekeeper & Pageserver would
//! resume normal operation as if the clock had been wound back to restore-point-LSN.
use std::{
collections::{HashMap, HashSet},
num::{NonZeroU32, NonZeroUsize},
sync::Arc,
};
use anyhow::Context;
use aws_sdk_s3::operation::list_object_versions::ListObjectVersionsOutput;
use aws_types::region::Region;
use camino::Utf8PathBuf;
use pageserver::tenant::{
remote_timeline_client::index::LayerFileMetadata, TENANTS_SEGMENT_NAME,
TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
};
use remote_storage::{GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind};
use s3_scrubber::{init_logging, init_s3_client, BucketConfig};
use tokio::io::AsyncReadExt;
use tracing::{debug, info, info_span, Instrument};
use utils::id::{TenantId, TimelineId};
#[derive(Debug, Clone, clap::Subcommand)]
enum ResurrectTimelines {
TimelineList { timeline_ids: Vec<TimelineId> },
// AllTimelinesDeletedAfter { timestamp: humantime::Timestamp },
}
#[derive(clap::Args)]
pub(crate) struct Command {
tenant_id: TenantId,
dest_dir: Utf8PathBuf,
#[clap(short, long)]
dry_run: bool,
#[clap(subcommand)]
timelines: ResurrectTimelines,
}
pub(crate) async fn doit(args: Command) -> anyhow::Result<()> {
let _logging_guard = {
let log_prefix = format!("restore_tenant_from_object_versioning_{}", args.tenant_id);
let dry_suffix = if args.dry_run { "__dry" } else { "" };
let file_name = {
format!(
"{}_{}{}.log",
log_prefix,
chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S"),
dry_suffix,
)
};
init_logging(&file_name)
};
let restore_dst = if tokio::fs::try_exists(&args.dest_dir).await? {
anyhow::bail!("destination directory already exists: {}", args.dest_dir,);
} else {
GenericRemoteStorage::from_config(&RemoteStorageConfig {
max_concurrent_syncs: NonZeroUsize::new(100).unwrap(),
max_sync_errors: NonZeroU32::new(1).unwrap(), // ???? would want so specify 0
storage: RemoteStorageKind::LocalFs(args.dest_dir.clone()),
})
.context("instantiate restore destination")?
};
let bucket_config = BucketConfig::from_env()?;
let bucket_region = Region::new(bucket_config.region);
let delimiter = "/".to_string();
let s3_client = Arc::new(init_s3_client(bucket_config.sso_account_id, bucket_region));
let tenant_root = [
"pageserver",
"v1",
TENANTS_SEGMENT_NAME,
&args.tenant_id.to_string(),
]
.join(&delimiter);
let tenant_delete_marker = [&tenant_root, TENANT_DELETED_MARKER_FILE_NAME].join(&delimiter);
// - Ensure the prefix is empty when ignoring existence of versions.
// - Ensure the tenant delete marker key is part of `DeleteMarkers`. If it isn't, the tenant hasn't finished deletion yet and we should let pageservers complete it first.
// - Restore each index_part.json based on the version in DeleteMarkers as well as the layers it references. For the layers, also use the version in DeleteMarkers and ensure it is the latest.
// - Remove the deleted_at mark for the specified timelines.
//
// Notes:
// - The restore will happen in-place because it's hard to change tenant/timeline ids.
// - The restore could be interrupted mid-way.
// - Hence, separate plan-making and plan-execution.
async {
info!("send request");
let res = s3_client
.list_objects_v2()
.bucket(&bucket_config.bucket)
.prefix(&tenant_root)
.send()
.await?;
info!(response=?res, "got response");
if res.key_count() > 0 {
anyhow::bail!("tenant prefix is not empty in S3");
}
if res.is_truncated() {
unimplemented!("can this even happen")
}
Ok(())
}
.instrument(info_span!("ensure prefix empty"))
.await
.context("ensure prefix is empty")?;
async {
info!("send request");
let res = s3_client
.list_object_versions()
.bucket(&bucket_config.bucket)
.prefix(&tenant_delete_marker)
.send()
.await?;
debug!(response=?res, "got response");
if res.is_truncated() {
unimplemented!("can this even happen")
}
let markers = res
.delete_markers()
.context("expected delete marker in response")?;
if markers.len() != 1 {
anyhow::bail!("expected exactly one delete marker because we create and delete the marker exactly once, got {}", markers.len());
}
if !markers[0].is_latest() {
anyhow::bail!("expected delete marker to have IsLatest set: {:?}", markers[0]);
}
Ok(())
}
.instrument(info_span!(
"ensure tenant delete marker exists in DeleteMarkers",
tenant_delete_marker,
))
.await
.context("ensure tenant delete marker exists in DeleteMarkers")?;
let timelines = match args.timelines {
ResurrectTimelines::TimelineList { timeline_ids } => timeline_ids,
};
// Fetch all the information we need to execute the restore.
let version_responses_by_timeline = async {
let mut out: HashMap<TimelineId, Vec<Arc<ListObjectVersionsOutput>>> = Default::default();
for tl in &timelines {
async {
let timeline_prefix = [tenant_root.as_str(), TIMELINES_SEGMENT_NAME , &tl.to_string()].join(&delimiter);
let mut next_key_marker = None;
let mut next_version_id_marker = None;
loop {
info!("sending request");
let res: ListObjectVersionsOutput = s3_client.list_object_versions()
.bucket(&bucket_config.bucket)
.prefix(&timeline_prefix)
.set_key_marker(next_key_marker.take())
.set_version_id_marker(next_version_id_marker.take())
.send()
.await?;
let res = Arc::new(res);
out.entry(*tl).or_default().push(Arc::clone(&res));
info!("got response");
match res.versions() {
Some(versions) => {
for version in versions {
info!("version: {:?}", version);
}
}
None => {
info!("no versions");
}
}
match res.delete_markers() {
Some(markers) => {
for marker in markers {
info!("delete marker: {:?}", marker);
}
}
None => {
info!("no delete markers");
}
}
if !res.is_truncated() {
break;
}
next_key_marker = res.next_key_marker().map(|s| s.to_string());
next_version_id_marker = res.next_version_id_marker().map(|s| s.to_string());
if let (None, None) = (&next_key_marker, &next_version_id_marker) {
anyhow::bail!("s3 returned is_truncated=true but neither next_key_marker nor next_version_id_marker are set");
}
}
Ok(())
}.instrument(info_span!("timeline", timeline_id=%tl)).await?;
}
anyhow::Ok(out)
}.instrument(info_span!("list all object versions and delete markers")).await?;
#[derive(Debug)]
struct LatestVersion {
key: String,
last_modified: aws_smithy_types::DateTime,
version_id: String,
}
let find_latest_version_based_on_delete_marker_last_modified = |tl: &TimelineId, key: &str| {
let restore_version_delete_marker = {
let mut candidates = Vec::new();
for res in &version_responses_by_timeline[tl] {
let Some(markers) = res.delete_markers() else {
continue;
};
for marker in markers {
if !marker.is_latest() {
continue;
}
if marker.key().unwrap() != key {
continue;
}
candidates.push(LatestVersion {
key: marker.key().unwrap().to_owned(),
last_modified: marker.last_modified().unwrap().clone(),
version_id: marker.version_id().unwrap().to_owned(),
});
}
}
info!(?candidates, "marker candidates");
if candidates.len() != 1 {
anyhow::bail!("expected exactly one IsLatest, got {}", candidates.len());
}
candidates.pop().unwrap()
};
info!(?restore_version_delete_marker, "found marker");
// There's no way to get the latest version from the delete marker.
// But, we observe (can't find written guarantee) that the Delete Marker's "Last Modified" is >= the latest version.
// So, find latest version based on that.
let restore_version = {
let mut candidates = Vec::new();
for res in &version_responses_by_timeline[tl] {
let Some(versions) = res.versions() else {
continue;
};
for version in versions {
if version.key().unwrap() != restore_version_delete_marker.key {
continue;
}
candidates.push(LatestVersion {
key: version.key().unwrap().to_owned(),
last_modified: version.last_modified().unwrap().clone(),
version_id: version.version_id().unwrap().to_owned(),
});
}
}
candidates.sort_by_key(|v| v.last_modified.clone());
info!(?candidates, "version candidates");
if candidates.is_empty() {
anyhow::bail!(
"expected at least one version matching the delete marker's key, got none"
);
}
{
let mut uniq = HashSet::new();
for v in &candidates {
if !uniq.insert(v.last_modified.clone()) {
anyhow::bail!("last_modified timestamps are not unique, don't know which version to pick");
}
}
}
candidates.pop().unwrap() // we sorted ascending, so, pop() is the latest
};
anyhow::Ok(restore_version)
};
let latest_index_part_versions: HashMap<TimelineId, LatestVersion> = {
let span = info_span!("find index part version");
let _enter = span.enter();
// The latest index part for a deleted tenant is always a DeletedMarker
let mut out = HashMap::new();
for tl in &timelines {
let span = info_span!("timeline", timeline_id=%tl);
let _enter = span.enter();
let restore_version = find_latest_version_based_on_delete_marker_last_modified(
tl,
// TODO: support generation numbers
&[
&tenant_root,
TIMELINES_SEGMENT_NAME,
&tl.to_string(),
pageserver::tenant::IndexPart::FILE_NAME,
]
.join(&delimiter),
)?;
out.insert(*tl, restore_version);
}
out
};
let index_part_contents: HashMap<TimelineId, pageserver::tenant::IndexPart> = async {
let mut out = HashMap::new();
for tl in &timelines {
async {
let v = &latest_index_part_versions[tl];
let mut body_buf = Vec::new();
info!("send request");
let res = s3_client
.get_object()
.bucket(&bucket_config.bucket)
.key(&v.key)
.version_id(&v.version_id)
.send()
.await?;
info!(?res, "got response header");
res.body
.into_async_read()
.read_to_end(&mut body_buf)
.await?;
let body_buf = String::from_utf8(body_buf)?;
info!(body_buf, "received response body");
let mut index_part: pageserver::tenant::IndexPart =
serde_json::from_str(&body_buf)?;
info!(?index_part, "parsed index part");
let deleted_at = index_part.deleted_at.take();
info!(
?deleted_at,
"removing deleted_at field from index part, previous value logged here"
);
let updated_buf = serde_json::to_vec(&index_part)?;
let updated_buf_len = updated_buf.len();
info!("uploading modified index part to restore_dst");
restore_dst
.upload(
std::io::Cursor::new(updated_buf),
updated_buf_len,
&RemotePath::from_string(&v.key).unwrap(),
None,
)
.await
.context("upload modified index part to restore_dst")?;
out.insert(*tl, index_part);
anyhow::Ok(())
}
.instrument(info_span!("timeline", timeline_id=%tl))
.await?;
}
anyhow::Ok(out)
}
.instrument(info_span!("get index part contents"))
.await
.context("get index part contents")?;
async {
for (tl, index_part) in &index_part_contents {
async {
for (layer_file_name, layer_md) in &index_part.layer_metadata {
let layer_md: LayerFileMetadata = layer_md.into();
async {
// TODO: support generations
let layer_file_key = [
&tenant_root,
TIMELINES_SEGMENT_NAME,
&tl.to_string(),
&layer_file_name.file_name(),
]
.join(&delimiter);
// The latest index parts naturally reference the latest layers.
// So, a deleted tenant's latest layers are the ones in DeleteMarkers.
//
// If we want to support restoring from not-latest index part, this will require more work.
// The idea is to
// 1. every index_part.json that we upload contains a strongly monotonically increasing sequence number
// 2. every image layer that we upload is S3-metadata-tagged with the sequence number of the IndexPart
// in which it first appeared.
// This allows to recover the correct layer object version, even if we have a bug that overwrites layers.
let restore_version =
find_latest_version_based_on_delete_marker_last_modified(
tl,
&layer_file_key,
)?;
// TODO: teach RemoteStorage copy operation so we can use s3_client.copy_object()
async {
let res = s3_client
.get_object()
.bucket(&bucket_config.bucket)
.key(&restore_version.key)
.version_id(&restore_version.version_id)
.send()
.await
.context("get object header")?;
// TODO: instead of file_size(), do actual data integrity checking.
restore_dst
.upload(
res.body.into_async_read(),
layer_md.file_size().try_into().unwrap(),
&RemotePath::from_string(&restore_version.key).unwrap(),
None,
)
.await
.context("download-body-and-upload")?;
anyhow::Ok(())
}
.instrument(info_span!("copy", layer_file_name=%layer_file_name))
.await?;
anyhow::Ok(())
}
.instrument(info_span!("layer", layer_file_name=%layer_file_name))
.await?;
}
anyhow::Ok(())
}
.instrument(info_span!("timeline", timeline_id=%tl))
.await?;
}
anyhow::Ok(())
}
.instrument(info_span!("download layer files into restore_dst"))
.await
.context("download layers into restore dst")?;
Ok(())
}