Compare commits

...

10 Commits

Author SHA1 Message Date
Anastasia Lubennikova
21ad98ae4e WIP import_timeline_from_tar 2022-06-08 22:23:33 +03:00
Thang Pham
6cfebc096f Add read/write throughput performance tests (#1883)
Part of #1467 

This PR adds several performance tests that compare the [PG statistics](https://www.postgresql.org/docs/current/monitoring-stats.html) obtained when running PG benchmarks against Neon and vanilla PG to measure the read/write throughput of the DB.
2022-06-06 12:32:10 -04:00
KlimentSerafimov
fecad1ca34 Resolving issue #1745. Added cluster option for SNI data (#1813)
* Added project option in case SNI data is missing. Resolving issue #1745.

* Added invariant checking for project name: if both sni_data and project_name are available then they should match.
2022-06-06 08:14:41 -04:00
bojanserafimov
92de8423af Remove dead code (#1886) 2022-06-05 09:18:11 -04:00
Dmitry Rodionov
e442f5357b unify two identical failpoints in flush_frozen_layer
probably is a merge artfact
2022-06-03 19:36:09 +03:00
Arseny Sher
5a723d44cd Parametrize test_normal_work.
I like to run small test locally, but let's avoid duplication.
2022-06-03 20:32:53 +04:00
Kirill Bulatov
2623193876 Remove pageserver_connstr from WAL stream logic 2022-06-03 17:30:36 +03:00
Arseny Sher
70a53c4b03 Get backup test_safekeeper_normal_work, but skip by default.
It is handy for development.
2022-06-03 16:12:14 +04:00
Arseny Sher
9e108102b3 Silence etcd safekeeper info key parse errors.
When we subscribe to everything, it is ok to receive not only safekeeper
timeline updates.
2022-06-03 16:12:14 +04:00
huming
9c846a93e8 chore(doc) 2022-06-03 14:24:27 +03:00
22 changed files with 398 additions and 374 deletions

1
.gitignore vendored
View File

@@ -5,6 +5,7 @@
__pycache__/
test_output/
.vscode
.idea
/.zenith
/integration_tests/.zenith

View File

@@ -57,15 +57,15 @@ pub struct SkTimelineInfo {
pub peer_horizon_lsn: Option<Lsn>,
#[serde(default)]
pub safekeeper_connstr: Option<String>,
#[serde(default)]
pub pageserver_connstr: Option<String>,
}
#[derive(Debug, thiserror::Error)]
pub enum BrokerError {
#[error("Etcd client error: {0}. Context: {1}")]
EtcdClient(etcd_client::Error, String),
#[error("Error during parsing etcd data: {0}")]
#[error("Error during parsing etcd key: {0}")]
InvalidKey(String),
#[error("Error during parsing etcd value: {0}")]
ParsingError(String),
#[error("Internal error: {0}")]
InternalError(String),
@@ -221,7 +221,7 @@ pub async fn subscribe_to_safekeeper_timeline_updates(
},
};
match parse_etcd_key_value(&subscription, key_str, value_str) {
match parse_safekeeper_timeline(&subscription, key_str, value_str) {
Ok((zttid, timeline)) => {
match timeline_updates
.entry(zttid)
@@ -243,6 +243,8 @@ pub async fn subscribe_to_safekeeper_timeline_updates(
}
}
}
// it is normal to get other keys when we subscribe to everything
Err(BrokerError::InvalidKey(e)) => debug!("Unexpected key for timeline update: {e}"),
Err(e) => error!("Failed to parse timeline update: {e}"),
};
}
@@ -281,14 +283,14 @@ static SK_TIMELINE_KEY_REGEX: Lazy<Regex> = Lazy::new(|| {
.expect("wrong regex for safekeeper timeline etcd key")
});
fn parse_etcd_key_value(
fn parse_safekeeper_timeline(
subscription: &SkTimelineSubscriptionKind,
key_str: &str,
value_str: &str,
) -> Result<(ZTenantTimelineId, SafekeeperTimeline), BrokerError> {
let broker_prefix = subscription.broker_etcd_prefix.as_str();
if !key_str.starts_with(broker_prefix) {
return Err(BrokerError::ParsingError(format!(
return Err(BrokerError::InvalidKey(format!(
"KV has unexpected key '{key_str}' that does not start with broker prefix {broker_prefix}"
)));
}
@@ -297,7 +299,7 @@ fn parse_etcd_key_value(
let key_captures = match SK_TIMELINE_KEY_REGEX.captures(key_part) {
Some(captures) => captures,
None => {
return Err(BrokerError::ParsingError(format!(
return Err(BrokerError::InvalidKey(format!(
"KV has unexpected key part '{key_part}' that does not match required regex {}",
SK_TIMELINE_KEY_REGEX.as_str()
)));
@@ -383,7 +385,7 @@ mod tests {
&timeline_subscription,
] {
let (id, _timeline) =
parse_etcd_key_value(subscription, &key_string, value_str)
parse_safekeeper_timeline(subscription, &key_string, value_str)
.unwrap_or_else(|e| panic!("Should be able to parse etcd key string '{key_string}' and etcd value string '{value_str}' for subscription {subscription:?}, but got: {e}"));
assert_eq!(id, ZTenantTimelineId::new(tenant_id, timeline_id));
}

View File

@@ -269,7 +269,14 @@ impl FeStartupPacket {
.next()
.context("expected even number of params in StartupMessage")?;
if name == "options" {
// deprecated way of passing params as cmd line args
// parsing options arguments "...&options=<var0>%3D<val0>+<var1>=<var1>..."
// '%3D' is '=' and '+' is ' '
// Note: we allow users that don't have SNI capabilities,
// to pass a special keyword argument 'project'
// to be used to determine the cluster name by the proxy.
//TODO: write unit test for this and refactor in its own function.
for cmdopt in value.split(' ') {
let nameval: Vec<&str> = cmdopt.split('=').collect();
if nameval.len() == 2 {

View File

@@ -22,6 +22,8 @@ use postgres_ffi::{pg_constants, ControlFileData, DBState_DB_SHUTDOWNED};
use postgres_ffi::{Oid, TransactionId};
use utils::lsn::Lsn;
use postgres::CopyOutReader;
///
/// Import all relation data pages from local disk into the repository.
///
@@ -403,3 +405,126 @@ fn import_wal<R: Repository>(
Ok(())
}
pub fn import_timeline_from_tar<R: Repository>(
tline: &mut DatadirTimeline<R>,
copyreader: CopyOutReader,
lsn: Lsn,
) -> Result<()> {
let mut ar = tar::Archive::new(copyreader);
let mut modification = tline.begin_modification(lsn);
modification.init_empty()?;
for e in ar.entries().unwrap() {
let mut entry = e.unwrap();
let header = entry.header();
let file_path = header.path().unwrap().into_owned();
match header.entry_type() {
tar::EntryType::Regular => {
let mut buffer = Vec::new();
// read the whole entry
entry.read_to_end(&mut buffer).unwrap();
import_file(&mut modification, &file_path.as_ref(), &buffer)?;
}
tar::EntryType::Directory => {
println!("tar::EntryType::Directory {}", file_path.display());
}
_ => {
panic!("tar::EntryType::?? {}", file_path.display());
}
}
}
Ok(())
}
pub fn import_file<R: Repository>(
modification: &mut DatadirModification<R>,
file_path: &Path,
buffer: &[u8],
) -> Result<()> {
if file_path.starts_with("global") {
let spcnode = pg_constants::GLOBALTABLESPACE_OID;
let dbnode = 0;
match file_path.file_name().unwrap().to_string_lossy().as_ref() {
"pg_control" => {
println!("pg_control file {}", file_path.display());
// Import it as ControlFile
modification.put_control_file(Bytes::copy_from_slice(&buffer[..]))?;
// Extract the checkpoint record and import it separately.
let pg_control = ControlFileData::decode(&buffer)?;
let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
modification.put_checkpoint(checkpoint_bytes)?;
}
"pg_filenode.map" => {
("pg_filenode.map file {}", file_path.display());
}
_ => {
println!("global relfile {}", file_path.display());
//TODO
}
}
} else if file_path.starts_with("base") {
let spcnode = pg_constants::DEFAULTTABLESPACE_OID;
let dbnode: u32 = file_path
.iter()
.skip(1)
.next()
.unwrap()
.to_string_lossy()
.parse()
.unwrap();
match file_path.file_name().unwrap().to_string_lossy().as_ref() {
"pg_filenode.map" => {
println!(
"dbnode {} pg_filenode.map file {}",
dbnode,
file_path.display()
);
modification.put_relmap_file(
spcnode,
dbnode,
Bytes::copy_from_slice(&buffer[..]),
)?;
}
_ => {
println!("dbnode {} relfile {}", dbnode, file_path.display());
//TODO
}
}
} else if file_path.starts_with("pg_xact") {
println!(
"pg_xact {} ",
file_path.file_name().unwrap().to_string_lossy().as_ref()
);
// TODO
} else if file_path.starts_with("pg_multixact/offset") {
println!(
"pg_multixact/offset {}",
file_path.file_name().unwrap().to_string_lossy().as_ref()
);
// TODO
} else if file_path.starts_with("pg_multixact/members") {
println!(
"pg_multixact/members {}",
file_path.file_name().unwrap().to_string_lossy().as_ref()
);
// TODO
} else if file_path.starts_with("pg_twophase") {
let xid = u32::from_str_radix(&file_path.file_name().unwrap().to_string_lossy(), 16)?;
println!(
"xid {} pg_twophase {}",
xid,
file_path.file_name().unwrap().to_string_lossy().as_ref()
);
modification.put_twophase_file(xid, Bytes::copy_from_slice(&buffer[..]))?;
}
Ok(())
}

View File

@@ -1727,9 +1727,7 @@ impl LayeredTimeline {
new_delta_path.clone(),
self.conf.timeline_path(&self.timeline_id, &self.tenant_id),
])?;
fail_point!("checkpoint-before-sync");
fail_point!("flush-frozen");
fail_point!("flush-frozen-before-sync");
// Finally, replace the frozen in-memory layer with the new on-disk layer
{

View File

@@ -20,7 +20,7 @@
//! assign a buffer for a page, you must hold the mapping lock and the lock on
//! the slot at the same time.
//!
//! Whenever you need to hold both locks simultenously, the slot lock must be
//! Whenever you need to hold both locks simultaneously, the slot lock must be
//! acquired first. This consistent ordering avoids deadlocks. To look up a page
//! in the cache, you would first look up the mapping, while holding the mapping
//! lock, and then lock the slot. You must release the mapping lock in between,

View File

@@ -1,223 +0,0 @@
//! Timeline synchronization logic to delete a bulk of timeline's remote files from the remote storage.
use anyhow::Context;
use futures::stream::{FuturesUnordered, StreamExt};
use tracing::{debug, error, info};
use utils::zid::ZTenantTimelineId;
use crate::remote_storage::{
storage_sync::{SyncQueue, SyncTask},
RemoteStorage,
};
use super::{LayersDeletion, SyncData};
/// Attempts to remove the timleline layers from the remote storage.
/// If the task had not adjusted the metadata before, the deletion will fail.
pub(super) async fn delete_timeline_layers<'a, P, S>(
storage: &'a S,
sync_queue: &SyncQueue,
sync_id: ZTenantTimelineId,
mut delete_data: SyncData<LayersDeletion>,
) -> bool
where
P: std::fmt::Debug + Send + Sync + 'static,
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
{
if !delete_data.data.deletion_registered {
error!("Cannot delete timeline layers before the deletion metadata is not registered, reenqueueing");
delete_data.retries += 1;
sync_queue.push(sync_id, SyncTask::Delete(delete_data));
return false;
}
if delete_data.data.layers_to_delete.is_empty() {
info!("No layers to delete, skipping");
return true;
}
let layers_to_delete = delete_data
.data
.layers_to_delete
.drain()
.collect::<Vec<_>>();
debug!("Layers to delete: {layers_to_delete:?}");
info!("Deleting {} timeline layers", layers_to_delete.len());
let mut delete_tasks = layers_to_delete
.into_iter()
.map(|local_layer_path| async {
let storage_path = match storage.storage_path(&local_layer_path).with_context(|| {
format!(
"Failed to get the layer storage path for local path '{}'",
local_layer_path.display()
)
}) {
Ok(path) => path,
Err(e) => return Err((e, local_layer_path)),
};
match storage.delete(&storage_path).await.with_context(|| {
format!(
"Failed to delete remote layer from storage at '{:?}'",
storage_path
)
}) {
Ok(()) => Ok(local_layer_path),
Err(e) => Err((e, local_layer_path)),
}
})
.collect::<FuturesUnordered<_>>();
let mut errored = false;
while let Some(deletion_result) = delete_tasks.next().await {
match deletion_result {
Ok(local_layer_path) => {
debug!(
"Successfully deleted layer {} for timeline {sync_id}",
local_layer_path.display()
);
delete_data.data.deleted_layers.insert(local_layer_path);
}
Err((e, local_layer_path)) => {
errored = true;
error!(
"Failed to delete layer {} for timeline {sync_id}: {e:?}",
local_layer_path.display()
);
delete_data.data.layers_to_delete.insert(local_layer_path);
}
}
}
if errored {
debug!("Reenqueuing failed delete task for timeline {sync_id}");
delete_data.retries += 1;
sync_queue.push(sync_id, SyncTask::Delete(delete_data));
}
errored
}
#[cfg(test)]
mod tests {
use std::{collections::HashSet, num::NonZeroUsize};
use itertools::Itertools;
use tempfile::tempdir;
use tokio::fs;
use utils::lsn::Lsn;
use crate::{
remote_storage::{
storage_sync::test_utils::{create_local_timeline, dummy_metadata},
LocalFs,
},
repository::repo_harness::{RepoHarness, TIMELINE_ID},
};
use super::*;
#[tokio::test]
async fn delete_timeline_negative() -> anyhow::Result<()> {
let harness = RepoHarness::create("delete_timeline_negative")?;
let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
let storage = LocalFs::new(tempdir()?.path().to_path_buf(), &harness.conf.workdir)?;
let deleted = delete_timeline_layers(
&storage,
&sync_queue,
sync_id,
SyncData {
retries: 1,
data: LayersDeletion {
deleted_layers: HashSet::new(),
layers_to_delete: HashSet::new(),
deletion_registered: false,
},
},
)
.await;
assert!(
!deleted,
"Should not start the deletion for task with delete metadata unregistered"
);
Ok(())
}
#[tokio::test]
async fn delete_timeline() -> anyhow::Result<()> {
let harness = RepoHarness::create("delete_timeline")?;
let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
let layer_files = ["a", "b", "c", "d"];
let storage = LocalFs::new(tempdir()?.path().to_path_buf(), &harness.conf.workdir)?;
let current_retries = 3;
let metadata = dummy_metadata(Lsn(0x30));
let local_timeline_path = harness.timeline_path(&TIMELINE_ID);
let timeline_upload =
create_local_timeline(&harness, TIMELINE_ID, &layer_files, metadata.clone()).await?;
for local_path in timeline_upload.layers_to_upload {
let remote_path = storage.storage_path(&local_path)?;
let remote_parent_dir = remote_path.parent().unwrap();
if !remote_parent_dir.exists() {
fs::create_dir_all(&remote_parent_dir).await?;
}
fs::copy(&local_path, &remote_path).await?;
}
assert_eq!(
storage
.list()
.await?
.into_iter()
.map(|remote_path| storage.local_path(&remote_path).unwrap())
.filter_map(|local_path| { Some(local_path.file_name()?.to_str()?.to_owned()) })
.sorted()
.collect::<Vec<_>>(),
layer_files
.iter()
.map(|layer_str| layer_str.to_string())
.sorted()
.collect::<Vec<_>>(),
"Expect to have all layer files remotely before deletion"
);
let deleted = delete_timeline_layers(
&storage,
&sync_queue,
sync_id,
SyncData {
retries: current_retries,
data: LayersDeletion {
deleted_layers: HashSet::new(),
layers_to_delete: HashSet::from([
local_timeline_path.join("a"),
local_timeline_path.join("c"),
local_timeline_path.join("something_different"),
]),
deletion_registered: true,
},
},
)
.await;
assert!(deleted, "Should be able to delete timeline files");
assert_eq!(
storage
.list()
.await?
.into_iter()
.map(|remote_path| storage.local_path(&remote_path).unwrap())
.filter_map(|local_path| { Some(local_path.file_name()?.to_str()?.to_owned()) })
.sorted()
.collect::<Vec<_>>(),
vec!["b".to_string(), "d".to_string()],
"Expect to have only non-deleted files remotely"
);
Ok(())
}
}

View File

@@ -195,6 +195,7 @@ impl Display for TimelineSyncStatusUpdate {
f.write_str(s)
}
}
///
/// A repository corresponds to one .zenith directory. One repository holds multiple
/// timelines, forked off from the same initial call to 'initdb'.
@@ -242,7 +243,7 @@ pub trait Repository: Send + Sync {
///
/// 'timelineid' specifies the timeline to GC, or None for all.
/// `horizon` specifies delta from last lsn to preserve all object versions (pitr interval).
/// `checkpoint_before_gc` parameter is used to force compaction of storage before CG
/// `checkpoint_before_gc` parameter is used to force compaction of storage before GC
/// to make tests more deterministic.
/// TODO Do we still need it or we can call checkpoint explicitly in tests where needed?
fn gc_iteration(

View File

@@ -659,7 +659,6 @@ impl WalConnectionManager {
match wal_stream_connection_string(
self.id,
info.safekeeper_connstr.as_deref()?,
info.pageserver_connstr.as_deref()?,
) {
Ok(connstr) => Some((sk_id, info, connstr)),
Err(e) => {
@@ -749,7 +748,6 @@ fn wal_stream_connection_string(
timeline_id,
}: ZTenantTimelineId,
listen_pg_addr_str: &str,
pageserver_connstr: &str,
) -> anyhow::Result<String> {
let sk_connstr = format!("postgresql://no_user@{listen_pg_addr_str}/no_db");
let me_conf = sk_connstr
@@ -759,7 +757,7 @@ fn wal_stream_connection_string(
})?;
let (host, port) = utils::connstring::connection_host_port(&me_conf);
Ok(format!(
"host={host} port={port} options='-c ztimelineid={timeline_id} ztenantid={tenant_id} pageserver_connstr={pageserver_connstr}'",
"host={host} port={port} options='-c ztimelineid={timeline_id} ztenantid={tenant_id}'"
))
}
@@ -792,20 +790,6 @@ mod tests {
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: None,
pageserver_connstr: Some("no safekeeper_connstr".to_string()),
},
),
(
NodeId(1),
SkTimelineInfo {
last_log_term: None,
flush_lsn: None,
commit_lsn: Some(Lsn(1)),
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: Some("no pageserver_connstr".to_string()),
pageserver_connstr: None,
},
),
(
@@ -818,7 +802,6 @@ mod tests {
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: Some("no commit_lsn".to_string()),
pageserver_connstr: Some("no commit_lsn (p)".to_string()),
},
),
(
@@ -831,7 +814,6 @@ mod tests {
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: Some("no commit_lsn".to_string()),
pageserver_connstr: Some("no commit_lsn (p)".to_string()),
},
),
]));
@@ -887,7 +869,6 @@ mod tests {
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()),
},
),
(
@@ -900,7 +881,6 @@ mod tests {
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: Some("not advanced Lsn".to_string()),
pageserver_connstr: Some("not advanced Lsn (p)".to_string()),
},
),
(
@@ -915,7 +895,6 @@ mod tests {
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: Some("not enough advanced Lsn".to_string()),
pageserver_connstr: Some("not enough advanced Lsn (p)".to_string()),
},
),
]));
@@ -947,7 +926,6 @@ mod tests {
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()),
},
)]))
.expect("Expected one candidate selected out of the only data option, but got none");
@@ -960,9 +938,6 @@ mod tests {
assert!(only_candidate
.wal_producer_connstr
.contains(DUMMY_SAFEKEEPER_CONNSTR));
assert!(only_candidate
.wal_producer_connstr
.contains(DUMMY_PAGESERVER_CONNSTR));
let selected_lsn = 100_000;
let biggest_wal_candidate = data_manager_with_no_connection
@@ -977,7 +952,6 @@ mod tests {
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: Some("smaller commit_lsn".to_string()),
pageserver_connstr: Some("smaller commit_lsn (p)".to_string()),
},
),
(
@@ -990,7 +964,6 @@ mod tests {
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()),
},
),
(
@@ -1003,9 +976,6 @@ mod tests {
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: None,
pageserver_connstr: Some(
"no safekeeper_connstr despite bigger commit_lsn".to_string(),
),
},
),
]))
@@ -1022,9 +992,6 @@ mod tests {
assert!(biggest_wal_candidate
.wal_producer_connstr
.contains(DUMMY_SAFEKEEPER_CONNSTR));
assert!(biggest_wal_candidate
.wal_producer_connstr
.contains(DUMMY_PAGESERVER_CONNSTR));
Ok(())
}
@@ -1071,7 +1038,6 @@ mod tests {
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()),
},
),
(
@@ -1084,7 +1050,6 @@ mod tests {
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: Some("advanced by Lsn safekeeper".to_string()),
pageserver_connstr: Some("advanced by Lsn safekeeper (p)".to_string()),
},
),
]);
@@ -1108,9 +1073,6 @@ mod tests {
assert!(over_threshcurrent_candidate
.wal_producer_connstr
.contains("advanced by Lsn safekeeper"));
assert!(over_threshcurrent_candidate
.wal_producer_connstr
.contains("advanced by Lsn safekeeper (p)"));
Ok(())
}
@@ -1146,7 +1108,6 @@ mod tests {
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()),
},
)]))
.expect(
@@ -1168,9 +1129,6 @@ mod tests {
assert!(over_threshcurrent_candidate
.wal_producer_connstr
.contains(DUMMY_SAFEKEEPER_CONNSTR));
assert!(over_threshcurrent_candidate
.wal_producer_connstr
.contains(DUMMY_PAGESERVER_CONNSTR));
Ok(())
}
@@ -1197,7 +1155,6 @@ mod tests {
}
const DUMMY_SAFEKEEPER_CONNSTR: &str = "safekeeper_connstr";
const DUMMY_PAGESERVER_CONNSTR: &str = "pageserver_connstr";
// the function itself does not need async, but it spawns a tokio::task underneath hence neeed
// a runtime to not to panic
@@ -1205,9 +1162,8 @@ mod tests {
id: ZTenantTimelineId,
safekeeper_id: NodeId,
) -> WalConnectionData {
let dummy_connstr =
wal_stream_connection_string(id, DUMMY_SAFEKEEPER_CONNSTR, DUMMY_PAGESERVER_CONNSTR)
.expect("Failed to construct dummy wal producer connstr");
let dummy_connstr = wal_stream_connection_string(id, DUMMY_SAFEKEEPER_CONNSTR)
.expect("Failed to construct dummy wal producer connstr");
WalConnectionData {
safekeeper_id,
connection: WalReceiverConnection::open(

View File

@@ -26,6 +26,11 @@ pub struct ClientCredentials {
// New console API requires SNI info to determine the cluster name.
// Other Auth backends don't need it.
pub sni_data: Option<String>,
// project_name is passed as argument from options from url.
// In case sni_data is missing: project_name is used to determine cluster name.
// In case sni_data is available: project_name and sni_data should match (otherwise throws an error).
pub project_name: Option<String>,
}
impl ClientCredentials {
@@ -37,22 +42,47 @@ impl ClientCredentials {
#[derive(Debug, Error)]
pub enum ProjectNameError {
#[error("SNI is missing, please upgrade the postgres client library")]
#[error("SNI is missing. EITHER please upgrade the postgres client library OR pass the project name as a parameter: '...&options=project%3D<project-name>...'.")]
Missing,
#[error("SNI is malformed")]
#[error("SNI is malformed.")]
Bad,
#[error("Inconsistent project name inferred from SNI and project option. String from SNI: '{0}', String from project option: '{1}'")]
Inconsistent(String, String),
}
impl UserFacingError for ProjectNameError {}
impl ClientCredentials {
/// Determine project name from SNI.
/// Determine project name from SNI or from project_name parameter from options argument.
pub fn project_name(&self) -> Result<&str, ProjectNameError> {
// Currently project name is passed as a top level domain
let sni = self.sni_data.as_ref().ok_or(ProjectNameError::Missing)?;
let (first, _) = sni.split_once('.').ok_or(ProjectNameError::Bad)?;
Ok(first)
// Checking that if both sni_data and project_name are set, then they should match
// otherwise, throws a ProjectNameError::Inconsistent error.
if let Some(sni_data) = &self.sni_data {
let project_name_from_sni_data =
sni_data.split_once('.').ok_or(ProjectNameError::Bad)?.0;
if let Some(project_name_from_options) = &self.project_name {
if !project_name_from_options.eq(project_name_from_sni_data) {
return Err(ProjectNameError::Inconsistent(
project_name_from_sni_data.to_string(),
project_name_from_options.to_string(),
));
}
}
}
// determine the project name from self.sni_data if it exists, otherwise from self.project_name.
let ret = match &self.sni_data {
// if sni_data exists, use it to determine project name
Some(sni_data) => sni_data.split_once('.').ok_or(ProjectNameError::Bad)?.0,
// otherwise use project_option if it was manually set thought options parameter.
None => self
.project_name
.as_ref()
.ok_or(ProjectNameError::Missing)?
.as_str(),
};
Ok(ret)
}
}
@@ -68,11 +98,13 @@ impl TryFrom<HashMap<String, String>> for ClientCredentials {
let user = get_param("user")?;
let dbname = get_param("database")?;
let project_name = get_param("project").ok();
Ok(Self {
user,
dbname,
sni_data: None,
project_name,
})
}
}

View File

@@ -29,12 +29,11 @@ pub struct SafekeeperPostgresHandler {
pub ztenantid: Option<ZTenantId>,
pub ztimelineid: Option<ZTimelineId>,
pub timeline: Option<Arc<Timeline>>,
pageserver_connstr: Option<String>,
}
/// Parsed Postgres command.
enum SafekeeperPostgresCommand {
StartWalPush { pageserver_connstr: Option<String> },
StartWalPush,
StartReplication { start_lsn: Lsn },
IdentifySystem,
JSONCtrl { cmd: AppendLogicalMessage },
@@ -42,11 +41,7 @@ enum SafekeeperPostgresCommand {
fn parse_cmd(cmd: &str) -> Result<SafekeeperPostgresCommand> {
if cmd.starts_with("START_WAL_PUSH") {
let re = Regex::new(r"START_WAL_PUSH(?: (.+))?").unwrap();
let caps = re.captures(cmd).unwrap();
let pageserver_connstr = caps.get(1).map(|m| m.as_str().to_owned());
Ok(SafekeeperPostgresCommand::StartWalPush { pageserver_connstr })
Ok(SafekeeperPostgresCommand::StartWalPush)
} else if cmd.starts_with("START_REPLICATION") {
let re =
Regex::new(r"START_REPLICATION(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)").unwrap();
@@ -86,8 +81,6 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler {
self.appname = Some(app_name.clone());
}
self.pageserver_connstr = params.get("pageserver_connstr").cloned();
Ok(())
} else {
bail!("Safekeeper received unexpected initial message: {:?}", sm);
@@ -113,14 +106,14 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler {
}
match cmd {
SafekeeperPostgresCommand::StartWalPush { pageserver_connstr } => {
ReceiveWalConn::new(pgb, pageserver_connstr)
SafekeeperPostgresCommand::StartWalPush => {
ReceiveWalConn::new(pgb)
.run(self)
.context("failed to run ReceiveWalConn")?;
}
SafekeeperPostgresCommand::StartReplication { start_lsn } => {
ReplicationConn::new(pgb)
.run(self, pgb, start_lsn, self.pageserver_connstr.clone())
.run(self, pgb, start_lsn)
.context("failed to run ReplicationConn")?;
}
SafekeeperPostgresCommand::IdentifySystem => {
@@ -142,7 +135,6 @@ impl SafekeeperPostgresHandler {
ztenantid: None,
ztimelineid: None,
timeline: None,
pageserver_connstr: None,
}
}

View File

@@ -32,22 +32,14 @@ pub struct ReceiveWalConn<'pg> {
pg_backend: &'pg mut PostgresBackend,
/// The cached result of `pg_backend.socket().peer_addr()` (roughly)
peer_addr: SocketAddr,
/// Pageserver connection string forwarded from compute
/// NOTE that it is allowed to operate without a pageserver.
/// So if compute has no pageserver configured do not use it.
pageserver_connstr: Option<String>,
}
impl<'pg> ReceiveWalConn<'pg> {
pub fn new(
pg: &'pg mut PostgresBackend,
pageserver_connstr: Option<String>,
) -> ReceiveWalConn<'pg> {
pub fn new(pg: &'pg mut PostgresBackend) -> ReceiveWalConn<'pg> {
let peer_addr = *pg.get_peer_addr();
ReceiveWalConn {
pg_backend: pg,
peer_addr,
pageserver_connstr,
}
}
@@ -120,9 +112,7 @@ impl<'pg> ReceiveWalConn<'pg> {
// Register the connection and defer unregister. Do that only
// after processing first message, as it sets wal_seg_size,
// wanted by many.
spg.timeline
.get()
.on_compute_connect(self.pageserver_connstr.as_ref())?;
spg.timeline.get().on_compute_connect()?;
_guard = Some(ComputeConnectionGuard {
timeline: Arc::clone(spg.timeline.get()),
});

View File

@@ -162,9 +162,8 @@ impl ReplicationConn {
spg: &mut SafekeeperPostgresHandler,
pgb: &mut PostgresBackend,
mut start_pos: Lsn,
pageserver_connstr: Option<String>,
) -> Result<()> {
let _enter = info_span!("WAL sender", timeline = %spg.ztimelineid.unwrap(), pageserver_connstr = %pageserver_connstr.as_deref().unwrap_or_default()).entered();
let _enter = info_span!("WAL sender", timeline = %spg.ztimelineid.unwrap()).entered();
// spawn the background thread which receives HotStandbyFeedback messages.
let bg_timeline = Arc::clone(spg.timeline.get());

View File

@@ -95,7 +95,6 @@ struct SharedState {
/// when tli is inactive instead of having this flag.
active: bool,
num_computes: u32,
pageserver_connstr: Option<String>,
last_removed_segno: XLogSegNo,
}
@@ -119,7 +118,6 @@ impl SharedState {
wal_backup_active: false,
active: false,
num_computes: 0,
pageserver_connstr: None,
last_removed_segno: 0,
})
}
@@ -139,7 +137,6 @@ impl SharedState {
wal_backup_active: false,
active: false,
num_computes: 0,
pageserver_connstr: None,
last_removed_segno: 0,
})
}
@@ -190,35 +187,6 @@ impl SharedState {
self.wal_backup_active
}
/// Activate timeline's walsender: start/change timeline information propagated into etcd for further pageserver connections.
fn activate_walsender(
&mut self,
zttid: &ZTenantTimelineId,
new_pageserver_connstr: Option<String>,
) {
if self.pageserver_connstr != new_pageserver_connstr {
self.deactivate_walsender(zttid);
if new_pageserver_connstr.is_some() {
info!(
"timeline {} has activated its walsender with connstr {new_pageserver_connstr:?}",
zttid.timeline_id,
);
}
self.pageserver_connstr = new_pageserver_connstr;
}
}
/// Deactivate the timeline: stop sending the timeline data into etcd, so no pageserver can connect for WAL streaming.
fn deactivate_walsender(&mut self, zttid: &ZTenantTimelineId) {
if let Some(pageserver_connstr) = self.pageserver_connstr.take() {
info!(
"timeline {} had deactivated its wallsender with connstr {pageserver_connstr:?}",
zttid.timeline_id,
)
}
}
fn get_wal_seg_size(&self) -> usize {
self.sk.state.server.wal_seg_size as usize
}
@@ -318,17 +286,12 @@ impl Timeline {
/// Register compute connection, starting timeline-related activity if it is
/// not running yet.
/// Can fail only if channel to a static thread got closed, which is not normal at all.
pub fn on_compute_connect(&self, pageserver_connstr: Option<&String>) -> Result<()> {
pub fn on_compute_connect(&self) -> Result<()> {
let is_wal_backup_action_pending: bool;
{
let mut shared_state = self.mutex.lock().unwrap();
shared_state.num_computes += 1;
is_wal_backup_action_pending = shared_state.update_status();
// FIXME: currently we always adopt latest pageserver connstr, but we
// should have kind of generations assigned by compute to distinguish
// the latest one or even pass it through consensus to reliably deliver
// to all safekeepers.
shared_state.activate_walsender(&self.zttid, pageserver_connstr.cloned());
}
// Wake up wal backup launcher, if offloading not started yet.
if is_wal_backup_action_pending {
@@ -364,7 +327,7 @@ impl Timeline {
(replica_state.remote_consistent_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet.
replica_state.remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn);
if stop {
shared_state.deactivate_walsender(&self.zttid);
shared_state.update_status();
return Ok(true);
}
}
@@ -525,7 +488,6 @@ impl Timeline {
)),
peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn),
safekeeper_connstr: Some(conf.listen_pg_addr.clone()),
pageserver_connstr: shared_state.pageserver_connstr.clone(),
backup_lsn: Some(shared_state.sk.inmem.backup_lsn),
})
}

View File

@@ -24,7 +24,7 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
'compaction_target_size': '4194304',
})
env.pageserver.safe_psql("failpoints flush-frozen=sleep(10000)")
env.pageserver.safe_psql("failpoints flush-frozen-before-sync=sleep(10000)")
pg_branch0 = env.postgres.create_start('main', tenant_id=tenant)
branch0_cur = pg_branch0.connect().cursor()

View File

@@ -1,5 +1,6 @@
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, NeonPageserverHttpClient
import pytest
def check_tenant(env: NeonEnv, pageserver_http: NeonPageserverHttpClient):
@@ -26,7 +27,8 @@ def check_tenant(env: NeonEnv, pageserver_http: NeonPageserverHttpClient):
pageserver_http.timeline_detach(tenant_id, timeline_id)
def test_normal_work(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize('num_timelines,num_safekeepers', [(3, 1)])
def test_normal_work(neon_env_builder: NeonEnvBuilder, num_timelines: int, num_safekeepers: int):
"""
Basic test:
* create new tenant with a timeline
@@ -41,7 +43,8 @@ def test_normal_work(neon_env_builder: NeonEnvBuilder):
"""
env = neon_env_builder.init_start()
neon_env_builder.num_safekeepers = num_safekeepers
pageserver_http = env.pageserver.http_client()
for _ in range(3):
for _ in range(num_timelines):
check_tenant(env, pageserver_http)

View File

@@ -45,7 +45,8 @@ def test_pageserver_recovery(neon_env_builder: NeonEnvBuilder):
# Configure failpoints
pscur.execute(
"failpoints checkpoint-before-sync=sleep(2000);checkpoint-after-sync=exit")
"failpoints flush-frozen-before-sync=sleep(2000);checkpoint-after-sync=exit"
)
# Do some updates until pageserver is crashed
try:

View File

@@ -1,6 +1,5 @@
pytest_plugins = (
"fixtures.neon_fixtures",
"fixtures.benchmark_fixture",
"fixtures.compare_fixtures",
"fixtures.slow",
)
pytest_plugins = ("fixtures.neon_fixtures",
"fixtures.benchmark_fixture",
"fixtures.compare_fixtures",
"fixtures.slow",
"fixtures.pg_stats")

View File

@@ -1,12 +1,13 @@
import pytest
from contextlib import contextmanager
from abc import ABC, abstractmethod
from fixtures.pg_stats import PgStatTable
from fixtures.neon_fixtures import PgBin, PgProtocol, VanillaPostgres, RemotePostgres, NeonEnv
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
# Type-related stuff
from typing import Iterator
from typing import Dict, List
class PgCompare(ABC):
@@ -51,6 +52,31 @@ class PgCompare(ABC):
def record_duration(self, out_name):
pass
@contextmanager
def record_pg_stats(self, pg_stats: List[PgStatTable]):
init_data = self._retrieve_pg_stats(pg_stats)
yield
data = self._retrieve_pg_stats(pg_stats)
for k in set(init_data) & set(data):
self.zenbenchmark.record(k, data[k] - init_data[k], '', MetricReport.HIGHER_IS_BETTER)
def _retrieve_pg_stats(self, pg_stats: List[PgStatTable]) -> Dict[str, int]:
results: Dict[str, int] = {}
with self.pg.connect().cursor() as cur:
for pg_stat in pg_stats:
cur.execute(pg_stat.query)
row = cur.fetchone()
assert len(row) == len(pg_stat.columns)
for col, val in zip(pg_stat.columns, row):
results[f"{pg_stat.table}.{col}"] = int(val)
return results
class NeonCompare(PgCompare):
"""PgCompare interface for the neon stack."""

View File

@@ -0,0 +1,52 @@
from typing import List
import pytest
class PgStatTable:
table: str
columns: List[str]
additional_query: str
def __init__(self, table: str, columns: List[str], filter_query: str = ""):
self.table = table
self.columns = columns
self.additional_query = filter_query
@property
def query(self) -> str:
return f"SELECT {','.join(self.columns)} FROM {self.table} {self.additional_query}"
@pytest.fixture(scope='function')
def pg_stats_rw() -> List[PgStatTable]:
return [
PgStatTable("pg_stat_database",
["tup_returned", "tup_fetched", "tup_inserted", "tup_updated", "tup_deleted"],
"WHERE datname='postgres'"),
]
@pytest.fixture(scope='function')
def pg_stats_ro() -> List[PgStatTable]:
return [
PgStatTable("pg_stat_database", ["tup_returned", "tup_fetched"],
"WHERE datname='postgres'"),
]
@pytest.fixture(scope='function')
def pg_stats_wo() -> List[PgStatTable]:
return [
PgStatTable("pg_stat_database", ["tup_inserted", "tup_updated", "tup_deleted"],
"WHERE datname='postgres'"),
]
@pytest.fixture(scope='function')
def pg_stats_wal() -> List[PgStatTable]:
return [
PgStatTable("pg_stat_wal",
["wal_records", "wal_fpi", "wal_bytes", "wal_buffers_full", "wal_write"],
"")
]

View File

@@ -0,0 +1,101 @@
import os
from typing import List
import pytest
from fixtures.compare_fixtures import PgCompare
from fixtures.pg_stats import PgStatTable
from performance.test_perf_pgbench import get_durations_matrix, get_scales_matrix
def get_seeds_matrix(default: int = 100):
seeds = os.getenv("TEST_PG_BENCH_SEEDS_MATRIX", default=str(default))
return list(map(int, seeds.split(",")))
@pytest.mark.parametrize("seed", get_seeds_matrix())
@pytest.mark.parametrize("scale", get_scales_matrix())
@pytest.mark.parametrize("duration", get_durations_matrix(5))
def test_compare_pg_stats_rw_with_pgbench_default(neon_with_baseline: PgCompare,
seed: int,
scale: int,
duration: int,
pg_stats_rw: List[PgStatTable]):
env = neon_with_baseline
# initialize pgbench
env.pg_bin.run_capture(['pgbench', f'-s{scale}', '-i', env.pg.connstr()])
env.flush()
with env.record_pg_stats(pg_stats_rw):
env.pg_bin.run_capture(
['pgbench', f'-T{duration}', f'--random-seed={seed}', '-Mprepared', env.pg.connstr()])
env.flush()
@pytest.mark.parametrize("seed", get_seeds_matrix())
@pytest.mark.parametrize("scale", get_scales_matrix())
@pytest.mark.parametrize("duration", get_durations_matrix(5))
def test_compare_pg_stats_wo_with_pgbench_simple_update(neon_with_baseline: PgCompare,
seed: int,
scale: int,
duration: int,
pg_stats_wo: List[PgStatTable]):
env = neon_with_baseline
# initialize pgbench
env.pg_bin.run_capture(['pgbench', f'-s{scale}', '-i', env.pg.connstr()])
env.flush()
with env.record_pg_stats(pg_stats_wo):
env.pg_bin.run_capture([
'pgbench',
'-N',
f'-T{duration}',
f'--random-seed={seed}',
'-Mprepared',
env.pg.connstr()
])
env.flush()
@pytest.mark.parametrize("seed", get_seeds_matrix())
@pytest.mark.parametrize("scale", get_scales_matrix())
@pytest.mark.parametrize("duration", get_durations_matrix(5))
def test_compare_pg_stats_ro_with_pgbench_select_only(neon_with_baseline: PgCompare,
seed: int,
scale: int,
duration: int,
pg_stats_ro: List[PgStatTable]):
env = neon_with_baseline
# initialize pgbench
env.pg_bin.run_capture(['pgbench', f'-s{scale}', '-i', env.pg.connstr()])
env.flush()
with env.record_pg_stats(pg_stats_ro):
env.pg_bin.run_capture([
'pgbench',
'-S',
f'-T{duration}',
f'--random-seed={seed}',
'-Mprepared',
env.pg.connstr()
])
env.flush()
@pytest.mark.parametrize("seed", get_seeds_matrix())
@pytest.mark.parametrize("scale", get_scales_matrix())
@pytest.mark.parametrize("duration", get_durations_matrix(5))
def test_compare_pg_stats_wal_with_pgbench_default(neon_with_baseline: PgCompare,
seed: int,
scale: int,
duration: int,
pg_stats_wal: List[PgStatTable]):
env = neon_with_baseline
# initialize pgbench
env.pg_bin.run_capture(['pgbench', f'-s{scale}', '-i', env.pg.connstr()])
env.flush()
with env.record_pg_stats(pg_stats_wal):
env.pg_bin.run_capture(
['pgbench', f'-T{duration}', f'--random-seed={seed}', '-Mprepared', env.pg.connstr()])
env.flush()

View File

@@ -79,7 +79,7 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int):
# Run simple-update workload
run_pgbench(env,
"simple-update",
['pgbench', '-n', '-c4', f'-T{duration}', '-P2', '-Mprepared', env.pg.connstr()])
['pgbench', '-N', '-c4', f'-T{duration}', '-P2', '-Mprepared', env.pg.connstr()])
# Run SELECT workload
run_pgbench(env,
@@ -89,13 +89,13 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int):
env.report_size()
def get_durations_matrix():
durations = os.getenv("TEST_PG_BENCH_DURATIONS_MATRIX", default="45")
def get_durations_matrix(default: int = 45):
durations = os.getenv("TEST_PG_BENCH_DURATIONS_MATRIX", default=str(default))
return list(map(int, durations.split(",")))
def get_scales_matrix():
scales = os.getenv("TEST_PG_BENCH_SCALES_MATRIX", default="10")
def get_scales_matrix(default: int = 10):
scales = os.getenv("TEST_PG_BENCH_SCALES_MATRIX", default=str(default))
return list(map(int, scales.split(",")))