Compare commits

..

3 Commits

Author SHA1 Message Date
Alex Chi Z
2025f8763f fix scrubber handle deleted timelines
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-07-30 16:36:12 -04:00
Alex Chi Z
e635891dbb maybe fix test cases?
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-07-30 15:39:03 -04:00
Alex Chi Z
59bb30669c fix(pageserver): do not delete index_part.json during timeline deletion
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-07-30 15:39:03 -04:00
27 changed files with 160 additions and 1098 deletions

View File

@@ -32,12 +32,8 @@ use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::time::{Duration, Instant};
use std::{env, fs};
use tokio::{spawn, sync::watch, task::JoinHandle, time};
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, error, info, instrument, warn};
use url::Url;
use utils::backoff::{
DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff_duration,
};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::measured_stream::MeasuredReader;
@@ -196,7 +192,6 @@ pub struct ComputeState {
pub startup_span: Option<tracing::span::Span>,
pub lfc_prewarm_state: LfcPrewarmState,
pub lfc_prewarm_token: CancellationToken,
pub lfc_offload_state: LfcOffloadState,
/// WAL flush LSN that is set after terminating Postgres and syncing safekeepers if
@@ -222,7 +217,6 @@ impl ComputeState {
lfc_offload_state: LfcOffloadState::default(),
terminate_flush_lsn: None,
promote_state: None,
lfc_prewarm_token: CancellationToken::new(),
}
}
@@ -1560,41 +1554,6 @@ impl ComputeNode {
Ok(lsn)
}
fn sync_safekeepers_with_retries(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
let max_retries = 5;
let mut attempts = 0;
loop {
let result = self.sync_safekeepers(storage_auth_token.clone());
match &result {
Ok(_) => {
if attempts > 0 {
tracing::info!("sync_safekeepers succeeded after {attempts} retries");
}
return result;
}
Err(e) if attempts < max_retries => {
tracing::info!(
"sync_safekeepers failed, will retry (attempt {attempts}): {e:#}"
);
}
Err(err) => {
tracing::warn!(
"sync_safekeepers still failed after {attempts} retries, giving up: {err:?}"
);
return result;
}
}
// sleep and retry
let backoff = exponential_backoff_duration(
attempts,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
);
std::thread::sleep(backoff);
attempts += 1;
}
}
/// Do all the preparations like PGDATA directory creation, configuration,
/// safekeepers sync, basebackup, etc.
#[instrument(skip_all)]
@@ -1630,7 +1589,7 @@ impl ComputeNode {
lsn
} else {
info!("starting safekeepers syncing");
self.sync_safekeepers_with_retries(pspec.storage_auth_token.clone())
self.sync_safekeepers(pspec.storage_auth_token.clone())
.with_context(|| "failed to sync safekeepers")?
};
info!("safekeepers synced at LSN {}", lsn);

View File

