use pageserver for pg list command

This commit is contained in:
Patrick Insinger
2021-05-10 15:03:44 -04:00
committed by Stas Kelvich
parent 2f2dff4c8d
commit 99d80aba52
8 changed files with 122 additions and 19 deletions

3
Cargo.lock generated
View File

@@ -1182,6 +1182,7 @@ dependencies = [
"rocksdb",
"rust-s3",
"serde",
"serde_json",
"slog",
"slog-async",
"slog-scope",
@@ -2449,7 +2450,9 @@ dependencies = [
"clap",
"control_plane",
"pageserver",
"postgres",
"postgres_ffi",
"serde_json",
"workspace_hack",
]

View File

@@ -39,7 +39,7 @@ hex = "0.4.3"
tar = "0.4.33"
parse_duration = "2.1.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
postgres_ffi = { path = "../postgres_ffi" }
zenith_utils = { path = "../zenith_utils" }

View File

@@ -0,0 +1,36 @@
use anyhow::Result;
use serde::{Deserialize, Serialize};
use zenith_utils::lsn::Lsn;
use crate::{repository::Repository, ZTimelineId};
#[derive(Serialize, Deserialize)]
pub struct BranchInfo {
pub name: String,
pub timeline_id: String,
pub latest_valid_lsn: Option<Lsn>,
}
pub(crate) fn get_branches(repository: &dyn Repository) -> Result<Vec<BranchInfo>> {
// adapted from CLI code
let branches_dir = std::path::Path::new("refs").join("branches");
std::fs::read_dir(&branches_dir)?
.map(|dir_entry_res| {
let dir_entry = dir_entry_res?;
let name = dir_entry.file_name().to_str().unwrap().to_string();
let timeline_id = std::fs::read_to_string(dir_entry.path())?.parse::<ZTimelineId>()?;
let latest_valid_lsn = repository
.get_timeline(timeline_id)
.map(|timeline| timeline.get_last_valid_lsn())
.ok();
Ok(BranchInfo {
name,
timeline_id: timeline_id.to_string(),
latest_valid_lsn,
})
})
.collect()
}

View File

@@ -1,4 +1,5 @@
use serde::{Deserialize, Serialize};
use std::fmt;
use std::net::SocketAddr;
use std::path::PathBuf;
@@ -6,6 +7,7 @@ use std::str::FromStr;
use std::time::Duration;
pub mod basebackup;
pub mod branches;
pub mod page_cache;
pub mod page_service;
pub mod repository;

View File

@@ -62,7 +62,7 @@ enum BeMessage {
NoData,
BindComplete,
CloseComplete,
DataRow,
DataRow(Bytes),
CommandComplete,
ControlFile,
@@ -74,6 +74,8 @@ enum BeMessage {
ZenithReadResponse(ZenithReadResponse),
}
const HELLO_WORLD_ROW: BeMessage = BeMessage::DataRow(Bytes::from_static(b"hello world"));
#[derive(Debug)]
struct ZenithRequest {
spcnode: u32,
@@ -502,10 +504,7 @@ impl Connection {
}
// XXX: accept some text data
BeMessage::DataRow => {
// XXX
let b = Bytes::from("hello world");
BeMessage::DataRow(b) => {
self.stream.write_u8(b'D')?;
self.stream.write_i32::<BE>(4 + 2 + 4 + b.len() as i32)?;
@@ -683,11 +682,19 @@ impl Connection {
walreceiver::launch_wal_receiver(&self.conf, timelineid, &connstr);
self.write_message_noflush(&BeMessage::CommandComplete)?;
self.write_message(&BeMessage::ReadyForQuery)?;
} else if query_string.starts_with(b"pg_list") {
let branches = crate::branches::get_branches(&*page_cache::get_repository())?;
let branches_buf = serde_json::to_vec(&branches)?;
self.write_message_noflush(&BeMessage::RowDescription)?;
self.write_message_noflush(&BeMessage::DataRow(Bytes::from(branches_buf)))?;
self.write_message_noflush(&BeMessage::CommandComplete)?;
self.write_message(&BeMessage::ReadyForQuery)?;
} else if query_string.starts_with(b"status") {
self.write_message_noflush(&BeMessage::RowDescription)?;
self.write_message_noflush(&BeMessage::DataRow)?;
self.write_message_noflush(&HELLO_WORLD_ROW)?;
self.write_message_noflush(&BeMessage::CommandComplete)?;
self.write_message(&BeMessage::ReadyForQuery)?;
} else if query_string.to_ascii_lowercase().starts_with(b"set ") {
@@ -697,7 +704,7 @@ impl Connection {
self.write_message(&BeMessage::ReadyForQuery)?;
} else {
self.write_message_noflush(&BeMessage::RowDescription)?;
self.write_message_noflush(&BeMessage::DataRow)?;
self.write_message_noflush(&HELLO_WORLD_ROW)?;
self.write_message_noflush(&BeMessage::CommandComplete)?;
self.write_message(&BeMessage::ReadyForQuery)?;
}

View File

@@ -1,6 +1,7 @@
import pytest
import psycopg2
import getpass
import json
pytest_plugins = ("fixtures.zenith_fixtures")
@@ -17,3 +18,34 @@ def test_status(zen_simple):
cur.execute('status;')
assert cur.fetchone() == ('hello world',)
pg_conn.close()
def test_pg_list(zen_simple):
username = getpass.getuser()
page_server_conn_str = 'host={} port={} dbname=postgres user={}'.format(
HOST, PAGESERVER_PORT, username)
page_server_conn = psycopg2.connect(page_server_conn_str)
page_server_conn.autocommit = True
page_server_cur = page_server_conn.cursor()
page_server_cur.execute('pg_list;')
branches = json.loads(page_server_cur.fetchone()[0])
assert len(branches) == 1
assert branches[0]['name'] == 'main'
assert 'timeline_id' in branches[0]
assert 'latest_valid_lsn' in branches[0]
zen_simple.zenith_cli.run(['branch', 'experimental', 'main'])
zen_simple.zenith_cli.run(['pg', 'create', 'experimental'])
page_server_cur.execute('pg_list;')
new_branches = json.loads(page_server_cur.fetchone()[0])
assert len(new_branches) == 2
new_branches.sort(key=lambda k: k['name'])
assert new_branches[0]['name'] == 'experimental'
assert new_branches[0]['timeline_id'] != branches[0]['timeline_id']
# TODO: do the LSNs have to match here?
assert new_branches[1] == branches[0]
page_server_conn.close()

View File

@@ -9,6 +9,9 @@ edition = "2018"
[dependencies]
clap = "2.33.0"
anyhow = "1.0"
serde_json = "1"
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" }
# FIXME: 'pageserver' is needed for ZTimelineId. Refactor
pageserver = { path = "../pageserver" }

View File

@@ -11,7 +11,7 @@ use control_plane::local_env::LocalEnv;
use control_plane::storage::PageServerNode;
use control_plane::{compute::ComputeControlPlane, local_env, storage};
use pageserver::ZTimelineId;
use pageserver::{branches::BranchInfo, ZTimelineId};
fn zenith_repo_dir() -> PathBuf {
// Find repository path
@@ -175,23 +175,43 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
cplane.new_node(timeline)?;
}
("list", Some(_sub_m)) => {
let mut tl2branch = HashMap::<ZTimelineId, String>::new();
let branches_dir = zenith_repo_dir().join("refs").join("branches");
for path in fs::read_dir(branches_dir.clone())? {
let branch_name = path?.file_name().to_str().unwrap().to_string();
let branch_file = branches_dir.join(branch_name.clone());
let timelineid = fs::read_to_string(branch_file)?.parse::<ZTimelineId>()?;
tl2branch.insert(timelineid, branch_name);
}
let page_server = storage::PageServerNode::from_env(env);
let mut client = page_server.page_server_psql_client()?;
let branches_msgs = client.simple_query("pg_list")?;
println!("NODE\tADDRESS\t\tSTATUS\tBRANCH");
let branches_json = branches_msgs
.first()
.map(|msg| match msg {
postgres::SimpleQueryMessage::Row(row) => row.get(0),
_ => None,
})
.flatten()
.ok_or_else(|| anyhow!("missing branches"))?;
let branch_infos: Vec<BranchInfo> = serde_json::from_str(branches_json)?;
let branch_infos: Result<HashMap<ZTimelineId, String>> = branch_infos
.into_iter()
.map(|branch_info| {
let timeline_id = ZTimelineId::from_str(&branch_info.timeline_id)?;
let lsn_string_opt = branch_info.latest_valid_lsn.map(|lsn| lsn.to_string());
let lsn_str = lsn_string_opt.as_deref().unwrap_or("?");
let branch_lsn_string = format!("{}@{}", branch_info.name, lsn_str);
Ok((timeline_id, branch_lsn_string))
})
.collect();
let branch_infos = branch_infos?;
println!("NODE\tADDRESS\t\tSTATUS\tBRANCH@LSN");
for (node_name, node) in cplane.nodes.iter() {
println!(
"{}\t{}\t{}\t{}",
node_name,
node.address,
node.status(),
tl2branch[&node.timelineid]
branch_infos
.get(&node.timelineid)
.map(|s| s.as_str())
.unwrap_or("?")
);
}
}