diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 200dcd02e6..0b13889c9f 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -1,17 +1,16 @@ -use std::fs::{self, File, OpenOptions}; +use std::collections::BTreeMap; +use std::fs::{self, File}; use std::io::Write; use std::net::SocketAddr; use std::net::TcpStream; use std::os::unix::fs::PermissionsExt; +use std::path::PathBuf; use std::process::{Command, Stdio}; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; -use std::{collections::BTreeMap, path::PathBuf}; use anyhow::{Context, Result}; -use lazy_static::lazy_static; -use regex::Regex; use zenith_utils::connstring::connection_host_port; use zenith_utils::lsn::Lsn; use zenith_utils::postgres_backend::AuthType; @@ -19,6 +18,7 @@ use zenith_utils::zid::ZTenantId; use zenith_utils::zid::ZTimelineId; use crate::local_env::LocalEnv; +use crate::postgresql_conf::PostgresConf; use crate::storage::PageServerNode; // @@ -144,76 +144,25 @@ impl PostgresNode { ); } - lazy_static! { - static ref CONF_PORT_RE: Regex = Regex::new(r"(?m)^\s*port\s*=\s*(\d+)\s*$").unwrap(); - static ref CONF_TIMELINE_RE: Regex = - Regex::new(r"(?m)^\s*zenith.zenith_timeline\s*=\s*'(\w+)'\s*$").unwrap(); - static ref CONF_TENANT_RE: Regex = - Regex::new(r"(?m)^\s*zenith.zenith_tenant\s*=\s*'(\w+)'\s*$").unwrap(); - } - // parse data directory name let fname = entry.file_name(); let name = fname.to_str().unwrap().to_string(); - // find out tcp port in config file + // Read config file into memory let cfg_path = entry.path().join("postgresql.conf"); - let config = fs::read_to_string(cfg_path.clone()).with_context(|| { - format!( - "failed to read config file in {}", - cfg_path.to_str().unwrap() - ) - })?; + let cfg_path_str = cfg_path.to_string_lossy(); + let mut conf_file = File::open(&cfg_path) + .with_context(|| format!("failed to open config file in {}", cfg_path_str))?; + let conf = PostgresConf::read(&mut conf_file) + .with_context(|| format!("failed to read config file in {}", cfg_path_str))?; - // parse port - let err_msg = format!( - "failed to find port definition in config file {}", - cfg_path.to_str().unwrap() - ); - let port: u16 = CONF_PORT_RE - .captures(config.as_str()) - .ok_or_else(|| anyhow::Error::msg(err_msg.clone() + " 1"))? - .iter() - .last() - .ok_or_else(|| anyhow::Error::msg(err_msg.clone() + " 2"))? - .ok_or_else(|| anyhow::Error::msg(err_msg.clone() + " 3"))? - .as_str() - .parse() - .with_context(|| err_msg)?; + // Read a few options from the config file + let context = format!("in config file {}", cfg_path_str); + let port: u16 = conf.parse_field("port", &context)?; + let timelineid: ZTimelineId = conf.parse_field("zenith.zenith_timeline", &context)?; + let tenantid: ZTenantId = conf.parse_field("zenith.zenith_tenant", &context)?; - // parse timeline - let err_msg = format!( - "failed to find timeline definition in config file {}", - cfg_path.to_str().unwrap() - ); - let timelineid: ZTimelineId = CONF_TIMELINE_RE - .captures(config.as_str()) - .ok_or_else(|| anyhow::Error::msg(err_msg.clone() + " 1"))? - .iter() - .last() - .ok_or_else(|| anyhow::Error::msg(err_msg.clone() + " 2"))? - .ok_or_else(|| anyhow::Error::msg(err_msg.clone() + " 3"))? - .as_str() - .parse() - .with_context(|| err_msg)?; - - // parse tenant - let err_msg = format!( - "failed to find tenant definition in config file {}", - cfg_path.to_str().unwrap() - ); - let tenantid = CONF_TENANT_RE - .captures(config.as_str()) - .ok_or_else(|| anyhow::Error::msg(err_msg.clone() + " 1"))? - .iter() - .last() - .ok_or_else(|| anyhow::Error::msg(err_msg.clone() + " 2"))? - .ok_or_else(|| anyhow::Error::msg(err_msg.clone() + " 3"))? - .as_str() - .parse() - .with_context(|| err_msg)?; - - let uses_wal_proposer = config.contains("wal_acceptors"); + let uses_wal_proposer = conf.get("wal_acceptors").is_some(); // ok now Ok(PostgresNode { @@ -308,72 +257,58 @@ impl PostgresNode { // Connect to a page server, get base backup, and untar it to initialize a // new data directory fn setup_pg_conf(&self, auth_type: AuthType) -> Result<()> { - File::create(self.pgdata().join("postgresql.conf").to_str().unwrap())?; - + let mut conf = PostgresConf::new(); + conf.append("max_wal_senders", "10"); // wal_log_hints is mandatory when running against pageserver (see gh issue#192) // TODO: is it possible to check wal_log_hints at pageserver side via XLOG_PARAMETER_CHANGE? - self.append_conf( - "postgresql.conf", - &format!( - "max_wal_senders = 10\n\ - wal_log_hints = on\n\ - max_replication_slots = 10\n\ - hot_standby = on\n\ - shared_buffers = 1MB\n\ - fsync = off\n\ - max_connections = 100\n\ - wal_sender_timeout = 0\n\ - wal_level = replica\n\ - listen_addresses = '{address}'\n\ - port = {port}\n", - address = self.address.ip(), - port = self.address.port() - ), - )?; + conf.append("wal_log_hints", "on"); + conf.append("max_replication_slots", "10"); + conf.append("hot_standby", "on"); + conf.append("shared_buffers", "1MB"); + conf.append("fsync", "off"); + conf.append("max_connections", "100"); + conf.append("wal_sender_timeout", "0"); + conf.append("wal_level", "replica"); + conf.append("listen_addresses", &self.address.ip().to_string()); + conf.append("port", &self.address.port().to_string()); // Never clean up old WAL. TODO: We should use a replication // slot or something proper, to prevent the compute node // from removing WAL that hasn't been streamed to the safekeeper or // page server yet. (gh issue #349) - self.append_conf("postgresql.conf", "wal_keep_size='10TB'\n")?; + conf.append("wal_keep_size", "10TB"); - // set up authentication - let password = if let AuthType::ZenithJWT = auth_type { - "$ZENITH_AUTH_TOKEN" - } else { - "" + // Configure the node to fetch pages from pageserver + let pageserver_connstr = { + let (host, port) = connection_host_port(&self.pageserver.pg_connection_config); + + // Set up authentication + // + // $ZENITH_AUTH_TOKEN will be replaced with value from environment + // variable during compute pg startup. It is done this way because + // otherwise user will be able to retrieve the value using SHOW + // command or pg_settings + let password = if let AuthType::ZenithJWT = auth_type { + "$ZENITH_AUTH_TOKEN" + } else { + "" + }; + + format!("host={} port={} password={}", host, port, password) }; - - // Configure that node to take pages from pageserver - let (host, port) = connection_host_port(&self.pageserver.pg_connection_config); - self.append_conf( - "postgresql.conf", - format!( - concat!( - "shared_preload_libraries = zenith\n", - // $ZENITH_AUTH_TOKEN will be replaced with value from environment variable during compute pg startup - // it is done this way because otherwise user will be able to retrieve the value using SHOW command or pg_settings - "zenith.page_server_connstring = 'host={} port={} password={}'\n", - "zenith.zenith_timeline='{}'\n", - "zenith.zenith_tenant='{}'\n", - ), - host, port, password, self.timelineid, self.tenantid, - ) - .as_str(), - )?; + conf.append("shared_preload_libraries", "zenith"); + conf.append_line(""); + conf.append("zenith.page_server_connstring", &pageserver_connstr); + conf.append("zenith.zenith_tenant", &self.tenantid.to_string()); + conf.append("zenith.zenith_timeline", &self.timelineid.to_string()); + conf.append_line(""); // Configure the node to stream WAL directly to the pageserver - self.append_conf( - "postgresql.conf", - format!( - concat!( - "synchronous_standby_names = 'pageserver'\n", // TODO: add a new function arg? - "zenith.callmemaybe_connstring = '{}'\n", // FIXME escaping - ), - self.connstr(), - ) - .as_str(), - )?; + conf.append("synchronous_standby_names", "pageserver"); // TODO: add a new function arg? + conf.append("zenith.callmemaybe_connstring", &self.connstr()); + + let mut file = File::create(self.pgdata().join("postgresql.conf"))?; + file.write_all(conf.to_string().as_bytes())?; Ok(()) } @@ -416,14 +351,6 @@ impl PostgresNode { } } - pub fn append_conf(&self, config: &str, opts: &str) -> Result<()> { - OpenOptions::new() - .append(true) - .open(self.pgdata().join(config).to_str().unwrap())? - .write_all(opts.as_bytes())?; - Ok(()) - } - fn pg_ctl(&self, args: &[&str], auth_token: &Option) -> Result<()> { let pg_ctl_path = self.env.pg_bin_dir().join("pg_ctl"); let mut cmd = Command::new(pg_ctl_path); diff --git a/control_plane/src/lib.rs b/control_plane/src/lib.rs index 6a14c0e77c..287f30be63 100644 --- a/control_plane/src/lib.rs +++ b/control_plane/src/lib.rs @@ -12,6 +12,7 @@ use std::path::Path; pub mod compute; pub mod local_env; +pub mod postgresql_conf; pub mod storage; /// Read a PID file diff --git a/control_plane/src/postgresql_conf.rs b/control_plane/src/postgresql_conf.rs new file mode 100644 index 0000000000..bcd463999b --- /dev/null +++ b/control_plane/src/postgresql_conf.rs @@ -0,0 +1,212 @@ +/// +/// Module for parsing postgresql.conf file. +/// +/// NOTE: This doesn't implement the full, correct postgresql.conf syntax. Just +/// enough to extract a few settings we need in Zenith, assuming you don't do +/// funny stuff like include-directives or funny escaping. +use anyhow::{anyhow, bail, Context, Result}; +use lazy_static::lazy_static; +use regex::Regex; +use std::collections::HashMap; +use std::fmt; +use std::io::BufRead; +use std::str::FromStr; + +/// In-memory representation of a postgresql.conf file +#[derive(Default)] +pub struct PostgresConf { + lines: Vec, + hash: HashMap, +} + +lazy_static! { + static ref CONF_LINE_RE: Regex = Regex::new(r"^((?:\w|\.)+)\s*=\s*(\S+)$").unwrap(); +} + +impl PostgresConf { + pub fn new() -> PostgresConf { + PostgresConf::default() + } + + /// Read file into memory + pub fn read(read: impl std::io::Read) -> Result { + let mut result = Self::new(); + + for line in std::io::BufReader::new(read).lines() { + let line = line?; + + // Store each line in a vector, in original format + result.lines.push(line.clone()); + + // Also parse each line and insert key=value lines into a hash map. + // + // FIXME: This doesn't match exactly the flex/bison grammar in PostgreSQL. + // But it's close enough for our usage. + let line = line.trim(); + if line.starts_with('#') { + // comment, ignore + continue; + } else if let Some(caps) = CONF_LINE_RE.captures(line) { + let name = caps.get(1).unwrap().as_str(); + let raw_val = caps.get(2).unwrap().as_str(); + + if let Ok(val) = deescape_str(raw_val) { + // Note: if there's already an entry in the hash map for + // this key, this will replace it. That's the behavior what + // we want; when PostgreSQL reads the file, each line + // overrides any previous value for the same setting. + result.hash.insert(name.to_string(), val.to_string()); + } + } + } + Ok(result) + } + + /// Return the current value of 'option' + pub fn get(&self, option: &str) -> Option<&str> { + self.hash.get(option).map(|x| x.as_ref()) + } + + /// Return the current value of a field, parsed to the right datatype. + /// + /// This calls the FromStr::parse() function on the value of the field. If + /// the field does not exist, or parsing fails, returns an error. + /// + pub fn parse_field(&self, field_name: &str, context: &str) -> Result + where + T: FromStr, + ::Err: std::error::Error + Send + Sync + 'static, + { + self.get(field_name) + .ok_or_else(|| anyhow!("could not find '{}' option {}", field_name, context))? + .parse::() + .with_context(|| format!("could not parse '{}' option {}", field_name, context)) + } + + /// + /// Note: if you call this multiple times for the same option, the config + /// file will a line for each call. It would be nice to have a function + /// to change an existing line, but that's a TODO. + /// + pub fn append(&mut self, option: &str, value: &str) { + self.lines + .push(format!("{}={}\n", option, escape_str(value))); + self.hash.insert(option.to_string(), value.to_string()); + } + + /// Append an arbitrary non-setting line to the config file + pub fn append_line(&mut self, line: &str) { + self.lines.push(line.to_string()); + } +} + +impl fmt::Display for PostgresConf { + /// Return the whole configuration file as a string + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + for line in self.lines.iter() { + f.write_str(line)?; + } + Ok(()) + } +} + +/// Escape a value for putting in postgresql.conf. +fn escape_str(s: &str) -> String { + // If the string doesn't contain anything that needs quoting or escaping, return it + // as it is. + // + // The first part of the regex, before the '|', matches the INTEGER rule in the + // PostgreSQL flex grammar (guc-file.l). It matches plain integers like "123" and + // "-123", and also accepts units like "10MB". The second part of the regex matches + // the UNQUOTED_STRING rule, and accepts strings that contain a single word, beginning + // with a letter. That covers words like "off" or "posix". Everything else is quoted. + // + // This regex is a bit more conservative than the rules in guc-file.l, so we quote some + // strings that PostgreSQL would accept without quoting, but that's OK. + lazy_static! { + static ref UNQUOTED_RE: Regex = + Regex::new(r"(^[-+]?[0-9]+[a-zA-Z]*$)|(^[a-zA-Z][a-zA-Z0-9]*$)").unwrap(); + } + if UNQUOTED_RE.is_match(s) { + s.to_string() + } else { + // Otherwise escape and quote it + let s = s + .replace('\\', "\\\\") + .replace('\n', "\\n") + .replace('\'', "''"); + + "\'".to_owned() + &s + "\'" + } +} + +/// De-escape a possibly-quoted value. +/// +/// See `DeescapeQuotedString` function in PostgreSQL sources for how PostgreSQL +/// does this. +fn deescape_str(s: &str) -> Result { + // If the string has a quote at the beginning and end, strip them out. + if s.len() >= 2 && s.starts_with('\'') && s.ends_with('\'') { + let mut result = String::new(); + + let mut iter = s[1..(s.len() - 1)].chars().peekable(); + while let Some(c) = iter.next() { + let newc = if c == '\\' { + match iter.next() { + Some('b') => '\x08', + Some('f') => '\x0c', + Some('n') => '\n', + Some('r') => '\r', + Some('t') => '\t', + Some('0'..='7') => { + // TODO + bail!("octal escapes not supported"); + } + Some(n) => n, + None => break, + } + } else if c == '\'' && iter.peek() == Some(&'\'') { + // doubled quote becomes just one quote + iter.next().unwrap() + } else { + c + }; + + result.push(newc); + } + Ok(result) + } else { + Ok(s.to_string()) + } +} + +#[test] +fn test_postgresql_conf_escapes() -> Result<()> { + assert_eq!(escape_str("foo bar"), "'foo bar'"); + // these don't need to be quoted + assert_eq!(escape_str("foo"), "foo"); + assert_eq!(escape_str("123"), "123"); + assert_eq!(escape_str("+123"), "+123"); + assert_eq!(escape_str("-10"), "-10"); + assert_eq!(escape_str("1foo"), "1foo"); + assert_eq!(escape_str("foo1"), "foo1"); + assert_eq!(escape_str("10MB"), "10MB"); + assert_eq!(escape_str("-10kB"), "-10kB"); + + // these need quoting and/or escaping + assert_eq!(escape_str("foo bar"), "'foo bar'"); + assert_eq!(escape_str("fo'o"), "'fo''o'"); + assert_eq!(escape_str("fo\no"), "'fo\\no'"); + assert_eq!(escape_str("fo\\o"), "'fo\\\\o'"); + assert_eq!(escape_str("10 cats"), "'10 cats'"); + + // Test de-escaping + assert_eq!(deescape_str(&escape_str("foo"))?, "foo"); + assert_eq!(deescape_str(&escape_str("fo'o\nba\\r"))?, "fo'o\nba\\r"); + assert_eq!(deescape_str("'\\b\\f\\n\\r\\t'")?, "\x08\x0c\n\r\t"); + + // octal-escapes are currently not supported + assert!(deescape_str("'foo\\7\\07\\007'").is_err()); + + Ok(()) +}