mirror of
https://github.com/neondatabase/neon.git
synced 2026-04-23 17:30:37 +00:00
Compare commits
10 Commits
kelvich/st
...
WIP_import
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
21ad98ae4e | ||
|
|
6cfebc096f | ||
|
|
fecad1ca34 | ||
|
|
92de8423af | ||
|
|
e442f5357b | ||
|
|
5a723d44cd | ||
|
|
2623193876 | ||
|
|
70a53c4b03 | ||
|
|
9e108102b3 | ||
|
|
9c846a93e8 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -5,6 +5,7 @@
|
||||
__pycache__/
|
||||
test_output/
|
||||
.vscode
|
||||
.idea
|
||||
/.zenith
|
||||
/integration_tests/.zenith
|
||||
|
||||
|
||||
@@ -57,15 +57,15 @@ pub struct SkTimelineInfo {
|
||||
pub peer_horizon_lsn: Option<Lsn>,
|
||||
#[serde(default)]
|
||||
pub safekeeper_connstr: Option<String>,
|
||||
#[serde(default)]
|
||||
pub pageserver_connstr: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum BrokerError {
|
||||
#[error("Etcd client error: {0}. Context: {1}")]
|
||||
EtcdClient(etcd_client::Error, String),
|
||||
#[error("Error during parsing etcd data: {0}")]
|
||||
#[error("Error during parsing etcd key: {0}")]
|
||||
InvalidKey(String),
|
||||
#[error("Error during parsing etcd value: {0}")]
|
||||
ParsingError(String),
|
||||
#[error("Internal error: {0}")]
|
||||
InternalError(String),
|
||||
@@ -221,7 +221,7 @@ pub async fn subscribe_to_safekeeper_timeline_updates(
|
||||
},
|
||||
};
|
||||
|
||||
match parse_etcd_key_value(&subscription, key_str, value_str) {
|
||||
match parse_safekeeper_timeline(&subscription, key_str, value_str) {
|
||||
Ok((zttid, timeline)) => {
|
||||
match timeline_updates
|
||||
.entry(zttid)
|
||||
@@ -243,6 +243,8 @@ pub async fn subscribe_to_safekeeper_timeline_updates(
|
||||
}
|
||||
}
|
||||
}
|
||||
// it is normal to get other keys when we subscribe to everything
|
||||
Err(BrokerError::InvalidKey(e)) => debug!("Unexpected key for timeline update: {e}"),
|
||||
Err(e) => error!("Failed to parse timeline update: {e}"),
|
||||
};
|
||||
}
|
||||
@@ -281,14 +283,14 @@ static SK_TIMELINE_KEY_REGEX: Lazy<Regex> = Lazy::new(|| {
|
||||
.expect("wrong regex for safekeeper timeline etcd key")
|
||||
});
|
||||
|
||||
fn parse_etcd_key_value(
|
||||
fn parse_safekeeper_timeline(
|
||||
subscription: &SkTimelineSubscriptionKind,
|
||||
key_str: &str,
|
||||
value_str: &str,
|
||||
) -> Result<(ZTenantTimelineId, SafekeeperTimeline), BrokerError> {
|
||||
let broker_prefix = subscription.broker_etcd_prefix.as_str();
|
||||
if !key_str.starts_with(broker_prefix) {
|
||||
return Err(BrokerError::ParsingError(format!(
|
||||
return Err(BrokerError::InvalidKey(format!(
|
||||
"KV has unexpected key '{key_str}' that does not start with broker prefix {broker_prefix}"
|
||||
)));
|
||||
}
|
||||
@@ -297,7 +299,7 @@ fn parse_etcd_key_value(
|
||||
let key_captures = match SK_TIMELINE_KEY_REGEX.captures(key_part) {
|
||||
Some(captures) => captures,
|
||||
None => {
|
||||
return Err(BrokerError::ParsingError(format!(
|
||||
return Err(BrokerError::InvalidKey(format!(
|
||||
"KV has unexpected key part '{key_part}' that does not match required regex {}",
|
||||
SK_TIMELINE_KEY_REGEX.as_str()
|
||||
)));
|
||||
@@ -383,7 +385,7 @@ mod tests {
|
||||
&timeline_subscription,
|
||||
] {
|
||||
let (id, _timeline) =
|
||||
parse_etcd_key_value(subscription, &key_string, value_str)
|
||||
parse_safekeeper_timeline(subscription, &key_string, value_str)
|
||||
.unwrap_or_else(|e| panic!("Should be able to parse etcd key string '{key_string}' and etcd value string '{value_str}' for subscription {subscription:?}, but got: {e}"));
|
||||
assert_eq!(id, ZTenantTimelineId::new(tenant_id, timeline_id));
|
||||
}
|
||||
|
||||
@@ -269,7 +269,14 @@ impl FeStartupPacket {
|
||||
.next()
|
||||
.context("expected even number of params in StartupMessage")?;
|
||||
if name == "options" {
|
||||
// deprecated way of passing params as cmd line args
|
||||
// parsing options arguments "...&options=<var0>%3D<val0>+<var1>=<var1>..."
|
||||
// '%3D' is '=' and '+' is ' '
|
||||
|
||||
// Note: we allow users that don't have SNI capabilities,
|
||||
// to pass a special keyword argument 'project'
|
||||
// to be used to determine the cluster name by the proxy.
|
||||
|
||||
//TODO: write unit test for this and refactor in its own function.
|
||||
for cmdopt in value.split(' ') {
|
||||
let nameval: Vec<&str> = cmdopt.split('=').collect();
|
||||
if nameval.len() == 2 {
|
||||
|
||||
@@ -22,6 +22,8 @@ use postgres_ffi::{pg_constants, ControlFileData, DBState_DB_SHUTDOWNED};
|
||||
use postgres_ffi::{Oid, TransactionId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use postgres::CopyOutReader;
|
||||
|
||||
///
|
||||
/// Import all relation data pages from local disk into the repository.
|
||||
///
|
||||
@@ -403,3 +405,126 @@ fn import_wal<R: Repository>(
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn import_timeline_from_tar<R: Repository>(
|
||||
tline: &mut DatadirTimeline<R>,
|
||||
copyreader: CopyOutReader,
|
||||
lsn: Lsn,
|
||||
) -> Result<()> {
|
||||
let mut ar = tar::Archive::new(copyreader);
|
||||
|
||||
let mut modification = tline.begin_modification(lsn);
|
||||
modification.init_empty()?;
|
||||
|
||||
for e in ar.entries().unwrap() {
|
||||
let mut entry = e.unwrap();
|
||||
let header = entry.header();
|
||||
let file_path = header.path().unwrap().into_owned();
|
||||
|
||||
match header.entry_type() {
|
||||
tar::EntryType::Regular => {
|
||||
let mut buffer = Vec::new();
|
||||
// read the whole entry
|
||||
entry.read_to_end(&mut buffer).unwrap();
|
||||
|
||||
import_file(&mut modification, &file_path.as_ref(), &buffer)?;
|
||||
}
|
||||
tar::EntryType::Directory => {
|
||||
println!("tar::EntryType::Directory {}", file_path.display());
|
||||
}
|
||||
_ => {
|
||||
panic!("tar::EntryType::?? {}", file_path.display());
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn import_file<R: Repository>(
|
||||
modification: &mut DatadirModification<R>,
|
||||
file_path: &Path,
|
||||
buffer: &[u8],
|
||||
) -> Result<()> {
|
||||
if file_path.starts_with("global") {
|
||||
let spcnode = pg_constants::GLOBALTABLESPACE_OID;
|
||||
let dbnode = 0;
|
||||
|
||||
match file_path.file_name().unwrap().to_string_lossy().as_ref() {
|
||||
"pg_control" => {
|
||||
println!("pg_control file {}", file_path.display());
|
||||
|
||||
// Import it as ControlFile
|
||||
modification.put_control_file(Bytes::copy_from_slice(&buffer[..]))?;
|
||||
|
||||
// Extract the checkpoint record and import it separately.
|
||||
let pg_control = ControlFileData::decode(&buffer)?;
|
||||
let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
|
||||
modification.put_checkpoint(checkpoint_bytes)?;
|
||||
}
|
||||
"pg_filenode.map" => {
|
||||
("pg_filenode.map file {}", file_path.display());
|
||||
}
|
||||
_ => {
|
||||
println!("global relfile {}", file_path.display());
|
||||
//TODO
|
||||
}
|
||||
}
|
||||
} else if file_path.starts_with("base") {
|
||||
let spcnode = pg_constants::DEFAULTTABLESPACE_OID;
|
||||
let dbnode: u32 = file_path
|
||||
.iter()
|
||||
.skip(1)
|
||||
.next()
|
||||
.unwrap()
|
||||
.to_string_lossy()
|
||||
.parse()
|
||||
.unwrap();
|
||||
|
||||
match file_path.file_name().unwrap().to_string_lossy().as_ref() {
|
||||
"pg_filenode.map" => {
|
||||
println!(
|
||||
"dbnode {} pg_filenode.map file {}",
|
||||
dbnode,
|
||||
file_path.display()
|
||||
);
|
||||
modification.put_relmap_file(
|
||||
spcnode,
|
||||
dbnode,
|
||||
Bytes::copy_from_slice(&buffer[..]),
|
||||
)?;
|
||||
}
|
||||
_ => {
|
||||
println!("dbnode {} relfile {}", dbnode, file_path.display());
|
||||
//TODO
|
||||
}
|
||||
}
|
||||
} else if file_path.starts_with("pg_xact") {
|
||||
println!(
|
||||
"pg_xact {} ",
|
||||
file_path.file_name().unwrap().to_string_lossy().as_ref()
|
||||
);
|
||||
// TODO
|
||||
} else if file_path.starts_with("pg_multixact/offset") {
|
||||
println!(
|
||||
"pg_multixact/offset {}",
|
||||
file_path.file_name().unwrap().to_string_lossy().as_ref()
|
||||
);
|
||||
// TODO
|
||||
} else if file_path.starts_with("pg_multixact/members") {
|
||||
println!(
|
||||
"pg_multixact/members {}",
|
||||
file_path.file_name().unwrap().to_string_lossy().as_ref()
|
||||
);
|
||||
// TODO
|
||||
} else if file_path.starts_with("pg_twophase") {
|
||||
let xid = u32::from_str_radix(&file_path.file_name().unwrap().to_string_lossy(), 16)?;
|
||||
|
||||
println!(
|
||||
"xid {} pg_twophase {}",
|
||||
xid,
|
||||
file_path.file_name().unwrap().to_string_lossy().as_ref()
|
||||
);
|
||||
modification.put_twophase_file(xid, Bytes::copy_from_slice(&buffer[..]))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1727,9 +1727,7 @@ impl LayeredTimeline {
|
||||
new_delta_path.clone(),
|
||||
self.conf.timeline_path(&self.timeline_id, &self.tenant_id),
|
||||
])?;
|
||||
fail_point!("checkpoint-before-sync");
|
||||
|
||||
fail_point!("flush-frozen");
|
||||
fail_point!("flush-frozen-before-sync");
|
||||
|
||||
// Finally, replace the frozen in-memory layer with the new on-disk layer
|
||||
{
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
//! assign a buffer for a page, you must hold the mapping lock and the lock on
|
||||
//! the slot at the same time.
|
||||
//!
|
||||
//! Whenever you need to hold both locks simultenously, the slot lock must be
|
||||
//! Whenever you need to hold both locks simultaneously, the slot lock must be
|
||||
//! acquired first. This consistent ordering avoids deadlocks. To look up a page
|
||||
//! in the cache, you would first look up the mapping, while holding the mapping
|
||||
//! lock, and then lock the slot. You must release the mapping lock in between,
|
||||
|
||||
@@ -1,223 +0,0 @@
|
||||
//! Timeline synchronization logic to delete a bulk of timeline's remote files from the remote storage.
|
||||
|
||||
use anyhow::Context;
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use tracing::{debug, error, info};
|
||||
use utils::zid::ZTenantTimelineId;
|
||||
|
||||
use crate::remote_storage::{
|
||||
storage_sync::{SyncQueue, SyncTask},
|
||||
RemoteStorage,
|
||||
};
|
||||
|
||||
use super::{LayersDeletion, SyncData};
|
||||
|
||||
/// Attempts to remove the timleline layers from the remote storage.
|
||||
/// If the task had not adjusted the metadata before, the deletion will fail.
|
||||
pub(super) async fn delete_timeline_layers<'a, P, S>(
|
||||
storage: &'a S,
|
||||
sync_queue: &SyncQueue,
|
||||
sync_id: ZTenantTimelineId,
|
||||
mut delete_data: SyncData<LayersDeletion>,
|
||||
) -> bool
|
||||
where
|
||||
P: std::fmt::Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
if !delete_data.data.deletion_registered {
|
||||
error!("Cannot delete timeline layers before the deletion metadata is not registered, reenqueueing");
|
||||
delete_data.retries += 1;
|
||||
sync_queue.push(sync_id, SyncTask::Delete(delete_data));
|
||||
return false;
|
||||
}
|
||||
|
||||
if delete_data.data.layers_to_delete.is_empty() {
|
||||
info!("No layers to delete, skipping");
|
||||
return true;
|
||||
}
|
||||
|
||||
let layers_to_delete = delete_data
|
||||
.data
|
||||
.layers_to_delete
|
||||
.drain()
|
||||
.collect::<Vec<_>>();
|
||||
debug!("Layers to delete: {layers_to_delete:?}");
|
||||
info!("Deleting {} timeline layers", layers_to_delete.len());
|
||||
|
||||
let mut delete_tasks = layers_to_delete
|
||||
.into_iter()
|
||||
.map(|local_layer_path| async {
|
||||
let storage_path = match storage.storage_path(&local_layer_path).with_context(|| {
|
||||
format!(
|
||||
"Failed to get the layer storage path for local path '{}'",
|
||||
local_layer_path.display()
|
||||
)
|
||||
}) {
|
||||
Ok(path) => path,
|
||||
Err(e) => return Err((e, local_layer_path)),
|
||||
};
|
||||
|
||||
match storage.delete(&storage_path).await.with_context(|| {
|
||||
format!(
|
||||
"Failed to delete remote layer from storage at '{:?}'",
|
||||
storage_path
|
||||
)
|
||||
}) {
|
||||
Ok(()) => Ok(local_layer_path),
|
||||
Err(e) => Err((e, local_layer_path)),
|
||||
}
|
||||
})
|
||||
.collect::<FuturesUnordered<_>>();
|
||||
|
||||
let mut errored = false;
|
||||
while let Some(deletion_result) = delete_tasks.next().await {
|
||||
match deletion_result {
|
||||
Ok(local_layer_path) => {
|
||||
debug!(
|
||||
"Successfully deleted layer {} for timeline {sync_id}",
|
||||
local_layer_path.display()
|
||||
);
|
||||
delete_data.data.deleted_layers.insert(local_layer_path);
|
||||
}
|
||||
Err((e, local_layer_path)) => {
|
||||
errored = true;
|
||||
error!(
|
||||
"Failed to delete layer {} for timeline {sync_id}: {e:?}",
|
||||
local_layer_path.display()
|
||||
);
|
||||
delete_data.data.layers_to_delete.insert(local_layer_path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if errored {
|
||||
debug!("Reenqueuing failed delete task for timeline {sync_id}");
|
||||
delete_data.retries += 1;
|
||||
sync_queue.push(sync_id, SyncTask::Delete(delete_data));
|
||||
}
|
||||
errored
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{collections::HashSet, num::NonZeroUsize};
|
||||
|
||||
use itertools::Itertools;
|
||||
use tempfile::tempdir;
|
||||
use tokio::fs;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::{
|
||||
remote_storage::{
|
||||
storage_sync::test_utils::{create_local_timeline, dummy_metadata},
|
||||
LocalFs,
|
||||
},
|
||||
repository::repo_harness::{RepoHarness, TIMELINE_ID},
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn delete_timeline_negative() -> anyhow::Result<()> {
|
||||
let harness = RepoHarness::create("delete_timeline_negative")?;
|
||||
let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
let storage = LocalFs::new(tempdir()?.path().to_path_buf(), &harness.conf.workdir)?;
|
||||
|
||||
let deleted = delete_timeline_layers(
|
||||
&storage,
|
||||
&sync_queue,
|
||||
sync_id,
|
||||
SyncData {
|
||||
retries: 1,
|
||||
data: LayersDeletion {
|
||||
deleted_layers: HashSet::new(),
|
||||
layers_to_delete: HashSet::new(),
|
||||
deletion_registered: false,
|
||||
},
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(
|
||||
!deleted,
|
||||
"Should not start the deletion for task with delete metadata unregistered"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn delete_timeline() -> anyhow::Result<()> {
|
||||
let harness = RepoHarness::create("delete_timeline")?;
|
||||
let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
let layer_files = ["a", "b", "c", "d"];
|
||||
let storage = LocalFs::new(tempdir()?.path().to_path_buf(), &harness.conf.workdir)?;
|
||||
let current_retries = 3;
|
||||
let metadata = dummy_metadata(Lsn(0x30));
|
||||
let local_timeline_path = harness.timeline_path(&TIMELINE_ID);
|
||||
let timeline_upload =
|
||||
create_local_timeline(&harness, TIMELINE_ID, &layer_files, metadata.clone()).await?;
|
||||
for local_path in timeline_upload.layers_to_upload {
|
||||
let remote_path = storage.storage_path(&local_path)?;
|
||||
let remote_parent_dir = remote_path.parent().unwrap();
|
||||
if !remote_parent_dir.exists() {
|
||||
fs::create_dir_all(&remote_parent_dir).await?;
|
||||
}
|
||||
fs::copy(&local_path, &remote_path).await?;
|
||||
}
|
||||
assert_eq!(
|
||||
storage
|
||||
.list()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|remote_path| storage.local_path(&remote_path).unwrap())
|
||||
.filter_map(|local_path| { Some(local_path.file_name()?.to_str()?.to_owned()) })
|
||||
.sorted()
|
||||
.collect::<Vec<_>>(),
|
||||
layer_files
|
||||
.iter()
|
||||
.map(|layer_str| layer_str.to_string())
|
||||
.sorted()
|
||||
.collect::<Vec<_>>(),
|
||||
"Expect to have all layer files remotely before deletion"
|
||||
);
|
||||
|
||||
let deleted = delete_timeline_layers(
|
||||
&storage,
|
||||
&sync_queue,
|
||||
sync_id,
|
||||
SyncData {
|
||||
retries: current_retries,
|
||||
data: LayersDeletion {
|
||||
deleted_layers: HashSet::new(),
|
||||
layers_to_delete: HashSet::from([
|
||||
local_timeline_path.join("a"),
|
||||
local_timeline_path.join("c"),
|
||||
local_timeline_path.join("something_different"),
|
||||
]),
|
||||
deletion_registered: true,
|
||||
},
|
||||
},
|
||||
)
|
||||
.await;
|
||||
assert!(deleted, "Should be able to delete timeline files");
|
||||
|
||||
assert_eq!(
|
||||
storage
|
||||
.list()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|remote_path| storage.local_path(&remote_path).unwrap())
|
||||
.filter_map(|local_path| { Some(local_path.file_name()?.to_str()?.to_owned()) })
|
||||
.sorted()
|
||||
.collect::<Vec<_>>(),
|
||||
vec!["b".to_string(), "d".to_string()],
|
||||
"Expect to have only non-deleted files remotely"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -195,6 +195,7 @@ impl Display for TimelineSyncStatusUpdate {
|
||||
f.write_str(s)
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// A repository corresponds to one .zenith directory. One repository holds multiple
|
||||
/// timelines, forked off from the same initial call to 'initdb'.
|
||||
@@ -242,7 +243,7 @@ pub trait Repository: Send + Sync {
|
||||
///
|
||||
/// 'timelineid' specifies the timeline to GC, or None for all.
|
||||
/// `horizon` specifies delta from last lsn to preserve all object versions (pitr interval).
|
||||
/// `checkpoint_before_gc` parameter is used to force compaction of storage before CG
|
||||
/// `checkpoint_before_gc` parameter is used to force compaction of storage before GC
|
||||
/// to make tests more deterministic.
|
||||
/// TODO Do we still need it or we can call checkpoint explicitly in tests where needed?
|
||||
fn gc_iteration(
|
||||
|
||||
@@ -659,7 +659,6 @@ impl WalConnectionManager {
|
||||
match wal_stream_connection_string(
|
||||
self.id,
|
||||
info.safekeeper_connstr.as_deref()?,
|
||||
info.pageserver_connstr.as_deref()?,
|
||||
) {
|
||||
Ok(connstr) => Some((sk_id, info, connstr)),
|
||||
Err(e) => {
|
||||
@@ -749,7 +748,6 @@ fn wal_stream_connection_string(
|
||||
timeline_id,
|
||||
}: ZTenantTimelineId,
|
||||
listen_pg_addr_str: &str,
|
||||
pageserver_connstr: &str,
|
||||
) -> anyhow::Result<String> {
|
||||
let sk_connstr = format!("postgresql://no_user@{listen_pg_addr_str}/no_db");
|
||||
let me_conf = sk_connstr
|
||||
@@ -759,7 +757,7 @@ fn wal_stream_connection_string(
|
||||
})?;
|
||||
let (host, port) = utils::connstring::connection_host_port(&me_conf);
|
||||
Ok(format!(
|
||||
"host={host} port={port} options='-c ztimelineid={timeline_id} ztenantid={tenant_id} pageserver_connstr={pageserver_connstr}'",
|
||||
"host={host} port={port} options='-c ztimelineid={timeline_id} ztenantid={tenant_id}'"
|
||||
))
|
||||
}
|
||||
|
||||
@@ -792,20 +790,6 @@ mod tests {
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: None,
|
||||
pageserver_connstr: Some("no safekeeper_connstr".to_string()),
|
||||
},
|
||||
),
|
||||
(
|
||||
NodeId(1),
|
||||
SkTimelineInfo {
|
||||
last_log_term: None,
|
||||
flush_lsn: None,
|
||||
commit_lsn: Some(Lsn(1)),
|
||||
backup_lsn: None,
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some("no pageserver_connstr".to_string()),
|
||||
pageserver_connstr: None,
|
||||
},
|
||||
),
|
||||
(
|
||||
@@ -818,7 +802,6 @@ mod tests {
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some("no commit_lsn".to_string()),
|
||||
pageserver_connstr: Some("no commit_lsn (p)".to_string()),
|
||||
},
|
||||
),
|
||||
(
|
||||
@@ -831,7 +814,6 @@ mod tests {
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some("no commit_lsn".to_string()),
|
||||
pageserver_connstr: Some("no commit_lsn (p)".to_string()),
|
||||
},
|
||||
),
|
||||
]));
|
||||
@@ -887,7 +869,6 @@ mod tests {
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
|
||||
pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()),
|
||||
},
|
||||
),
|
||||
(
|
||||
@@ -900,7 +881,6 @@ mod tests {
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some("not advanced Lsn".to_string()),
|
||||
pageserver_connstr: Some("not advanced Lsn (p)".to_string()),
|
||||
},
|
||||
),
|
||||
(
|
||||
@@ -915,7 +895,6 @@ mod tests {
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some("not enough advanced Lsn".to_string()),
|
||||
pageserver_connstr: Some("not enough advanced Lsn (p)".to_string()),
|
||||
},
|
||||
),
|
||||
]));
|
||||
@@ -947,7 +926,6 @@ mod tests {
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
|
||||
pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()),
|
||||
},
|
||||
)]))
|
||||
.expect("Expected one candidate selected out of the only data option, but got none");
|
||||
@@ -960,9 +938,6 @@ mod tests {
|
||||
assert!(only_candidate
|
||||
.wal_producer_connstr
|
||||
.contains(DUMMY_SAFEKEEPER_CONNSTR));
|
||||
assert!(only_candidate
|
||||
.wal_producer_connstr
|
||||
.contains(DUMMY_PAGESERVER_CONNSTR));
|
||||
|
||||
let selected_lsn = 100_000;
|
||||
let biggest_wal_candidate = data_manager_with_no_connection
|
||||
@@ -977,7 +952,6 @@ mod tests {
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some("smaller commit_lsn".to_string()),
|
||||
pageserver_connstr: Some("smaller commit_lsn (p)".to_string()),
|
||||
},
|
||||
),
|
||||
(
|
||||
@@ -990,7 +964,6 @@ mod tests {
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
|
||||
pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()),
|
||||
},
|
||||
),
|
||||
(
|
||||
@@ -1003,9 +976,6 @@ mod tests {
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: None,
|
||||
pageserver_connstr: Some(
|
||||
"no safekeeper_connstr despite bigger commit_lsn".to_string(),
|
||||
),
|
||||
},
|
||||
),
|
||||
]))
|
||||
@@ -1022,9 +992,6 @@ mod tests {
|
||||
assert!(biggest_wal_candidate
|
||||
.wal_producer_connstr
|
||||
.contains(DUMMY_SAFEKEEPER_CONNSTR));
|
||||
assert!(biggest_wal_candidate
|
||||
.wal_producer_connstr
|
||||
.contains(DUMMY_PAGESERVER_CONNSTR));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1071,7 +1038,6 @@ mod tests {
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
|
||||
pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()),
|
||||
},
|
||||
),
|
||||
(
|
||||
@@ -1084,7 +1050,6 @@ mod tests {
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some("advanced by Lsn safekeeper".to_string()),
|
||||
pageserver_connstr: Some("advanced by Lsn safekeeper (p)".to_string()),
|
||||
},
|
||||
),
|
||||
]);
|
||||
@@ -1108,9 +1073,6 @@ mod tests {
|
||||
assert!(over_threshcurrent_candidate
|
||||
.wal_producer_connstr
|
||||
.contains("advanced by Lsn safekeeper"));
|
||||
assert!(over_threshcurrent_candidate
|
||||
.wal_producer_connstr
|
||||
.contains("advanced by Lsn safekeeper (p)"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1146,7 +1108,6 @@ mod tests {
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
|
||||
pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()),
|
||||
},
|
||||
)]))
|
||||
.expect(
|
||||
@@ -1168,9 +1129,6 @@ mod tests {
|
||||
assert!(over_threshcurrent_candidate
|
||||
.wal_producer_connstr
|
||||
.contains(DUMMY_SAFEKEEPER_CONNSTR));
|
||||
assert!(over_threshcurrent_candidate
|
||||
.wal_producer_connstr
|
||||
.contains(DUMMY_PAGESERVER_CONNSTR));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1197,7 +1155,6 @@ mod tests {
|
||||
}
|
||||
|
||||
const DUMMY_SAFEKEEPER_CONNSTR: &str = "safekeeper_connstr";
|
||||
const DUMMY_PAGESERVER_CONNSTR: &str = "pageserver_connstr";
|
||||
|
||||
// the function itself does not need async, but it spawns a tokio::task underneath hence neeed
|
||||
// a runtime to not to panic
|
||||
@@ -1205,9 +1162,8 @@ mod tests {
|
||||
id: ZTenantTimelineId,
|
||||
safekeeper_id: NodeId,
|
||||
) -> WalConnectionData {
|
||||
let dummy_connstr =
|
||||
wal_stream_connection_string(id, DUMMY_SAFEKEEPER_CONNSTR, DUMMY_PAGESERVER_CONNSTR)
|
||||
.expect("Failed to construct dummy wal producer connstr");
|
||||
let dummy_connstr = wal_stream_connection_string(id, DUMMY_SAFEKEEPER_CONNSTR)
|
||||
.expect("Failed to construct dummy wal producer connstr");
|
||||
WalConnectionData {
|
||||
safekeeper_id,
|
||||
connection: WalReceiverConnection::open(
|
||||
|
||||
@@ -26,6 +26,11 @@ pub struct ClientCredentials {
|
||||
// New console API requires SNI info to determine the cluster name.
|
||||
// Other Auth backends don't need it.
|
||||
pub sni_data: Option<String>,
|
||||
|
||||
// project_name is passed as argument from options from url.
|
||||
// In case sni_data is missing: project_name is used to determine cluster name.
|
||||
// In case sni_data is available: project_name and sni_data should match (otherwise throws an error).
|
||||
pub project_name: Option<String>,
|
||||
}
|
||||
|
||||
impl ClientCredentials {
|
||||
@@ -37,22 +42,47 @@ impl ClientCredentials {
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ProjectNameError {
|
||||
#[error("SNI is missing, please upgrade the postgres client library")]
|
||||
#[error("SNI is missing. EITHER please upgrade the postgres client library OR pass the project name as a parameter: '...&options=project%3D<project-name>...'.")]
|
||||
Missing,
|
||||
|
||||
#[error("SNI is malformed")]
|
||||
#[error("SNI is malformed.")]
|
||||
Bad,
|
||||
|
||||
#[error("Inconsistent project name inferred from SNI and project option. String from SNI: '{0}', String from project option: '{1}'")]
|
||||
Inconsistent(String, String),
|
||||
}
|
||||
|
||||
impl UserFacingError for ProjectNameError {}
|
||||
|
||||
impl ClientCredentials {
|
||||
/// Determine project name from SNI.
|
||||
/// Determine project name from SNI or from project_name parameter from options argument.
|
||||
pub fn project_name(&self) -> Result<&str, ProjectNameError> {
|
||||
// Currently project name is passed as a top level domain
|
||||
let sni = self.sni_data.as_ref().ok_or(ProjectNameError::Missing)?;
|
||||
let (first, _) = sni.split_once('.').ok_or(ProjectNameError::Bad)?;
|
||||
Ok(first)
|
||||
// Checking that if both sni_data and project_name are set, then they should match
|
||||
// otherwise, throws a ProjectNameError::Inconsistent error.
|
||||
if let Some(sni_data) = &self.sni_data {
|
||||
let project_name_from_sni_data =
|
||||
sni_data.split_once('.').ok_or(ProjectNameError::Bad)?.0;
|
||||
if let Some(project_name_from_options) = &self.project_name {
|
||||
if !project_name_from_options.eq(project_name_from_sni_data) {
|
||||
return Err(ProjectNameError::Inconsistent(
|
||||
project_name_from_sni_data.to_string(),
|
||||
project_name_from_options.to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
// determine the project name from self.sni_data if it exists, otherwise from self.project_name.
|
||||
let ret = match &self.sni_data {
|
||||
// if sni_data exists, use it to determine project name
|
||||
Some(sni_data) => sni_data.split_once('.').ok_or(ProjectNameError::Bad)?.0,
|
||||
// otherwise use project_option if it was manually set thought options parameter.
|
||||
None => self
|
||||
.project_name
|
||||
.as_ref()
|
||||
.ok_or(ProjectNameError::Missing)?
|
||||
.as_str(),
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,11 +98,13 @@ impl TryFrom<HashMap<String, String>> for ClientCredentials {
|
||||
|
||||
let user = get_param("user")?;
|
||||
let dbname = get_param("database")?;
|
||||
let project_name = get_param("project").ok();
|
||||
|
||||
Ok(Self {
|
||||
user,
|
||||
dbname,
|
||||
sni_data: None,
|
||||
project_name,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,12 +29,11 @@ pub struct SafekeeperPostgresHandler {
|
||||
pub ztenantid: Option<ZTenantId>,
|
||||
pub ztimelineid: Option<ZTimelineId>,
|
||||
pub timeline: Option<Arc<Timeline>>,
|
||||
pageserver_connstr: Option<String>,
|
||||
}
|
||||
|
||||
/// Parsed Postgres command.
|
||||
enum SafekeeperPostgresCommand {
|
||||
StartWalPush { pageserver_connstr: Option<String> },
|
||||
StartWalPush,
|
||||
StartReplication { start_lsn: Lsn },
|
||||
IdentifySystem,
|
||||
JSONCtrl { cmd: AppendLogicalMessage },
|
||||
@@ -42,11 +41,7 @@ enum SafekeeperPostgresCommand {
|
||||
|
||||
fn parse_cmd(cmd: &str) -> Result<SafekeeperPostgresCommand> {
|
||||
if cmd.starts_with("START_WAL_PUSH") {
|
||||
let re = Regex::new(r"START_WAL_PUSH(?: (.+))?").unwrap();
|
||||
|
||||
let caps = re.captures(cmd).unwrap();
|
||||
let pageserver_connstr = caps.get(1).map(|m| m.as_str().to_owned());
|
||||
Ok(SafekeeperPostgresCommand::StartWalPush { pageserver_connstr })
|
||||
Ok(SafekeeperPostgresCommand::StartWalPush)
|
||||
} else if cmd.starts_with("START_REPLICATION") {
|
||||
let re =
|
||||
Regex::new(r"START_REPLICATION(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)").unwrap();
|
||||
@@ -86,8 +81,6 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler {
|
||||
self.appname = Some(app_name.clone());
|
||||
}
|
||||
|
||||
self.pageserver_connstr = params.get("pageserver_connstr").cloned();
|
||||
|
||||
Ok(())
|
||||
} else {
|
||||
bail!("Safekeeper received unexpected initial message: {:?}", sm);
|
||||
@@ -113,14 +106,14 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler {
|
||||
}
|
||||
|
||||
match cmd {
|
||||
SafekeeperPostgresCommand::StartWalPush { pageserver_connstr } => {
|
||||
ReceiveWalConn::new(pgb, pageserver_connstr)
|
||||
SafekeeperPostgresCommand::StartWalPush => {
|
||||
ReceiveWalConn::new(pgb)
|
||||
.run(self)
|
||||
.context("failed to run ReceiveWalConn")?;
|
||||
}
|
||||
SafekeeperPostgresCommand::StartReplication { start_lsn } => {
|
||||
ReplicationConn::new(pgb)
|
||||
.run(self, pgb, start_lsn, self.pageserver_connstr.clone())
|
||||
.run(self, pgb, start_lsn)
|
||||
.context("failed to run ReplicationConn")?;
|
||||
}
|
||||
SafekeeperPostgresCommand::IdentifySystem => {
|
||||
@@ -142,7 +135,6 @@ impl SafekeeperPostgresHandler {
|
||||
ztenantid: None,
|
||||
ztimelineid: None,
|
||||
timeline: None,
|
||||
pageserver_connstr: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -32,22 +32,14 @@ pub struct ReceiveWalConn<'pg> {
|
||||
pg_backend: &'pg mut PostgresBackend,
|
||||
/// The cached result of `pg_backend.socket().peer_addr()` (roughly)
|
||||
peer_addr: SocketAddr,
|
||||
/// Pageserver connection string forwarded from compute
|
||||
/// NOTE that it is allowed to operate without a pageserver.
|
||||
/// So if compute has no pageserver configured do not use it.
|
||||
pageserver_connstr: Option<String>,
|
||||
}
|
||||
|
||||
impl<'pg> ReceiveWalConn<'pg> {
|
||||
pub fn new(
|
||||
pg: &'pg mut PostgresBackend,
|
||||
pageserver_connstr: Option<String>,
|
||||
) -> ReceiveWalConn<'pg> {
|
||||
pub fn new(pg: &'pg mut PostgresBackend) -> ReceiveWalConn<'pg> {
|
||||
let peer_addr = *pg.get_peer_addr();
|
||||
ReceiveWalConn {
|
||||
pg_backend: pg,
|
||||
peer_addr,
|
||||
pageserver_connstr,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -120,9 +112,7 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
// Register the connection and defer unregister. Do that only
|
||||
// after processing first message, as it sets wal_seg_size,
|
||||
// wanted by many.
|
||||
spg.timeline
|
||||
.get()
|
||||
.on_compute_connect(self.pageserver_connstr.as_ref())?;
|
||||
spg.timeline.get().on_compute_connect()?;
|
||||
_guard = Some(ComputeConnectionGuard {
|
||||
timeline: Arc::clone(spg.timeline.get()),
|
||||
});
|
||||
|
||||
@@ -162,9 +162,8 @@ impl ReplicationConn {
|
||||
spg: &mut SafekeeperPostgresHandler,
|
||||
pgb: &mut PostgresBackend,
|
||||
mut start_pos: Lsn,
|
||||
pageserver_connstr: Option<String>,
|
||||
) -> Result<()> {
|
||||
let _enter = info_span!("WAL sender", timeline = %spg.ztimelineid.unwrap(), pageserver_connstr = %pageserver_connstr.as_deref().unwrap_or_default()).entered();
|
||||
let _enter = info_span!("WAL sender", timeline = %spg.ztimelineid.unwrap()).entered();
|
||||
|
||||
// spawn the background thread which receives HotStandbyFeedback messages.
|
||||
let bg_timeline = Arc::clone(spg.timeline.get());
|
||||
|
||||
@@ -95,7 +95,6 @@ struct SharedState {
|
||||
/// when tli is inactive instead of having this flag.
|
||||
active: bool,
|
||||
num_computes: u32,
|
||||
pageserver_connstr: Option<String>,
|
||||
last_removed_segno: XLogSegNo,
|
||||
}
|
||||
|
||||
@@ -119,7 +118,6 @@ impl SharedState {
|
||||
wal_backup_active: false,
|
||||
active: false,
|
||||
num_computes: 0,
|
||||
pageserver_connstr: None,
|
||||
last_removed_segno: 0,
|
||||
})
|
||||
}
|
||||
@@ -139,7 +137,6 @@ impl SharedState {
|
||||
wal_backup_active: false,
|
||||
active: false,
|
||||
num_computes: 0,
|
||||
pageserver_connstr: None,
|
||||
last_removed_segno: 0,
|
||||
})
|
||||
}
|
||||
@@ -190,35 +187,6 @@ impl SharedState {
|
||||
self.wal_backup_active
|
||||
}
|
||||
|
||||
/// Activate timeline's walsender: start/change timeline information propagated into etcd for further pageserver connections.
|
||||
fn activate_walsender(
|
||||
&mut self,
|
||||
zttid: &ZTenantTimelineId,
|
||||
new_pageserver_connstr: Option<String>,
|
||||
) {
|
||||
if self.pageserver_connstr != new_pageserver_connstr {
|
||||
self.deactivate_walsender(zttid);
|
||||
|
||||
if new_pageserver_connstr.is_some() {
|
||||
info!(
|
||||
"timeline {} has activated its walsender with connstr {new_pageserver_connstr:?}",
|
||||
zttid.timeline_id,
|
||||
);
|
||||
}
|
||||
self.pageserver_connstr = new_pageserver_connstr;
|
||||
}
|
||||
}
|
||||
|
||||
/// Deactivate the timeline: stop sending the timeline data into etcd, so no pageserver can connect for WAL streaming.
|
||||
fn deactivate_walsender(&mut self, zttid: &ZTenantTimelineId) {
|
||||
if let Some(pageserver_connstr) = self.pageserver_connstr.take() {
|
||||
info!(
|
||||
"timeline {} had deactivated its wallsender with connstr {pageserver_connstr:?}",
|
||||
zttid.timeline_id,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fn get_wal_seg_size(&self) -> usize {
|
||||
self.sk.state.server.wal_seg_size as usize
|
||||
}
|
||||
@@ -318,17 +286,12 @@ impl Timeline {
|
||||
/// Register compute connection, starting timeline-related activity if it is
|
||||
/// not running yet.
|
||||
/// Can fail only if channel to a static thread got closed, which is not normal at all.
|
||||
pub fn on_compute_connect(&self, pageserver_connstr: Option<&String>) -> Result<()> {
|
||||
pub fn on_compute_connect(&self) -> Result<()> {
|
||||
let is_wal_backup_action_pending: bool;
|
||||
{
|
||||
let mut shared_state = self.mutex.lock().unwrap();
|
||||
shared_state.num_computes += 1;
|
||||
is_wal_backup_action_pending = shared_state.update_status();
|
||||
// FIXME: currently we always adopt latest pageserver connstr, but we
|
||||
// should have kind of generations assigned by compute to distinguish
|
||||
// the latest one or even pass it through consensus to reliably deliver
|
||||
// to all safekeepers.
|
||||
shared_state.activate_walsender(&self.zttid, pageserver_connstr.cloned());
|
||||
}
|
||||
// Wake up wal backup launcher, if offloading not started yet.
|
||||
if is_wal_backup_action_pending {
|
||||
@@ -364,7 +327,7 @@ impl Timeline {
|
||||
(replica_state.remote_consistent_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet.
|
||||
replica_state.remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn);
|
||||
if stop {
|
||||
shared_state.deactivate_walsender(&self.zttid);
|
||||
shared_state.update_status();
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
@@ -525,7 +488,6 @@ impl Timeline {
|
||||
)),
|
||||
peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn),
|
||||
safekeeper_connstr: Some(conf.listen_pg_addr.clone()),
|
||||
pageserver_connstr: shared_state.pageserver_connstr.clone(),
|
||||
backup_lsn: Some(shared_state.sk.inmem.backup_lsn),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
|
||||
'compaction_target_size': '4194304',
|
||||
})
|
||||
|
||||
env.pageserver.safe_psql("failpoints flush-frozen=sleep(10000)")
|
||||
env.pageserver.safe_psql("failpoints flush-frozen-before-sync=sleep(10000)")
|
||||
|
||||
pg_branch0 = env.postgres.create_start('main', tenant_id=tenant)
|
||||
branch0_cur = pg_branch0.connect().cursor()
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, NeonPageserverHttpClient
|
||||
import pytest
|
||||
|
||||
|
||||
def check_tenant(env: NeonEnv, pageserver_http: NeonPageserverHttpClient):
|
||||
@@ -26,7 +27,8 @@ def check_tenant(env: NeonEnv, pageserver_http: NeonPageserverHttpClient):
|
||||
pageserver_http.timeline_detach(tenant_id, timeline_id)
|
||||
|
||||
|
||||
def test_normal_work(neon_env_builder: NeonEnvBuilder):
|
||||
@pytest.mark.parametrize('num_timelines,num_safekeepers', [(3, 1)])
|
||||
def test_normal_work(neon_env_builder: NeonEnvBuilder, num_timelines: int, num_safekeepers: int):
|
||||
"""
|
||||
Basic test:
|
||||
* create new tenant with a timeline
|
||||
@@ -41,7 +43,8 @@ def test_normal_work(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
neon_env_builder.num_safekeepers = num_safekeepers
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
for _ in range(3):
|
||||
for _ in range(num_timelines):
|
||||
check_tenant(env, pageserver_http)
|
||||
|
||||
@@ -45,7 +45,8 @@ def test_pageserver_recovery(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# Configure failpoints
|
||||
pscur.execute(
|
||||
"failpoints checkpoint-before-sync=sleep(2000);checkpoint-after-sync=exit")
|
||||
"failpoints flush-frozen-before-sync=sleep(2000);checkpoint-after-sync=exit"
|
||||
)
|
||||
|
||||
# Do some updates until pageserver is crashed
|
||||
try:
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
pytest_plugins = (
|
||||
"fixtures.neon_fixtures",
|
||||
"fixtures.benchmark_fixture",
|
||||
"fixtures.compare_fixtures",
|
||||
"fixtures.slow",
|
||||
)
|
||||
pytest_plugins = ("fixtures.neon_fixtures",
|
||||
"fixtures.benchmark_fixture",
|
||||
"fixtures.compare_fixtures",
|
||||
"fixtures.slow",
|
||||
"fixtures.pg_stats")
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
import pytest
|
||||
from contextlib import contextmanager
|
||||
from abc import ABC, abstractmethod
|
||||
from fixtures.pg_stats import PgStatTable
|
||||
|
||||
from fixtures.neon_fixtures import PgBin, PgProtocol, VanillaPostgres, RemotePostgres, NeonEnv
|
||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||
|
||||
# Type-related stuff
|
||||
from typing import Iterator
|
||||
from typing import Dict, List
|
||||
|
||||
|
||||
class PgCompare(ABC):
|
||||
@@ -51,6 +52,31 @@ class PgCompare(ABC):
|
||||
def record_duration(self, out_name):
|
||||
pass
|
||||
|
||||
@contextmanager
|
||||
def record_pg_stats(self, pg_stats: List[PgStatTable]):
|
||||
init_data = self._retrieve_pg_stats(pg_stats)
|
||||
|
||||
yield
|
||||
|
||||
data = self._retrieve_pg_stats(pg_stats)
|
||||
|
||||
for k in set(init_data) & set(data):
|
||||
self.zenbenchmark.record(k, data[k] - init_data[k], '', MetricReport.HIGHER_IS_BETTER)
|
||||
|
||||
def _retrieve_pg_stats(self, pg_stats: List[PgStatTable]) -> Dict[str, int]:
|
||||
results: Dict[str, int] = {}
|
||||
|
||||
with self.pg.connect().cursor() as cur:
|
||||
for pg_stat in pg_stats:
|
||||
cur.execute(pg_stat.query)
|
||||
row = cur.fetchone()
|
||||
assert len(row) == len(pg_stat.columns)
|
||||
|
||||
for col, val in zip(pg_stat.columns, row):
|
||||
results[f"{pg_stat.table}.{col}"] = int(val)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
class NeonCompare(PgCompare):
|
||||
"""PgCompare interface for the neon stack."""
|
||||
|
||||
52
test_runner/fixtures/pg_stats.py
Normal file
52
test_runner/fixtures/pg_stats.py
Normal file
@@ -0,0 +1,52 @@
|
||||
from typing import List
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
class PgStatTable:
|
||||
table: str
|
||||
columns: List[str]
|
||||
additional_query: str
|
||||
|
||||
def __init__(self, table: str, columns: List[str], filter_query: str = ""):
|
||||
self.table = table
|
||||
self.columns = columns
|
||||
self.additional_query = filter_query
|
||||
|
||||
@property
|
||||
def query(self) -> str:
|
||||
return f"SELECT {','.join(self.columns)} FROM {self.table} {self.additional_query}"
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def pg_stats_rw() -> List[PgStatTable]:
|
||||
return [
|
||||
PgStatTable("pg_stat_database",
|
||||
["tup_returned", "tup_fetched", "tup_inserted", "tup_updated", "tup_deleted"],
|
||||
"WHERE datname='postgres'"),
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def pg_stats_ro() -> List[PgStatTable]:
|
||||
return [
|
||||
PgStatTable("pg_stat_database", ["tup_returned", "tup_fetched"],
|
||||
"WHERE datname='postgres'"),
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def pg_stats_wo() -> List[PgStatTable]:
|
||||
return [
|
||||
PgStatTable("pg_stat_database", ["tup_inserted", "tup_updated", "tup_deleted"],
|
||||
"WHERE datname='postgres'"),
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def pg_stats_wal() -> List[PgStatTable]:
|
||||
return [
|
||||
PgStatTable("pg_stat_wal",
|
||||
["wal_records", "wal_fpi", "wal_bytes", "wal_buffers_full", "wal_write"],
|
||||
"")
|
||||
]
|
||||
101
test_runner/performance/test_compare_pg_stats.py
Normal file
101
test_runner/performance/test_compare_pg_stats.py
Normal file
@@ -0,0 +1,101 @@
|
||||
import os
|
||||
from typing import List
|
||||
|
||||
import pytest
|
||||
from fixtures.compare_fixtures import PgCompare
|
||||
from fixtures.pg_stats import PgStatTable
|
||||
|
||||
from performance.test_perf_pgbench import get_durations_matrix, get_scales_matrix
|
||||
|
||||
|
||||
def get_seeds_matrix(default: int = 100):
|
||||
seeds = os.getenv("TEST_PG_BENCH_SEEDS_MATRIX", default=str(default))
|
||||
return list(map(int, seeds.split(",")))
|
||||
|
||||
|
||||
@pytest.mark.parametrize("seed", get_seeds_matrix())
|
||||
@pytest.mark.parametrize("scale", get_scales_matrix())
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix(5))
|
||||
def test_compare_pg_stats_rw_with_pgbench_default(neon_with_baseline: PgCompare,
|
||||
seed: int,
|
||||
scale: int,
|
||||
duration: int,
|
||||
pg_stats_rw: List[PgStatTable]):
|
||||
env = neon_with_baseline
|
||||
# initialize pgbench
|
||||
env.pg_bin.run_capture(['pgbench', f'-s{scale}', '-i', env.pg.connstr()])
|
||||
env.flush()
|
||||
|
||||
with env.record_pg_stats(pg_stats_rw):
|
||||
env.pg_bin.run_capture(
|
||||
['pgbench', f'-T{duration}', f'--random-seed={seed}', '-Mprepared', env.pg.connstr()])
|
||||
env.flush()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("seed", get_seeds_matrix())
|
||||
@pytest.mark.parametrize("scale", get_scales_matrix())
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix(5))
|
||||
def test_compare_pg_stats_wo_with_pgbench_simple_update(neon_with_baseline: PgCompare,
|
||||
seed: int,
|
||||
scale: int,
|
||||
duration: int,
|
||||
pg_stats_wo: List[PgStatTable]):
|
||||
env = neon_with_baseline
|
||||
# initialize pgbench
|
||||
env.pg_bin.run_capture(['pgbench', f'-s{scale}', '-i', env.pg.connstr()])
|
||||
env.flush()
|
||||
|
||||
with env.record_pg_stats(pg_stats_wo):
|
||||
env.pg_bin.run_capture([
|
||||
'pgbench',
|
||||
'-N',
|
||||
f'-T{duration}',
|
||||
f'--random-seed={seed}',
|
||||
'-Mprepared',
|
||||
env.pg.connstr()
|
||||
])
|
||||
env.flush()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("seed", get_seeds_matrix())
|
||||
@pytest.mark.parametrize("scale", get_scales_matrix())
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix(5))
|
||||
def test_compare_pg_stats_ro_with_pgbench_select_only(neon_with_baseline: PgCompare,
|
||||
seed: int,
|
||||
scale: int,
|
||||
duration: int,
|
||||
pg_stats_ro: List[PgStatTable]):
|
||||
env = neon_with_baseline
|
||||
# initialize pgbench
|
||||
env.pg_bin.run_capture(['pgbench', f'-s{scale}', '-i', env.pg.connstr()])
|
||||
env.flush()
|
||||
|
||||
with env.record_pg_stats(pg_stats_ro):
|
||||
env.pg_bin.run_capture([
|
||||
'pgbench',
|
||||
'-S',
|
||||
f'-T{duration}',
|
||||
f'--random-seed={seed}',
|
||||
'-Mprepared',
|
||||
env.pg.connstr()
|
||||
])
|
||||
env.flush()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("seed", get_seeds_matrix())
|
||||
@pytest.mark.parametrize("scale", get_scales_matrix())
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix(5))
|
||||
def test_compare_pg_stats_wal_with_pgbench_default(neon_with_baseline: PgCompare,
|
||||
seed: int,
|
||||
scale: int,
|
||||
duration: int,
|
||||
pg_stats_wal: List[PgStatTable]):
|
||||
env = neon_with_baseline
|
||||
# initialize pgbench
|
||||
env.pg_bin.run_capture(['pgbench', f'-s{scale}', '-i', env.pg.connstr()])
|
||||
env.flush()
|
||||
|
||||
with env.record_pg_stats(pg_stats_wal):
|
||||
env.pg_bin.run_capture(
|
||||
['pgbench', f'-T{duration}', f'--random-seed={seed}', '-Mprepared', env.pg.connstr()])
|
||||
env.flush()
|
||||
@@ -79,7 +79,7 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int):
|
||||
# Run simple-update workload
|
||||
run_pgbench(env,
|
||||
"simple-update",
|
||||
['pgbench', '-n', '-c4', f'-T{duration}', '-P2', '-Mprepared', env.pg.connstr()])
|
||||
['pgbench', '-N', '-c4', f'-T{duration}', '-P2', '-Mprepared', env.pg.connstr()])
|
||||
|
||||
# Run SELECT workload
|
||||
run_pgbench(env,
|
||||
@@ -89,13 +89,13 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int):
|
||||
env.report_size()
|
||||
|
||||
|
||||
def get_durations_matrix():
|
||||
durations = os.getenv("TEST_PG_BENCH_DURATIONS_MATRIX", default="45")
|
||||
def get_durations_matrix(default: int = 45):
|
||||
durations = os.getenv("TEST_PG_BENCH_DURATIONS_MATRIX", default=str(default))
|
||||
return list(map(int, durations.split(",")))
|
||||
|
||||
|
||||
def get_scales_matrix():
|
||||
scales = os.getenv("TEST_PG_BENCH_SCALES_MATRIX", default="10")
|
||||
def get_scales_matrix(default: int = 10):
|
||||
scales = os.getenv("TEST_PG_BENCH_SCALES_MATRIX", default=str(default))
|
||||
return list(map(int, scales.split(",")))
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user