Compare commits

..

8 Commits

Author SHA1 Message Date
Konstantin Knizhnik
a612e9a356 New RFC version 2024-12-16 09:16:39 +02:00
Konstantin Knizhnik
5af77ab39b Update V3 protol version RFC 2024-12-15 19:22:10 +02:00
Konstantin Knizhnik
10bc58c510 Update V3 protol version RFC 2024-12-15 19:21:54 +02:00
Konstantin Knizhnik
da6c78dd5d Add RFC for V3 protocol version 2024-08-15 16:23:12 +03:00
Joonas Koivunen
52641eb853 storcon: add spans to drain/fill ops (#8735)
this way we do not need to repeat the %node_id everywhere, and we get no
stray messages in logs from within the op.
2024-08-15 15:30:04 +03:00
Joonas Koivunen
d9a57aeed9 storcon: deny external node configuration if an operation is ongoing (#8727)
Per #8674, disallow node configuration while drain/fill are ongoing.
Implement it by adding a only-http wrapper
`Service::external_node_configure` which checks for operation existing
before configuring.

Additionally:
- allow cancelling drain/fill after a pageserver has restarted and
transitioned to WarmingUp

Fixes: #8674
2024-08-15 10:54:05 +01:00
Alexander Bayandin
a9c28be7d0 fix(pageserver): allow unused_imports in download.rs on macOS (#8733)
## Problem

On macOS, clippy fails with the following error:

```
error: unused import: `crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt`
  --> pageserver/src/tenant/remote_timeline_client/download.rs:26:5
   |
26 | use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |
   = note: `-D unused-imports` implied by `-D warnings`
   = help: to override `-D warnings` add `#[allow(unused_imports)]`
```

Introduced in https://github.com/neondatabase/neon/pull/8717

## Summary of changes
- allow `unused_imports` for
`crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt` on macOS
in download.rs
2024-08-15 10:06:28 +01:00
Vlad Lazar
fef77b0cc9 safekeeper: consider partial uploads when pulling timeline (#8628)
## Problem
The control file contains the id of the safekeeper that uploaded it.
Previously, when sending a snapshot of the control file to another sk,
it would eventually be gc-ed by the receiving sk. This is incorrect
because the original sk might still need it later.

## Summary of Changes
When sending a snapshot and the control file contains an uploaded
segment:
* Create a copy of the segment in s3 with the destination sk in the
  object name
* Tweak the streamed control file to point to the object create in the
  previous step

Note that the snapshot endpoint now has to know the id of the requestor,
so the api has been extended to include the node if of the destination
sk.

Closes https://github.com/neondatabase/neon/issues/8542
2024-08-15 09:02:33 +01:00
14 changed files with 496 additions and 73 deletions

View File

@@ -0,0 +1,84 @@
# V3 version of compute-page server protocol
Created on: 2024-08-15
Author: Konstantin Knizhnik
## Summary
Current version of compute-PS protocol doesn't allow to verify that received response actually corresponds to the request.
In most cases it should not cause any problems, because Neon SMGR follows classical server request-response pattern.
If response is not received due to some reasons (network error, ...), then connection is dropped and error is reported.
But we also actively use prefetch, which allows to minimize network round-trip overhead.
In case of prefetch compute sends multiple getpage requests and then gets responses to all of them.
It is expected that responses are received in the same order as requests are sent, i.e. for each request we receive response.
Unfortunately it can be violated in case of errors. In this case it can happen that connection is reset but prefetch ring - not.
As a result we treat response of new request as response to some older prefetch request and place wrong page image in shared buffer.
If this page is modified, then it is saved in WAL and database file. So we are not able to recover original (correct) page image by applying WAL.
So we can mix pages of one relation or even pages of different relations. Most frequently such corruption is detected for indexes,
but just because there are more invariants which can be checked. And there is no good universal way to detect and recover such
corruption.
## Motivation
This bug in prefetch was fixed, but we want to prevent similar problems in future.
For example prewarm is also similar with prefetch and sens several requests and only
after it wait for response.
## Previous work
We already changed protocol version from V1 to V2 when replaced single request LSN with pair
(request LSN,not modified since LSN). It was done by introducing new command codes.
So there was no explicit check for protocol version: if server receives new command,
it assumes that it is new protocol version. After both clients and servers were upgraded to new version,
new command codes were removed. Then version was added to "pagestream" command used to perform handshake.
Client has `neon.protocol_version` GUC specifying which protocol version it should use.
So client informs server about protocol version it is going to use, but server can not ask the client to use some other protocol version,
it can only reject connection request if it is not supporting this protocol version.
## Requirements
- Be able to verify that page server response corresponds to the requests
- Provide backward compatibility: old clients should work with new server.
## Non Goals
- Detect page corruption (include CRC)
- Support of vector operation (merge several requests into one)
- Forward compatibility: support new clients with old page server
## Solution
Include in response extra fields making it possible to verify that response corresponds to the particular request.
Such extra fields include:
- tablespace OID
- database OID
- relation OID
- fork number
- block id (for getpage)
- request LSN
- last modified LSN
In addition to this fields, we also introduce unique auto-incremented `request_id`.
It is combined from `backend_id` and local auto-incremented counter.
There is some probability of collision if backend is restarted, but it is not critical as far as we have all other fields included in response.
`request_id` can be used for better tracking and associating log messages produced by client and page server.
Although only mismatch of `getpage` request can cause data corruption, we want to extend responses for all other commands: get relation/db size, check presence of relation.
## Compatibility
We will change handshake command freom "pagestream_v2" to "pagestream_v3". With V3 version of protocol server should
reply with extended responses. Request/response tags will not be changed.
To prevent forward compatibility issues (when new client tries to access old server), deploy of this PR should be done in three steps:
1. Deploy of new server recognizing V3 protocol version
2. Deploy of new client which is able to send V3 commands, but by default still using V2.
3. After one release cycle when no rollback to previous PS version is possible, we can switch default version of protocol to V3, by changing `neon.protocol_version` GUC in project settings.

View File

@@ -313,20 +313,17 @@ pub struct MetadataHealthUpdateRequest {
pub struct MetadataHealthUpdateResponse {}
#[derive(Serialize, Deserialize, Debug)]
pub struct MetadataHealthListUnhealthyResponse {
pub unhealthy_tenant_shards: Vec<TenantShardId>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct MetadataHealthListOutdatedRequest {
#[serde(with = "humantime_serde")]
pub not_scrubbed_for: Duration,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct MetadataHealthListOutdatedResponse {
pub health_records: Vec<MetadataHealthRecord>,
}

View File

@@ -23,6 +23,7 @@ use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
use crate::tenant::storage_layer::LayerName;
use crate::tenant::Generation;
#[cfg_attr(target_os = "macos", allow(unused_imports))]
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile};
use crate::TEMP_FILE_SUFFIX;

View File

@@ -164,6 +164,30 @@ impl Deref for FileStorage {
}
}
impl TimelinePersistentState {
pub(crate) fn write_to_buf(&self) -> Result<Vec<u8>> {
let mut buf: Vec<u8> = Vec::new();
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
if self.eviction_state == EvictionState::Present {
// temp hack for forward compatibility
const PREV_FORMAT_VERSION: u32 = 8;
let prev = downgrade_v9_to_v8(self);
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, PREV_FORMAT_VERSION)?;
prev.ser_into(&mut buf)?;
} else {
// otherwise, we write the current format version
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
self.ser_into(&mut buf)?;
}
// calculate checksum before resize
let checksum = crc32c::crc32c(&buf);
buf.extend_from_slice(&checksum.to_le_bytes());
Ok(buf)
}
}
#[async_trait::async_trait]
impl Storage for FileStorage {
/// Persists state durably to the underlying storage.
@@ -180,24 +204,8 @@ impl Storage for FileStorage {
&control_partial_path
)
})?;
let mut buf: Vec<u8> = Vec::new();
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
if s.eviction_state == EvictionState::Present {
// temp hack for forward compatibility
const PREV_FORMAT_VERSION: u32 = 8;
let prev = downgrade_v9_to_v8(s);
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, PREV_FORMAT_VERSION)?;
prev.ser_into(&mut buf)?;
} else {
// otherwise, we write the current format version
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
s.ser_into(&mut buf)?;
}
// calculate checksum before resize
let checksum = crc32c::crc32c(&buf);
buf.extend_from_slice(&checksum.to_le_bytes());
let buf: Vec<u8> = s.write_to_buf()?;
control_partial.write_all(&buf).await.with_context(|| {
format!(

View File

@@ -10,7 +10,7 @@
use reqwest::{IntoUrl, Method, StatusCode};
use utils::{
http::error::HttpErrorBody,
id::{TenantId, TimelineId},
id::{NodeId, TenantId, TimelineId},
logging::SecretString,
};
@@ -97,10 +97,11 @@ impl Client {
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
stream_to: NodeId,
) -> Result<reqwest::Response> {
let uri = format!(
"{}/v1/tenant/{}/timeline/{}/snapshot",
self.mgmt_api_endpoint, tenant_id, timeline_id
"{}/v1/tenant/{}/timeline/{}/snapshot/{}",
self.mgmt_api_endpoint, tenant_id, timeline_id, stream_to.0
);
self.get(&uri).await
}

View File

@@ -205,6 +205,7 @@ async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Bo
/// Stream tar archive with all timeline data.
async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let destination = parse_request_param(&request, "destination_id")?;
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
@@ -225,7 +226,13 @@ async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Bo
// so create the chan and write to it in another task.
let (tx, rx) = mpsc::channel(1);
task::spawn(pull_timeline::stream_snapshot(tli, tx));
let conf = get_conf(&request);
task::spawn(pull_timeline::stream_snapshot(
tli,
conf.my_id,
destination,
tx,
));
let rx_stream = ReceiverStream::new(rx);
let body = Body::wrap_stream(rx_stream);
@@ -565,7 +572,7 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
request_span(r, tenant_delete_handler)
})
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot",
"/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot/:destination_id",
|r| request_span(r, timeline_snapshot_handler),
)
.post("/v1/pull_timeline", |r| {

View File

@@ -11,13 +11,8 @@ use std::{
io::{self, ErrorKind},
sync::Arc,
};
use tokio::{
fs::{File, OpenOptions},
io::AsyncWrite,
sync::mpsc,
task,
};
use tokio_tar::{Archive, Builder};
use tokio::{fs::OpenOptions, io::AsyncWrite, sync::mpsc, task};
use tokio_tar::{Archive, Builder, Header};
use tokio_util::{
io::{CopyToBytes, SinkWriter},
sync::PollSender,
@@ -32,13 +27,15 @@ use crate::{
routes::TimelineStatus,
},
safekeeper::Term,
state::TimelinePersistentState,
timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError, WalResidentTimeline},
wal_backup,
wal_storage::{self, open_wal_file, Storage},
GlobalTimelines, SafeKeeperConf,
};
use utils::{
crashsafe::{durable_rename, fsync_async_opt},
id::{TenantId, TenantTimelineId, TimelineId},
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
logging::SecretString,
lsn::Lsn,
pausable_failpoint,
@@ -46,8 +43,13 @@ use utils::{
/// Stream tar archive of timeline to tx.
#[instrument(name = "snapshot", skip_all, fields(ttid = %tli.ttid))]
pub async fn stream_snapshot(tli: WalResidentTimeline, tx: mpsc::Sender<Result<Bytes>>) {
if let Err(e) = stream_snapshot_guts(tli, tx.clone()).await {
pub async fn stream_snapshot(
tli: WalResidentTimeline,
source: NodeId,
destination: NodeId,
tx: mpsc::Sender<Result<Bytes>>,
) {
if let Err(e) = stream_snapshot_guts(tli, source, destination, tx.clone()).await {
// Error type/contents don't matter as they won't can't reach the client
// (hyper likely doesn't do anything with it), but http stream will be
// prematurely terminated. It would be nice to try to send the error in
@@ -81,6 +83,8 @@ impl Drop for SnapshotContext {
pub async fn stream_snapshot_guts(
tli: WalResidentTimeline,
source: NodeId,
destination: NodeId,
tx: mpsc::Sender<Result<Bytes>>,
) -> Result<()> {
// tokio-tar wants Write implementor, but we have mpsc tx <Result<Bytes>>;
@@ -104,7 +108,7 @@ pub async fn stream_snapshot_guts(
// which is also likely suboptimal.
let mut ar = Builder::new_non_terminated(pinned_writer);
let bctx = tli.start_snapshot(&mut ar).await?;
let bctx = tli.start_snapshot(&mut ar, source, destination).await?;
pausable_failpoint!("sk-snapshot-after-list-pausable");
let tli_dir = tli.get_timeline_dir();
@@ -158,13 +162,43 @@ impl WalResidentTimeline {
async fn start_snapshot<W: AsyncWrite + Unpin + Send>(
&self,
ar: &mut tokio_tar::Builder<W>,
source: NodeId,
destination: NodeId,
) -> Result<SnapshotContext> {
let mut shared_state = self.write_shared_state().await;
let wal_seg_size = shared_state.get_wal_seg_size();
let cf_path = self.get_timeline_dir().join(CONTROL_FILE_NAME);
let mut cf = File::open(cf_path).await?;
ar.append_file(CONTROL_FILE_NAME, &mut cf).await?;
let mut control_store = TimelinePersistentState::clone(shared_state.sk.state());
// Modify the partial segment of the in-memory copy for the control file to
// point to the destination safekeeper.
let replace = control_store
.partial_backup
.replace_uploaded_segment(source, destination)?;
if let Some(replace) = replace {
// The deserialized control file has an uploaded partial. We upload a copy
// of it to object storage for the destination safekeeper and send an updated
// control file in the snapshot.
tracing::info!(
"Replacing uploaded partial segment in in-mem control file: {replace:?}"
);
let remote_timeline_path = wal_backup::remote_timeline_path(&self.tli.ttid)?;
wal_backup::copy_partial_segment(
&replace.previous.remote_path(&remote_timeline_path),
&replace.current.remote_path(&remote_timeline_path),
)
.await?;
}
let buf = control_store
.write_to_buf()
.with_context(|| "failed to serialize control store")?;
let mut header = Header::new_gnu();
header.set_size(buf.len().try_into().expect("never breaches u64"));
ar.append_data(&mut header, CONTROL_FILE_NAME, buf.as_slice())
.await
.with_context(|| "failed to append to archive")?;
// We need to stream since the oldest segment someone (s3 or pageserver)
// still needs. This duplicates calc_horizon_lsn logic.
@@ -342,7 +376,7 @@ async fn pull_timeline(
let client = Client::new(host.clone(), sk_auth_token.clone());
// Request stream with basebackup archive.
let bb_resp = client
.snapshot(status.tenant_id, status.timeline_id)
.snapshot(status.tenant_id, status.timeline_id, conf.my_id)
.await?;
// Make Stream of Bytes from it...

View File

@@ -483,6 +483,16 @@ pub(crate) async fn backup_partial_segment(
.await
}
pub(crate) async fn copy_partial_segment(
source: &RemotePath,
destination: &RemotePath,
) -> Result<()> {
let storage = get_configured_remote_storage();
let cancel = CancellationToken::new();
storage.copy_object(source, destination, &cancel).await
}
pub async fn read_object(
file_path: &RemotePath,
offset: u64,

View File

@@ -17,14 +17,13 @@
//! file. Code updates state in the control file before doing any S3 operations.
//! This way control file stores information about all potentially existing
//! remote partial segments and can clean them up after uploading a newer version.
use camino::Utf8PathBuf;
use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
use remote_storage::RemotePath;
use serde::{Deserialize, Serialize};
use tracing::{debug, error, info, instrument, warn};
use utils::lsn::Lsn;
use utils::{id::NodeId, lsn::Lsn};
use crate::{
metrics::{MISC_OPERATION_SECONDS, PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS},
@@ -82,6 +81,12 @@ pub struct State {
pub segments: Vec<PartialRemoteSegment>,
}
#[derive(Debug)]
pub(crate) struct ReplaceUploadedSegment {
pub(crate) previous: PartialRemoteSegment,
pub(crate) current: PartialRemoteSegment,
}
impl State {
/// Find an Uploaded segment. There should be only one Uploaded segment at a time.
pub(crate) fn uploaded_segment(&self) -> Option<PartialRemoteSegment> {
@@ -90,6 +95,54 @@ impl State {
.find(|seg| seg.status == UploadStatus::Uploaded)
.cloned()
}
/// Replace the name of the Uploaded segment (if one exists) in order to match
/// it with `destination` safekeeper. Returns a description of the change or None
/// wrapped in anyhow::Result.
pub(crate) fn replace_uploaded_segment(
&mut self,
source: NodeId,
destination: NodeId,
) -> anyhow::Result<Option<ReplaceUploadedSegment>> {
let current = self
.segments
.iter_mut()
.find(|seg| seg.status == UploadStatus::Uploaded);
let current = match current {
Some(some) => some,
None => {
return anyhow::Ok(None);
}
};
// Sanity check that the partial segment we are replacing is belongs
// to the `source` SK.
if !current
.name
.ends_with(format!("sk{}.partial", source.0).as_str())
{
anyhow::bail!(
"Partial segment name ({}) doesn't match self node id ({})",
current.name,
source
);
}
let previous = current.clone();
let new_name = current.name.replace(
format!("_sk{}", source.0).as_str(),
format!("_sk{}", destination.0).as_str(),
);
current.name = new_name;
anyhow::Ok(Some(ReplaceUploadedSegment {
previous,
current: current.clone(),
}))
}
}
struct PartialBackup {

View File

@@ -500,7 +500,7 @@ async fn handle_node_configure(mut req: Request<Body>) -> Result<Response<Body>,
StatusCode::OK,
state
.service
.node_configure(
.external_node_configure(
config_req.node_id,
config_req.availability.map(NodeAvailability::from),
config_req.scheduling,

View File

@@ -4912,6 +4912,26 @@ impl Service {
Ok(())
}
/// Wrapper around [`Self::node_configure`] which only allows changes while there is no ongoing
/// operation for HTTP api.
pub(crate) async fn external_node_configure(
&self,
node_id: NodeId,
availability: Option<NodeAvailability>,
scheduling: Option<NodeSchedulingPolicy>,
) -> Result<(), ApiError> {
{
let locked = self.inner.read().unwrap();
if let Some(op) = locked.ongoing_operation.as_ref().map(|op| op.operation) {
return Err(ApiError::PreconditionFailed(
format!("Ongoing background operation forbids configuring: {op}").into(),
));
}
}
self.node_configure(node_id, availability, scheduling).await
}
pub(crate) async fn start_node_drain(
self: &Arc<Self>,
node_id: NodeId,
@@ -4969,6 +4989,8 @@ impl Service {
cancel: cancel.clone(),
});
let span = tracing::info_span!(parent: None, "drain_node", %node_id);
tokio::task::spawn({
let service = self.clone();
let cancel = cancel.clone();
@@ -4985,21 +5007,21 @@ impl Service {
}
}
tracing::info!(%node_id, "Drain background operation starting");
tracing::info!("Drain background operation starting");
let res = service.drain_node(node_id, cancel).await;
match res {
Ok(()) => {
tracing::info!(%node_id, "Drain background operation completed successfully");
tracing::info!("Drain background operation completed successfully");
}
Err(OperationError::Cancelled) => {
tracing::info!(%node_id, "Drain background operation was cancelled");
tracing::info!("Drain background operation was cancelled");
}
Err(err) => {
tracing::error!(%node_id, "Drain background operation encountered: {err}")
tracing::error!("Drain background operation encountered: {err}")
}
}
}
});
}.instrument(span));
}
NodeSchedulingPolicy::Draining => {
return Err(ApiError::Conflict(format!(
@@ -5017,14 +5039,14 @@ impl Service {
}
pub(crate) async fn cancel_node_drain(&self, node_id: NodeId) -> Result<(), ApiError> {
let (node_available, node_policy) = {
let node_available = {
let locked = self.inner.read().unwrap();
let nodes = &locked.nodes;
let node = nodes.get(&node_id).ok_or(ApiError::NotFound(
anyhow::anyhow!("Node {} not registered", node_id).into(),
))?;
(node.is_available(), node.get_scheduling())
node.is_available()
};
if !node_available {
@@ -5033,12 +5055,6 @@ impl Service {
));
}
if !matches!(node_policy, NodeSchedulingPolicy::Draining) {
return Err(ApiError::PreconditionFailed(
format!("Node {node_id} has no drain in progress").into(),
));
}
if let Some(op_handler) = self.inner.read().unwrap().ongoing_operation.as_ref() {
if let Operation::Drain(drain) = op_handler.operation {
if drain.node_id == node_id {
@@ -5104,6 +5120,8 @@ impl Service {
cancel: cancel.clone(),
});
let span = tracing::info_span!(parent: None, "fill_node", %node_id);
tokio::task::spawn({
let service = self.clone();
let cancel = cancel.clone();
@@ -5120,21 +5138,21 @@ impl Service {
}
}
tracing::info!(%node_id, "Fill background operation starting");
tracing::info!("Fill background operation starting");
let res = service.fill_node(node_id, cancel).await;
match res {
Ok(()) => {
tracing::info!(%node_id, "Fill background operation completed successfully");
tracing::info!("Fill background operation completed successfully");
}
Err(OperationError::Cancelled) => {
tracing::info!(%node_id, "Fill background operation was cancelled");
tracing::info!("Fill background operation was cancelled");
}
Err(err) => {
tracing::error!(%node_id, "Fill background operation encountered: {err}")
tracing::error!("Fill background operation encountered: {err}")
}
}
}
});
}.instrument(span));
}
NodeSchedulingPolicy::Filling => {
return Err(ApiError::Conflict(format!(
@@ -5152,14 +5170,14 @@ impl Service {
}
pub(crate) async fn cancel_node_fill(&self, node_id: NodeId) -> Result<(), ApiError> {
let (node_available, node_policy) = {
let node_available = {
let locked = self.inner.read().unwrap();
let nodes = &locked.nodes;
let node = nodes.get(&node_id).ok_or(ApiError::NotFound(
anyhow::anyhow!("Node {} not registered", node_id).into(),
))?;
(node.is_available(), node.get_scheduling())
node.is_available()
};
if !node_available {
@@ -5168,12 +5186,6 @@ impl Service {
));
}
if !matches!(node_policy, NodeSchedulingPolicy::Filling) {
return Err(ApiError::PreconditionFailed(
format!("Node {node_id} has no fill in progress").into(),
));
}
if let Some(op_handler) = self.inner.read().unwrap().ongoing_operation.as_ref() {
if let Operation::Fill(fill) = op_handler.operation {
if fill.node_id == node_id {
@@ -5982,7 +5994,7 @@ impl Service {
.await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT)
.await;
failpoint_support::sleep_millis_async!("sleepy-drain-loop");
failpoint_support::sleep_millis_async!("sleepy-drain-loop", &cancel);
}
while !waiters.is_empty() {

View File

@@ -67,6 +67,7 @@ from fixtures.pageserver.utils import (
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import (
LocalFsStorage,
MockS3Server,
RemoteStorage,
RemoteStorageKind,
@@ -4425,14 +4426,32 @@ class Safekeeper(LogUtils):
def timeline_dir(self, tenant_id, timeline_id) -> Path:
return self.data_dir / str(tenant_id) / str(timeline_id)
def list_uploaded_segments(self, tenant_id: TenantId, timeline_id: TimelineId):
tline_path = (
self.env.repo_dir
/ "local_fs_remote_storage"
/ "safekeeper"
/ str(tenant_id)
/ str(timeline_id)
)
assert isinstance(self.env.safekeepers_remote_storage, LocalFsStorage)
return self._list_segments_in_dir(
tline_path, lambda name: ".metadata" not in name and ".___temp" not in name
)
def list_segments(self, tenant_id, timeline_id) -> List[str]:
"""
Get list of segment names of the given timeline.
"""
tli_dir = self.timeline_dir(tenant_id, timeline_id)
return self._list_segments_in_dir(
tli_dir, lambda name: not name.startswith("safekeeper.control")
)
def _list_segments_in_dir(self, path: Path, keep_filter: Callable[[str], bool]) -> list[str]:
segments = []
for _, _, filenames in os.walk(tli_dir):
segments.extend([f for f in filenames if not f.startswith("safekeeper.control")])
for _, _, filenames in os.walk(path):
segments.extend([f for f in filenames if keep_filter(f)])
segments.sort()
return segments

View File

@@ -2091,3 +2091,47 @@ def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder):
)
== 0
)
def test_storage_controller_ps_restarted_during_drain(neon_env_builder: NeonEnvBuilder):
# single unsharded tenant, two locations
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_start()
env.storage_controller.tenant_policy_update(env.initial_tenant, {"placement": {"Attached": 1}})
env.storage_controller.reconcile_until_idle()
attached_id = int(env.storage_controller.locate(env.initial_tenant)[0]["node_id"])
attached = next((ps for ps in env.pageservers if ps.id == attached_id))
def attached_is_draining():
details = env.storage_controller.node_status(attached.id)
assert details["scheduling"] == "Draining"
env.storage_controller.configure_failpoints(("sleepy-drain-loop", "return(10000)"))
env.storage_controller.node_drain(attached.id)
wait_until(10, 0.5, attached_is_draining)
attached.restart()
# we are unable to reconfigure node while the operation is still ongoing
with pytest.raises(
StorageControllerApiException,
match="Precondition failed: Ongoing background operation forbids configuring: drain.*",
):
env.storage_controller.node_configure(attached.id, {"scheduling": "Pause"})
with pytest.raises(
StorageControllerApiException,
match="Precondition failed: Ongoing background operation forbids configuring: drain.*",
):
env.storage_controller.node_configure(attached.id, {"availability": "Offline"})
env.storage_controller.cancel_node_drain(attached.id)
def reconfigure_node_again():
env.storage_controller.node_configure(attached.id, {"scheduling": "Pause"})
# allow for small delay between actually having cancelled and being able reconfigure again
wait_until(4, 0.5, reconfigure_node_again)

View File

@@ -49,7 +49,13 @@ from fixtures.remote_storage import (
)
from fixtures.safekeeper.http import SafekeeperHttpClient
from fixtures.safekeeper.utils import are_walreceivers_absent
from fixtures.utils import PropagatingThread, get_dir_size, query_scalar, start_in_background
from fixtures.utils import (
PropagatingThread,
get_dir_size,
query_scalar,
start_in_background,
wait_until,
)
def wait_lsn_force_checkpoint(
@@ -63,6 +69,18 @@ def wait_lsn_force_checkpoint(
lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
log.info(f"pg_current_wal_flush_lsn is {lsn}, waiting for it on pageserver")
wait_lsn_force_checkpoint_at(lsn, tenant_id, timeline_id, ps, pageserver_conn_options)
def wait_lsn_force_checkpoint_at(
lsn: Lsn,
tenant_id: TenantId,
timeline_id: TimelineId,
ps: NeonPageserver,
pageserver_conn_options=None,
):
pageserver_conn_options = pageserver_conn_options or {}
auth_token = None
if "password" in pageserver_conn_options:
auth_token = pageserver_conn_options["password"]
@@ -2304,3 +2322,138 @@ def test_s3_eviction(
)
assert event_metrics_seen
def test_pull_timeline_partial_segment_integrity(neon_env_builder: NeonEnvBuilder):
"""
Verify that pulling timeline from a SK with an uploaded partial segment
does not lead to consistency issues:
1. Start 3 SKs - only use two
2. Ingest a bit of WAL
3. Wait for partial to be uploaded
4. Pull timeline to the third SK
6. Replace source with destination SK and start compute
5. Wait for source SK to evict timeline
6. Go back to initial compute SK config and validate that
source SK can unevict the timeline (S3 state is consistent)
"""
neon_env_builder.auth_enabled = True
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
neon_env_builder.safekeeper_extra_opts = [
"--enable-offload",
"--delete-offloaded-wal",
"--partial-backup-timeout",
"500ms",
"--control-file-save-interval",
"500ms",
"--eviction-min-resident=500ms",
]
env = neon_env_builder.init_start(initial_tenant_conf={"checkpoint_timeout": "100ms"})
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
(src_sk, dst_sk) = (env.safekeepers[0], env.safekeepers[2])
log.info("use only first 2 safekeepers, 3rd will be seeded")
endpoint = env.endpoints.create("main")
endpoint.active_safekeepers = [1, 2]
endpoint.start()
endpoint.safe_psql("create table t(key int, value text)")
endpoint.safe_psql("insert into t select generate_series(1, 180000), 'papaya'")
endpoint.stop()
def source_partial_segment_uploaded():
first_segment_name = "000000010000000000000001"
segs = src_sk.list_uploaded_segments(tenant_id, timeline_id)
candidate_seg = None
for seg in segs:
if "partial" in seg and "sk1" in seg and not seg.startswith(first_segment_name):
candidate_seg = seg
if candidate_seg is not None:
# The term might change, causing the segment to be gc-ed shortly after,
# so give it a bit of time to make sure it's stable.
time.sleep(2)
segs = src_sk.list_uploaded_segments(tenant_id, timeline_id)
assert candidate_seg in segs
return candidate_seg
raise Exception("Partial segment not uploaded yet")
source_partial_segment = wait_until(15, 1, source_partial_segment_uploaded)
log.info(
f"Uploaded segments before pull are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}"
)
log.info(f"Tracking source partial segment: {source_partial_segment}")
src_flush_lsn = src_sk.get_flush_lsn(tenant_id, timeline_id)
log.info(f"flush_lsn on src before pull_timeline: {src_flush_lsn}")
pageserver_conn_options = {"password": env.auth_keys.generate_tenant_token(tenant_id)}
wait_lsn_force_checkpoint_at(
src_flush_lsn, tenant_id, timeline_id, env.pageserver, pageserver_conn_options
)
dst_sk.pull_timeline([src_sk], tenant_id, timeline_id)
def evicted():
evictions = src_sk.http_client().get_metric_value(
"safekeeper_eviction_events_completed_total", {"kind": "evict"}
)
if evictions is None or evictions == 0:
raise Exception("Eviction did not happen on source safekeeper yet")
wait_until(30, 1, evicted)
endpoint.start(safekeepers=[2, 3])
def new_partial_segment_uploaded():
segs = src_sk.list_uploaded_segments(tenant_id, timeline_id)
for seg in segs:
if "partial" in seg and "sk3" in seg:
return seg
raise Exception("Partial segment not uploaded yet")
log.info(
f"Uploaded segments before post-pull ingest are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}"
)
endpoint.safe_psql("insert into t select generate_series(1, 1000), 'pear'")
wait_until(15, 1, new_partial_segment_uploaded)
log.info(
f"Uploaded segments after post-pull ingest are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}"
)
# Allow for some gc iterations to happen and assert that the original
# uploaded partial segment remains in place.
time.sleep(5)
segs = src_sk.list_uploaded_segments(tenant_id, timeline_id)
assert source_partial_segment in segs
log.info(
f"Uploaded segments at the end are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}"
)
# Restart the endpoint in order to check that the source safekeeper
# can unevict the timeline
endpoint.stop()
endpoint.start(safekeepers=[1, 2])
def unevicted():
unevictions = src_sk.http_client().get_metric_value(
"safekeeper_eviction_events_completed_total", {"kind": "restore"}
)
if unevictions is None or unevictions == 0:
raise Exception("Uneviction did not happen on source safekeeper yet")
wait_until(10, 1, unevicted)