mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 14:40:37 +00:00
Compare commits
8 Commits
rc/proxy/2
...
ps_protcol
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a612e9a356 | ||
|
|
5af77ab39b | ||
|
|
10bc58c510 | ||
|
|
da6c78dd5d | ||
|
|
52641eb853 | ||
|
|
d9a57aeed9 | ||
|
|
a9c28be7d0 | ||
|
|
fef77b0cc9 |
84
docs/rfcs/039-compute-ps-protcol-v3.md
Normal file
84
docs/rfcs/039-compute-ps-protcol-v3.md
Normal file
@@ -0,0 +1,84 @@
|
||||
# V3 version of compute-page server protocol
|
||||
|
||||
Created on: 2024-08-15
|
||||
|
||||
Author: Konstantin Knizhnik
|
||||
|
||||
## Summary
|
||||
|
||||
Current version of compute-PS protocol doesn't allow to verify that received response actually corresponds to the request.
|
||||
In most cases it should not cause any problems, because Neon SMGR follows classical server request-response pattern.
|
||||
If response is not received due to some reasons (network error, ...), then connection is dropped and error is reported.
|
||||
|
||||
But we also actively use prefetch, which allows to minimize network round-trip overhead.
|
||||
In case of prefetch compute sends multiple getpage requests and then gets responses to all of them.
|
||||
It is expected that responses are received in the same order as requests are sent, i.e. for each request we receive response.
|
||||
|
||||
Unfortunately it can be violated in case of errors. In this case it can happen that connection is reset but prefetch ring - not.
|
||||
As a result we treat response of new request as response to some older prefetch request and place wrong page image in shared buffer.
|
||||
If this page is modified, then it is saved in WAL and database file. So we are not able to recover original (correct) page image by applying WAL.
|
||||
So we can mix pages of one relation or even pages of different relations. Most frequently such corruption is detected for indexes,
|
||||
but just because there are more invariants which can be checked. And there is no good universal way to detect and recover such
|
||||
corruption.
|
||||
|
||||
## Motivation
|
||||
|
||||
This bug in prefetch was fixed, but we want to prevent similar problems in future.
|
||||
For example prewarm is also similar with prefetch and sens several requests and only
|
||||
after it wait for response.
|
||||
|
||||
## Previous work
|
||||
|
||||
We already changed protocol version from V1 to V2 when replaced single request LSN with pair
|
||||
(request LSN,not modified since LSN). It was done by introducing new command codes.
|
||||
So there was no explicit check for protocol version: if server receives new command,
|
||||
it assumes that it is new protocol version. After both clients and servers were upgraded to new version,
|
||||
new command codes were removed. Then version was added to "pagestream" command used to perform handshake.
|
||||
Client has `neon.protocol_version` GUC specifying which protocol version it should use.
|
||||
So client informs server about protocol version it is going to use, but server can not ask the client to use some other protocol version,
|
||||
it can only reject connection request if it is not supporting this protocol version.
|
||||
|
||||
|
||||
## Requirements
|
||||
|
||||
- Be able to verify that page server response corresponds to the requests
|
||||
- Provide backward compatibility: old clients should work with new server.
|
||||
|
||||
## Non Goals
|
||||
|
||||
- Detect page corruption (include CRC)
|
||||
- Support of vector operation (merge several requests into one)
|
||||
- Forward compatibility: support new clients with old page server
|
||||
|
||||
|
||||
## Solution
|
||||
|
||||
Include in response extra fields making it possible to verify that response corresponds to the particular request.
|
||||
Such extra fields include:
|
||||
|
||||
- tablespace OID
|
||||
- database OID
|
||||
- relation OID
|
||||
- fork number
|
||||
- block id (for getpage)
|
||||
- request LSN
|
||||
- last modified LSN
|
||||
|
||||
In addition to this fields, we also introduce unique auto-incremented `request_id`.
|
||||
It is combined from `backend_id` and local auto-incremented counter.
|
||||
There is some probability of collision if backend is restarted, but it is not critical as far as we have all other fields included in response.
|
||||
`request_id` can be used for better tracking and associating log messages produced by client and page server.
|
||||
|
||||
Although only mismatch of `getpage` request can cause data corruption, we want to extend responses for all other commands: get relation/db size, check presence of relation.
|
||||
|
||||
|
||||
## Compatibility
|
||||
|
||||
We will change handshake command freom "pagestream_v2" to "pagestream_v3". With V3 version of protocol server should
|
||||
reply with extended responses. Request/response tags will not be changed.
|
||||
|
||||
To prevent forward compatibility issues (when new client tries to access old server), deploy of this PR should be done in three steps:
|
||||
1. Deploy of new server recognizing V3 protocol version
|
||||
2. Deploy of new client which is able to send V3 commands, but by default still using V2.
|
||||
3. After one release cycle when no rollback to previous PS version is possible, we can switch default version of protocol to V3, by changing `neon.protocol_version` GUC in project settings.
|
||||
|
||||
@@ -313,20 +313,17 @@ pub struct MetadataHealthUpdateRequest {
|
||||
pub struct MetadataHealthUpdateResponse {}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
|
||||
pub struct MetadataHealthListUnhealthyResponse {
|
||||
pub unhealthy_tenant_shards: Vec<TenantShardId>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
|
||||
pub struct MetadataHealthListOutdatedRequest {
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub not_scrubbed_for: Duration,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
|
||||
pub struct MetadataHealthListOutdatedResponse {
|
||||
pub health_records: Vec<MetadataHealthRecord>,
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
|
||||
use crate::tenant::storage_layer::LayerName;
|
||||
use crate::tenant::Generation;
|
||||
#[cfg_attr(target_os = "macos", allow(unused_imports))]
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
|
||||
use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile};
|
||||
use crate::TEMP_FILE_SUFFIX;
|
||||
|
||||
@@ -164,6 +164,30 @@ impl Deref for FileStorage {
|
||||
}
|
||||
}
|
||||
|
||||
impl TimelinePersistentState {
|
||||
pub(crate) fn write_to_buf(&self) -> Result<Vec<u8>> {
|
||||
let mut buf: Vec<u8> = Vec::new();
|
||||
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
|
||||
|
||||
if self.eviction_state == EvictionState::Present {
|
||||
// temp hack for forward compatibility
|
||||
const PREV_FORMAT_VERSION: u32 = 8;
|
||||
let prev = downgrade_v9_to_v8(self);
|
||||
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, PREV_FORMAT_VERSION)?;
|
||||
prev.ser_into(&mut buf)?;
|
||||
} else {
|
||||
// otherwise, we write the current format version
|
||||
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
|
||||
self.ser_into(&mut buf)?;
|
||||
}
|
||||
|
||||
// calculate checksum before resize
|
||||
let checksum = crc32c::crc32c(&buf);
|
||||
buf.extend_from_slice(&checksum.to_le_bytes());
|
||||
Ok(buf)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Storage for FileStorage {
|
||||
/// Persists state durably to the underlying storage.
|
||||
@@ -180,24 +204,8 @@ impl Storage for FileStorage {
|
||||
&control_partial_path
|
||||
)
|
||||
})?;
|
||||
let mut buf: Vec<u8> = Vec::new();
|
||||
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
|
||||
|
||||
if s.eviction_state == EvictionState::Present {
|
||||
// temp hack for forward compatibility
|
||||
const PREV_FORMAT_VERSION: u32 = 8;
|
||||
let prev = downgrade_v9_to_v8(s);
|
||||
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, PREV_FORMAT_VERSION)?;
|
||||
prev.ser_into(&mut buf)?;
|
||||
} else {
|
||||
// otherwise, we write the current format version
|
||||
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
|
||||
s.ser_into(&mut buf)?;
|
||||
}
|
||||
|
||||
// calculate checksum before resize
|
||||
let checksum = crc32c::crc32c(&buf);
|
||||
buf.extend_from_slice(&checksum.to_le_bytes());
|
||||
let buf: Vec<u8> = s.write_to_buf()?;
|
||||
|
||||
control_partial.write_all(&buf).await.with_context(|| {
|
||||
format!(
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
use reqwest::{IntoUrl, Method, StatusCode};
|
||||
use utils::{
|
||||
http::error::HttpErrorBody,
|
||||
id::{TenantId, TimelineId},
|
||||
id::{NodeId, TenantId, TimelineId},
|
||||
logging::SecretString,
|
||||
};
|
||||
|
||||
@@ -97,10 +97,11 @@ impl Client {
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
stream_to: NodeId,
|
||||
) -> Result<reqwest::Response> {
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{}/timeline/{}/snapshot",
|
||||
self.mgmt_api_endpoint, tenant_id, timeline_id
|
||||
"{}/v1/tenant/{}/timeline/{}/snapshot/{}",
|
||||
self.mgmt_api_endpoint, tenant_id, timeline_id, stream_to.0
|
||||
);
|
||||
self.get(&uri).await
|
||||
}
|
||||
|
||||
@@ -205,6 +205,7 @@ async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
|
||||
/// Stream tar archive with all timeline data.
|
||||
async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let destination = parse_request_param(&request, "destination_id")?;
|
||||
let ttid = TenantTimelineId::new(
|
||||
parse_request_param(&request, "tenant_id")?,
|
||||
parse_request_param(&request, "timeline_id")?,
|
||||
@@ -225,7 +226,13 @@ async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Bo
|
||||
// so create the chan and write to it in another task.
|
||||
let (tx, rx) = mpsc::channel(1);
|
||||
|
||||
task::spawn(pull_timeline::stream_snapshot(tli, tx));
|
||||
let conf = get_conf(&request);
|
||||
task::spawn(pull_timeline::stream_snapshot(
|
||||
tli,
|
||||
conf.my_id,
|
||||
destination,
|
||||
tx,
|
||||
));
|
||||
|
||||
let rx_stream = ReceiverStream::new(rx);
|
||||
let body = Body::wrap_stream(rx_stream);
|
||||
@@ -565,7 +572,7 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
|
||||
request_span(r, tenant_delete_handler)
|
||||
})
|
||||
.get(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot",
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot/:destination_id",
|
||||
|r| request_span(r, timeline_snapshot_handler),
|
||||
)
|
||||
.post("/v1/pull_timeline", |r| {
|
||||
|
||||
@@ -11,13 +11,8 @@ use std::{
|
||||
io::{self, ErrorKind},
|
||||
sync::Arc,
|
||||
};
|
||||
use tokio::{
|
||||
fs::{File, OpenOptions},
|
||||
io::AsyncWrite,
|
||||
sync::mpsc,
|
||||
task,
|
||||
};
|
||||
use tokio_tar::{Archive, Builder};
|
||||
use tokio::{fs::OpenOptions, io::AsyncWrite, sync::mpsc, task};
|
||||
use tokio_tar::{Archive, Builder, Header};
|
||||
use tokio_util::{
|
||||
io::{CopyToBytes, SinkWriter},
|
||||
sync::PollSender,
|
||||
@@ -32,13 +27,15 @@ use crate::{
|
||||
routes::TimelineStatus,
|
||||
},
|
||||
safekeeper::Term,
|
||||
state::TimelinePersistentState,
|
||||
timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError, WalResidentTimeline},
|
||||
wal_backup,
|
||||
wal_storage::{self, open_wal_file, Storage},
|
||||
GlobalTimelines, SafeKeeperConf,
|
||||
};
|
||||
use utils::{
|
||||
crashsafe::{durable_rename, fsync_async_opt},
|
||||
id::{TenantId, TenantTimelineId, TimelineId},
|
||||
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
|
||||
logging::SecretString,
|
||||
lsn::Lsn,
|
||||
pausable_failpoint,
|
||||
@@ -46,8 +43,13 @@ use utils::{
|
||||
|
||||
/// Stream tar archive of timeline to tx.
|
||||
#[instrument(name = "snapshot", skip_all, fields(ttid = %tli.ttid))]
|
||||
pub async fn stream_snapshot(tli: WalResidentTimeline, tx: mpsc::Sender<Result<Bytes>>) {
|
||||
if let Err(e) = stream_snapshot_guts(tli, tx.clone()).await {
|
||||
pub async fn stream_snapshot(
|
||||
tli: WalResidentTimeline,
|
||||
source: NodeId,
|
||||
destination: NodeId,
|
||||
tx: mpsc::Sender<Result<Bytes>>,
|
||||
) {
|
||||
if let Err(e) = stream_snapshot_guts(tli, source, destination, tx.clone()).await {
|
||||
// Error type/contents don't matter as they won't can't reach the client
|
||||
// (hyper likely doesn't do anything with it), but http stream will be
|
||||
// prematurely terminated. It would be nice to try to send the error in
|
||||
@@ -81,6 +83,8 @@ impl Drop for SnapshotContext {
|
||||
|
||||
pub async fn stream_snapshot_guts(
|
||||
tli: WalResidentTimeline,
|
||||
source: NodeId,
|
||||
destination: NodeId,
|
||||
tx: mpsc::Sender<Result<Bytes>>,
|
||||
) -> Result<()> {
|
||||
// tokio-tar wants Write implementor, but we have mpsc tx <Result<Bytes>>;
|
||||
@@ -104,7 +108,7 @@ pub async fn stream_snapshot_guts(
|
||||
// which is also likely suboptimal.
|
||||
let mut ar = Builder::new_non_terminated(pinned_writer);
|
||||
|
||||
let bctx = tli.start_snapshot(&mut ar).await?;
|
||||
let bctx = tli.start_snapshot(&mut ar, source, destination).await?;
|
||||
pausable_failpoint!("sk-snapshot-after-list-pausable");
|
||||
|
||||
let tli_dir = tli.get_timeline_dir();
|
||||
@@ -158,13 +162,43 @@ impl WalResidentTimeline {
|
||||
async fn start_snapshot<W: AsyncWrite + Unpin + Send>(
|
||||
&self,
|
||||
ar: &mut tokio_tar::Builder<W>,
|
||||
source: NodeId,
|
||||
destination: NodeId,
|
||||
) -> Result<SnapshotContext> {
|
||||
let mut shared_state = self.write_shared_state().await;
|
||||
let wal_seg_size = shared_state.get_wal_seg_size();
|
||||
|
||||
let cf_path = self.get_timeline_dir().join(CONTROL_FILE_NAME);
|
||||
let mut cf = File::open(cf_path).await?;
|
||||
ar.append_file(CONTROL_FILE_NAME, &mut cf).await?;
|
||||
let mut control_store = TimelinePersistentState::clone(shared_state.sk.state());
|
||||
// Modify the partial segment of the in-memory copy for the control file to
|
||||
// point to the destination safekeeper.
|
||||
let replace = control_store
|
||||
.partial_backup
|
||||
.replace_uploaded_segment(source, destination)?;
|
||||
|
||||
if let Some(replace) = replace {
|
||||
// The deserialized control file has an uploaded partial. We upload a copy
|
||||
// of it to object storage for the destination safekeeper and send an updated
|
||||
// control file in the snapshot.
|
||||
tracing::info!(
|
||||
"Replacing uploaded partial segment in in-mem control file: {replace:?}"
|
||||
);
|
||||
|
||||
let remote_timeline_path = wal_backup::remote_timeline_path(&self.tli.ttid)?;
|
||||
wal_backup::copy_partial_segment(
|
||||
&replace.previous.remote_path(&remote_timeline_path),
|
||||
&replace.current.remote_path(&remote_timeline_path),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
let buf = control_store
|
||||
.write_to_buf()
|
||||
.with_context(|| "failed to serialize control store")?;
|
||||
let mut header = Header::new_gnu();
|
||||
header.set_size(buf.len().try_into().expect("never breaches u64"));
|
||||
ar.append_data(&mut header, CONTROL_FILE_NAME, buf.as_slice())
|
||||
.await
|
||||
.with_context(|| "failed to append to archive")?;
|
||||
|
||||
// We need to stream since the oldest segment someone (s3 or pageserver)
|
||||
// still needs. This duplicates calc_horizon_lsn logic.
|
||||
@@ -342,7 +376,7 @@ async fn pull_timeline(
|
||||
let client = Client::new(host.clone(), sk_auth_token.clone());
|
||||
// Request stream with basebackup archive.
|
||||
let bb_resp = client
|
||||
.snapshot(status.tenant_id, status.timeline_id)
|
||||
.snapshot(status.tenant_id, status.timeline_id, conf.my_id)
|
||||
.await?;
|
||||
|
||||
// Make Stream of Bytes from it...
|
||||
|
||||
@@ -483,6 +483,16 @@ pub(crate) async fn backup_partial_segment(
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn copy_partial_segment(
|
||||
source: &RemotePath,
|
||||
destination: &RemotePath,
|
||||
) -> Result<()> {
|
||||
let storage = get_configured_remote_storage();
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
storage.copy_object(source, destination, &cancel).await
|
||||
}
|
||||
|
||||
pub async fn read_object(
|
||||
file_path: &RemotePath,
|
||||
offset: u64,
|
||||
|
||||
@@ -17,14 +17,13 @@
|
||||
//! file. Code updates state in the control file before doing any S3 operations.
|
||||
//! This way control file stores information about all potentially existing
|
||||
//! remote partial segments and can clean them up after uploading a newer version.
|
||||
|
||||
use camino::Utf8PathBuf;
|
||||
use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
|
||||
use remote_storage::RemotePath;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use tracing::{debug, error, info, instrument, warn};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::{id::NodeId, lsn::Lsn};
|
||||
|
||||
use crate::{
|
||||
metrics::{MISC_OPERATION_SECONDS, PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS},
|
||||
@@ -82,6 +81,12 @@ pub struct State {
|
||||
pub segments: Vec<PartialRemoteSegment>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ReplaceUploadedSegment {
|
||||
pub(crate) previous: PartialRemoteSegment,
|
||||
pub(crate) current: PartialRemoteSegment,
|
||||
}
|
||||
|
||||
impl State {
|
||||
/// Find an Uploaded segment. There should be only one Uploaded segment at a time.
|
||||
pub(crate) fn uploaded_segment(&self) -> Option<PartialRemoteSegment> {
|
||||
@@ -90,6 +95,54 @@ impl State {
|
||||
.find(|seg| seg.status == UploadStatus::Uploaded)
|
||||
.cloned()
|
||||
}
|
||||
|
||||
/// Replace the name of the Uploaded segment (if one exists) in order to match
|
||||
/// it with `destination` safekeeper. Returns a description of the change or None
|
||||
/// wrapped in anyhow::Result.
|
||||
pub(crate) fn replace_uploaded_segment(
|
||||
&mut self,
|
||||
source: NodeId,
|
||||
destination: NodeId,
|
||||
) -> anyhow::Result<Option<ReplaceUploadedSegment>> {
|
||||
let current = self
|
||||
.segments
|
||||
.iter_mut()
|
||||
.find(|seg| seg.status == UploadStatus::Uploaded);
|
||||
|
||||
let current = match current {
|
||||
Some(some) => some,
|
||||
None => {
|
||||
return anyhow::Ok(None);
|
||||
}
|
||||
};
|
||||
|
||||
// Sanity check that the partial segment we are replacing is belongs
|
||||
// to the `source` SK.
|
||||
if !current
|
||||
.name
|
||||
.ends_with(format!("sk{}.partial", source.0).as_str())
|
||||
{
|
||||
anyhow::bail!(
|
||||
"Partial segment name ({}) doesn't match self node id ({})",
|
||||
current.name,
|
||||
source
|
||||
);
|
||||
}
|
||||
|
||||
let previous = current.clone();
|
||||
|
||||
let new_name = current.name.replace(
|
||||
format!("_sk{}", source.0).as_str(),
|
||||
format!("_sk{}", destination.0).as_str(),
|
||||
);
|
||||
|
||||
current.name = new_name;
|
||||
|
||||
anyhow::Ok(Some(ReplaceUploadedSegment {
|
||||
previous,
|
||||
current: current.clone(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
struct PartialBackup {
|
||||
|
||||
@@ -500,7 +500,7 @@ async fn handle_node_configure(mut req: Request<Body>) -> Result<Response<Body>,
|
||||
StatusCode::OK,
|
||||
state
|
||||
.service
|
||||
.node_configure(
|
||||
.external_node_configure(
|
||||
config_req.node_id,
|
||||
config_req.availability.map(NodeAvailability::from),
|
||||
config_req.scheduling,
|
||||
|
||||
@@ -4912,6 +4912,26 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Wrapper around [`Self::node_configure`] which only allows changes while there is no ongoing
|
||||
/// operation for HTTP api.
|
||||
pub(crate) async fn external_node_configure(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
availability: Option<NodeAvailability>,
|
||||
scheduling: Option<NodeSchedulingPolicy>,
|
||||
) -> Result<(), ApiError> {
|
||||
{
|
||||
let locked = self.inner.read().unwrap();
|
||||
if let Some(op) = locked.ongoing_operation.as_ref().map(|op| op.operation) {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
format!("Ongoing background operation forbids configuring: {op}").into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
self.node_configure(node_id, availability, scheduling).await
|
||||
}
|
||||
|
||||
pub(crate) async fn start_node_drain(
|
||||
self: &Arc<Self>,
|
||||
node_id: NodeId,
|
||||
@@ -4969,6 +4989,8 @@ impl Service {
|
||||
cancel: cancel.clone(),
|
||||
});
|
||||
|
||||
let span = tracing::info_span!(parent: None, "drain_node", %node_id);
|
||||
|
||||
tokio::task::spawn({
|
||||
let service = self.clone();
|
||||
let cancel = cancel.clone();
|
||||
@@ -4985,21 +5007,21 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!(%node_id, "Drain background operation starting");
|
||||
tracing::info!("Drain background operation starting");
|
||||
let res = service.drain_node(node_id, cancel).await;
|
||||
match res {
|
||||
Ok(()) => {
|
||||
tracing::info!(%node_id, "Drain background operation completed successfully");
|
||||
tracing::info!("Drain background operation completed successfully");
|
||||
}
|
||||
Err(OperationError::Cancelled) => {
|
||||
tracing::info!(%node_id, "Drain background operation was cancelled");
|
||||
tracing::info!("Drain background operation was cancelled");
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!(%node_id, "Drain background operation encountered: {err}")
|
||||
tracing::error!("Drain background operation encountered: {err}")
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}.instrument(span));
|
||||
}
|
||||
NodeSchedulingPolicy::Draining => {
|
||||
return Err(ApiError::Conflict(format!(
|
||||
@@ -5017,14 +5039,14 @@ impl Service {
|
||||
}
|
||||
|
||||
pub(crate) async fn cancel_node_drain(&self, node_id: NodeId) -> Result<(), ApiError> {
|
||||
let (node_available, node_policy) = {
|
||||
let node_available = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let nodes = &locked.nodes;
|
||||
let node = nodes.get(&node_id).ok_or(ApiError::NotFound(
|
||||
anyhow::anyhow!("Node {} not registered", node_id).into(),
|
||||
))?;
|
||||
|
||||
(node.is_available(), node.get_scheduling())
|
||||
node.is_available()
|
||||
};
|
||||
|
||||
if !node_available {
|
||||
@@ -5033,12 +5055,6 @@ impl Service {
|
||||
));
|
||||
}
|
||||
|
||||
if !matches!(node_policy, NodeSchedulingPolicy::Draining) {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
format!("Node {node_id} has no drain in progress").into(),
|
||||
));
|
||||
}
|
||||
|
||||
if let Some(op_handler) = self.inner.read().unwrap().ongoing_operation.as_ref() {
|
||||
if let Operation::Drain(drain) = op_handler.operation {
|
||||
if drain.node_id == node_id {
|
||||
@@ -5104,6 +5120,8 @@ impl Service {
|
||||
cancel: cancel.clone(),
|
||||
});
|
||||
|
||||
let span = tracing::info_span!(parent: None, "fill_node", %node_id);
|
||||
|
||||
tokio::task::spawn({
|
||||
let service = self.clone();
|
||||
let cancel = cancel.clone();
|
||||
@@ -5120,21 +5138,21 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!(%node_id, "Fill background operation starting");
|
||||
tracing::info!("Fill background operation starting");
|
||||
let res = service.fill_node(node_id, cancel).await;
|
||||
match res {
|
||||
Ok(()) => {
|
||||
tracing::info!(%node_id, "Fill background operation completed successfully");
|
||||
tracing::info!("Fill background operation completed successfully");
|
||||
}
|
||||
Err(OperationError::Cancelled) => {
|
||||
tracing::info!(%node_id, "Fill background operation was cancelled");
|
||||
tracing::info!("Fill background operation was cancelled");
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!(%node_id, "Fill background operation encountered: {err}")
|
||||
tracing::error!("Fill background operation encountered: {err}")
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}.instrument(span));
|
||||
}
|
||||
NodeSchedulingPolicy::Filling => {
|
||||
return Err(ApiError::Conflict(format!(
|
||||
@@ -5152,14 +5170,14 @@ impl Service {
|
||||
}
|
||||
|
||||
pub(crate) async fn cancel_node_fill(&self, node_id: NodeId) -> Result<(), ApiError> {
|
||||
let (node_available, node_policy) = {
|
||||
let node_available = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let nodes = &locked.nodes;
|
||||
let node = nodes.get(&node_id).ok_or(ApiError::NotFound(
|
||||
anyhow::anyhow!("Node {} not registered", node_id).into(),
|
||||
))?;
|
||||
|
||||
(node.is_available(), node.get_scheduling())
|
||||
node.is_available()
|
||||
};
|
||||
|
||||
if !node_available {
|
||||
@@ -5168,12 +5186,6 @@ impl Service {
|
||||
));
|
||||
}
|
||||
|
||||
if !matches!(node_policy, NodeSchedulingPolicy::Filling) {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
format!("Node {node_id} has no fill in progress").into(),
|
||||
));
|
||||
}
|
||||
|
||||
if let Some(op_handler) = self.inner.read().unwrap().ongoing_operation.as_ref() {
|
||||
if let Operation::Fill(fill) = op_handler.operation {
|
||||
if fill.node_id == node_id {
|
||||
@@ -5982,7 +5994,7 @@ impl Service {
|
||||
.await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT)
|
||||
.await;
|
||||
|
||||
failpoint_support::sleep_millis_async!("sleepy-drain-loop");
|
||||
failpoint_support::sleep_millis_async!("sleepy-drain-loop", &cancel);
|
||||
}
|
||||
|
||||
while !waiters.is_empty() {
|
||||
|
||||
@@ -67,6 +67,7 @@ from fixtures.pageserver.utils import (
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
from fixtures.remote_storage import (
|
||||
LocalFsStorage,
|
||||
MockS3Server,
|
||||
RemoteStorage,
|
||||
RemoteStorageKind,
|
||||
@@ -4425,14 +4426,32 @@ class Safekeeper(LogUtils):
|
||||
def timeline_dir(self, tenant_id, timeline_id) -> Path:
|
||||
return self.data_dir / str(tenant_id) / str(timeline_id)
|
||||
|
||||
def list_uploaded_segments(self, tenant_id: TenantId, timeline_id: TimelineId):
|
||||
tline_path = (
|
||||
self.env.repo_dir
|
||||
/ "local_fs_remote_storage"
|
||||
/ "safekeeper"
|
||||
/ str(tenant_id)
|
||||
/ str(timeline_id)
|
||||
)
|
||||
assert isinstance(self.env.safekeepers_remote_storage, LocalFsStorage)
|
||||
return self._list_segments_in_dir(
|
||||
tline_path, lambda name: ".metadata" not in name and ".___temp" not in name
|
||||
)
|
||||
|
||||
def list_segments(self, tenant_id, timeline_id) -> List[str]:
|
||||
"""
|
||||
Get list of segment names of the given timeline.
|
||||
"""
|
||||
tli_dir = self.timeline_dir(tenant_id, timeline_id)
|
||||
return self._list_segments_in_dir(
|
||||
tli_dir, lambda name: not name.startswith("safekeeper.control")
|
||||
)
|
||||
|
||||
def _list_segments_in_dir(self, path: Path, keep_filter: Callable[[str], bool]) -> list[str]:
|
||||
segments = []
|
||||
for _, _, filenames in os.walk(tli_dir):
|
||||
segments.extend([f for f in filenames if not f.startswith("safekeeper.control")])
|
||||
for _, _, filenames in os.walk(path):
|
||||
segments.extend([f for f in filenames if keep_filter(f)])
|
||||
segments.sort()
|
||||
return segments
|
||||
|
||||
|
||||
@@ -2091,3 +2091,47 @@ def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder):
|
||||
)
|
||||
== 0
|
||||
)
|
||||
|
||||
|
||||
def test_storage_controller_ps_restarted_during_drain(neon_env_builder: NeonEnvBuilder):
|
||||
# single unsharded tenant, two locations
|
||||
neon_env_builder.num_pageservers = 2
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.storage_controller.tenant_policy_update(env.initial_tenant, {"placement": {"Attached": 1}})
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
attached_id = int(env.storage_controller.locate(env.initial_tenant)[0]["node_id"])
|
||||
attached = next((ps for ps in env.pageservers if ps.id == attached_id))
|
||||
|
||||
def attached_is_draining():
|
||||
details = env.storage_controller.node_status(attached.id)
|
||||
assert details["scheduling"] == "Draining"
|
||||
|
||||
env.storage_controller.configure_failpoints(("sleepy-drain-loop", "return(10000)"))
|
||||
env.storage_controller.node_drain(attached.id)
|
||||
|
||||
wait_until(10, 0.5, attached_is_draining)
|
||||
|
||||
attached.restart()
|
||||
|
||||
# we are unable to reconfigure node while the operation is still ongoing
|
||||
with pytest.raises(
|
||||
StorageControllerApiException,
|
||||
match="Precondition failed: Ongoing background operation forbids configuring: drain.*",
|
||||
):
|
||||
env.storage_controller.node_configure(attached.id, {"scheduling": "Pause"})
|
||||
with pytest.raises(
|
||||
StorageControllerApiException,
|
||||
match="Precondition failed: Ongoing background operation forbids configuring: drain.*",
|
||||
):
|
||||
env.storage_controller.node_configure(attached.id, {"availability": "Offline"})
|
||||
|
||||
env.storage_controller.cancel_node_drain(attached.id)
|
||||
|
||||
def reconfigure_node_again():
|
||||
env.storage_controller.node_configure(attached.id, {"scheduling": "Pause"})
|
||||
|
||||
# allow for small delay between actually having cancelled and being able reconfigure again
|
||||
wait_until(4, 0.5, reconfigure_node_again)
|
||||
|
||||
@@ -49,7 +49,13 @@ from fixtures.remote_storage import (
|
||||
)
|
||||
from fixtures.safekeeper.http import SafekeeperHttpClient
|
||||
from fixtures.safekeeper.utils import are_walreceivers_absent
|
||||
from fixtures.utils import PropagatingThread, get_dir_size, query_scalar, start_in_background
|
||||
from fixtures.utils import (
|
||||
PropagatingThread,
|
||||
get_dir_size,
|
||||
query_scalar,
|
||||
start_in_background,
|
||||
wait_until,
|
||||
)
|
||||
|
||||
|
||||
def wait_lsn_force_checkpoint(
|
||||
@@ -63,6 +69,18 @@ def wait_lsn_force_checkpoint(
|
||||
lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
log.info(f"pg_current_wal_flush_lsn is {lsn}, waiting for it on pageserver")
|
||||
|
||||
wait_lsn_force_checkpoint_at(lsn, tenant_id, timeline_id, ps, pageserver_conn_options)
|
||||
|
||||
|
||||
def wait_lsn_force_checkpoint_at(
|
||||
lsn: Lsn,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
ps: NeonPageserver,
|
||||
pageserver_conn_options=None,
|
||||
):
|
||||
pageserver_conn_options = pageserver_conn_options or {}
|
||||
|
||||
auth_token = None
|
||||
if "password" in pageserver_conn_options:
|
||||
auth_token = pageserver_conn_options["password"]
|
||||
@@ -2304,3 +2322,138 @@ def test_s3_eviction(
|
||||
)
|
||||
|
||||
assert event_metrics_seen
|
||||
|
||||
|
||||
def test_pull_timeline_partial_segment_integrity(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Verify that pulling timeline from a SK with an uploaded partial segment
|
||||
does not lead to consistency issues:
|
||||
1. Start 3 SKs - only use two
|
||||
2. Ingest a bit of WAL
|
||||
3. Wait for partial to be uploaded
|
||||
4. Pull timeline to the third SK
|
||||
6. Replace source with destination SK and start compute
|
||||
5. Wait for source SK to evict timeline
|
||||
6. Go back to initial compute SK config and validate that
|
||||
source SK can unevict the timeline (S3 state is consistent)
|
||||
"""
|
||||
neon_env_builder.auth_enabled = True
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
|
||||
|
||||
neon_env_builder.safekeeper_extra_opts = [
|
||||
"--enable-offload",
|
||||
"--delete-offloaded-wal",
|
||||
"--partial-backup-timeout",
|
||||
"500ms",
|
||||
"--control-file-save-interval",
|
||||
"500ms",
|
||||
"--eviction-min-resident=500ms",
|
||||
]
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"checkpoint_timeout": "100ms"})
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
(src_sk, dst_sk) = (env.safekeepers[0], env.safekeepers[2])
|
||||
|
||||
log.info("use only first 2 safekeepers, 3rd will be seeded")
|
||||
endpoint = env.endpoints.create("main")
|
||||
endpoint.active_safekeepers = [1, 2]
|
||||
endpoint.start()
|
||||
endpoint.safe_psql("create table t(key int, value text)")
|
||||
endpoint.safe_psql("insert into t select generate_series(1, 180000), 'papaya'")
|
||||
|
||||
endpoint.stop()
|
||||
|
||||
def source_partial_segment_uploaded():
|
||||
first_segment_name = "000000010000000000000001"
|
||||
segs = src_sk.list_uploaded_segments(tenant_id, timeline_id)
|
||||
|
||||
candidate_seg = None
|
||||
for seg in segs:
|
||||
if "partial" in seg and "sk1" in seg and not seg.startswith(first_segment_name):
|
||||
candidate_seg = seg
|
||||
|
||||
if candidate_seg is not None:
|
||||
# The term might change, causing the segment to be gc-ed shortly after,
|
||||
# so give it a bit of time to make sure it's stable.
|
||||
time.sleep(2)
|
||||
|
||||
segs = src_sk.list_uploaded_segments(tenant_id, timeline_id)
|
||||
assert candidate_seg in segs
|
||||
return candidate_seg
|
||||
|
||||
raise Exception("Partial segment not uploaded yet")
|
||||
|
||||
source_partial_segment = wait_until(15, 1, source_partial_segment_uploaded)
|
||||
log.info(
|
||||
f"Uploaded segments before pull are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}"
|
||||
)
|
||||
log.info(f"Tracking source partial segment: {source_partial_segment}")
|
||||
|
||||
src_flush_lsn = src_sk.get_flush_lsn(tenant_id, timeline_id)
|
||||
log.info(f"flush_lsn on src before pull_timeline: {src_flush_lsn}")
|
||||
|
||||
pageserver_conn_options = {"password": env.auth_keys.generate_tenant_token(tenant_id)}
|
||||
wait_lsn_force_checkpoint_at(
|
||||
src_flush_lsn, tenant_id, timeline_id, env.pageserver, pageserver_conn_options
|
||||
)
|
||||
|
||||
dst_sk.pull_timeline([src_sk], tenant_id, timeline_id)
|
||||
|
||||
def evicted():
|
||||
evictions = src_sk.http_client().get_metric_value(
|
||||
"safekeeper_eviction_events_completed_total", {"kind": "evict"}
|
||||
)
|
||||
|
||||
if evictions is None or evictions == 0:
|
||||
raise Exception("Eviction did not happen on source safekeeper yet")
|
||||
|
||||
wait_until(30, 1, evicted)
|
||||
|
||||
endpoint.start(safekeepers=[2, 3])
|
||||
|
||||
def new_partial_segment_uploaded():
|
||||
segs = src_sk.list_uploaded_segments(tenant_id, timeline_id)
|
||||
for seg in segs:
|
||||
if "partial" in seg and "sk3" in seg:
|
||||
return seg
|
||||
|
||||
raise Exception("Partial segment not uploaded yet")
|
||||
|
||||
log.info(
|
||||
f"Uploaded segments before post-pull ingest are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}"
|
||||
)
|
||||
|
||||
endpoint.safe_psql("insert into t select generate_series(1, 1000), 'pear'")
|
||||
wait_until(15, 1, new_partial_segment_uploaded)
|
||||
|
||||
log.info(
|
||||
f"Uploaded segments after post-pull ingest are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}"
|
||||
)
|
||||
|
||||
# Allow for some gc iterations to happen and assert that the original
|
||||
# uploaded partial segment remains in place.
|
||||
time.sleep(5)
|
||||
segs = src_sk.list_uploaded_segments(tenant_id, timeline_id)
|
||||
assert source_partial_segment in segs
|
||||
|
||||
log.info(
|
||||
f"Uploaded segments at the end are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}"
|
||||
)
|
||||
|
||||
# Restart the endpoint in order to check that the source safekeeper
|
||||
# can unevict the timeline
|
||||
endpoint.stop()
|
||||
endpoint.start(safekeepers=[1, 2])
|
||||
|
||||
def unevicted():
|
||||
unevictions = src_sk.http_client().get_metric_value(
|
||||
"safekeeper_eviction_events_completed_total", {"kind": "restore"}
|
||||
)
|
||||
|
||||
if unevictions is None or unevictions == 0:
|
||||
raise Exception("Uneviction did not happen on source safekeeper yet")
|
||||
|
||||
wait_until(10, 1, unevicted)
|
||||
|
||||
Reference in New Issue
Block a user