@@ -7,8 +7,7 @@ use http::StatusCode;
use reqwest::Client;
use std::mem::replace;
use std::sync::Arc;
use tokio::{io::AsyncReadExt, select, spawn};
use tokio_util::sync::CancellationToken;
use tokio::{io::AsyncReadExt, spawn};
use tracing::{error, info};
#[derive(serde::Serialize, Default)]
@@ -93,35 +92,34 @@ impl ComputeNode {
/// If there is a prewarm request ongoing, return `false`, `true` otherwise.
/// Has a failpoint "compute-prewarm"
pub fn prewarm_lfc(self: &Arc<Self>, from_endpoint: Option<String>) -> bool {
let token: CancellationToken;
{
let state = &mut self.state.lock().unwrap();
token = state.lfc_prewarm_token.clone();
if let LfcPrewarmState::Prewarming =
replace(&mut state.lfc_prewarm_state, LfcPrewarmState::Prewarming)
{
let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
if let LfcPrewarmState::Prewarming = replace(state, LfcPrewarmState::Prewarming) {
return false;
}
}
crate::metrics::LFC_PREWARMS.inc();
let this = self.clone();
let cloned = self.clone();
spawn(async move {
let prewarm_state = match this.prewarm_impl(from_endpoint, token).await {
Ok(state) => state,
let state = match cloned.prewarm_impl(from_endpoint).await {
Ok(true) => LfcPrewarmState::Completed,
Ok(false) => {
info!(
"skipping LFC prewarm because LFC state is not found in endpoint storage"
);
LfcPrewarmState::Skipped
}
Err(err) => {
crate::metrics::LFC_PREWARM_ERRORS.inc();
error!(%err, "could not prewarm LFC");
let error = format!("{err:#}");
LfcPrewarmState::Failed { error }
LfcPrewarmState::Failed {
error: format!("{err:#}"),
}
}
};
let state = &mut this.state.lock().unwrap();
if let LfcPrewarmState::Cancelled = prewarm_state {
state.lfc_prewarm_token = CancellationToken::new();
}
state.lfc_prewarm_state = prewarm_state;
cloned.state.lock().unwrap().lfc_prewarm_state = state;
});
true
}
@@ -134,70 +132,47 @@ impl ComputeNode {
/// Request LFC state from endpoint storage and load corresponding pages into Postgres.
/// Returns a result with `false` if the LFC state is not found in endpoint storage.
async fn prewarm_impl(
&self,
from_endpoint: Option<String>,
token: CancellationToken,
) -> Result<LfcPrewarmState> {
let EndpointStoragePair {
url,
token: storage_token,
} = self.endpoint_storage_pair(from_endpoint)?;
async fn prewarm_impl(&self, from_endpoint: Option<String>) -> Result<bool> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(from_endpoint)?;
#[cfg(feature = "testing")]
fail::fail_point!("compute-prewarm", |_| bail!("compute-prewarm failpoint"));
fail::fail_point!("compute-prewarm", |_| {
bail!("prewarm configured to fail because of a failpoint")
});
info!(%url, "requesting LFC state from endpoint storage");
let request = Client::new().get(&url).bearer_auth(storage_token);
let response = select! {
_ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
response = request.send() => response
}
.context("querying endpoint storage")?;
match response.status() {
let request = Client::new().get(&url).bearer_auth(token);
let res = request.send().await.context("querying endpoint storage")?;
match res.status() {
StatusCode::OK => (),
StatusCode::NOT_FOUND => return Ok(LfcPrewarmState::Skipped),
StatusCode::NOT_FOUND => {
return Ok(false);
}
status => bail!("{status} querying endpoint storage"),
}
let mut uncompressed = Vec::new();
let lfc_state = select! {
_ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
lfc_state = response.bytes() => lfc_state
}
.context("getting request body from endpoint storage")?;
let mut decoder = ZstdDecoder::new(lfc_state.iter().as_slice());
select! {
_ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
read = decoder.read_to_end(&mut uncompressed) => read
}
.context("decoding LFC state")?;
let uncompressed_len = uncompressed.len();
info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}");
// Client connection and prewarm info querying are fast and therefore don't need
// cancellation
let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
let lfc_state = res
.bytes()
.await
.context("connecting to postgres")?;
let pg_token = client.cancel_token();
.context("getting request body from endpoint storage")?;
ZstdDecoder::new(lfc_state.iter().as_slice())
.read_to_end(&mut uncompressed)
.await
.context("decoding LFC state")?;
let uncompressed_len = uncompressed.len();
let params: Vec<&(dyn postgres_types::ToSql + Sync)> = vec![&uncompressed];
select! {
res = client.query_one("select neon.prewarm_local_cache($1)", &params) => res,
_ = token.cancelled() => {
pg_token.cancel_query(postgres::NoTls).await
.context("cancelling neon.prewarm_local_cache()")?;
return Ok(LfcPrewarmState::Cancelled)
}
}
.context("loading LFC state into postgres")
.map(|_| ())?;
info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}, loading into Postgres");
Ok(LfcPrewarmState::Completed)
ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?
.query_one("select neon.prewarm_local_cache($1)", &[&uncompressed])
.await
.context("loading LFC state into postgres")
.map(|_| ())?;
Ok(true)
}
/// If offload request is ongoing, return false, true otherwise
@@ -225,20 +200,20 @@ impl ComputeNode {
async fn offload_lfc_with_state_update(&self) {
crate::metrics::LFC_OFFLOADS.inc();
let state = match self.offload_lfc_impl().await {
Ok(state) => state,
Err(err) => {
crate::metrics::LFC_OFFLOAD_ERRORS.inc();
error!(%err, "could not offload LFC");
let error = format!("{err:#}");
LfcOffloadState::Failed { error }
}
let Err(err) = self.offload_lfc_impl().await else {
self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Completed;
return;
};
self.state.lock().unwrap().lfc_offload_state = state;
crate::metrics::LFC_OFFLOAD_ERRORS.inc();
error!(%err, "could not offload LFC state to endpoint storage");
self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed {
error: format!("{err:#}"),
};
}
async fn offload_lfc_impl(&self) -> Result<LfcOffloadState> {
async fn offload_lfc_impl(&self) -> Result<()> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
info!(%url, "requesting LFC state from Postgres");
@@ -253,7 +228,7 @@ impl ComputeNode {
.context("deserializing LFC state")?;
let Some(state) = state else {
info!(%url, "empty LFC state, not exporting");
return Ok(LfcOffloadState::Skipped);
return Ok(());
};
let mut compressed = Vec::new();
@@ -267,7 +242,7 @@ impl ComputeNode {
let request = Client::new().put(url).bearer_auth(token).body(compressed);
match request.send().await {
Ok(res) if res.status() == StatusCode::OK => Ok(LfcOffloadState::Completed),
Ok(res) if res.status() == StatusCode::OK => Ok(()),
Ok(res) => bail!(
"Request to endpoint storage failed with status: {}",
res.status()
@@ -275,8 +250,4 @@ impl ComputeNode {
Err(err) => Err(err).context("writing to endpoint storage"),
}
}
pub fn cancel_prewarm(self: &Arc<Self>) {
self.state.lock().unwrap().lfc_prewarm_token.cancel();
}
}

View File

@@ -139,15 +139,6 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/LfcPrewarmState"
delete:
tags:
- Prewarm
summary: Cancel ongoing LFC prewarm
description: ""
operationId: cancelLfcPrewarm
responses:
202:
description: Prewarm cancelled
/lfc/offload:
post:
@@ -645,7 +636,7 @@ components:
properties:
status:
description: LFC offload status
enum: [not_offloaded, offloading, completed, skipped, failed]
enum: [not_offloaded, offloading, completed, failed]
type: string
error:
description: LFC offload error, if any

View File

@@ -46,8 +46,3 @@ pub(in crate::http) async fn offload(compute: Compute) -> Response {
)
}
}
pub(in crate::http) async fn cancel_prewarm(compute: Compute) -> StatusCode {
compute.cancel_prewarm();
StatusCode::ACCEPTED
}

View File

@@ -99,12 +99,7 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
);
let authenticated_router = Router::<Arc<ComputeNode>>::new()
.route(
"/lfc/prewarm",
get(lfc::prewarm_state)
.post(lfc::prewarm)
.delete(lfc::cancel_prewarm),
)
.route("/lfc/prewarm", get(lfc::prewarm_state).post(lfc::prewarm))
.route("/lfc/offload", get(lfc::offload_state).post(lfc::offload))
.route("/promote", post(promote::promote))
.route("/check_writability", post(check_writability::is_writable))

View File

@@ -1,246 +0,0 @@
# Node deletion API improvement
Created on 2025-07-07
Implemented on _TBD_
## Summary
This RFC describes improvements to the storage controller API for gracefully deleting pageserver
nodes.
## Motivation
The basic node deletion API introduced in [#8226](https://github.com/neondatabase/neon/issues/8333)
has several limitations:
- Deleted nodes can re-add themselves if they restart (e.g., a flaky node that keeps restarting and
we cannot reach via SSH to stop the pageserver). This issue has been resolved by tombstone
mechanism in [#12036](https://github.com/neondatabase/neon/issues/12036)
- Process of node deletion is not graceful, i.e. it just imitates a node failure
In this context, "graceful" node deletion means that users do not experience any disruption or
negative effects, provided the system remains in a healthy state (i.e., the remaining pageservers
can handle the workload and all requirements are met). To achieve this, the system must perform
live migration of all tenant shards from the node being deleted while the node is still running
and continue processing all incoming requests. The node is removed only after all tenant shards
have been safely migrated.
Although live migrations can be achieved with the drain functionality, it leads to incorrect shard
placement, such as not matching availability zones. This results in unnecessary work to optimize
the placement that was just recently performed.
If we delete a node before its tenant shards are fully moved, the new node won't have all the
needed data (e.g. heatmaps) ready. This means user requests to the new node will be much slower at
first. If there are many tenant shards, this slowdown affects a huge amount of users.
Graceful node deletion is more complicated and can introduce new issues. It takes longer because
live migration of each tenant shard can last several minutes. Using non-blocking accessors may
also cause deletion to wait if other processes are holding inner state lock. It also gets trickier
because we need to handle other requests, like drain and fill, at the same time.
## Impacted components (e.g. pageserver, safekeeper, console, etc)
- storage controller
- pageserver (indirectly)
## Proposed implementation
### Tombstones
To resolve the problem of deleted nodes re-adding themselves, a tombstone mechanism was introduced
as part of the node stored information. Each node has a separate `NodeLifecycle` field with two
possible states: `Active` and `Deleted`. When node deletion completes, the database row is not
deleted but instead has its `NodeLifecycle` column switched to `Deleted`. Nodes with `Deleted`
lifecycle are treated as if the row is absent for most handlers, with several exceptions: reattach
and register functionality must be aware of tombstones. Additionally, new debug handlers are
available for listing and deleting tombstones via the `/debug/v1/tombstone` path.
### Gracefulness
The problem of making node deletion graceful is complex and involves several challenges:
- **Cancellable**: The operation must be cancellable to allow administrators to abort the process
if needed, e.g. if run by mistake.
- **Non-blocking**: We don't want to block deployment operations like draining/filling on the node
deletion process. We need clear policies for handling concurrent operations: what happens when a
drain/fill request arrives while deletion is in progress, and what happens when a delete request
arrives while drain/fill is in progress.
- **Persistent**: If the storage controller restarts during this long-running operation, we must
preserve progress and automatically resume the deletion process after the storage controller
restarts.
- **Migrated correctly**: We cannot simply use the existing drain mechanism for nodes scheduled
for deletion, as this would move shards to irrelevant locations. The drain process expects the
node to return, so it only moves shards to backup locations, not to their preferred AZs. It also
leaves secondary locations unmoved. This could result in unnecessary load on the storage
controller and inefficient resource utilization.
- **Force option**: Administrators need the ability to force immediate, non-graceful deletion when
time constraints or emergency situations require it, bypassing the normal graceful migration
process.
See below for a detailed breakdown of the proposed changes and mechanisms.
#### Node lifecycle
New `NodeLifecycle` enum and a matching database field with these values:
- `Active`: The normal state. All operations are allowed.
- `ScheduledForDeletion`: The node is marked to be deleted soon. Deletion may be in progress or
will happen later, but the node will eventually be removed. All operations are allowed.
- `Deleted`: The node is fully deleted. No operations are allowed, and the node cannot be brought
back. The only action left is to remove its record from the database. Any attempt to register a
node in this state will fail.
This state persists across storage controller restarts.
**State transition**
```
+--------------------+
+---| Active |<---------------------+
| +--------------------+ |
| ^ |
| start_node_delete | cancel_node_delete |
v | |
+----------------------------------+ |
| ScheduledForDeletion | |
+----------------------------------+ |
| |
| node_register |
| |
| delete_node (at the finish) |
| |
v |
+---------+ tombstone_delete +----------+
| Deleted |-------------------------------->| no row |
+---------+ +----------+
```
#### NodeSchedulingPolicy::Deleting
A `Deleting` variant to the `NodeSchedulingPolicy` enum. This means the deletion function is
running for the node right now. Only one node can have the `Deleting` policy at a time.
The `NodeSchedulingPolicy::Deleting` state is persisted in the database. However, after a storage
controller restart, any node previously marked as `Deleting` will have its scheduling policy reset
to `Pause`. The policy will only transition back to `Deleting` when the deletion operation is
actively started again, as triggered by the node's `NodeLifecycle::ScheduledForDeletion` state.
`NodeSchedulingPolicy` transition details:
1. When `node_delete` begins, set the policy to `NodeSchedulingPolicy::Deleting`.
2. If `node_delete` is cancelled (for example, due to a concurrent drain operation), revert the
policy to its previous value. The policy is persisted in storcon DB.
3. After `node_delete` completes, the final value of the scheduling policy is irrelevant, since
`NodeLifecycle::Deleted` prevents any further access to this field.
The deletion process cannot be initiated for nodes currently undergoing deployment-related
operations (`Draining`, `Filling`, or `PauseForRestart` policies). Deletion will only be triggered
once the node transitions to either the `Active` or `Pause` state.
#### OperationTracker
A replacement for `Option<OperationHandler> ongoing_operation`, the `OperationTracker` is a
dedicated service state object responsible for managing all long-running node operations (drain,
fill, delete) with robust concurrency control.
Key responsibilities:
- Orchestrates the execution of operations
- Supports cancellation of currently running operations
- Enforces operation constraints, e.g. allowing only single drain/fill operation at a time
- Persists deletion state, enabling recovery of pending deletions across restarts
- Ensures thread safety across concurrent requests
#### Attached tenant shard processing
When deleting a node, handle each attached tenant shard as follows:
1. Pick the best node to become the new attached (the candidate).
2. If the candidate already has this shard as a secondary:
- Create a new secondary for the shard on another suitable node.
Otherwise:
- Create a secondary for the shard on the candidate node.
3. Wait until all secondaries are ready and pre-warmed.
4. Promote the candidate's secondary to attached.
5. Remove the secondary from the node being deleted.
This process safely moves all attached shards before deleting the node.
#### Secondary tenant shard processing
When deleting a node, handle each secondary tenant shard as follows:
1. Choose the best node to become the new secondary.
2. Create a secondary for the shard on that node.
3. Wait until the new secondary is ready.
4. Remove the secondary from the node being deleted.
This ensures all secondary shards are safely moved before deleting the node.
### Reliability, failure modes and corner cases
In case of a storage controller failure and following restart, the system behavior depends on the
`NodeLifecycle` state:
- If `NodeLifecycle` is `Active`: No action is taken for this node.
- If `NodeLifecycle` is `Deleted`: The node will not be re-added.
- If `NodeLifecycle` is `ScheduledForDeletion`: A deletion background task will be launched for
this node.
In case of a pageserver node failure during deletion, the behavior depends on the `force` flag:
- If `force` is set: The node deletion will proceed regardless of the node's availability.
- If `force` is not set: The deletion will be retried a limited number of times. If the node
remains unavailable, the deletion process will pause and automatically resume when the node
becomes healthy again.
### Operations concurrency
The following sections describe the behavior when different types of requests arrive at the storage
controller and how they interact with ongoing operations.
#### Delete request
Handler: `PUT /control/v1/node/:node_id/delete`
1. If node lifecycle is `NodeLifecycle::ScheduledForDeletion`:
- Return `200 OK`: there is already an ongoing deletion request for this node
2. Update & persist lifecycle to `NodeLifecycle::ScheduledForDeletion`
3. Persist current scheduling policy
4. If there is no active operation (drain/fill/delete):
- Run deletion process for this node
#### Cancel delete request
Handler: `DELETE /control/v1/node/:node_id/delete`
1. If node lifecycle is not `NodeLifecycle::ScheduledForDeletion`:
- Return `404 Not Found`: there is no current deletion request for this node
2. If the active operation is deleting this node, cancel it
3. Update & persist lifecycle to `NodeLifecycle::Active`
4. Restore the last scheduling policy from persistence
#### Drain/fill request
1. If there are already ongoing drain/fill processes:
- Return `409 Conflict`: queueing of drain/fill processes is not supported
2. If there is an ongoing delete process:
- Cancel it and wait until it is cancelled
3. Run the drain/fill process
4. After the drain/fill process is cancelled or finished:
- Try to find another candidate to delete and run the deletion process for that node
#### Drain/fill cancel request
1. If the active operation is not the related process:
- Return `400 Bad Request`: cancellation request is incorrect, operations are not the same
2. Cancel the active operation
3. Try to find another candidate to delete and run the deletion process for that node
## Definition of Done
- [x] Fix flaky node scenario and introduce related debug handlers
- [ ] Node deletion intent is persistent - a node will be eventually deleted after a deletion
request regardless of draining/filling requests and restarts
- [ ] Node deletion can be graceful - deletion completes only after moving all tenant shards to
recommended locations
- [ ] Deploying does not break due to long deletions - drain/fill operations override deletion
process and deletion resumes after drain/fill completes
- [ ] `force` flag is implemented and provides fast, failure-tolerant node removal (e.g., when a
pageserver node does not respond)
- [ ] Legacy delete handler code is removed from storage_controller, test_runner, and storcon_cli

View File

@@ -68,15 +68,11 @@ pub enum LfcPrewarmState {
/// We tried to fetch the corresponding LFC state from the endpoint storage,
/// but received `Not Found 404`. This should normally happen only during the
/// first endpoint start after creation with `autoprewarm: true`.
/// This may also happen if LFC is turned off or not initialized
///
/// During the orchestrated prewarm via API, when a caller explicitly
/// provides the LFC state key to prewarm from, it's the caller responsibility
/// to handle this status as an error state in this case.
Skipped,
/// LFC prewarm was cancelled. Some pages in LFC cache may be prewarmed if query
/// has started working before cancellation
Cancelled,
}
impl Display for LfcPrewarmState {
@@ -87,7 +83,6 @@ impl Display for LfcPrewarmState {
LfcPrewarmState::Completed => f.write_str("Completed"),
LfcPrewarmState::Skipped => f.write_str("Skipped"),
LfcPrewarmState::Failed { error } => write!(f, "Error({error})"),
LfcPrewarmState::Cancelled => f.write_str("Cancelled"),
}
}
}
@@ -102,7 +97,6 @@ pub enum LfcOffloadState {
Failed {
error: String,
},
Skipped,
}
#[derive(Serialize, Debug, Clone, PartialEq)]

View File

@@ -1992,11 +1992,25 @@ impl RemoteTimelineClient {
)))?
});
debug!("enqueuing index part deletion");
self.deletion_queue_client
.push_immediate([latest_index].to_vec())
.await
.map_err(|_| DeleteTimelineError::Cancelled)?;
// Skip deleting the index part.json for now. Isolated pageserver will cause attach issues if we
// delete index part here.
//
// - Pageserver 1 attaches the tenant with generation N and creates a timeline A.
// - Pageserver 1 gets isolated from the network. Storcon attaches the tenant to pageserver 2 with generation N+1.
// - Pageserver 2 delete timeline A, now the timeline directory is empty.
// - Pageserver 1 rejoins the network, ingests the new data from safekeeper, and uploads the index_part.json
// with the old generation N.
// - Now we are left with a timeline directory with index_part.json with generation N, but no layers
// except the newly-uploaded one from the isolated pageserver 1.
//
// As a solution, we will keep the tombstone index_part.json (with `deleted_at` set) so that we don't
// run into the issue above.
// debug!("enqueuing index part deletion");
// self.deletion_queue_client
// .push_immediate([latest_index].to_vec())
// .await
// .map_err(|_| DeleteTimelineError::Cancelled)?;
// Timeline deletion is rare and we have probably emitted a reasonably number of objects: wait
// for a flush to a persistent deletion list so that we may be sure deletion will occur.

View File

@@ -458,7 +458,7 @@ pub(crate) enum LocalProxyConnError {
impl ReportableError for HttpConnError {
fn get_error_kind(&self) -> ErrorKind {
match self {
HttpConnError::ConnectError(e) => e.get_error_kind(),
HttpConnError::ConnectError(_) => ErrorKind::Compute,
HttpConnError::ConnectionClosedAbruptly(_) => ErrorKind::Compute,
HttpConnError::PostgresConnectionError(p) => match p.as_db_error() {
// user provided a wrong database name

View File

@@ -612,25 +612,19 @@ pub async fn handle_request(
}
}
let max_term = statuses
.iter()
.map(|(status, _)| status.acceptor_state.term)
.max()
.unwrap();
// Find the most advanced safekeeper
let (status, i) = statuses
.into_iter()
.max_by_key(|(status, _)| {
(
status.acceptor_state.epoch,
status.flush_lsn,
/* BEGIN_HADRON */
// We need to pull from the SK with the highest term.
// This is because another compute may come online and vote the same highest term again on the other two SKs.
// Then, there will be 2 computes running on the same term.
status.acceptor_state.term,
/* END_HADRON */
status.flush_lsn,
status.commit_lsn,
)
})
@@ -640,22 +634,6 @@ pub async fn handle_request(
assert!(status.tenant_id == request.tenant_id);
assert!(status.timeline_id == request.timeline_id);
// TODO(diko): This is hadron only check to make sure that we pull the timeline
// from the safekeeper with the highest term during timeline restore.
// We could avoid returning the error by calling bump_term after pull_timeline.
// However, this is not a big deal because we retry the pull_timeline requests.
// The check should be removed together with removing custom hadron logic for
// safekeeper restore.
if wait_for_peer_timeline_status && status.acceptor_state.term != max_term {
return Err(ApiError::PreconditionFailed(
format!(
"choosen safekeeper {} has term {}, but the most advanced term is {}",
safekeeper_host, status.acceptor_state.term, max_term
)
.into(),
));
}
match pull_timeline(
status,
safekeeper_host,

View File

@@ -195,14 +195,12 @@ impl StateSK {
to: Configuration,
) -> Result<TimelineMembershipSwitchResponse> {
let result = self.state_mut().membership_switch(to).await?;
let flush_lsn = self.flush_lsn();
let last_log_term = self.state().acceptor_state.get_last_log_term(flush_lsn);
Ok(TimelineMembershipSwitchResponse {
previous_conf: result.previous_conf,
current_conf: result.current_conf,
last_log_term,
flush_lsn,
last_log_term: self.state().acceptor_state.term,
flush_lsn: self.flush_lsn(),
})
}

View File

@@ -24,12 +24,12 @@ use pageserver_api::controller_api::{
};
use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo};
use safekeeper_api::PgVersionId;
use safekeeper_api::Term;
use safekeeper_api::membership::{self, MemberSet, SafekeeperGeneration};
use safekeeper_api::models::{
PullTimelineRequest, TimelineLocateResponse, TimelineMembershipSwitchRequest,
TimelineMembershipSwitchResponse,
};
use safekeeper_api::{INITIAL_TERM, Term};
use safekeeper_client::mgmt_api;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
@@ -1298,7 +1298,13 @@ impl Service {
)
.await?;
let sync_position = Self::get_sync_position(&results)?;
let mut sync_position = (INITIAL_TERM, Lsn::INVALID);
for res in results.into_iter().flatten() {
let sk_position = (res.last_log_term, res.flush_lsn);
if sync_position < sk_position {
sync_position = sk_position;
}
}
tracing::info!(
%generation,
@@ -1592,36 +1598,4 @@ impl Service {
Ok(())
}
/// Get membership switch responses from all safekeepers and return the sync position.
///
/// Sync position is a position equal or greater than the commit position.
/// It is guaranteed that all WAL entries with (last_log_term, flush_lsn)
/// greater than the sync position are not committed (= not on a quorum).
///
/// Returns error if there is no quorum of successful responses.
fn get_sync_position(
responses: &[mgmt_api::Result<TimelineMembershipSwitchResponse>],
) -> Result<(Term, Lsn), ApiError> {
let quorum_size = responses.len() / 2 + 1;
let mut wal_positions = responses
.iter()
.flatten()
.map(|res| (res.last_log_term, res.flush_lsn))
.collect::<Vec<_>>();
// Should be already checked if the responses are from tenant_timeline_set_membership_quorum.
if wal_positions.len() < quorum_size {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"not enough successful responses to get sync position: {}/{}",
wal_positions.len(),
quorum_size,
)));
}
wal_positions.sort();
Ok(wal_positions[quorum_size - 1])
}
}

View File

@@ -142,6 +142,12 @@ impl TenantRefAccumulator {
.or_default()
.insert(this_shard_idx);
// TODO: change this to "is X days ago?"
if index_part.deleted_at.is_some() {
tracing::info!(%ttid, "The timeline is already deleted, skipping");
return;
}
let mut ancestor_refs = Vec::new();
for (layer_name, layer_metadata) in &index_part.layer_metadata {
if layer_metadata.shard != this_shard_idx {

View File

@@ -78,26 +78,20 @@ class EndpointHttpClient(requests.Session):
json: dict[str, str] = res.json()
return json
def prewarm_lfc(self, from_endpoint_id: str | None = None) -> dict[str, str]:
def prewarm_lfc(self, from_endpoint_id: str | None = None):
"""
Prewarm LFC cache from given endpoint and wait till it finishes or errors
"""
params = {"from_endpoint": from_endpoint_id} if from_endpoint_id else dict()
self.post(self.prewarm_url, params=params).raise_for_status()
return self.prewarm_lfc_wait()
self.prewarm_lfc_wait()
def cancel_prewarm_lfc(self):
"""
Cancel LFC prewarm if any is ongoing
"""
self.delete(self.prewarm_url).raise_for_status()
def prewarm_lfc_wait(self) -> dict[str, str]:
def prewarm_lfc_wait(self):
"""
Wait till LFC prewarm returns with error or success.
If prewarm was not requested before calling this function, it will error
"""
statuses = "failed", "completed", "skipped", "cancelled"
statuses = "failed", "completed", "skipped"
def prewarmed():
json = self.prewarm_lfc_status()
@@ -107,7 +101,6 @@ class EndpointHttpClient(requests.Session):
wait_until(prewarmed, timeout=60)
res = self.prewarm_lfc_status()
assert res["status"] != "failed", res
return res
def offload_lfc_status(self) -> dict[str, str]:
res = self.get(self.offload_url)
@@ -115,31 +108,29 @@ class EndpointHttpClient(requests.Session):
json: dict[str, str] = res.json()
return json
def offload_lfc(self) -> dict[str, str]:
def offload_lfc(self):
"""
Offload LFC cache to endpoint storage and wait till offload finishes or errors
"""
self.post(self.offload_url).raise_for_status()
return self.offload_lfc_wait()
self.offload_lfc_wait()
def offload_lfc_wait(self) -> dict[str, str]:
def offload_lfc_wait(self):
"""
Wait till LFC offload returns with error or success.
If offload was not requested before calling this function, it will error
"""
statuses = "failed", "completed", "skipped"
def offloaded():
json = self.offload_lfc_status()
status, err = json["status"], json.get("error")
assert status in statuses, f"{status}, {err=}"
assert status in ["failed", "completed"], f"{status}, {err=}"
wait_until(offloaded, timeout=60)
res = self.offload_lfc_status()
assert res["status"] != "failed", res
return res
def promote(self, promote_spec: dict[str, Any], disconnect: bool = False) -> dict[str, str]:
def promote(self, promote_spec: dict[str, Any], disconnect: bool = False):
url = f"http://localhost:{self.external_port}/promote"
if disconnect:
try: # send first request to start promote and disconnect

View File

@@ -79,7 +79,6 @@ class NeonAPI:
elif resp.status_code == 423 and resp.json()["message"] in {
"endpoint is in some transitive state, could not suspend",
"project already has running conflicting operations, scheduling of new ones is prohibited",
"snapshot is in transition",
}:
retry = True
self.retries4xx += 1
@@ -106,7 +105,6 @@ class NeonAPI:
branch_name: str | None = None,
branch_role_name: str | None = None,
branch_database_name: str | None = None,
project_settings: dict[str, Any] | None = None,
) -> dict[str, Any]:
data: dict[str, Any] = {
"project": {
@@ -123,8 +121,6 @@ class NeonAPI:
data["project"]["branch"]["role_name"] = branch_role_name
if branch_database_name:
data["project"]["branch"]["database_name"] = branch_database_name
if project_settings:
data["project"]["settings"] = project_settings
resp = self.__request(
"POST",
@@ -359,63 +355,6 @@ class NeonAPI:
return cast("dict[str, Any]", resp.json())
def create_snapshot(
self,
project_id: str,
branch_id: str,
lsn: str | None = None,
timestamp: str | None = None,
name: str | None = None,
expires_at: str | None = None,
) -> dict[str, Any]:
params: dict[str, Any] = {
"lsn": lsn,
"timestamp": timestamp,
"name": name,
"expires_at": expires_at,
}
params = {key: value for key, value in params.items() if value is not None}
resp = self.__request(
"POST",
f"/projects/{project_id}/branches/{branch_id}/snapshot",
params=params,
json={},
headers={
"Accept": "application/json",
},
)
return cast("dict[str, Any]", resp.json())
def delete_snapshot(self, project_id: str, snapshot_id: str) -> dict[str, Any]:
resp = self.__request("DELETE", f"/projects/{project_id}/snapshots/{snapshot_id}")
return cast("dict[str, Any]", resp.json())
def restore_snapshot(
self,
project_id: str,
snapshot_id: str,
target_branch_id: str,
name: str | None = None,
finalize_restore: bool = True,
) -> dict[str, Any]:
data: dict[str, Any] = {
"target_branch_id": target_branch_id,
"finalize_restore": finalize_restore,
}
if name is not None:
data["name"] = name
log.info("Restore snapshot data: %s", data)
resp = self.__request(
"POST",
f"/projects/{project_id}/snapshots/{snapshot_id}/restore",
json=data,
headers={
"Accept": "application/json",
"Content-Type": "application/json",
},
)
return cast("dict[str, Any]", resp.json())
def delete_endpoint(self, project_id: str, endpoint_id: str) -> dict[str, Any]:
resp = self.__request("DELETE", f"/projects/{project_id}/endpoints/{endpoint_id}")
return cast("dict[str,Any]", resp.json())
@@ -457,14 +396,6 @@ class NeonAPI:
return cast("dict[str, Any]", resp.json())
def get_branch_endpoints(self, project_id: str, branch_id: str) -> dict[str, Any]:
resp = self.__request(
"GET",
f"/projects/{project_id}/branches/{branch_id}/endpoints",
headers={"Accept": "application/json", "Content-Type": "application/json"},
)
return cast("dict[str, Any]", resp.json())
def get_endpoints(self, project_id: str) -> dict[str, Any]:
resp = self.__request(
"GET",

View File

@@ -262,6 +262,7 @@ class PgProtocol:
# pooler does not support statement_timeout
# Check if the hostname contains the string 'pooler'
hostname = result.get("host", "")
log.info(f"Hostname: {hostname}")
options = result.get("options", "")
if "statement_timeout" not in options and "pooler" not in hostname:
options = f"-cstatement_timeout=120s {options}"
@@ -2313,7 +2314,6 @@ class NeonStorageController(MetricsGetter, LogUtils):
timeline_id: TimelineId,
new_sk_set: list[int],
):
log.info(f"migrate_safekeepers({tenant_id}, {timeline_id}, {new_sk_set})")
response = self.request(
"POST",
f"{self.api}/v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate",

View File

@@ -11,7 +11,6 @@ import time
from datetime import UTC, datetime, timedelta
from typing import TYPE_CHECKING, Any
import psycopg2
import pytest
from fixtures.log_helper import log
@@ -23,29 +22,6 @@ if TYPE_CHECKING:
from fixtures.pg_version import PgVersion
class NeonSnapshot:
"""
A snapshot of the Neon Branch
Gets the output of the API call af a snapshot creation
"""
def __init__(self, project: NeonProject, snapshot: dict[str, Any]):
self.project: NeonProject = project
snapshot = snapshot["snapshot"]
self.id: str = snapshot["id"]
self.name: str = snapshot["name"]
self.created_at: datetime = datetime.fromisoformat(snapshot["created_at"])
self.source_branch: NeonBranch = project.branches[snapshot["source_branch_id"]]
project.snapshots[self.id] = self
self.restored: bool = False
def __str__(self) -> str:
return f"id: {self.id}, name: {self.name}, created_at: {self.created_at}"
def delete(self) -> None:
self.project.delete_snapshot(self.id)
class NeonEndpoint:
"""
Neon Endpoint
@@ -91,21 +67,9 @@ class NeonBranch:
is_reset defines if the branch is a reset one i.e. created as a result of the reset API Call
"""
def __init__(
self,
project,
branch: dict[str, Any],
is_reset=False,
primary_branch: NeonBranch | None = None,
):
def __init__(self, project, branch: dict[str, Any], is_reset=False):
self.id: str = branch["branch"]["id"]
self.desc = branch
self.name: str | None = None
if "name" in branch["branch"]:
self.name = branch["branch"]["name"]
self.restored_from: str | None = None
if "restored_from" in branch["branch"]:
self.restored_from = branch["branch"]["restored_from"]
self.project: NeonProject = project
self.neon_api: NeonAPI = project.neon_api
self.project_id: str = branch["branch"]["project_id"]
@@ -146,36 +110,13 @@ class NeonBranch:
"PGPASSWORD": self.connection_parameters["password"],
"PGSSLMODE": "require",
}
self.replicas: dict[str, NeonBranch] = {}
self.primary_branch: NeonBranch | None = primary_branch
if primary_branch:
if not self.connection_parameters:
raise ValueError(
"connection_parameters is required when primary_branch is specified"
)
self.project.replicas[self.id] = self
primary_branch.replicas[self.id] = self
with psycopg2.connect(primary_branch.connstr()) as conn:
with conn.cursor() as cur:
cur.execute(f"CREATE PUBLICATION {self.id} FOR ALL TABLES")
conn.commit()
with psycopg2.connect(self.connstr()) as conn:
with conn.cursor() as cur:
cur.execute(
f"CREATE SUBSCRIPTION {self.id} CONNECTION '{primary_branch.connstr()}' PUBLICATION {self.id}"
)
conn.commit()
def __str__(self):
"""
Prints the branch's information with all the predecessors
Prints the branch's name with all the predecessors
(r) means the branch is a reset one
"""
name = f"({self.name})" if self.name and self.name != self.id else ""
restored_from = f"(restored_from: {self.restored_from})" if self.restored_from else ""
ancestor = (
f" <- {self.primary_branch}" if self.primary_branch else f", parent: {self.parent}"
)
return f"{self.id}{name}{restored_from}{ancestor}"
return f"{self.id}{'(r)' if self.id in self.project.reset_branches else ''}, parent: {self.parent}"
def random_time(self) -> datetime:
min_time = max(
@@ -187,10 +128,8 @@ class NeonBranch:
log.info("min_time: %s, max_time: %s", min_time, max_time)
return (min_time + (max_time - min_time) * random.random()).replace(microsecond=0)
def create_child_branch(
self, parent_timestamp: datetime | None = None, primary_branch: NeonBranch | None = None
) -> NeonBranch | None:
return self.project.create_branch(self.id, parent_timestamp, primary_branch=primary_branch)
def create_child_branch(self, parent_timestamp: datetime | None = None) -> NeonBranch | None:
return self.project.create_branch(self.id, parent_timestamp)
def create_ro_endpoint(self) -> NeonEndpoint | None:
if not self.project.check_limit_endpoints():
@@ -213,9 +152,6 @@ class NeonBranch:
self.project.terminate_benchmark(self.id)
def reset_to_parent(self) -> None:
"""
Resets the branch to the parent branch
"""
for ep in self.project.endpoints.values():
if ep.type == "read_only":
ep.terminate_benchmark()
@@ -281,19 +217,6 @@ class NeonBranch:
ep.start_benchmark()
return res
def create_logical_replica(self) -> NeonBranch | None:
if self.primary_branch is not None:
raise RuntimeError("The primary branch cannot be a logical replica")
if self.id in self.project.reset_branches:
raise RuntimeError("Reset branch cannot be a primary branch")
replica = self.create_child_branch(primary_branch=self)
return replica
def connstr(self):
if self.connection_parameters is None:
raise RuntimeError("Connection parameters are not defined")
return " ".join([f"{key}={value}" for key, value in self.connection_parameters.items()])
class NeonProject:
"""
@@ -305,9 +228,7 @@ class NeonProject:
self.neon_api = neon_api
self.pg_bin = pg_bin
proj = self.neon_api.create_project(
pg_version,
f"Automatic random API test GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}",
project_settings={"enable_logical_replication": True},
pg_version, f"Automatic random API test GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}"
)
self.id: str = proj["project"]["id"]
self.name: str = proj["project"]["name"]
@@ -319,7 +240,6 @@ class NeonProject:
# Leaf branches are the branches, which do not have children
self.leaf_branches: dict[str, NeonBranch] = {}
self.branches: dict[str, NeonBranch] = {}
self.branch_num: int = 0
self.reset_branches: set[str] = set()
self.main_branch: NeonBranch = NeonBranch(self, proj)
self.main_branch.connection_parameters = self.connection_parameters
@@ -333,9 +253,6 @@ class NeonProject:
self.limits: dict[str, Any] = self.get_limits()["limits"]
self.read_only_endpoints_total: int = 0
self.min_time: datetime = datetime.now(UTC)
self.snapshots: dict[str, NeonSnapshot] = {}
self.snapshot_num: int = 0
self.replicas: dict[str, NeonBranch] = {}
def get_limits(self) -> dict[str, Any]:
return self.neon_api.get_project_limits(self.id)
@@ -363,11 +280,7 @@ class NeonProject:
return False
def create_branch(
self,
parent_id: str | None = None,
parent_timestamp: datetime | None = None,
is_reset: bool = False,
primary_branch: NeonBranch | None = None,
self, parent_id: str | None = None, parent_timestamp: datetime | None = None
) -> NeonBranch | None:
self.wait()
if not self.check_limit_branches():
@@ -380,14 +293,14 @@ class NeonProject:
branch_def = self.neon_api.create_branch(
self.id, parent_id=parent_id, parent_timestamp=parent_timestamp_str
)
new_branch = NeonBranch(self, branch_def, is_reset, primary_branch)
new_branch = NeonBranch(self, branch_def)
self.wait()
return new_branch
def delete_branch(self, branch_id: str) -> None:
parent = self.branches[branch_id].parent
if not parent or branch_id == self.main_branch.id:
raise RuntimeError("Cannot delete the main branch or a branch restored from a snapshot")
raise RuntimeError("Cannot delete the main branch")
if branch_id not in self.leaf_branches and branch_id not in self.reset_branches:
raise RuntimeError(f"The branch {branch_id}, probably, has ancestors")
if branch_id not in self.branches:
@@ -400,18 +313,7 @@ class NeonProject:
if branch_id not in self.reset_branches:
self.terminate_benchmark(branch_id)
self.neon_api.delete_branch(self.id, branch_id)
primary_branch = self.branches[branch_id].primary_branch
if primary_branch is not None:
with psycopg2.connect(primary_branch.connstr()) as conn:
with conn.cursor() as cur:
cur.execute(f"DROP PUBLICATION {branch_id}")
conn.commit()
parent.replicas.pop(branch_id)
self.replicas.pop(branch_id)
else:
for replica in self.branches[branch_id].replicas.values():
replica.delete()
if len(parent.children) == 1 and parent.parent is not None:
if len(parent.children) == 1 and parent.id != self.main_branch.id:
self.leaf_branches[parent.id] = parent
parent.children.pop(branch_id)
if branch_id in self.leaf_branches:
@@ -431,26 +333,6 @@ class NeonProject:
log.info("No leaf branches found")
return target
def get_random_parent_branch(self) -> NeonBranch:
return self.branches[
random.choice(
list(set(self.branches.keys()) - self.reset_branches - set(self.replicas.keys()))
)
]
def gen_branch_name(self) -> str:
self.branch_num += 1
return f"branch{self.branch_num}"
def get_random_snapshot(self) -> NeonSnapshot | None:
snapshot: NeonSnapshot | None = None
avail_snapshots = [sn for sn in self.snapshots.values() if not sn.restored]
if avail_snapshots:
snapshot = random.choice(avail_snapshots)
else:
log.info("No snapshots found")
return snapshot
def delete_endpoint(self, endpoint_id: str) -> None:
self.terminate_benchmark(endpoint_id)
self.neon_api.delete_endpoint(self.id, endpoint_id)
@@ -527,116 +409,6 @@ class NeonProject:
self.restore_num += 1
return f"restore{self.restore_num}"
def gen_snapshot_name(self) -> str:
self.snapshot_num += 1
return f"snapshot{self.snapshot_num}"
def create_snapshot(
self,
lsn: str | None = None,
timestamp: datetime | None = None,
) -> NeonSnapshot:
"""
Create a new Neon snapshot for the current project
Two optional arguments: lsn and timestamp are mutually exclusive
they instruct to create a snapshot with the specific lns or timestamp
"""
snapshot_name = self.gen_snapshot_name()
with psycopg2.connect(self.connection_uri) as conn:
with conn.cursor() as cur:
# We will check the value we set now after the snapshot restored to verify consistency
cur.execute(
f"INSERT INTO sanity_check (name, value) VALUES "
f"('snapsot_name', '{snapshot_name}') ON CONFLICT (name) DO UPDATE SET value = EXCLUDED.value"
)
conn.commit()
snapshot = NeonSnapshot(
self,
self.neon_api.create_snapshot(
self.id,
self.main_branch.id,
lsn,
timestamp.isoformat().replace("+00:00", "Z") if timestamp else None,
snapshot_name,
),
)
self.wait()
# Now we taint the value after the snapshot was taken
cur.execute("UPDATE sanity_check SET value = 'tainted' || value")
conn.commit()
return snapshot
def delete_snapshot(self, snapshot_id: str) -> None:
"""
Deletes the snapshot with the given id
"""
self.wait()
self.neon_api.delete_snapshot(self.id, snapshot_id)
self.snapshots.pop(snapshot_id)
self.wait()
def restore_snapshot(self, snapshot_id: str) -> NeonBranch | None:
"""
Creates a new Neon branch for the current project, then restores the snapshot
with the given id
"""
target_branch = self.get_random_parent_branch().create_child_branch()
if not target_branch:
return None
self.snapshots[snapshot_id].restored = True
new_branch_def: dict[str, Any] = self.neon_api.restore_snapshot(
self.id,
snapshot_id,
target_branch.id,
self.gen_branch_name(),
)
self.wait()
new_branch_def = self.neon_api.get_branch_details(self.id, new_branch_def["branch"]["id"])
# The restored branch will lose the parent afterward, but it has it during the restoration.
# So, we delete parent_id
new_branch_def["branch"].pop("parent_id")
new_branch = NeonBranch(self, new_branch_def)
log.info("Restored snapshot to the branch: %s", new_branch)
target_branch_def = self.neon_api.get_branch_details(self.id, target_branch.id)
if "name" in target_branch_def["branch"]:
target_branch.name = target_branch_def["branch"]["name"]
if new_branch.connection_parameters is None:
if not new_branch.endpoints:
for ep in self.neon_api.get_branch_endpoints(self.id, new_branch.id)["endpoints"]:
if ep["id"] not in self.endpoints:
NeonEndpoint(self, ep)
new_branch.connection_parameters = self.connection_parameters.copy()
for ep in new_branch.endpoints.values():
if ep.type == "read_write":
new_branch.connection_parameters["host"] = ep.host
break
new_branch.connect_env = {
"PGHOST": new_branch.connection_parameters["host"],
"PGUSER": new_branch.connection_parameters["role"],
"PGDATABASE": new_branch.connection_parameters["database"],
"PGPASSWORD": new_branch.connection_parameters["password"],
"PGSSLMODE": "require",
}
with psycopg2.connect(
host=new_branch.connection_parameters["host"],
port=5432,
user=new_branch.connection_parameters["role"],
password=new_branch.connection_parameters["password"],
database=new_branch.connection_parameters["database"],
) as conn:
with conn.cursor() as cur:
cur.execute("SELECT value FROM sanity_check WHERE name = 'snapsot_name'")
snapshot_name = None
if row := cur.fetchone():
snapshot_name = row[0]
# We verify here that the value we select from the table matches with the snapshot name
# To ensure consistency
assert snapshot_name == self.snapshots[snapshot_id].name
self.wait()
target_branch.start_benchmark()
new_branch.start_benchmark()
return new_branch
@pytest.fixture()
def setup_class(
@@ -666,7 +438,9 @@ def do_action(project: NeonProject, action: str) -> bool:
if action == "new_branch" or action == "new_branch_random_time":
use_random_time: bool = action == "new_branch_random_time"
log.info("Trying to create a new branch %s", "random time" if use_random_time else "")
parent = project.get_random_parent_branch()
parent = project.branches[
random.choice(list(set(project.branches.keys()) - project.reset_branches))
]
child = parent.create_child_branch(parent.random_time() if use_random_time else None)
if child is None:
return False
@@ -705,31 +479,6 @@ def do_action(project: NeonProject, action: str) -> bool:
return False
log.info("Reset to parent %s", target)
target.reset_to_parent()
elif action == "create_snapshot":
snapshot = project.create_snapshot()
if snapshot is None:
return False
log.info("Created snapshot %s", snapshot)
elif action == "restore_snapshot":
if (snapshot_to_restore := project.get_random_snapshot()) is None:
return False
log.info("Restoring snapshot %s", snapshot_to_restore)
if project.restore_snapshot(snapshot_to_restore.id) is None:
return False
elif action == "delete_snapshot":
snapshot_to_delete = project.get_random_snapshot()
if snapshot_to_delete is None:
return False
snapshot_to_delete.delete()
log.info("Deleted snapshot %s", snapshot_to_delete)
elif action == "create_logical_replica":
primary: NeonBranch | None = project.get_random_parent_branch()
if primary is None:
return False
replica: NeonBranch | None = primary.create_logical_replica()
if replica is None:
return False
log.info("Created logical replica %s", replica)
else:
raise ValueError(f"The action {action} is unknown")
return True
@@ -763,28 +512,12 @@ def test_api_random(
("delete_branch", 1.2),
("restore_random_time", 0.9),
("reset_to_parent", 0.3),
("create_snapshot", 0.2),
("restore_snapshot", 0.1),
("delete_snapshot", 0.1),
)
if num_ops_env := os.getenv("NUM_OPERATIONS"):
num_operations = int(num_ops_env)
else:
num_operations = 250
pg_bin.run(["pgbench", "-i", "-I", "dtGvp", "-s100"], env=project.main_branch.connect_env)
# Create a table for sanity check
# We are going to leve some control values there to check, e.g., after restoring a snapshot
pg_bin.run(
[
"psql",
"-c",
"CREATE TABLE IF NOT EXISTS sanity_check (name VARCHAR NOT NULL PRIMARY KEY, value VARCHAR)",
],
env=project.main_branch.connect_env,
)
# To not go to the past where pgbench tables do not exist
time.sleep(1)
project.min_time = datetime.now(UTC)
# To not go to the past where pgbench tables do not exist
time.sleep(1)
project.min_time = datetime.now(UTC)

View File

@@ -1,6 +1,6 @@
import random
import threading
from enum import StrEnum
from threading import Thread
from time import sleep
from typing import Any
@@ -47,23 +47,19 @@ def offload_lfc(method: PrewarmMethod, client: EndpointHttpClient, cur: Cursor)
# With autoprewarm, we need to be sure LFC was offloaded after all writes
# finish, so we sleep. Otherwise we'll have less prewarmed pages than we want
sleep(AUTOOFFLOAD_INTERVAL_SECS)
offload_res = client.offload_lfc_wait()
log.info(offload_res)
return offload_res
client.offload_lfc_wait()
return
if method == PrewarmMethod.COMPUTE_CTL:
status = client.prewarm_lfc_status()
assert status["status"] == "not_prewarmed"
assert "error" not in status
offload_res = client.offload_lfc()
log.info(offload_res)
client.offload_lfc()
assert client.prewarm_lfc_status()["status"] == "not_prewarmed"
parsed = prom_parse(client)
desired = {OFFLOAD_LABEL: 1, PREWARM_LABEL: 0, OFFLOAD_ERR_LABEL: 0, PREWARM_ERR_LABEL: 0}
assert parsed == desired, f"{parsed=} != {desired=}"
return offload_res
return
raise AssertionError(f"{method} not in PrewarmMethod")
@@ -72,30 +68,21 @@ def prewarm_endpoint(
method: PrewarmMethod, client: EndpointHttpClient, cur: Cursor, lfc_state: str | None
):
if method == PrewarmMethod.AUTOPREWARM:
prewarm_res = client.prewarm_lfc_wait()
log.info(prewarm_res)
client.prewarm_lfc_wait()
elif method == PrewarmMethod.COMPUTE_CTL:
prewarm_res = client.prewarm_lfc()
log.info(prewarm_res)
return prewarm_res
client.prewarm_lfc()
elif method == PrewarmMethod.POSTGRES:
cur.execute("select neon.prewarm_local_cache(%s)", (lfc_state,))
def check_prewarmed_contains(
def check_prewarmed(
method: PrewarmMethod, client: EndpointHttpClient, desired_status: dict[str, str | int]
):
if method == PrewarmMethod.AUTOPREWARM:
prewarm_status = client.prewarm_lfc_status()
for k in desired_status:
assert desired_status[k] == prewarm_status[k]
assert client.prewarm_lfc_status() == desired_status
assert prom_parse(client)[PREWARM_LABEL] == 1
elif method == PrewarmMethod.COMPUTE_CTL:
prewarm_status = client.prewarm_lfc_status()
for k in desired_status:
assert desired_status[k] == prewarm_status[k]
assert client.prewarm_lfc_status() == desired_status
desired = {OFFLOAD_LABEL: 0, PREWARM_LABEL: 1, PREWARM_ERR_LABEL: 0, OFFLOAD_ERR_LABEL: 0}
assert prom_parse(client) == desired
@@ -162,6 +149,9 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod):
log.info(f"Used LFC size: {lfc_used_pages}")
pg_cur.execute("select * from neon.get_prewarm_info()")
total, prewarmed, skipped, _ = pg_cur.fetchall()[0]
log.info(f"Prewarm info: {total=} {prewarmed=} {skipped=}")
progress = (prewarmed + skipped) * 100 // total
log.info(f"Prewarm progress: {progress}%")
assert lfc_used_pages > 10000
assert total > 0
assert prewarmed > 0
@@ -171,54 +161,7 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod):
assert lfc_cur.fetchall()[0][0] == n_records * (n_records + 1) / 2
desired = {"status": "completed", "total": total, "prewarmed": prewarmed, "skipped": skipped}
check_prewarmed_contains(method, client, desired)
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_lfc_prewarm_cancel(neon_simple_env: NeonEnv):
"""
Test we can cancel LFC prewarm and prewarm successfully after
"""
env = neon_simple_env
n_records = 1000000
cfg = [
"autovacuum = off",
"shared_buffers=1MB",
"neon.max_file_cache_size=1GB",
"neon.file_cache_size_limit=1GB",
"neon.file_cache_prewarm_limit=1000",
]
endpoint = env.endpoints.create_start(branch_name="main", config_lines=cfg)
pg_conn = endpoint.connect()
pg_cur = pg_conn.cursor()
pg_cur.execute("create schema neon; create extension neon with schema neon")
pg_cur.execute("create database lfc")
lfc_conn = endpoint.connect(dbname="lfc")
lfc_cur = lfc_conn.cursor()
log.info(f"Inserting {n_records} rows")
lfc_cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))")
lfc_cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))")
log.info(f"Inserted {n_records} rows")
client = endpoint.http_client()
method = PrewarmMethod.COMPUTE_CTL
offload_lfc(method, client, pg_cur)
endpoint.stop()
endpoint.start()
thread = Thread(target=lambda: prewarm_endpoint(method, client, pg_cur, None))
thread.start()
# wait 2 seconds to ensure we cancel prewarm SQL query
sleep(2)
client.cancel_prewarm_lfc()
thread.join()
assert client.prewarm_lfc_status()["status"] == "cancelled"
prewarm_endpoint(method, client, pg_cur, None)
assert client.prewarm_lfc_status()["status"] == "completed"
check_prewarmed(method, client, desired)
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
@@ -235,8 +178,9 @@ def test_lfc_prewarm_empty(neon_simple_env: NeonEnv):
cur = conn.cursor()
cur.execute("create schema neon; create extension neon with schema neon")
method = PrewarmMethod.COMPUTE_CTL
assert offload_lfc(method, client, cur)["status"] == "skipped"
assert prewarm_endpoint(method, client, cur, None)["status"] == "skipped"
offload_lfc(method, client, cur)
prewarm_endpoint(method, client, cur, None)
assert client.prewarm_lfc_status()["status"] == "skipped"
# autoprewarm isn't needed as we prewarm manually
@@ -307,11 +251,11 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, method: PrewarmMet
workload_threads = []
for _ in range(n_threads):
t = Thread(target=workload)
t = threading.Thread(target=workload)
workload_threads.append(t)
t.start()
prewarm_thread = Thread(target=prewarm)
prewarm_thread = threading.Thread(target=prewarm)
prewarm_thread.start()
def prewarmed():

View File

@@ -621,6 +621,7 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
path
for path in remote_timeline_path.iterdir()
if not (path.name.endswith("initdb.tar.zst"))
and not (path.name.startswith("index_part.json"))
]
assert len(filtered) == 0

View File

@@ -286,177 +286,3 @@ def test_sk_generation_aware_tombstones(neon_env_builder: NeonEnvBuilder):
assert re.match(r".*Timeline .* deleted.*", exc.value.response.text)
# The timeline should remain deleted.
expect_deleted(second_sk)
def test_safekeeper_migration_stale_timeline(neon_env_builder: NeonEnvBuilder):
"""
Test that safekeeper migration handles stale timeline correctly by migrating to
a safekeeper with a stale timeline.
1. Check that we are waiting for the stale timeline to catch up with the commit lsn.
The migration might fail if there is no compute to advance the WAL.
2. Check that we rely on last_log_term (and not the current term) when waiting for the
sync_position on step 7.
3. Check that migration succeeds if the compute is running.
"""
neon_env_builder.num_safekeepers = 2
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
"timeline_safekeeper_count": 1,
}
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS)
env.storage_controller.allowed_errors.append(".*not enough successful .* to reach quorum.*")
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
active_sk = env.get_safekeeper(mconf["sk_set"][0])
other_sk = [sk for sk in env.safekeepers if sk.id != active_sk.id][0]
ep = env.endpoints.create("main", tenant_id=env.initial_tenant)
ep.start(safekeeper_generation=1, safekeepers=[active_sk.id])
ep.safe_psql("CREATE TABLE t(a int)")
ep.safe_psql("INSERT INTO t VALUES (0)")
# Pull the timeline to other_sk, so other_sk now has a "stale" timeline on it.
other_sk.pull_timeline([active_sk], env.initial_tenant, env.initial_timeline)
# Advance the WAL on active_sk.
ep.safe_psql("INSERT INTO t VALUES (1)")
# The test is more tricky if we have the same last_log_term but different term/flush_lsn.
# Stop the active_sk during the endpoint shutdown because otherwise compute_ctl runs
# sync_safekeepers and advances last_log_term on active_sk.
active_sk.stop()
ep.stop(mode="immediate")
active_sk.start()
active_sk_status = active_sk.http_client().timeline_status(
env.initial_tenant, env.initial_timeline
)
other_sk_status = other_sk.http_client().timeline_status(
env.initial_tenant, env.initial_timeline
)
# other_sk should have the same last_log_term, but a stale flush_lsn.
assert active_sk_status.last_log_term == other_sk_status.last_log_term
assert active_sk_status.flush_lsn > other_sk_status.flush_lsn
commit_lsn = active_sk_status.flush_lsn
# Bump the term on other_sk to make it higher than active_sk.
# This is to make sure we don't use current term instead of last_log_term in the algorithm.
other_sk.http_client().term_bump(
env.initial_tenant, env.initial_timeline, active_sk_status.term + 100
)
# TODO(diko): now it fails because the timeline on other_sk is stale and there is no compute
# to catch up it with active_sk. It might be fixed in https://databricks.atlassian.net/browse/LKB-946
# if we delete stale timelines before starting the migration.
# But the rest of the test is still valid: we should not lose committed WAL after the migration.
with pytest.raises(
StorageControllerApiException, match="not enough successful .* to reach quorum"
):
env.storage_controller.migrate_safekeepers(
env.initial_tenant, env.initial_timeline, [other_sk.id]
)
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert mconf["new_sk_set"] == [other_sk.id]
assert mconf["sk_set"] == [active_sk.id]
assert mconf["generation"] == 2
# Start the endpoint, so it advances the WAL on other_sk.
ep.start(safekeeper_generation=2, safekeepers=[active_sk.id, other_sk.id])
# Now the migration should succeed.
env.storage_controller.migrate_safekeepers(
env.initial_tenant, env.initial_timeline, [other_sk.id]
)
# Check that we didn't lose committed WAL.
assert (
other_sk.http_client().timeline_status(env.initial_tenant, env.initial_timeline).flush_lsn
>= commit_lsn
)
assert ep.safe_psql("SELECT * FROM t") == [(0,), (1,)]
def test_pull_from_most_advanced_sk(neon_env_builder: NeonEnvBuilder):
"""
Test that we pull the timeline from the most advanced safekeeper during the
migration and do not lose committed WAL.
"""
neon_env_builder.num_safekeepers = 4
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
"timeline_safekeeper_count": 3,
}
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS)
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
sk_set = mconf["sk_set"]
assert len(sk_set) == 3
other_sk = [sk.id for sk in env.safekeepers if sk.id not in sk_set][0]
ep = env.endpoints.create("main", tenant_id=env.initial_tenant)
ep.start(safekeeper_generation=1, safekeepers=sk_set)
ep.safe_psql("CREATE TABLE t(a int)")
ep.safe_psql("INSERT INTO t VALUES (0)")
# Stop one sk, so we have a lagging WAL on it.
env.get_safekeeper(sk_set[0]).stop()
# Advance the WAL on the other sks.
ep.safe_psql("INSERT INTO t VALUES (1)")
# Stop other sks to make sure compute_ctl doesn't advance the last_log_term on them during shutdown.
for sk_id in sk_set[1:]:
env.get_safekeeper(sk_id).stop()
ep.stop(mode="immediate")
for sk_id in sk_set:
env.get_safekeeper(sk_id).start()
# Bump the term on the lagging sk to make sure we don't use it to choose the most advanced sk.
env.get_safekeeper(sk_set[0]).http_client().term_bump(
env.initial_tenant, env.initial_timeline, 100
)
def get_commit_lsn(sk_set: list[int]):
flush_lsns = []
last_log_terms = []
for sk_id in sk_set:
sk = env.get_safekeeper(sk_id)
status = sk.http_client().timeline_status(env.initial_tenant, env.initial_timeline)
flush_lsns.append(status.flush_lsn)
last_log_terms.append(status.last_log_term)
# In this test we assume that all sks have the same last_log_term.
assert len(set(last_log_terms)) == 1
flush_lsns.sort(reverse=True)
commit_lsn = flush_lsns[len(sk_set) // 2]
log.info(f"sk_set: {sk_set}, flush_lsns: {flush_lsns}, commit_lsn: {commit_lsn}")
return commit_lsn
commit_lsn_before_migration = get_commit_lsn(sk_set)
# Make two migrations, so the lagging sk stays in the sk_set, but other sks are replaced.
new_sk_set1 = [sk_set[0], sk_set[1], other_sk] # remove sk_set[2], add other_sk
new_sk_set2 = [sk_set[0], other_sk, sk_set[2]] # remove sk_set[1], add sk_set[2] back
env.storage_controller.migrate_safekeepers(
env.initial_tenant, env.initial_timeline, new_sk_set1
)
env.storage_controller.migrate_safekeepers(
env.initial_tenant, env.initial_timeline, new_sk_set2
)
commit_lsn_after_migration = get_commit_lsn(new_sk_set2)
# We should not lose committed WAL.
# If we have choosen the lagging sk to pull the timeline from, this might fail.
assert commit_lsn_before_migration <= commit_lsn_after_migration
ep.start(safekeeper_generation=5, safekeepers=new_sk_set2)
assert ep.safe_psql("SELECT * FROM t") == [(0,), (1,)]

View File

@@ -410,6 +410,7 @@ def test_scrubber_physical_gc_timeline_deletion(neon_env_builder: NeonEnvBuilder
# Delete the timeline
env.storage_controller.pageserver_api().timeline_delete(tenant_id, timeline_id)
time.sleep(1) # give scrubber some time to wait for min_age_secs
# Subsequently doing physical GC should clean up the ancestor layers
gc_summary = env.storage_scrubber.pageserver_physical_gc(min_age_secs=0, mode="full")

View File

@@ -491,7 +491,7 @@ def test_timeline_archival_chaos(neon_env_builder: NeonEnvBuilder):
return (active_ids, offloaded_ids)
def timeline_objects(tenant_shard_id, timeline_id):
def timeline_objects_exclude_index_part(tenant_shard_id, timeline_id):
response = list_prefix(
env.pageserver_remote_storage, # type: ignore
prefix="/".join(
@@ -505,7 +505,11 @@ def test_timeline_archival_chaos(neon_env_builder: NeonEnvBuilder):
+ "/",
)
return [k["Key"] for k in response.get("Contents", [])]
return [
k["Key"]
for k in response.get("Contents", [])
if not k["Key"].startswith("index_part.json")
]
def worker():
"""
@@ -533,7 +537,7 @@ def test_timeline_archival_chaos(neon_env_builder: NeonEnvBuilder):
state.created = True
if (
timeline_objects(
timeline_objects_exclude_index_part(
tenant_shard_id=tenant_shard_id, timeline_id=state.timeline_id
)
== []
@@ -550,7 +554,9 @@ def test_timeline_archival_chaos(neon_env_builder: NeonEnvBuilder):
violations.append(msg)
raise RuntimeError(msg)
objects = timeline_objects(tenant_shard_id, state.timeline_id)
objects = timeline_objects_exclude_index_part(
tenant_shard_id, state.timeline_id
)
if len(objects) == 0:
log.info(f"Confirmed deletion of timeline {state.timeline_id}")
timelines_deleted.append(state.timeline_id)

View File

@@ -1,18 +1,18 @@
{
"v17": [
"17.5",
"1e01fcea2a6b38180021aa83e0051d95286d9096"
"fa1788475e3146cc9c7c6a1b74f48fd296898fcd"
],
"v16": [
"16.9",
"a42351fcd41ea01edede1daed65f651e838988fc"
"9b9cb4b3e33347aea8f61e606bb6569979516de5"
],
"v15": [
"15.13",
"2aaab3bb4a13557aae05bb2ae0ef0a132d0c4f85"
"aaaeff2550d5deba58847f112af9b98fa3a58b00"
],
"v14": [
"14.18",
"2155cb165d05f617eb2c8ad7e43367189b627703"
"c9f9fdd0113b52c0bd535afdb09d3a543aeee25f"
]
}