Compare commits

..

4 Commits

Author SHA1 Message Date
John Spray
7f3670a589 pageserver: don't log deletion S3 op failures as errors 2023-10-11 14:26:01 +01:00
Alex Chi Z
5158de70f3 proxy: breakdown wake up failure metrics (#4933)
## Problem

close https://github.com/neondatabase/neon/issues/4702

## Summary of changes

This PR adds a new metrics for wake up errors and breaks it down by most
common reasons (mostly follows the `could_retry` implementation).
2023-10-10 13:17:37 +01:00
khanova
aec9188d36 Added timeout for http requests (#5514)
# Problem
Proxy timeout for HTTP-requests

## Summary of changes
If the HTTP-request exceeds 15s, it would be killed.

Resolves: https://github.com/neondatabase/neon/issues/4847
2023-10-10 13:39:38 +02:00
John Spray
acefee9a32 pageserver: flush deletion queue on detach (#5452)
## Problem

If a caller detaches a tenant and then attaches it again, pending
deletions from the old attachment might not have happened yet. This is
not a correctness problem, but it causes:
- Risk of leaking some objects in S3
- Some warnings from the deletion queue when pending LSN updates and
pending deletions don't pass validation.

## Summary of changes

- Deletion queue now uses UnboundedChannel so that the push interfaces
don't have to be async.
- This was pulled out of https://github.com/neondatabase/neon/pull/5397,
where it is also useful to be able to drive the queue from non-async
contexts.
- Why is it okay for this to be unbounded? The only way the
unbounded-ness of the channel can become a problem is if writing out
deletion lists can't keep up, but if the system were that overloaded
then the code generating deletions (GC, compaction) would also be
impacted.
- DeletionQueueClient gets a new `flush_advisory` function, which is
like flush_execute, but doesn't wait for completion: this is appropriate
for use in contexts where we would like to encourage the deletion queue
to flush, but don't need to block on it.
- This function is also expected to be useful in next steps for seamless
migration, where the option to flush to S3 while transitioning into
AttachedStale will also include flushing deletion queue, but we wouldn't
want to block on that flush.
- The tenant_detach code in mgr.rs invokes flush_advisory after stopping
the `Tenant` object.

---------

Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
2023-10-10 10:46:24 +01:00
13 changed files with 191 additions and 608 deletions

25
Cargo.lock generated
View File

@@ -570,9 +570,9 @@ dependencies = [
[[package]]
name = "aws-smithy-types"
version = "0.56.1"
version = "0.56.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d90dbc8da2f6be461fa3c1906b20af8f79d14968fe47f2b7d29d086f62a51728"
checksum = "eed0a94eefd845a2a78677f1b72f02fa75802d38f7f59be675add140279aa8bf"
dependencies = [
"base64-simd",
"itoa",
@@ -2420,16 +2420,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]]
name = "num-bigint"
version = "0.4.3"
@@ -2662,12 +2652,6 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "pagectl"
version = "0.1.0"
@@ -3764,11 +3748,9 @@ dependencies = [
"aws-config",
"aws-sdk-s3",
"aws-smithy-http",
"aws-smithy-types",
"aws-types",
"bincode",
"bytes",
"camino",
"chrono",
"clap",
"crc32c",
@@ -3776,11 +3758,9 @@ dependencies = [
"futures-util",
"hex",
"histogram",
"humantime",
"itertools",
"pageserver",
"rand",
"remote_storage",
"reqwest",
"serde",
"serde_json",
@@ -4962,7 +4942,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"serde",

View File

@@ -153,7 +153,7 @@ impl FlushOp {
#[derive(Clone, Debug)]
pub struct DeletionQueueClient {
tx: tokio::sync::mpsc::Sender<ListWriterQueueMessage>,
tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
@@ -416,7 +416,7 @@ pub enum DeletionQueueError {
impl DeletionQueueClient {
pub(crate) fn broken() -> Self {
// Channels whose receivers are immediately dropped.
let (tx, _rx) = tokio::sync::mpsc::channel(1);
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
let (executor_tx, _executor_rx) = tokio::sync::mpsc::channel(1);
Self {
tx,
@@ -428,12 +428,12 @@ impl DeletionQueueClient {
/// This is cancel-safe. If you drop the future before it completes, the message
/// is not pushed, although in the context of the deletion queue it doesn't matter: once
/// we decide to do a deletion the decision is always final.
async fn do_push<T>(
fn do_push<T>(
&self,
queue: &tokio::sync::mpsc::Sender<T>,
queue: &tokio::sync::mpsc::UnboundedSender<T>,
msg: T,
) -> Result<(), DeletionQueueError> {
match queue.send(msg).await {
match queue.send(msg) {
Ok(_) => Ok(()),
Err(e) => {
// This shouldn't happen, we should shut down all tenants before
@@ -445,7 +445,7 @@ impl DeletionQueueClient {
}
}
pub(crate) async fn recover(
pub(crate) fn recover(
&self,
attached_tenants: HashMap<TenantId, Generation>,
) -> Result<(), DeletionQueueError> {
@@ -453,7 +453,6 @@ impl DeletionQueueClient {
&self.tx,
ListWriterQueueMessage::Recover(RecoverOp { attached_tenants }),
)
.await
}
/// When a Timeline wishes to update the remote_consistent_lsn that it exposes to the outside
@@ -526,6 +525,21 @@ impl DeletionQueueClient {
return self.flush_immediate().await;
}
self.push_layers_sync(tenant_id, timeline_id, current_generation, layers)
}
/// When a Tenant has a generation, push_layers is always synchronous because
/// the ListValidator channel is an unbounded channel.
///
/// This can be merged into push_layers when we remove the Generation-less mode
/// support (`<https://github.com/neondatabase/neon/issues/5395>`)
pub(crate) fn push_layers_sync(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
current_generation: Generation,
layers: Vec<(LayerFileName, Generation)>,
) -> Result<(), DeletionQueueError> {
metrics::DELETION_QUEUE
.keys_submitted
.inc_by(layers.len() as u64);
@@ -539,17 +553,16 @@ impl DeletionQueueClient {
objects: Vec::new(),
}),
)
.await
}
/// This is cancel-safe. If you drop the future the flush may still happen in the background.
async fn do_flush<T>(
&self,
queue: &tokio::sync::mpsc::Sender<T>,
queue: &tokio::sync::mpsc::UnboundedSender<T>,
msg: T,
rx: tokio::sync::oneshot::Receiver<()>,
) -> Result<(), DeletionQueueError> {
self.do_push(queue, msg).await?;
self.do_push(queue, msg)?;
if rx.await.is_err() {
// This shouldn't happen if tenants are shut down before deletion queue. If we
// encounter a bug like this, then a flusher will incorrectly believe it has flushed
@@ -570,6 +583,18 @@ impl DeletionQueueClient {
.await
}
/// Issue a flush without waiting for it to complete. This is useful on advisory flushes where
/// the caller wants to avoid the risk of waiting for lots of enqueued work, such as on tenant
/// detach where flushing is nice but not necessary.
///
/// This function provides no guarantees of work being done.
pub fn flush_advisory(&self) {
let (flush_op, _) = FlushOp::new();
// Transmit the flush message, ignoring any result (such as a closed channel during shutdown).
drop(self.tx.send(ListWriterQueueMessage::FlushExecute(flush_op)));
}
// Wait until all previous deletions are executed
pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> {
debug!("flush_execute: flushing to deletion lists...");
@@ -586,9 +611,7 @@ impl DeletionQueueClient {
// Flush any immediate-mode deletions (the above backend flush will only flush
// the executor if deletions had flowed through the backend)
debug!("flush_execute: flushing execution...");
let (flush_op, rx) = FlushOp::new();
self.do_flush(&self.executor_tx, DeleterMessage::Flush(flush_op), rx)
.await?;
self.flush_immediate().await?;
debug!("flush_execute: finished flushing execution...");
Ok(())
}
@@ -643,8 +666,10 @@ impl DeletionQueue {
where
C: ControlPlaneGenerationsApi + Send + Sync,
{
// Deep channel: it consumes deletions from all timelines and we do not want to block them
let (tx, rx) = tokio::sync::mpsc::channel(16384);
// Unbounded channel: enables non-async functions to submit deletions. The actual length is
// constrained by how promptly the ListWriter wakes up and drains it, which should be frequent
// enough to avoid this taking pathologically large amount of memory.
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
// Shallow channel: it carries DeletionLists which each contain up to thousands of deletions
let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16);
@@ -957,7 +982,7 @@ mod test {
// Basic test that the deletion queue processes the deletions we pass into it
let ctx = setup("deletion_queue_smoke").expect("Failed test setup");
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::new()).await?;
client.recover(HashMap::new())?;
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
let tenant_id = ctx.harness.tenant_id;
@@ -1025,7 +1050,7 @@ mod test {
async fn deletion_queue_validation() -> anyhow::Result<()> {
let ctx = setup("deletion_queue_validation").expect("Failed test setup");
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::new()).await?;
client.recover(HashMap::new())?;
// Generation that the control plane thinks is current
let latest_generation = Generation::new(0xdeadbeef);
@@ -1082,7 +1107,7 @@ mod test {
// Basic test that the deletion queue processes the deletions we pass into it
let mut ctx = setup("deletion_queue_recovery").expect("Failed test setup");
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::new()).await?;
client.recover(HashMap::new())?;
let tenant_id = ctx.harness.tenant_id;
@@ -1145,9 +1170,7 @@ mod test {
drop(client);
ctx.restart().await;
let client = ctx.deletion_queue.new_client();
client
.recover(HashMap::from([(tenant_id, now_generation)]))
.await?;
client.recover(HashMap::from([(tenant_id, now_generation)]))?;
info!("Flush-executing");
client.flush_execute().await?;
@@ -1173,7 +1196,7 @@ pub(crate) mod mock {
};
pub struct ConsumerState {
rx: tokio::sync::mpsc::Receiver<ListWriterQueueMessage>,
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
executor_rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
}
@@ -1250,7 +1273,7 @@ pub(crate) mod mock {
}
pub struct MockDeletionQueue {
tx: tokio::sync::mpsc::Sender<ListWriterQueueMessage>,
tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
executed: Arc<AtomicUsize>,
remote_storage: Option<GenericRemoteStorage>,
@@ -1260,7 +1283,7 @@ pub(crate) mod mock {
impl MockDeletionQueue {
pub fn new(remote_storage: Option<GenericRemoteStorage>) -> Self {
let (tx, rx) = tokio::sync::mpsc::channel(16384);
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16384);
let executed = Arc::new(AtomicUsize::new(0));

View File

@@ -12,7 +12,6 @@ use remote_storage::MAX_KEYS_PER_DELETE;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing::warn;
use crate::metrics;
@@ -88,7 +87,10 @@ impl Deleter {
self.accumulator.clear();
}
Err(e) => {
warn!("DeleteObjects request failed: {e:#}, will retry");
// The RemoteStorage interface doesn't discriminate between
// real errors and 503/429 responses, so we log at INFO level
// to avoid propagating spurious error-severity logs.
info!("DeleteObjects request failed: {e:#}, will retry");
metrics::DELETION_QUEUE
.remote_errors
.with_label_values(&["execute"])

View File

@@ -85,7 +85,7 @@ pub(super) struct ListWriter {
conf: &'static PageServerConf,
// Incoming frontend requests to delete some keys
rx: tokio::sync::mpsc::Receiver<ListWriterQueueMessage>,
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
// Outbound requests to the backend to execute deletion lists we have composed.
tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
@@ -111,7 +111,7 @@ impl ListWriter {
pub(super) fn new(
conf: &'static PageServerConf,
rx: tokio::sync::mpsc::Receiver<ListWriterQueueMessage>,
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
cancel: CancellationToken,
) -> Self {

View File

@@ -575,9 +575,14 @@ async fn tenant_detach_handler(
let state = get_state(&request);
let conf = state.conf;
mgr::detach_tenant(conf, tenant_id, detach_ignored.unwrap_or(false))
.instrument(info_span!("tenant_detach", %tenant_id))
.await?;
mgr::detach_tenant(
conf,
tenant_id,
detach_ignored.unwrap_or(false),
&state.deletion_queue_client,
)
.instrument(info_span!("tenant_detach", %tenant_id))
.await?;
json_response(StatusCode::OK, ())
}
@@ -1034,7 +1039,7 @@ async fn put_tenant_location_config_handler(
// The `Detached` state is special, it doesn't upsert a tenant, it removes
// its local disk content and drops it from memory.
if let LocationConfigMode::Detached = request_data.config.mode {
mgr::detach_tenant(conf, tenant_id, true)
mgr::detach_tenant(conf, tenant_id, true, &state.deletion_queue_client)
.instrument(info_span!("tenant_detach", %tenant_id))
.await?;
return json_response(StatusCode::OK, ());

View File

@@ -45,6 +45,7 @@ use std::sync::{Mutex, RwLock};
use std::time::{Duration, Instant};
use self::config::AttachedLocationConfig;
use self::config::AttachmentMode;
use self::config::LocationConf;
use self::config::TenantConf;
use self::delete::DeleteTenantFlow;
@@ -2076,6 +2077,15 @@ impl Tenant {
}
}
}
pub(crate) fn get_attach_mode(&self) -> AttachmentMode {
self.tenant_conf
.read()
.unwrap()
.location
.attach_mode
.clone()
}
}
/// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),

View File

@@ -24,7 +24,7 @@ use crate::control_plane_client::{
};
use crate::deletion_queue::DeletionQueueClient;
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::{LocationConf, LocationMode, TenantConfOpt};
use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt};
use crate::tenant::delete::DeleteTenantFlow;
use crate::tenant::{
create_tenant_files, AttachedTenantConf, CreateTenantFilesMode, Tenant, TenantState,
@@ -206,8 +206,7 @@ async fn init_load_generations(
if resources.remote_storage.is_some() {
resources
.deletion_queue_client
.recover(generations.clone())
.await?;
.recover(generations.clone())?;
}
Ok(Some(generations))
@@ -695,6 +694,18 @@ pub(crate) async fn upsert_location(
if let Some(tenant) = shutdown_tenant {
let (_guard, progress) = utils::completion::channel();
match tenant.get_attach_mode() {
AttachmentMode::Single | AttachmentMode::Multi => {
// Before we leave our state as the presumed holder of the latest generation,
// flush any outstanding deletions to reduce the risk of leaking objects.
deletion_queue_client.flush_advisory()
}
AttachmentMode::Stale => {
// If we're stale there's not point trying to flush deletions
}
};
info!("Shutting down attached tenant");
match tenant.shutdown(progress, false).await {
Ok(()) => {}
@@ -849,8 +860,16 @@ pub async fn detach_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
detach_ignored: bool,
deletion_queue_client: &DeletionQueueClient,
) -> Result<(), TenantStateError> {
let tmp_path = detach_tenant0(conf, &TENANTS, tenant_id, detach_ignored).await?;
let tmp_path = detach_tenant0(
conf,
&TENANTS,
tenant_id,
detach_ignored,
deletion_queue_client,
)
.await?;
// Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
// After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
let task_tenant_id = None;
@@ -875,6 +894,7 @@ async fn detach_tenant0(
tenants: &tokio::sync::RwLock<TenantsMap>,
tenant_id: TenantId,
detach_ignored: bool,
deletion_queue_client: &DeletionQueueClient,
) -> Result<Utf8PathBuf, TenantStateError> {
let tenant_dir_rename_operation = |tenant_id_to_clean| async move {
let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
@@ -886,6 +906,10 @@ async fn detach_tenant0(
let removal_result =
remove_tenant_from_memory(tenants, tenant_id, tenant_dir_rename_operation(tenant_id)).await;
// Flush pending deletions, so that they have a good chance of passing validation
// before this tenant is potentially re-attached elsewhere.
deletion_queue_client.flush_advisory();
// Ignored tenants are not present in memory and will bail the removal from memory operation.
// Before returning the error, check for ignored tenant removal case — we only need to clean its local files then.
if detach_ignored && matches!(removal_result, Err(TenantStateError::NotFound(_))) {

View File

@@ -47,6 +47,7 @@ enum Payload {
const MAX_RESPONSE_SIZE: usize = 10 * 1024 * 1024; // 10 MiB
const MAX_REQUEST_SIZE: u64 = 10 * 1024 * 1024; // 10 MiB
const HTTP_CONNECTION_TIMEOUT: tokio::time::Duration = tokio::time::Duration::from_secs(15);
static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode");
@@ -189,27 +190,44 @@ pub async fn handle(
conn_pool: Arc<GlobalConnPool>,
session_id: uuid::Uuid,
) -> Result<Response<Body>, ApiError> {
let result = handle_inner(request, sni_hostname, conn_pool, session_id).await;
let result = tokio::time::timeout(
HTTP_CONNECTION_TIMEOUT,
handle_inner(request, sni_hostname, conn_pool, session_id),
)
.await;
let mut response = match result {
Ok(r) => r,
Err(e) => {
let message = format!("{:?}", e);
let code = match e.downcast_ref::<tokio_postgres::Error>() {
Some(e) => match e.code() {
Some(e) => serde_json::to_value(e.code()).unwrap(),
Ok(r) => match r {
Ok(r) => r,
Err(e) => {
let message = format!("{:?}", e);
let code = e.downcast_ref::<tokio_postgres::Error>().and_then(|e| {
e.code()
.map(|s| serde_json::to_value(s.code()).unwrap_or_default())
});
let code = match code {
Some(c) => c,
None => Value::Null,
},
None => Value::Null,
};
error!(
?code,
"sql-over-http per-client task finished with an error: {e:#}"
};
error!(
?code,
"sql-over-http per-client task finished with an error: {e:#}"
);
// TODO: this shouldn't always be bad request.
json_response(
StatusCode::BAD_REQUEST,
json!({ "message": message, "code": code }),
)?
}
},
Err(_) => {
let message = format!(
"HTTP-Connection timed out, execution time exeeded {} seconds",
HTTP_CONNECTION_TIMEOUT.as_secs()
);
// TODO: this shouldn't always be bad request.
error!(message);
json_response(
StatusCode::BAD_REQUEST,
json!({ "message": message, "code": code }),
StatusCode::GATEWAY_TIMEOUT,
json!({ "message": message, "code": StatusCode::GATEWAY_TIMEOUT.as_u16() }),
)?
}
};

View File

@@ -7,6 +7,7 @@ use crate::{
compute::{self, PostgresConnection},
config::{ProxyConfig, TlsConfig},
console::{self, errors::WakeComputeError, messages::MetricsAuxInfo, Api},
http::StatusCode,
metrics::{Ids, USAGE_METRICS},
protocol2::WithClientIp,
stream::{PqStream, Stream},
@@ -75,6 +76,15 @@ static NUM_CONNECTION_FAILURES: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});
static NUM_WAKEUP_FAILURES: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_connection_failures_breakdown",
"Number of wake-up failures (per kind).",
&["retry", "kind"],
)
.unwrap()
});
static NUM_BYTES_PROXIED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_io_bytes_per_client",
@@ -397,6 +407,46 @@ impl ConnectMechanism for TcpMechanism<'_> {
}
}
const fn bool_to_str(x: bool) -> &'static str {
if x {
"true"
} else {
"false"
}
}
fn report_error(e: &WakeComputeError, retry: bool) {
use crate::console::errors::ApiError;
let retry = bool_to_str(retry);
let kind = match e {
WakeComputeError::BadComputeAddress(_) => "bad_compute_address",
WakeComputeError::ApiError(ApiError::Transport(_)) => "api_transport_error",
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::LOCKED,
ref text,
}) if text.contains("written data quota exceeded")
|| text.contains("the limit for current plan reached") =>
{
"quota_exceeded"
}
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::LOCKED,
..
}) => "api_console_locked",
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::BAD_REQUEST,
..
}) => "api_console_bad_request",
WakeComputeError::ApiError(ApiError::Console { status, .. })
if status.is_server_error() =>
{
"api_console_other_server_error"
}
WakeComputeError::ApiError(ApiError::Console { .. }) => "api_console_other_error",
};
NUM_WAKEUP_FAILURES.with_label_values(&[retry, kind]).inc();
}
/// Try to connect to the compute node, retrying if necessary.
/// This function might update `node_info`, so we take it by `&mut`.
#[tracing::instrument(skip_all)]
@@ -440,10 +490,12 @@ where
match handle_try_wake(wake_res, num_retries) {
Err(e) => {
error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node");
report_error(&e, false);
return Err(e.into());
}
// failed to wake up but we can continue to retry
Ok(ControlFlow::Continue(e)) => {
report_error(&e, true);
warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node");
}
// successfully woke up a compute node and can break the wakeup loop

View File

@@ -33,13 +33,9 @@ reqwest = { workspace = true, default-features = false, features = ["rustls-tls"
aws-config = { workspace = true, default-features = false, features = ["rustls", "credentials-sso"] }
pageserver = { path = "../pageserver" }
remote_storage = { path = "../libs/remote_storage" }
tracing.workspace = true
tracing-subscriber = { version = "0.3.17", features = ["ansi"] }
tracing-subscriber.workspace = true
clap.workspace = true
tracing-appender = "0.2"
histogram = "0.7"
humantime.workspace = true
camino.workspace = true
aws-smithy-types = "0.56.1"

View File

@@ -4,6 +4,7 @@ pub mod delete_batch_producer;
pub mod metadata_stream;
mod s3_deletion;
pub mod scan_metadata;
use std::env;
use std::fmt::Display;
use std::time::Duration;

View File

@@ -16,8 +16,6 @@ use tracing::{info, warn};
use clap::{Parser, Subcommand, ValueEnum};
mod restore_tenant_from_object_versioning;
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
#[command(arg_required_else_help(true))]
@@ -61,9 +59,6 @@ enum Command {
skip_validation: bool,
},
ScanMetadata {},
RestoreTenantFromObjectVersioningMostRecentIndexPart(
restore_tenant_from_object_versioning::Command,
),
}
async fn tidy(
@@ -252,14 +247,5 @@ async fn main() -> anyhow::Result<()> {
}
}
},
Command::RestoreTenantFromObjectVersioningMostRecentIndexPart(arg) => {
match restore_tenant_from_object_versioning::doit(arg).await {
Err(e) => {
tracing::error!("Failed: {e:?}");
Err(e)
}
Ok(()) => Ok(()),
}
}
}
}

View File

@@ -1,513 +0,0 @@
//! Restore pageserver state from S3 object versioning.
//!
//! This sub-cmmand allows restoring a tenant's pageserver S3 state from S3 object versioning.
//!
//! # Instructions
//!
//! - Run command
//! ```
//! SSO_ACCOUNT_ID=... REGION=... \
//! BUCKET=neon-{prod,staging}-storage-... \
//! cargo run -p s3_scrubber \
//! restore-tenant-from-object-versioning-most-recent-index-part \
//! TENANT_TO_RESTORE \
//! ./restore
//! timeline-list TIMELINE_TO_RESTORE TIMELINE_TO_RESTORE ...
//! ```
//! - `./restore` now contains the timeline state referenced by the latest `index_part.json`s of the
//! specified timelines in the `timeline-list`` argument
//! - Use `cargo neon` to start a pageserver
//! - `rm -rf .neon`
//! - `cargo neon init`
//! - `sed -i 's/\(.*control_plane_api.*\)/#\1/' .neon/config`
//! - `sed -i 's/\(.*control_plane_api.*\)/#\1/' .neon/pageserver_1/pageserver.toml`
//! - configure the pageserver remote storage config to point to the restore directory.
//! Use your text editor to edit the TOML file: `.neon/pageserver_1/pageserver.toml`.
//! ```
//! [remote_storage]
//! local_path = "/path/to/restore/pageserver/v1"
//! ````
//! - `cargo neon start`
//! - make sure attaching the tenant works
//! - `curl -X POST localhost:9898/v1/tenant/TENANT_TO_RESTORE/attach`
//! - check `curl -X GET localhost:9898/v1/tenant/TENANT_TO_RESTORE | jq`
//! - for each timeline $timeline_id to restore:
//! - `cargo neon mappings map --branch-name restore-$timeline_id --tenant-id TENANT_TO_RESTORE --timeline-id $timeline_id`
//! - `cargo neon endpoint create --tenant-id TENANT_TO_RESTORE --branch-name restore-$timeline_id ep-restore-$timeline_id`
//! - `cargo neon endpoint start --tenant-id TENANT_TO_RESTORE ep-restore-$timeline_id`
//! - it prints a connection string, looking like `postgresql://cloud_admin@127.0.0.1:PORT/DB`
//! - dump database contents using postgres tools
//! - determine PG version `$restore_pg_version` using
//! ```
//! curl -s -X GET localhost:9898/v1/tenant/TENANT_TO_RESTORE/timeline/$timeline_id | jq .pg_version
//! ```
//! - pg_dumpall
//! ```
//! ./pg_install/$restore_pg_version/bin/pg_dumpall -d THE_CONNECTION_STRING/postgres > ./restore/pg_dumpall.out
//! ```
//! - pg_dump a specific database
//! ```
//! ./pg_install/v15/bin/pg_dump -d 'THE_CONNECTION_STRING/THEDBTODUMP' > ./restore/pg_dump_THEDBTODUMP.out
//! ```
//! - `cargo neon endpoint stop --tenant-id TENANT_TO_RESTORE restore-$timeline_id`
//!
//! - Use the pg_dump files to restore the database into a new Neon project.
//!
//! # Limitations & Future Work
//!
//! Just restoring Pageserver S3 state restores a consistent state at an LSN that is NOT THE LAST COMMIT LSN.
//! The reason is that Pageserver uploads layers to S3 with implementation-specific delays that are optimized for day-to-day operation.
//!
//! If we still had the Safekeeper WAL, we could restore the Safekeeper S3 state in a similar way.
//! In that case, we wouldn't need the `pg_dump` step.
//! We would simply attach the tenant to safekeepers and pageservers.
//! When attaching to Safekeeper, we would need to tell it that PS remote_consistent_lsn is the restore-point-LSN,
//! i.e., the LSNs in the restored index_part.json's in Pageserver S3 state.
//! Pageserver attach would pick up the restored state from S3, and the Safekeeper & Pageserver would
//! resume normal operation as if the clock had been wound back to restore-point-LSN.
use std::{
collections::{HashMap, HashSet},
num::{NonZeroU32, NonZeroUsize},
sync::Arc,
};
use anyhow::Context;
use aws_sdk_s3::operation::list_object_versions::ListObjectVersionsOutput;
use aws_types::region::Region;
use camino::Utf8PathBuf;
use pageserver::tenant::{
remote_timeline_client::index::LayerFileMetadata, TENANTS_SEGMENT_NAME,
TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
};
use remote_storage::{GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind};
use s3_scrubber::{init_logging, init_s3_client, BucketConfig};
use tokio::io::AsyncReadExt;
use tracing::{debug, info, info_span, Instrument};
use utils::id::{TenantId, TimelineId};
#[derive(Debug, Clone, clap::Subcommand)]
enum ResurrectTimelines {
TimelineList { timeline_ids: Vec<TimelineId> },
// AllTimelinesDeletedAfter { timestamp: humantime::Timestamp },
}
#[derive(clap::Args)]
pub(crate) struct Command {
tenant_id: TenantId,
dest_dir: Utf8PathBuf,
#[clap(short, long)]
dry_run: bool,
#[clap(subcommand)]
timelines: ResurrectTimelines,
}
pub(crate) async fn doit(args: Command) -> anyhow::Result<()> {
let _logging_guard = {
let log_prefix = format!("restore_tenant_from_object_versioning_{}", args.tenant_id);
let dry_suffix = if args.dry_run { "__dry" } else { "" };
let file_name = {
format!(
"{}_{}{}.log",
log_prefix,
chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S"),
dry_suffix,
)
};
init_logging(&file_name)
};
let restore_dst = if tokio::fs::try_exists(&args.dest_dir).await? {
anyhow::bail!("destination directory already exists: {}", args.dest_dir,);
} else {
GenericRemoteStorage::from_config(&RemoteStorageConfig {
max_concurrent_syncs: NonZeroUsize::new(100).unwrap(),
max_sync_errors: NonZeroU32::new(1).unwrap(), // ???? would want so specify 0
storage: RemoteStorageKind::LocalFs(args.dest_dir.clone()),
})
.context("instantiate restore destination")?
};
let bucket_config = BucketConfig::from_env()?;
let bucket_region = Region::new(bucket_config.region);
let delimiter = "/".to_string();
let s3_client = Arc::new(init_s3_client(bucket_config.sso_account_id, bucket_region));
let tenant_root = [
"pageserver",
"v1",
TENANTS_SEGMENT_NAME,
&args.tenant_id.to_string(),
]
.join(&delimiter);
let tenant_delete_marker = [&tenant_root, TENANT_DELETED_MARKER_FILE_NAME].join(&delimiter);
// - Ensure the prefix is empty when ignoring existence of versions.
// - Ensure the tenant delete marker key is part of `DeleteMarkers`. If it isn't, the tenant hasn't finished deletion yet and we should let pageservers complete it first.
// - Restore each index_part.json based on the version in DeleteMarkers as well as the layers it references. For the layers, also use the version in DeleteMarkers and ensure it is the latest.
// - Remove the deleted_at mark for the specified timelines.
//
// Notes:
// - The restore will happen in-place because it's hard to change tenant/timeline ids.
// - The restore could be interrupted mid-way.
// - Hence, separate plan-making and plan-execution.
async {
info!("send request");
let res = s3_client
.list_objects_v2()
.bucket(&bucket_config.bucket)
.prefix(&tenant_root)
.send()
.await?;
info!(response=?res, "got response");
if res.key_count() > 0 {
anyhow::bail!("tenant prefix is not empty in S3");
}
if res.is_truncated() {
unimplemented!("can this even happen")
}
Ok(())
}
.instrument(info_span!("ensure prefix empty"))
.await
.context("ensure prefix is empty")?;
async {
info!("send request");
let res = s3_client
.list_object_versions()
.bucket(&bucket_config.bucket)
.prefix(&tenant_delete_marker)
.send()
.await?;
debug!(response=?res, "got response");
if res.is_truncated() {
unimplemented!("can this even happen")
}
let markers = res
.delete_markers()
.context("expected delete marker in response")?;
if markers.len() != 1 {
anyhow::bail!("expected exactly one delete marker because we create and delete the marker exactly once, got {}", markers.len());
}
if !markers[0].is_latest() {
anyhow::bail!("expected delete marker to have IsLatest set: {:?}", markers[0]);
}
Ok(())
}
.instrument(info_span!(
"ensure tenant delete marker exists in DeleteMarkers",
tenant_delete_marker,
))
.await
.context("ensure tenant delete marker exists in DeleteMarkers")?;
let timelines = match args.timelines {
ResurrectTimelines::TimelineList { timeline_ids } => timeline_ids,
};
// Fetch all the information we need to execute the restore.
let version_responses_by_timeline = async {
let mut out: HashMap<TimelineId, Vec<Arc<ListObjectVersionsOutput>>> = Default::default();
for tl in &timelines {
async {
let timeline_prefix = [tenant_root.as_str(), TIMELINES_SEGMENT_NAME , &tl.to_string()].join(&delimiter);
let mut next_key_marker = None;
let mut next_version_id_marker = None;
loop {
info!("sending request");
let res: ListObjectVersionsOutput = s3_client.list_object_versions()
.bucket(&bucket_config.bucket)
.prefix(&timeline_prefix)
.set_key_marker(next_key_marker.take())
.set_version_id_marker(next_version_id_marker.take())
.send()
.await?;
let res = Arc::new(res);
out.entry(*tl).or_default().push(Arc::clone(&res));
info!("got response");
match res.versions() {
Some(versions) => {
for version in versions {
info!("version: {:?}", version);
}
}
None => {
info!("no versions");
}
}
match res.delete_markers() {
Some(markers) => {
for marker in markers {
info!("delete marker: {:?}", marker);
}
}
None => {
info!("no delete markers");
}
}
if !res.is_truncated() {
break;
}
next_key_marker = res.next_key_marker().map(|s| s.to_string());
next_version_id_marker = res.next_version_id_marker().map(|s| s.to_string());
if let (None, None) = (&next_key_marker, &next_version_id_marker) {
anyhow::bail!("s3 returned is_truncated=true but neither next_key_marker nor next_version_id_marker are set");
}
}
Ok(())
}.instrument(info_span!("timeline", timeline_id=%tl)).await?;
}
anyhow::Ok(out)
}.instrument(info_span!("list all object versions and delete markers")).await?;
#[derive(Debug)]
struct LatestVersion {
key: String,
last_modified: aws_smithy_types::DateTime,
version_id: String,
}
let find_latest_version_based_on_delete_marker_last_modified = |tl: &TimelineId, key: &str| {
let restore_version_delete_marker = {
let mut candidates = Vec::new();
for res in &version_responses_by_timeline[tl] {
let Some(markers) = res.delete_markers() else {
continue;
};
for marker in markers {
if !marker.is_latest() {
continue;
}
if marker.key().unwrap() != key {
continue;
}
candidates.push(LatestVersion {
key: marker.key().unwrap().to_owned(),
last_modified: marker.last_modified().unwrap().clone(),
version_id: marker.version_id().unwrap().to_owned(),
});
}
}
info!(?candidates, "marker candidates");
if candidates.len() != 1 {
anyhow::bail!("expected exactly one IsLatest, got {}", candidates.len());
}
candidates.pop().unwrap()
};
info!(?restore_version_delete_marker, "found marker");
// There's no way to get the latest version from the delete marker.
// But, we observe (can't find written guarantee) that the Delete Marker's "Last Modified" is >= the latest version.
// So, find latest version based on that.
let restore_version = {
let mut candidates = Vec::new();
for res in &version_responses_by_timeline[tl] {
let Some(versions) = res.versions() else {
continue;
};
for version in versions {
if version.key().unwrap() != restore_version_delete_marker.key {
continue;
}
candidates.push(LatestVersion {
key: version.key().unwrap().to_owned(),
last_modified: version.last_modified().unwrap().clone(),
version_id: version.version_id().unwrap().to_owned(),
});
}
}
candidates.sort_by_key(|v| v.last_modified.clone());
info!(?candidates, "version candidates");
if candidates.is_empty() {
anyhow::bail!(
"expected at least one version matching the delete marker's key, got none"
);
}
{
let mut uniq = HashSet::new();
for v in &candidates {
if !uniq.insert(v.last_modified.clone()) {
anyhow::bail!("last_modified timestamps are not unique, don't know which version to pick");
}
}
}
candidates.pop().unwrap() // we sorted ascending, so, pop() is the latest
};
anyhow::Ok(restore_version)
};
let latest_index_part_versions: HashMap<TimelineId, LatestVersion> = {
let span = info_span!("find index part version");
let _enter = span.enter();
// The latest index part for a deleted tenant is always a DeletedMarker
let mut out = HashMap::new();
for tl in &timelines {
let span = info_span!("timeline", timeline_id=%tl);
let _enter = span.enter();
let restore_version = find_latest_version_based_on_delete_marker_last_modified(
tl,
// TODO: support generation numbers
&[
&tenant_root,
TIMELINES_SEGMENT_NAME,
&tl.to_string(),
pageserver::tenant::IndexPart::FILE_NAME,
]
.join(&delimiter),
)?;
out.insert(*tl, restore_version);
}
out
};
let index_part_contents: HashMap<TimelineId, pageserver::tenant::IndexPart> = async {
let mut out = HashMap::new();
for tl in &timelines {
async {
let v = &latest_index_part_versions[tl];
let mut body_buf = Vec::new();
info!("send request");
let res = s3_client
.get_object()
.bucket(&bucket_config.bucket)
.key(&v.key)
.version_id(&v.version_id)
.send()
.await?;
info!(?res, "got response header");
res.body
.into_async_read()
.read_to_end(&mut body_buf)
.await?;
let body_buf = String::from_utf8(body_buf)?;
info!(body_buf, "received response body");
let mut index_part: pageserver::tenant::IndexPart =
serde_json::from_str(&body_buf)?;
info!(?index_part, "parsed index part");
let deleted_at = index_part.deleted_at.take();
info!(
?deleted_at,
"removing deleted_at field from index part, previous value logged here"
);
let updated_buf = serde_json::to_vec(&index_part)?;
let updated_buf_len = updated_buf.len();
info!("uploading modified index part to restore_dst");
restore_dst
.upload(
std::io::Cursor::new(updated_buf),
updated_buf_len,
&RemotePath::from_string(&v.key).unwrap(),
None,
)
.await
.context("upload modified index part to restore_dst")?;
out.insert(*tl, index_part);
anyhow::Ok(())
}
.instrument(info_span!("timeline", timeline_id=%tl))
.await?;
}
anyhow::Ok(out)
}
.instrument(info_span!("get index part contents"))
.await
.context("get index part contents")?;
async {
for (tl, index_part) in &index_part_contents {
async {
for (layer_file_name, layer_md) in &index_part.layer_metadata {
let layer_md: LayerFileMetadata = layer_md.into();
async {
// TODO: support generations
let layer_file_key = [
&tenant_root,
TIMELINES_SEGMENT_NAME,
&tl.to_string(),
&layer_file_name.file_name(),
]
.join(&delimiter);
// The latest index parts naturally reference the latest layers.
// So, a deleted tenant's latest layers are the ones in DeleteMarkers.
//
// If we want to support restoring from not-latest index part, this will require more work.
// The idea is to
// 1. every index_part.json that we upload contains a strongly monotonically increasing sequence number
// 2. every image layer that we upload is S3-metadata-tagged with the sequence number of the IndexPart
// in which it first appeared.
// This allows to recover the correct layer object version, even if we have a bug that overwrites layers.
let restore_version =
find_latest_version_based_on_delete_marker_last_modified(
tl,
&layer_file_key,
)?;
// TODO: teach RemoteStorage copy operation so we can use s3_client.copy_object()
async {
let res = s3_client
.get_object()
.bucket(&bucket_config.bucket)
.key(&restore_version.key)
.version_id(&restore_version.version_id)
.send()
.await
.context("get object header")?;
// TODO: instead of file_size(), do actual data integrity checking.
restore_dst
.upload(
res.body.into_async_read(),
layer_md.file_size().try_into().unwrap(),
&RemotePath::from_string(&restore_version.key).unwrap(),
None,
)
.await
.context("download-body-and-upload")?;
anyhow::Ok(())
}
.instrument(info_span!("copy", layer_file_name=%layer_file_name))
.await?;
anyhow::Ok(())
}
.instrument(info_span!("layer", layer_file_name=%layer_file_name))
.await?;
}
anyhow::Ok(())
}
.instrument(info_span!("timeline", timeline_id=%tl))
.await?;
}
anyhow::Ok(())
}
.instrument(info_span!("download layer files into restore_dst"))
.await
.context("download layers into restore dst")?;
Ok(())
}