mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-21 07:00:38 +00:00
Compare commits
1 Commits
partitioni
...
fix-makefi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
294fab103f |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -5,7 +5,6 @@
|
||||
__pycache__/
|
||||
test_output/
|
||||
.vscode
|
||||
.idea
|
||||
/.zenith
|
||||
/integration_tests/.zenith
|
||||
|
||||
|
||||
4
Makefile
4
Makefile
@@ -113,3 +113,7 @@ fmt:
|
||||
.PHONY: setup-pre-commit-hook
|
||||
setup-pre-commit-hook:
|
||||
ln -s -f ../../pre-commit.py .git/hooks/pre-commit
|
||||
|
||||
# Rebuild when any makefile changes
|
||||
# https://stackoverflow.com/questions/3871444/making-all-rules-depend-on-the-makefile-itself
|
||||
.EXTRA_PREREQS+=$(foreach mk, ${MAKEFILE_LIST},$(abspath ${mk}))
|
||||
|
||||
@@ -57,15 +57,15 @@ pub struct SkTimelineInfo {
|
||||
pub peer_horizon_lsn: Option<Lsn>,
|
||||
#[serde(default)]
|
||||
pub safekeeper_connstr: Option<String>,
|
||||
#[serde(default)]
|
||||
pub pageserver_connstr: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum BrokerError {
|
||||
#[error("Etcd client error: {0}. Context: {1}")]
|
||||
EtcdClient(etcd_client::Error, String),
|
||||
#[error("Error during parsing etcd key: {0}")]
|
||||
InvalidKey(String),
|
||||
#[error("Error during parsing etcd value: {0}")]
|
||||
#[error("Error during parsing etcd data: {0}")]
|
||||
ParsingError(String),
|
||||
#[error("Internal error: {0}")]
|
||||
InternalError(String),
|
||||
@@ -221,7 +221,7 @@ pub async fn subscribe_to_safekeeper_timeline_updates(
|
||||
},
|
||||
};
|
||||
|
||||
match parse_safekeeper_timeline(&subscription, key_str, value_str) {
|
||||
match parse_etcd_key_value(&subscription, key_str, value_str) {
|
||||
Ok((zttid, timeline)) => {
|
||||
match timeline_updates
|
||||
.entry(zttid)
|
||||
@@ -243,8 +243,6 @@ pub async fn subscribe_to_safekeeper_timeline_updates(
|
||||
}
|
||||
}
|
||||
}
|
||||
// it is normal to get other keys when we subscribe to everything
|
||||
Err(BrokerError::InvalidKey(e)) => debug!("Unexpected key for timeline update: {e}"),
|
||||
Err(e) => error!("Failed to parse timeline update: {e}"),
|
||||
};
|
||||
}
|
||||
@@ -283,14 +281,14 @@ static SK_TIMELINE_KEY_REGEX: Lazy<Regex> = Lazy::new(|| {
|
||||
.expect("wrong regex for safekeeper timeline etcd key")
|
||||
});
|
||||
|
||||
fn parse_safekeeper_timeline(
|
||||
fn parse_etcd_key_value(
|
||||
subscription: &SkTimelineSubscriptionKind,
|
||||
key_str: &str,
|
||||
value_str: &str,
|
||||
) -> Result<(ZTenantTimelineId, SafekeeperTimeline), BrokerError> {
|
||||
let broker_prefix = subscription.broker_etcd_prefix.as_str();
|
||||
if !key_str.starts_with(broker_prefix) {
|
||||
return Err(BrokerError::InvalidKey(format!(
|
||||
return Err(BrokerError::ParsingError(format!(
|
||||
"KV has unexpected key '{key_str}' that does not start with broker prefix {broker_prefix}"
|
||||
)));
|
||||
}
|
||||
@@ -299,7 +297,7 @@ fn parse_safekeeper_timeline(
|
||||
let key_captures = match SK_TIMELINE_KEY_REGEX.captures(key_part) {
|
||||
Some(captures) => captures,
|
||||
None => {
|
||||
return Err(BrokerError::InvalidKey(format!(
|
||||
return Err(BrokerError::ParsingError(format!(
|
||||
"KV has unexpected key part '{key_part}' that does not match required regex {}",
|
||||
SK_TIMELINE_KEY_REGEX.as_str()
|
||||
)));
|
||||
@@ -385,7 +383,7 @@ mod tests {
|
||||
&timeline_subscription,
|
||||
] {
|
||||
let (id, _timeline) =
|
||||
parse_safekeeper_timeline(subscription, &key_string, value_str)
|
||||
parse_etcd_key_value(subscription, &key_string, value_str)
|
||||
.unwrap_or_else(|e| panic!("Should be able to parse etcd key string '{key_string}' and etcd value string '{value_str}' for subscription {subscription:?}, but got: {e}"));
|
||||
assert_eq!(id, ZTenantTimelineId::new(tenant_id, timeline_id));
|
||||
}
|
||||
|
||||
@@ -269,14 +269,7 @@ impl FeStartupPacket {
|
||||
.next()
|
||||
.context("expected even number of params in StartupMessage")?;
|
||||
if name == "options" {
|
||||
// parsing options arguments "...&options=<var0>%3D<val0>+<var1>=<var1>..."
|
||||
// '%3D' is '=' and '+' is ' '
|
||||
|
||||
// Note: we allow users that don't have SNI capabilities,
|
||||
// to pass a special keyword argument 'project'
|
||||
// to be used to determine the cluster name by the proxy.
|
||||
|
||||
//TODO: write unit test for this and refactor in its own function.
|
||||
// deprecated way of passing params as cmd line args
|
||||
for cmdopt in value.split(' ') {
|
||||
let nameval: Vec<&str> = cmdopt.split('=').collect();
|
||||
if nameval.len() == 2 {
|
||||
|
||||
@@ -1727,7 +1727,9 @@ impl LayeredTimeline {
|
||||
new_delta_path.clone(),
|
||||
self.conf.timeline_path(&self.timeline_id, &self.tenant_id),
|
||||
])?;
|
||||
fail_point!("flush-frozen-before-sync");
|
||||
fail_point!("checkpoint-before-sync");
|
||||
|
||||
fail_point!("flush-frozen");
|
||||
|
||||
// Finally, replace the frozen in-memory layer with the new on-disk layer
|
||||
{
|
||||
@@ -1859,37 +1861,41 @@ impl LayeredTimeline {
|
||||
|
||||
let target_file_size = self.get_checkpoint_distance();
|
||||
|
||||
// 1. Partition the key space
|
||||
let pgdir = tenant_mgr::get_local_timeline_with_load(self.tenant_id, self.timeline_id)?;
|
||||
let (partitioning, lsn) = pgdir.repartition(
|
||||
self.get_last_record_lsn(),
|
||||
self.get_compaction_target_size(),
|
||||
)?;
|
||||
let timer = self.create_images_time_histo.start_timer();
|
||||
|
||||
// 2. Create new image layers for partitions that have been modified
|
||||
// "enough".
|
||||
let mut layer_paths_to_upload = HashSet::with_capacity(partitioning.parts.len());
|
||||
for part in partitioning.parts.iter() {
|
||||
if self.time_for_new_image_layer(part, lsn)? {
|
||||
let new_path = self.create_image_layer(part, lsn)?;
|
||||
layer_paths_to_upload.insert(new_path);
|
||||
// Define partitioning schema if needed
|
||||
if let Ok(pgdir) =
|
||||
tenant_mgr::get_local_timeline_with_load(self.tenant_id, self.timeline_id)
|
||||
{
|
||||
let (partitioning, lsn) = pgdir.repartition(
|
||||
self.get_last_record_lsn(),
|
||||
self.get_compaction_target_size(),
|
||||
)?;
|
||||
let timer = self.create_images_time_histo.start_timer();
|
||||
// 2. Create new image layers for partitions that have been modified
|
||||
// "enough".
|
||||
let mut layer_paths_to_upload = HashSet::with_capacity(partitioning.parts.len());
|
||||
for part in partitioning.parts.iter() {
|
||||
if self.time_for_new_image_layer(part, lsn)? {
|
||||
let new_path = self.create_image_layer(part, lsn)?;
|
||||
layer_paths_to_upload.insert(new_path);
|
||||
}
|
||||
}
|
||||
}
|
||||
if self.upload_layers.load(atomic::Ordering::Relaxed) {
|
||||
storage_sync::schedule_layer_upload(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
layer_paths_to_upload,
|
||||
None,
|
||||
);
|
||||
}
|
||||
timer.stop_and_record();
|
||||
if self.upload_layers.load(atomic::Ordering::Relaxed) {
|
||||
storage_sync::schedule_layer_upload(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
layer_paths_to_upload,
|
||||
None,
|
||||
);
|
||||
}
|
||||
timer.stop_and_record();
|
||||
|
||||
// 3. Compact
|
||||
let timer = self.compact_time_histo.start_timer();
|
||||
self.compact_level0(target_file_size)?;
|
||||
timer.stop_and_record();
|
||||
// 3. Compact
|
||||
let timer = self.compact_time_histo.start_timer();
|
||||
self.compact_level0(target_file_size)?;
|
||||
timer.stop_and_record();
|
||||
} else {
|
||||
debug!("Could not compact because no partitioning specified yet");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
//! assign a buffer for a page, you must hold the mapping lock and the lock on
|
||||
//! the slot at the same time.
|
||||
//!
|
||||
//! Whenever you need to hold both locks simultaneously, the slot lock must be
|
||||
//! Whenever you need to hold both locks simultenously, the slot lock must be
|
||||
//! acquired first. This consistent ordering avoids deadlocks. To look up a page
|
||||
//! in the cache, you would first look up the mapping, while holding the mapping
|
||||
//! lock, and then lock the slot. You must release the mapping lock in between,
|
||||
|
||||
223
pageserver/src/remote_storage/storage_sync/delete.rs
Normal file
223
pageserver/src/remote_storage/storage_sync/delete.rs
Normal file
@@ -0,0 +1,223 @@
|
||||
//! Timeline synchronization logic to delete a bulk of timeline's remote files from the remote storage.
|
||||
|
||||
use anyhow::Context;
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use tracing::{debug, error, info};
|
||||
use utils::zid::ZTenantTimelineId;
|
||||
|
||||
use crate::remote_storage::{
|
||||
storage_sync::{SyncQueue, SyncTask},
|
||||
RemoteStorage,
|
||||
};
|
||||
|
||||
use super::{LayersDeletion, SyncData};
|
||||
|
||||
/// Attempts to remove the timleline layers from the remote storage.
|
||||
/// If the task had not adjusted the metadata before, the deletion will fail.
|
||||
pub(super) async fn delete_timeline_layers<'a, P, S>(
|
||||
storage: &'a S,
|
||||
sync_queue: &SyncQueue,
|
||||
sync_id: ZTenantTimelineId,
|
||||
mut delete_data: SyncData<LayersDeletion>,
|
||||
) -> bool
|
||||
where
|
||||
P: std::fmt::Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
if !delete_data.data.deletion_registered {
|
||||
error!("Cannot delete timeline layers before the deletion metadata is not registered, reenqueueing");
|
||||
delete_data.retries += 1;
|
||||
sync_queue.push(sync_id, SyncTask::Delete(delete_data));
|
||||
return false;
|
||||
}
|
||||
|
||||
if delete_data.data.layers_to_delete.is_empty() {
|
||||
info!("No layers to delete, skipping");
|
||||
return true;
|
||||
}
|
||||
|
||||
let layers_to_delete = delete_data
|
||||
.data
|
||||
.layers_to_delete
|
||||
.drain()
|
||||
.collect::<Vec<_>>();
|
||||
debug!("Layers to delete: {layers_to_delete:?}");
|
||||
info!("Deleting {} timeline layers", layers_to_delete.len());
|
||||
|
||||
let mut delete_tasks = layers_to_delete
|
||||
.into_iter()
|
||||
.map(|local_layer_path| async {
|
||||
let storage_path = match storage.storage_path(&local_layer_path).with_context(|| {
|
||||
format!(
|
||||
"Failed to get the layer storage path for local path '{}'",
|
||||
local_layer_path.display()
|
||||
)
|
||||
}) {
|
||||
Ok(path) => path,
|
||||
Err(e) => return Err((e, local_layer_path)),
|
||||
};
|
||||
|
||||
match storage.delete(&storage_path).await.with_context(|| {
|
||||
format!(
|
||||
"Failed to delete remote layer from storage at '{:?}'",
|
||||
storage_path
|
||||
)
|
||||
}) {
|
||||
Ok(()) => Ok(local_layer_path),
|
||||
Err(e) => Err((e, local_layer_path)),
|
||||
}
|
||||
})
|
||||
.collect::<FuturesUnordered<_>>();
|
||||
|
||||
let mut errored = false;
|
||||
while let Some(deletion_result) = delete_tasks.next().await {
|
||||
match deletion_result {
|
||||
Ok(local_layer_path) => {
|
||||
debug!(
|
||||
"Successfully deleted layer {} for timeline {sync_id}",
|
||||
local_layer_path.display()
|
||||
);
|
||||
delete_data.data.deleted_layers.insert(local_layer_path);
|
||||
}
|
||||
Err((e, local_layer_path)) => {
|
||||
errored = true;
|
||||
error!(
|
||||
"Failed to delete layer {} for timeline {sync_id}: {e:?}",
|
||||
local_layer_path.display()
|
||||
);
|
||||
delete_data.data.layers_to_delete.insert(local_layer_path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if errored {
|
||||
debug!("Reenqueuing failed delete task for timeline {sync_id}");
|
||||
delete_data.retries += 1;
|
||||
sync_queue.push(sync_id, SyncTask::Delete(delete_data));
|
||||
}
|
||||
errored
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{collections::HashSet, num::NonZeroUsize};
|
||||
|
||||
use itertools::Itertools;
|
||||
use tempfile::tempdir;
|
||||
use tokio::fs;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::{
|
||||
remote_storage::{
|
||||
storage_sync::test_utils::{create_local_timeline, dummy_metadata},
|
||||
LocalFs,
|
||||
},
|
||||
repository::repo_harness::{RepoHarness, TIMELINE_ID},
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn delete_timeline_negative() -> anyhow::Result<()> {
|
||||
let harness = RepoHarness::create("delete_timeline_negative")?;
|
||||
let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
let storage = LocalFs::new(tempdir()?.path().to_path_buf(), &harness.conf.workdir)?;
|
||||
|
||||
let deleted = delete_timeline_layers(
|
||||
&storage,
|
||||
&sync_queue,
|
||||
sync_id,
|
||||
SyncData {
|
||||
retries: 1,
|
||||
data: LayersDeletion {
|
||||
deleted_layers: HashSet::new(),
|
||||
layers_to_delete: HashSet::new(),
|
||||
deletion_registered: false,
|
||||
},
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(
|
||||
!deleted,
|
||||
"Should not start the deletion for task with delete metadata unregistered"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn delete_timeline() -> anyhow::Result<()> {
|
||||
let harness = RepoHarness::create("delete_timeline")?;
|
||||
let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
let layer_files = ["a", "b", "c", "d"];
|
||||
let storage = LocalFs::new(tempdir()?.path().to_path_buf(), &harness.conf.workdir)?;
|
||||
let current_retries = 3;
|
||||
let metadata = dummy_metadata(Lsn(0x30));
|
||||
let local_timeline_path = harness.timeline_path(&TIMELINE_ID);
|
||||
let timeline_upload =
|
||||
create_local_timeline(&harness, TIMELINE_ID, &layer_files, metadata.clone()).await?;
|
||||
for local_path in timeline_upload.layers_to_upload {
|
||||
let remote_path = storage.storage_path(&local_path)?;
|
||||
let remote_parent_dir = remote_path.parent().unwrap();
|
||||
if !remote_parent_dir.exists() {
|
||||
fs::create_dir_all(&remote_parent_dir).await?;
|
||||
}
|
||||
fs::copy(&local_path, &remote_path).await?;
|
||||
}
|
||||
assert_eq!(
|
||||
storage
|
||||
.list()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|remote_path| storage.local_path(&remote_path).unwrap())
|
||||
.filter_map(|local_path| { Some(local_path.file_name()?.to_str()?.to_owned()) })
|
||||
.sorted()
|
||||
.collect::<Vec<_>>(),
|
||||
layer_files
|
||||
.iter()
|
||||
.map(|layer_str| layer_str.to_string())
|
||||
.sorted()
|
||||
.collect::<Vec<_>>(),
|
||||
"Expect to have all layer files remotely before deletion"
|
||||
);
|
||||
|
||||
let deleted = delete_timeline_layers(
|
||||
&storage,
|
||||
&sync_queue,
|
||||
sync_id,
|
||||
SyncData {
|
||||
retries: current_retries,
|
||||
data: LayersDeletion {
|
||||
deleted_layers: HashSet::new(),
|
||||
layers_to_delete: HashSet::from([
|
||||
local_timeline_path.join("a"),
|
||||
local_timeline_path.join("c"),
|
||||
local_timeline_path.join("something_different"),
|
||||
]),
|
||||
deletion_registered: true,
|
||||
},
|
||||
},
|
||||
)
|
||||
.await;
|
||||
assert!(deleted, "Should be able to delete timeline files");
|
||||
|
||||
assert_eq!(
|
||||
storage
|
||||
.list()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|remote_path| storage.local_path(&remote_path).unwrap())
|
||||
.filter_map(|local_path| { Some(local_path.file_name()?.to_str()?.to_owned()) })
|
||||
.sorted()
|
||||
.collect::<Vec<_>>(),
|
||||
vec!["b".to_string(), "d".to_string()],
|
||||
"Expect to have only non-deleted files remotely"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -195,7 +195,6 @@ impl Display for TimelineSyncStatusUpdate {
|
||||
f.write_str(s)
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// A repository corresponds to one .zenith directory. One repository holds multiple
|
||||
/// timelines, forked off from the same initial call to 'initdb'.
|
||||
@@ -243,7 +242,7 @@ pub trait Repository: Send + Sync {
|
||||
///
|
||||
/// 'timelineid' specifies the timeline to GC, or None for all.
|
||||
/// `horizon` specifies delta from last lsn to preserve all object versions (pitr interval).
|
||||
/// `checkpoint_before_gc` parameter is used to force compaction of storage before GC
|
||||
/// `checkpoint_before_gc` parameter is used to force compaction of storage before CG
|
||||
/// to make tests more deterministic.
|
||||
/// TODO Do we still need it or we can call checkpoint explicitly in tests where needed?
|
||||
fn gc_iteration(
|
||||
|
||||
@@ -659,6 +659,7 @@ impl WalConnectionManager {
|
||||
match wal_stream_connection_string(
|
||||
self.id,
|
||||
info.safekeeper_connstr.as_deref()?,
|
||||
info.pageserver_connstr.as_deref()?,
|
||||
) {
|
||||
Ok(connstr) => Some((sk_id, info, connstr)),
|
||||
Err(e) => {
|
||||
@@ -748,6 +749,7 @@ fn wal_stream_connection_string(
|
||||
timeline_id,
|
||||
}: ZTenantTimelineId,
|
||||
listen_pg_addr_str: &str,
|
||||
pageserver_connstr: &str,
|
||||
) -> anyhow::Result<String> {
|
||||
let sk_connstr = format!("postgresql://no_user@{listen_pg_addr_str}/no_db");
|
||||
let me_conf = sk_connstr
|
||||
@@ -757,7 +759,7 @@ fn wal_stream_connection_string(
|
||||
})?;
|
||||
let (host, port) = utils::connstring::connection_host_port(&me_conf);
|
||||
Ok(format!(
|
||||
"host={host} port={port} options='-c ztimelineid={timeline_id} ztenantid={tenant_id}'"
|
||||
"host={host} port={port} options='-c ztimelineid={timeline_id} ztenantid={tenant_id} pageserver_connstr={pageserver_connstr}'",
|
||||
))
|
||||
}
|
||||
|
||||
@@ -790,6 +792,20 @@ mod tests {
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: None,
|
||||
pageserver_connstr: Some("no safekeeper_connstr".to_string()),
|
||||
},
|
||||
),
|
||||
(
|
||||
NodeId(1),
|
||||
SkTimelineInfo {
|
||||
last_log_term: None,
|
||||
flush_lsn: None,
|
||||
commit_lsn: Some(Lsn(1)),
|
||||
backup_lsn: None,
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some("no pageserver_connstr".to_string()),
|
||||
pageserver_connstr: None,
|
||||
},
|
||||
),
|
||||
(
|
||||
@@ -802,6 +818,7 @@ mod tests {
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some("no commit_lsn".to_string()),
|
||||
pageserver_connstr: Some("no commit_lsn (p)".to_string()),
|
||||
},
|
||||
),
|
||||
(
|
||||
@@ -814,6 +831,7 @@ mod tests {
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some("no commit_lsn".to_string()),
|
||||
pageserver_connstr: Some("no commit_lsn (p)".to_string()),
|
||||
},
|
||||
),
|
||||
]));
|
||||
@@ -869,6 +887,7 @@ mod tests {
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
|
||||
pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()),
|
||||
},
|
||||
),
|
||||
(
|
||||
@@ -881,6 +900,7 @@ mod tests {
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some("not advanced Lsn".to_string()),
|
||||
pageserver_connstr: Some("not advanced Lsn (p)".to_string()),
|
||||
},
|
||||
),
|
||||
(
|
||||
@@ -895,6 +915,7 @@ mod tests {
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some("not enough advanced Lsn".to_string()),
|
||||
pageserver_connstr: Some("not enough advanced Lsn (p)".to_string()),
|
||||
},
|
||||
),
|
||||
]));
|
||||
@@ -926,6 +947,7 @@ mod tests {
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
|
||||
pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()),
|
||||
},
|
||||
)]))
|
||||
.expect("Expected one candidate selected out of the only data option, but got none");
|
||||
@@ -938,6 +960,9 @@ mod tests {
|
||||
assert!(only_candidate
|
||||
.wal_producer_connstr
|
||||
.contains(DUMMY_SAFEKEEPER_CONNSTR));
|
||||
assert!(only_candidate
|
||||
.wal_producer_connstr
|
||||
.contains(DUMMY_PAGESERVER_CONNSTR));
|
||||
|
||||
let selected_lsn = 100_000;
|
||||
let biggest_wal_candidate = data_manager_with_no_connection
|
||||
@@ -952,6 +977,7 @@ mod tests {
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some("smaller commit_lsn".to_string()),
|
||||
pageserver_connstr: Some("smaller commit_lsn (p)".to_string()),
|
||||
},
|
||||
),
|
||||
(
|
||||
@@ -964,6 +990,7 @@ mod tests {
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
|
||||
pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()),
|
||||
},
|
||||
),
|
||||
(
|
||||
@@ -976,6 +1003,9 @@ mod tests {
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: None,
|
||||
pageserver_connstr: Some(
|
||||
"no safekeeper_connstr despite bigger commit_lsn".to_string(),
|
||||
),
|
||||
},
|
||||
),
|
||||
]))
|
||||
@@ -992,6 +1022,9 @@ mod tests {
|
||||
assert!(biggest_wal_candidate
|
||||
.wal_producer_connstr
|
||||
.contains(DUMMY_SAFEKEEPER_CONNSTR));
|
||||
assert!(biggest_wal_candidate
|
||||
.wal_producer_connstr
|
||||
.contains(DUMMY_PAGESERVER_CONNSTR));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1038,6 +1071,7 @@ mod tests {
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
|
||||
pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()),
|
||||
},
|
||||
),
|
||||
(
|
||||
@@ -1050,6 +1084,7 @@ mod tests {
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some("advanced by Lsn safekeeper".to_string()),
|
||||
pageserver_connstr: Some("advanced by Lsn safekeeper (p)".to_string()),
|
||||
},
|
||||
),
|
||||
]);
|
||||
@@ -1073,6 +1108,9 @@ mod tests {
|
||||
assert!(over_threshcurrent_candidate
|
||||
.wal_producer_connstr
|
||||
.contains("advanced by Lsn safekeeper"));
|
||||
assert!(over_threshcurrent_candidate
|
||||
.wal_producer_connstr
|
||||
.contains("advanced by Lsn safekeeper (p)"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1108,6 +1146,7 @@ mod tests {
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
|
||||
pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()),
|
||||
},
|
||||
)]))
|
||||
.expect(
|
||||
@@ -1129,6 +1168,9 @@ mod tests {
|
||||
assert!(over_threshcurrent_candidate
|
||||
.wal_producer_connstr
|
||||
.contains(DUMMY_SAFEKEEPER_CONNSTR));
|
||||
assert!(over_threshcurrent_candidate
|
||||
.wal_producer_connstr
|
||||
.contains(DUMMY_PAGESERVER_CONNSTR));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1155,6 +1197,7 @@ mod tests {
|
||||
}
|
||||
|
||||
const DUMMY_SAFEKEEPER_CONNSTR: &str = "safekeeper_connstr";
|
||||
const DUMMY_PAGESERVER_CONNSTR: &str = "pageserver_connstr";
|
||||
|
||||
// the function itself does not need async, but it spawns a tokio::task underneath hence neeed
|
||||
// a runtime to not to panic
|
||||
@@ -1162,8 +1205,9 @@ mod tests {
|
||||
id: ZTenantTimelineId,
|
||||
safekeeper_id: NodeId,
|
||||
) -> WalConnectionData {
|
||||
let dummy_connstr = wal_stream_connection_string(id, DUMMY_SAFEKEEPER_CONNSTR)
|
||||
.expect("Failed to construct dummy wal producer connstr");
|
||||
let dummy_connstr =
|
||||
wal_stream_connection_string(id, DUMMY_SAFEKEEPER_CONNSTR, DUMMY_PAGESERVER_CONNSTR)
|
||||
.expect("Failed to construct dummy wal producer connstr");
|
||||
WalConnectionData {
|
||||
safekeeper_id,
|
||||
connection: WalReceiverConnection::open(
|
||||
|
||||
@@ -26,11 +26,6 @@ pub struct ClientCredentials {
|
||||
// New console API requires SNI info to determine the cluster name.
|
||||
// Other Auth backends don't need it.
|
||||
pub sni_data: Option<String>,
|
||||
|
||||
// project_name is passed as argument from options from url.
|
||||
// In case sni_data is missing: project_name is used to determine cluster name.
|
||||
// In case sni_data is available: project_name and sni_data should match (otherwise throws an error).
|
||||
pub project_name: Option<String>,
|
||||
}
|
||||
|
||||
impl ClientCredentials {
|
||||
@@ -42,47 +37,22 @@ impl ClientCredentials {
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ProjectNameError {
|
||||
#[error("SNI is missing. EITHER please upgrade the postgres client library OR pass the project name as a parameter: '...&options=project%3D<project-name>...'.")]
|
||||
#[error("SNI is missing, please upgrade the postgres client library")]
|
||||
Missing,
|
||||
|
||||
#[error("SNI is malformed.")]
|
||||
#[error("SNI is malformed")]
|
||||
Bad,
|
||||
|
||||
#[error("Inconsistent project name inferred from SNI and project option. String from SNI: '{0}', String from project option: '{1}'")]
|
||||
Inconsistent(String, String),
|
||||
}
|
||||
|
||||
impl UserFacingError for ProjectNameError {}
|
||||
|
||||
impl ClientCredentials {
|
||||
/// Determine project name from SNI or from project_name parameter from options argument.
|
||||
/// Determine project name from SNI.
|
||||
pub fn project_name(&self) -> Result<&str, ProjectNameError> {
|
||||
// Checking that if both sni_data and project_name are set, then they should match
|
||||
// otherwise, throws a ProjectNameError::Inconsistent error.
|
||||
if let Some(sni_data) = &self.sni_data {
|
||||
let project_name_from_sni_data =
|
||||
sni_data.split_once('.').ok_or(ProjectNameError::Bad)?.0;
|
||||
if let Some(project_name_from_options) = &self.project_name {
|
||||
if !project_name_from_options.eq(project_name_from_sni_data) {
|
||||
return Err(ProjectNameError::Inconsistent(
|
||||
project_name_from_sni_data.to_string(),
|
||||
project_name_from_options.to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
// determine the project name from self.sni_data if it exists, otherwise from self.project_name.
|
||||
let ret = match &self.sni_data {
|
||||
// if sni_data exists, use it to determine project name
|
||||
Some(sni_data) => sni_data.split_once('.').ok_or(ProjectNameError::Bad)?.0,
|
||||
// otherwise use project_option if it was manually set thought options parameter.
|
||||
None => self
|
||||
.project_name
|
||||
.as_ref()
|
||||
.ok_or(ProjectNameError::Missing)?
|
||||
.as_str(),
|
||||
};
|
||||
Ok(ret)
|
||||
// Currently project name is passed as a top level domain
|
||||
let sni = self.sni_data.as_ref().ok_or(ProjectNameError::Missing)?;
|
||||
let (first, _) = sni.split_once('.').ok_or(ProjectNameError::Bad)?;
|
||||
Ok(first)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -98,13 +68,11 @@ impl TryFrom<HashMap<String, String>> for ClientCredentials {
|
||||
|
||||
let user = get_param("user")?;
|
||||
let dbname = get_param("database")?;
|
||||
let project_name = get_param("project").ok();
|
||||
|
||||
Ok(Self {
|
||||
user,
|
||||
dbname,
|
||||
sni_data: None,
|
||||
project_name,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,11 +29,12 @@ pub struct SafekeeperPostgresHandler {
|
||||
pub ztenantid: Option<ZTenantId>,
|
||||
pub ztimelineid: Option<ZTimelineId>,
|
||||
pub timeline: Option<Arc<Timeline>>,
|
||||
pageserver_connstr: Option<String>,
|
||||
}
|
||||
|
||||
/// Parsed Postgres command.
|
||||
enum SafekeeperPostgresCommand {
|
||||
StartWalPush,
|
||||
StartWalPush { pageserver_connstr: Option<String> },
|
||||
StartReplication { start_lsn: Lsn },
|
||||
IdentifySystem,
|
||||
JSONCtrl { cmd: AppendLogicalMessage },
|
||||
@@ -41,7 +42,11 @@ enum SafekeeperPostgresCommand {
|
||||
|
||||
fn parse_cmd(cmd: &str) -> Result<SafekeeperPostgresCommand> {
|
||||
if cmd.starts_with("START_WAL_PUSH") {
|
||||
Ok(SafekeeperPostgresCommand::StartWalPush)
|
||||
let re = Regex::new(r"START_WAL_PUSH(?: (.+))?").unwrap();
|
||||
|
||||
let caps = re.captures(cmd).unwrap();
|
||||
let pageserver_connstr = caps.get(1).map(|m| m.as_str().to_owned());
|
||||
Ok(SafekeeperPostgresCommand::StartWalPush { pageserver_connstr })
|
||||
} else if cmd.starts_with("START_REPLICATION") {
|
||||
let re =
|
||||
Regex::new(r"START_REPLICATION(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)").unwrap();
|
||||
@@ -81,6 +86,8 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler {
|
||||
self.appname = Some(app_name.clone());
|
||||
}
|
||||
|
||||
self.pageserver_connstr = params.get("pageserver_connstr").cloned();
|
||||
|
||||
Ok(())
|
||||
} else {
|
||||
bail!("Safekeeper received unexpected initial message: {:?}", sm);
|
||||
@@ -106,14 +113,14 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler {
|
||||
}
|
||||
|
||||
match cmd {
|
||||
SafekeeperPostgresCommand::StartWalPush => {
|
||||
ReceiveWalConn::new(pgb)
|
||||
SafekeeperPostgresCommand::StartWalPush { pageserver_connstr } => {
|
||||
ReceiveWalConn::new(pgb, pageserver_connstr)
|
||||
.run(self)
|
||||
.context("failed to run ReceiveWalConn")?;
|
||||
}
|
||||
SafekeeperPostgresCommand::StartReplication { start_lsn } => {
|
||||
ReplicationConn::new(pgb)
|
||||
.run(self, pgb, start_lsn)
|
||||
.run(self, pgb, start_lsn, self.pageserver_connstr.clone())
|
||||
.context("failed to run ReplicationConn")?;
|
||||
}
|
||||
SafekeeperPostgresCommand::IdentifySystem => {
|
||||
@@ -135,6 +142,7 @@ impl SafekeeperPostgresHandler {
|
||||
ztenantid: None,
|
||||
ztimelineid: None,
|
||||
timeline: None,
|
||||
pageserver_connstr: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -32,14 +32,22 @@ pub struct ReceiveWalConn<'pg> {
|
||||
pg_backend: &'pg mut PostgresBackend,
|
||||
/// The cached result of `pg_backend.socket().peer_addr()` (roughly)
|
||||
peer_addr: SocketAddr,
|
||||
/// Pageserver connection string forwarded from compute
|
||||
/// NOTE that it is allowed to operate without a pageserver.
|
||||
/// So if compute has no pageserver configured do not use it.
|
||||
pageserver_connstr: Option<String>,
|
||||
}
|
||||
|
||||
impl<'pg> ReceiveWalConn<'pg> {
|
||||
pub fn new(pg: &'pg mut PostgresBackend) -> ReceiveWalConn<'pg> {
|
||||
pub fn new(
|
||||
pg: &'pg mut PostgresBackend,
|
||||
pageserver_connstr: Option<String>,
|
||||
) -> ReceiveWalConn<'pg> {
|
||||
let peer_addr = *pg.get_peer_addr();
|
||||
ReceiveWalConn {
|
||||
pg_backend: pg,
|
||||
peer_addr,
|
||||
pageserver_connstr,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,7 +120,9 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
// Register the connection and defer unregister. Do that only
|
||||
// after processing first message, as it sets wal_seg_size,
|
||||
// wanted by many.
|
||||
spg.timeline.get().on_compute_connect()?;
|
||||
spg.timeline
|
||||
.get()
|
||||
.on_compute_connect(self.pageserver_connstr.as_ref())?;
|
||||
_guard = Some(ComputeConnectionGuard {
|
||||
timeline: Arc::clone(spg.timeline.get()),
|
||||
});
|
||||
|
||||
@@ -162,8 +162,9 @@ impl ReplicationConn {
|
||||
spg: &mut SafekeeperPostgresHandler,
|
||||
pgb: &mut PostgresBackend,
|
||||
mut start_pos: Lsn,
|
||||
pageserver_connstr: Option<String>,
|
||||
) -> Result<()> {
|
||||
let _enter = info_span!("WAL sender", timeline = %spg.ztimelineid.unwrap()).entered();
|
||||
let _enter = info_span!("WAL sender", timeline = %spg.ztimelineid.unwrap(), pageserver_connstr = %pageserver_connstr.as_deref().unwrap_or_default()).entered();
|
||||
|
||||
// spawn the background thread which receives HotStandbyFeedback messages.
|
||||
let bg_timeline = Arc::clone(spg.timeline.get());
|
||||
|
||||
@@ -95,6 +95,7 @@ struct SharedState {
|
||||
/// when tli is inactive instead of having this flag.
|
||||
active: bool,
|
||||
num_computes: u32,
|
||||
pageserver_connstr: Option<String>,
|
||||
last_removed_segno: XLogSegNo,
|
||||
}
|
||||
|
||||
@@ -118,6 +119,7 @@ impl SharedState {
|
||||
wal_backup_active: false,
|
||||
active: false,
|
||||
num_computes: 0,
|
||||
pageserver_connstr: None,
|
||||
last_removed_segno: 0,
|
||||
})
|
||||
}
|
||||
@@ -137,6 +139,7 @@ impl SharedState {
|
||||
wal_backup_active: false,
|
||||
active: false,
|
||||
num_computes: 0,
|
||||
pageserver_connstr: None,
|
||||
last_removed_segno: 0,
|
||||
})
|
||||
}
|
||||
@@ -187,6 +190,35 @@ impl SharedState {
|
||||
self.wal_backup_active
|
||||
}
|
||||
|
||||
/// Activate timeline's walsender: start/change timeline information propagated into etcd for further pageserver connections.
|
||||
fn activate_walsender(
|
||||
&mut self,
|
||||
zttid: &ZTenantTimelineId,
|
||||
new_pageserver_connstr: Option<String>,
|
||||
) {
|
||||
if self.pageserver_connstr != new_pageserver_connstr {
|
||||
self.deactivate_walsender(zttid);
|
||||
|
||||
if new_pageserver_connstr.is_some() {
|
||||
info!(
|
||||
"timeline {} has activated its walsender with connstr {new_pageserver_connstr:?}",
|
||||
zttid.timeline_id,
|
||||
);
|
||||
}
|
||||
self.pageserver_connstr = new_pageserver_connstr;
|
||||
}
|
||||
}
|
||||
|
||||
/// Deactivate the timeline: stop sending the timeline data into etcd, so no pageserver can connect for WAL streaming.
|
||||
fn deactivate_walsender(&mut self, zttid: &ZTenantTimelineId) {
|
||||
if let Some(pageserver_connstr) = self.pageserver_connstr.take() {
|
||||
info!(
|
||||
"timeline {} had deactivated its wallsender with connstr {pageserver_connstr:?}",
|
||||
zttid.timeline_id,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fn get_wal_seg_size(&self) -> usize {
|
||||
self.sk.state.server.wal_seg_size as usize
|
||||
}
|
||||
@@ -286,12 +318,17 @@ impl Timeline {
|
||||
/// Register compute connection, starting timeline-related activity if it is
|
||||
/// not running yet.
|
||||
/// Can fail only if channel to a static thread got closed, which is not normal at all.
|
||||
pub fn on_compute_connect(&self) -> Result<()> {
|
||||
pub fn on_compute_connect(&self, pageserver_connstr: Option<&String>) -> Result<()> {
|
||||
let is_wal_backup_action_pending: bool;
|
||||
{
|
||||
let mut shared_state = self.mutex.lock().unwrap();
|
||||
shared_state.num_computes += 1;
|
||||
is_wal_backup_action_pending = shared_state.update_status();
|
||||
// FIXME: currently we always adopt latest pageserver connstr, but we
|
||||
// should have kind of generations assigned by compute to distinguish
|
||||
// the latest one or even pass it through consensus to reliably deliver
|
||||
// to all safekeepers.
|
||||
shared_state.activate_walsender(&self.zttid, pageserver_connstr.cloned());
|
||||
}
|
||||
// Wake up wal backup launcher, if offloading not started yet.
|
||||
if is_wal_backup_action_pending {
|
||||
@@ -327,7 +364,7 @@ impl Timeline {
|
||||
(replica_state.remote_consistent_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet.
|
||||
replica_state.remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn);
|
||||
if stop {
|
||||
shared_state.update_status();
|
||||
shared_state.deactivate_walsender(&self.zttid);
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
@@ -488,6 +525,7 @@ impl Timeline {
|
||||
)),
|
||||
peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn),
|
||||
safekeeper_connstr: Some(conf.listen_pg_addr.clone()),
|
||||
pageserver_connstr: shared_state.pageserver_connstr.clone(),
|
||||
backup_lsn: Some(shared_state.sk.inmem.backup_lsn),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
|
||||
'compaction_target_size': '4194304',
|
||||
})
|
||||
|
||||
env.pageserver.safe_psql("failpoints flush-frozen-before-sync=sleep(10000)")
|
||||
env.pageserver.safe_psql("failpoints flush-frozen=sleep(10000)")
|
||||
|
||||
pg_branch0 = env.postgres.create_start('main', tenant_id=tenant)
|
||||
branch0_cur = pg_branch0.connect().cursor()
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, NeonPageserverHttpClient
|
||||
import pytest
|
||||
|
||||
|
||||
def check_tenant(env: NeonEnv, pageserver_http: NeonPageserverHttpClient):
|
||||
@@ -27,8 +26,7 @@ def check_tenant(env: NeonEnv, pageserver_http: NeonPageserverHttpClient):
|
||||
pageserver_http.timeline_detach(tenant_id, timeline_id)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('num_timelines,num_safekeepers', [(3, 1)])
|
||||
def test_normal_work(neon_env_builder: NeonEnvBuilder, num_timelines: int, num_safekeepers: int):
|
||||
def test_normal_work(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Basic test:
|
||||
* create new tenant with a timeline
|
||||
@@ -43,8 +41,7 @@ def test_normal_work(neon_env_builder: NeonEnvBuilder, num_timelines: int, num_s
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
neon_env_builder.num_safekeepers = num_safekeepers
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
for _ in range(num_timelines):
|
||||
for _ in range(3):
|
||||
check_tenant(env, pageserver_http)
|
||||
|
||||
@@ -45,8 +45,7 @@ def test_pageserver_recovery(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# Configure failpoints
|
||||
pscur.execute(
|
||||
"failpoints flush-frozen-before-sync=sleep(2000);checkpoint-after-sync=exit"
|
||||
)
|
||||
"failpoints checkpoint-before-sync=sleep(2000);checkpoint-after-sync=exit")
|
||||
|
||||
# Do some updates until pageserver is crashed
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user