tests: remove tests that depend on libpq

This commit is contained in:
Vlad Lazar
2025-07-29 17:35:47 +01:00
parent e92ed85e9a
commit 2f9cc9a11e
4 changed files with 10 additions and 904 deletions

View File

@@ -431,419 +431,3 @@ async fn get_pageserver_connection_info(
}
Ok(shards)
}
#[cfg(test)]
mod test {
use std::collections::BTreeMap;
use super::*;
use crate::schema::hadron_safekeepers;
use diesel::PgConnection;
use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
use pageserver_api::controller_api::{SCSafekeeperTimeline, TimelineSafekeeperPeer};
use postgresql_archive::VersionReq;
use postgresql_embedded::Settings;
use postgresql_embedded::blocking::PostgreSQL;
async fn get_embedded_pg() -> postgresql_embedded::Result<PostgreSQL> {
tokio::task::spawn_blocking(|| {
let pg_install_dir = "../pg_install/16.0.0";
// Link "pg_install/v16" -> "pg_install/16.0.0" so that it can be picked up by the postgres_embedded
// crate without needing to download anything. The postgres_embedded crate expects a specific format
// for the directory name.
let _ = std::os::unix::fs::symlink("./v16", pg_install_dir);
let settings = Settings {
installation_dir: std::path::PathBuf::from(pg_install_dir),
username: "postgres".to_string(),
password: "password".to_string(),
// Use a 30-second timeout for database initialization to avoid flakiness in the CI environment.
timeout: Some(std::time::Duration::from_secs(30)),
version: VersionReq::parse("=16.0.0").unwrap(),
..Default::default()
};
let mut pg = PostgreSQL::new(settings);
pg.setup()?;
pg.start()?;
pg.create_database("test")?;
Ok(pg)
})
.await
.unwrap()
}
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations");
fn run_migrations(connection: &mut PgConnection) -> Result<(), String> {
connection.run_pending_migrations(MIGRATIONS).unwrap();
Ok(())
}
fn get_test_sk_node(id: u64) -> SafeKeeperNode {
SafeKeeperNode::new(
NodeId(id),
format!("safekeeper-{id}"),
123,
format!("safekeeper-{id}"),
456,
)
}
#[tokio::test]
async fn test_safekeeper_upserts_and_list() {
let pg = get_embedded_pg().await.unwrap();
let connection_string = pg.settings().url("test");
{
let mut conn = PgConnection::establish(&connection_string)
.unwrap_or_else(|_| panic!("Error connecting to {connection_string}"));
run_migrations(&mut conn).unwrap();
}
let mut connection = AsyncPgConnection::establish(&connection_string)
.await
.unwrap_or_else(|_| panic!("Error connecting to {connection_string}"));
execute_sk_upsert(&mut connection, get_test_sk_node(0).to_database_row())
.await
.unwrap();
execute_sk_upsert(&mut connection, get_test_sk_node(1).to_database_row())
.await
.unwrap();
execute_sk_upsert(&mut connection, get_test_sk_node(2).to_database_row())
.await
.unwrap();
// Insert an entry into the hadron_timeline_safekeepers table.
use crate::schema::hadron_timeline_safekeepers;
let timeline1_id = TimelineId::generate();
diesel::insert_into(hadron_timeline_safekeepers::table)
.values(&HadronTimelineSafekeeper {
timeline_id: timeline1_id.to_string(),
sk_node_id: 0,
legacy_endpoint_id: None,
})
.execute(&mut connection)
.await
.expect("Failed to insert timeline1");
// Test that the nodes have indeed been inserted
let sk_nodes = hadron_safekeepers::table
.load::<HadronSafekeeperRow>(&mut connection)
.await
.unwrap();
assert_eq!(sk_nodes.len(), 3);
assert_eq!(sk_nodes[0].sk_node_id, 0);
assert_eq!(sk_nodes[1].sk_node_id, 1);
assert_eq!(sk_nodes[2].sk_node_id, 2);
// Test that we can read the nodes back out in the join query, where we pull all the Safekeepers along with their endpoints scheduled.
// There should be no endpoints in this test, verify that nothing breaks.
let sk_nodes = scan_safekeepers_and_scheduled_timelines(&mut connection)
.await
.unwrap();
assert_eq!(sk_nodes.len(), 3);
assert_eq!(sk_nodes[&NodeId(0)].legacy_endpoints.len(), 0);
assert_eq!(sk_nodes[&NodeId(1)].legacy_endpoints.len(), 0);
assert_eq!(sk_nodes[&NodeId(2)].legacy_endpoints.len(), 0);
// Test that only the 0th safekeeper is assigned to the timeline.
assert_eq!(sk_nodes[&NodeId(0)].timelines.len(), 1);
assert_eq!(sk_nodes[&NodeId(1)].timelines.len(), 0);
assert_eq!(sk_nodes[&NodeId(2)].timelines.len(), 0);
}
#[tokio::test]
async fn test_idempotently_persist_or_get_existing_timeline_safekeepers() {
let pg = get_embedded_pg().await.unwrap();
let connection_string = pg.settings().url("test");
{
let mut conn = PgConnection::establish(&connection_string)
.unwrap_or_else(|_| panic!("Error connecting to {connection_string}"));
run_migrations(&mut conn).unwrap();
}
let mut connection = AsyncPgConnection::establish(&connection_string)
.await
.unwrap_or_else(|_| panic!("Error connecting to {connection_string}"));
// An initial call should insert the timeline safekeepers and return the inserted values.
let timeline1_id = TimelineId::generate();
let safekeeper_ids = vec![NodeId(1), NodeId(2), NodeId(3)];
let inserted = idempotently_persist_or_get_existing_timeline_safekeepers(
&mut connection,
timeline1_id,
&safekeeper_ids,
)
.await
.expect("Failed to insert timeline safekeepers");
assert_eq!(inserted, safekeeper_ids);
// A second call with the same timeline should return the same safekeeper IDs.
let retrieved = idempotently_persist_or_get_existing_timeline_safekeepers(
&mut connection,
timeline1_id,
&[NodeId(4), NodeId(5), NodeId(6)],
)
.await
.expect("Failed to retrieve timeline safekeepers");
assert_eq!(retrieved, safekeeper_ids);
}
async fn load_timelines_by_sk_node(
conn: &mut AsyncPgConnection,
) -> DatabaseResult<BTreeMap<i64, Vec<String>>> {
use crate::schema::hadron_timeline_safekeepers;
let rows = hadron_timeline_safekeepers::table
.select((
hadron_timeline_safekeepers::sk_node_id,
hadron_timeline_safekeepers::timeline_id,
))
.load::<(i64, String)>(conn)
.await?;
let mut timelines_by_sk_node = BTreeMap::new();
for (sk_node_id, timeline_id) in rows {
timelines_by_sk_node
.entry(sk_node_id)
.or_insert_with(Vec::new)
.push(timeline_id);
}
Ok(timelines_by_sk_node)
}
#[tokio::test]
async fn test_delete_timeline_safekeepers() {
let pg = get_embedded_pg().await.unwrap();
let connection_string = pg.settings().url("test");
{
let mut conn = PgConnection::establish(&connection_string)
.unwrap_or_else(|_| panic!("Error connecting to {connection_string}"));
run_migrations(&mut conn).unwrap();
}
let mut connection = AsyncPgConnection::establish(&connection_string)
.await
.unwrap_or_else(|_| panic!("Error connecting to {connection_string}"));
// Insert some values
let timeline1_id = TimelineId::generate();
let safekeeper_ids = vec![NodeId(1), NodeId(2), NodeId(3)];
idempotently_persist_or_get_existing_timeline_safekeepers(
&mut connection,
timeline1_id,
&safekeeper_ids,
)
.await
.expect("Failed to insert timeline safekeepers");
// Validate that the values were inserted
let inserted = load_timelines_by_sk_node(&mut connection)
.await
.expect("Failed to load timelines by sk node");
assert_eq!(inserted.get(&1).unwrap().len(), 1);
assert_eq!(inserted.get(&2).unwrap().len(), 1);
assert_eq!(inserted.get(&3).unwrap().len(), 1);
// Delete the values
delete_timeline_safekeepers(&mut connection, timeline1_id)
.await
.expect("Failed to delete timeline safekeepers");
// Validate that the values were deleted
let deleted = load_timelines_by_sk_node(&mut connection)
.await
.expect("Failed to load timelines by sk node");
assert!(deleted.is_empty());
}
fn assert_list_safekeeper_timelines(
actual: &mut SCSafekeeperTimelinesResponse,
expected: &mut SCSafekeeperTimelinesResponse,
) {
assert_eq!(actual.timelines.len(), expected.timelines.len());
assert_eq!(
actual.safekeeper_peers.len(),
expected.safekeeper_peers.len()
);
actual.timelines.sort_by_key(|item| item.timeline_id);
expected.timelines.sort_by_key(|item| item.timeline_id);
actual.safekeeper_peers.sort_by_key(|item| item.node_id);
expected.safekeeper_peers.sort_by_key(|item| item.node_id);
for i in 0..actual.timelines.len() {
let mut at = actual.timelines[i].clone();
let mut et = expected.timelines[i].clone();
at.peers.sort_by_key(|item| item.0);
et.peers.sort_by_key(|item| item.0);
assert_eq!(at.timeline_id, et.timeline_id);
assert!(
at.peers.iter().eq(et.peers.iter()),
"at peers: {:#?}, et peers: {:#?}",
at.peers,
et.peers
);
}
for i in 0..actual.safekeeper_peers.len() {
let at = actual.safekeeper_peers[i].clone();
let et = expected.safekeeper_peers[i].clone();
assert_eq!(at.node_id, et.node_id);
assert_eq!(at.listen_http_addr, et.listen_http_addr);
assert_eq!(at.http_port, et.http_port);
}
}
#[tokio::test]
async fn test_list_safekeeper_timelines() {
let pg = get_embedded_pg().await.unwrap();
let connection_string = pg.settings().url("test");
{
let mut conn = PgConnection::establish(&connection_string)
.unwrap_or_else(|_| panic!("Error connecting to {connection_string}"));
run_migrations(&mut conn).unwrap();
}
let mut connection = AsyncPgConnection::establish(&connection_string)
.await
.unwrap_or_else(|_| panic!("Error connecting to {connection_string}"));
// Insert some values
let safekeeper_ids = vec![
NodeId(0),
NodeId(1),
NodeId(2),
NodeId(3),
NodeId(4),
NodeId(5),
];
for safekeeper_id in &safekeeper_ids {
execute_sk_upsert(
&mut connection,
get_test_sk_node(safekeeper_id.0).to_database_row(),
)
.await
.unwrap();
}
// Create some endpoints.
// 5 use SK-0/1/2
// 5 use SK-2/3/4
let mut timeline_ids = Vec::new();
for i in 0..10 {
let timeline_id = TimelineId::generate();
timeline_ids.push(timeline_id);
let safekeepers = if i < 5 {
vec![NodeId(0), NodeId(1), NodeId(2)]
} else {
vec![NodeId(2), NodeId(3), NodeId(4)]
};
idempotently_persist_or_get_existing_timeline_safekeepers(
&mut connection,
timeline_id,
&safekeepers,
)
.await
.unwrap();
}
// SK-0/1 owns the first 5 timelines.
// SK-2 owns all 10 timelines.
// SK-3/4 owns the last 5 timelines.
// SK-5 owns no timelines.
// SK-6 does not exist.
let mut expected_responses = vec![
SCSafekeeperTimelinesResponse {
timelines: Vec::new(),
safekeeper_peers: Vec::new(),
};
7
];
// SC does not know the tenant ids.
for (i, timeline_id) in timeline_ids.iter().enumerate().take(10) {
if i < 5 {
expected_responses[0].timelines.push(SCSafekeeperTimeline {
timeline_id: *timeline_id,
peers: vec![NodeId(0), NodeId(1), NodeId(2)],
});
expected_responses[2].timelines.push(SCSafekeeperTimeline {
timeline_id: *timeline_id,
peers: vec![NodeId(0), NodeId(1), NodeId(2)],
});
continue;
}
expected_responses[2].timelines.push(SCSafekeeperTimeline {
timeline_id: *timeline_id,
peers: vec![NodeId(2), NodeId(3), NodeId(4)],
});
expected_responses[3].timelines.push(SCSafekeeperTimeline {
timeline_id: *timeline_id,
peers: vec![NodeId(2), NodeId(3), NodeId(4)],
});
}
for i in 0..5 {
expected_responses[2]
.safekeeper_peers
.push(TimelineSafekeeperPeer {
node_id: NodeId(i),
listen_http_addr: format!("safekeeper-{i}"),
http_port: 123,
});
if i < 3 {
expected_responses[0]
.safekeeper_peers
.push(TimelineSafekeeperPeer {
node_id: NodeId(i),
listen_http_addr: format!("safekeeper-{i}"),
http_port: 123,
});
expected_responses[3]
.safekeeper_peers
.push(TimelineSafekeeperPeer {
node_id: NodeId(i + 2),
listen_http_addr: format!("safekeeper-{}", i + 2),
http_port: 123,
});
}
}
expected_responses[1] = expected_responses[0].clone();
expected_responses[4] = expected_responses[3].clone();
for safekeeper_id in &safekeeper_ids {
let sk_timelines: Result<SCSafekeeperTimelinesResponse, DatabaseError> =
execute_safekeeper_list_timelines(
&mut connection,
safekeeper_id.0.try_into().unwrap(),
)
.await;
assert!(sk_timelines.is_ok());
let mut sk_timelines: SCSafekeeperTimelinesResponse = sk_timelines.unwrap();
assert_list_safekeeper_timelines(
&mut sk_timelines,
&mut expected_responses[safekeeper_id.0 as usize],
);
}
}
}