mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 21:20:37 +00:00
Compare commits
6 Commits
problame/i
...
ro_replica
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
afaaeb94a2 | ||
|
|
2c683b38de | ||
|
|
cb9a558eb5 | ||
|
|
a60f687ce2 | ||
|
|
8dae879994 | ||
|
|
d19c5248c9 |
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -4505,6 +4505,7 @@ dependencies = [
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"criterion",
|
||||
"futures",
|
||||
"git-version",
|
||||
"heapless",
|
||||
"hex",
|
||||
@@ -4534,6 +4535,7 @@ dependencies = [
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"url",
|
||||
"uuid",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
@@ -4840,7 +4842,6 @@ dependencies = [
|
||||
"either",
|
||||
"fail",
|
||||
"futures",
|
||||
"futures-channel",
|
||||
"futures-executor",
|
||||
"futures-util",
|
||||
"hashbrown 0.12.3",
|
||||
|
||||
@@ -225,6 +225,21 @@ RUN wget https://github.com/iCyberon/pg_hashids/archive/refs/tags/v1.2.1.tar.gz
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_hashids.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "rum-pg-build"
|
||||
# compile rum extension
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS rum-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN wget https://github.com/postgrespro/rum/archive/refs/tags/1.3.13.tar.gz -O rum.tar.gz && \
|
||||
mkdir rum-src && cd rum-src && tar xvzf ../rum.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/rum.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "rust extensions"
|
||||
@@ -304,6 +319,7 @@ COPY --from=pg-jsonschema-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-graphql-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=hypopg-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-hashids-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=rum-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY pgxn/ pgxn/
|
||||
|
||||
RUN make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
|
||||
@@ -11,22 +11,15 @@ RUN set -e \
|
||||
&& touch /etc/inittab
|
||||
|
||||
RUN set -e \
|
||||
&& echo "::sysinit:cgconfigparser -l /etc/cgconfig.conf -s 1664" >> /etc/inittab \
|
||||
&& echo "::respawn:su vm-informant -c '/usr/local/bin/vm-informant --auto-restart --cgroup=neon-postgres'" >> /etc/inittab
|
||||
&& echo "::respawn:su vm-informant -c '/usr/local/bin/vm-informant --auto-restart'" >> /etc/inittab
|
||||
|
||||
# Combine, starting from non-VM compute node image.
|
||||
FROM $SRC_IMAGE as base
|
||||
|
||||
# Temporarily set user back to root so we can run apt update and adduser
|
||||
# Temporarily set user back to root so we can run adduser
|
||||
USER root
|
||||
RUN apt update && \
|
||||
apt install --no-install-recommends -y \
|
||||
cgroup-tools
|
||||
RUN adduser vm-informant --disabled-password --no-create-home
|
||||
USER postgres
|
||||
|
||||
ADD vm-cgconfig.conf /etc/cgconfig.conf
|
||||
COPY --from=informant /etc/inittab /etc/inittab
|
||||
COPY --from=informant /usr/bin/vm-informant /usr/local/bin/vm-informant
|
||||
|
||||
ENTRYPOINT ["/usr/sbin/cgexec", "-g", "*:neon-postgres", "/usr/local/bin/compute_ctl"]
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
|
||||
use control_plane::compute::ComputeControlPlane;
|
||||
use control_plane::compute::Replication;
|
||||
use control_plane::local_env::LocalEnv;
|
||||
use control_plane::pageserver::PageServerNode;
|
||||
use control_plane::safekeeper::SafekeeperNode;
|
||||
@@ -472,7 +473,14 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
|
||||
println!("Creating node for imported timeline ...");
|
||||
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
|
||||
|
||||
cplane.new_node(tenant_id, name, timeline_id, None, None, pg_version)?;
|
||||
cplane.new_node(
|
||||
tenant_id,
|
||||
name,
|
||||
timeline_id,
|
||||
None,
|
||||
pg_version,
|
||||
Replication::Primary,
|
||||
)?;
|
||||
println!("Done");
|
||||
}
|
||||
Some(("branch", branch_match)) => {
|
||||
@@ -558,20 +566,19 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
.iter()
|
||||
.filter(|((node_tenant_id, _), _)| node_tenant_id == &tenant_id)
|
||||
{
|
||||
let lsn_str = match node.lsn {
|
||||
None => {
|
||||
// -> primary node
|
||||
let lsn_str = match node.replication {
|
||||
Replication::Static(lsn) => {
|
||||
// -> read-only node
|
||||
// Use the node's LSN.
|
||||
lsn.to_string()
|
||||
}
|
||||
_ => {
|
||||
// Use the LSN at the end of the timeline.
|
||||
timeline_infos
|
||||
.get(&node.timeline_id)
|
||||
.map(|bi| bi.last_record_lsn.to_string())
|
||||
.unwrap_or_else(|| "?".to_string())
|
||||
}
|
||||
Some(lsn) => {
|
||||
// -> read-only node
|
||||
// Use the node's LSN.
|
||||
lsn.to_string()
|
||||
}
|
||||
};
|
||||
|
||||
let branch_name = timeline_name_mappings
|
||||
@@ -617,7 +624,26 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
.copied()
|
||||
.context("Failed to parse postgres version from the argument string")?;
|
||||
|
||||
cplane.new_node(tenant_id, &node_name, timeline_id, lsn, port, pg_version)?;
|
||||
let hot_standby = sub_args
|
||||
.get_one::<bool>("hot-standby")
|
||||
.copied()
|
||||
.unwrap_or(false);
|
||||
|
||||
let replication = match (lsn, hot_standby) {
|
||||
(Some(lsn), false) => Replication::Static(lsn),
|
||||
(None, true) => Replication::Replica,
|
||||
(None, false) => Replication::Primary,
|
||||
(Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
|
||||
};
|
||||
|
||||
cplane.new_node(
|
||||
tenant_id,
|
||||
&node_name,
|
||||
timeline_id,
|
||||
port,
|
||||
pg_version,
|
||||
replication,
|
||||
)?;
|
||||
}
|
||||
"start" => {
|
||||
let port: Option<u16> = sub_args.get_one::<u16>("port").copied();
|
||||
@@ -635,7 +661,21 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
None
|
||||
};
|
||||
|
||||
let hot_standby = sub_args
|
||||
.get_one::<bool>("hot-standby")
|
||||
.copied()
|
||||
.unwrap_or(false);
|
||||
|
||||
if let Some(node) = node {
|
||||
match (&node.replication, hot_standby) {
|
||||
(Replication::Static(_), true) => {
|
||||
bail!("Cannot start a node in hot standby mode when it is already configured as a static replica")
|
||||
}
|
||||
(Replication::Primary, true) => {
|
||||
bail!("Cannot start a node as a hot standby replica, it is already configured as primary node")
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
println!("Starting existing postgres {node_name}...");
|
||||
node.start(&auth_token)?;
|
||||
} else {
|
||||
@@ -657,6 +697,14 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
.get_one::<u32>("pg-version")
|
||||
.copied()
|
||||
.context("Failed to `pg-version` from the argument string")?;
|
||||
|
||||
let replication = match (lsn, hot_standby) {
|
||||
(Some(lsn), false) => Replication::Static(lsn),
|
||||
(None, true) => Replication::Replica,
|
||||
(None, false) => Replication::Primary,
|
||||
(Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
|
||||
};
|
||||
|
||||
// when used with custom port this results in non obvious behaviour
|
||||
// port is remembered from first start command, i e
|
||||
// start --port X
|
||||
@@ -664,8 +712,14 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
// start <-- will also use port X even without explicit port argument
|
||||
println!("Starting new postgres (v{pg_version}) {node_name} on timeline {timeline_id} ...");
|
||||
|
||||
let node =
|
||||
cplane.new_node(tenant_id, node_name, timeline_id, lsn, port, pg_version)?;
|
||||
let node = cplane.new_node(
|
||||
tenant_id,
|
||||
node_name,
|
||||
timeline_id,
|
||||
port,
|
||||
pg_version,
|
||||
replication,
|
||||
)?;
|
||||
node.start(&auth_token)?;
|
||||
}
|
||||
}
|
||||
@@ -918,6 +972,12 @@ fn cli() -> Command {
|
||||
.help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.")
|
||||
.required(false);
|
||||
|
||||
let hot_standby_arg = Arg::new("hot-standby")
|
||||
.value_parser(value_parser!(bool))
|
||||
.long("hot-standby")
|
||||
.help("If set, the node will be a hot replica on the specified timeline")
|
||||
.required(false);
|
||||
|
||||
Command::new("Neon CLI")
|
||||
.arg_required_else_help(true)
|
||||
.version(GIT_VERSION)
|
||||
@@ -1042,6 +1102,7 @@ fn cli() -> Command {
|
||||
.long("config-only")
|
||||
.required(false))
|
||||
.arg(pg_version_arg.clone())
|
||||
.arg(hot_standby_arg.clone())
|
||||
)
|
||||
.subcommand(Command::new("start")
|
||||
.about("Start a postgres compute node.\n This command actually creates new node from scratch, but preserves existing config files")
|
||||
@@ -1052,6 +1113,7 @@ fn cli() -> Command {
|
||||
.arg(lsn_arg)
|
||||
.arg(port_arg)
|
||||
.arg(pg_version_arg)
|
||||
.arg(hot_standby_arg)
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("stop")
|
||||
|
||||
@@ -78,11 +78,12 @@ impl ComputeControlPlane {
|
||||
tenant_id: TenantId,
|
||||
name: &str,
|
||||
timeline_id: TimelineId,
|
||||
lsn: Option<Lsn>,
|
||||
port: Option<u16>,
|
||||
pg_version: u32,
|
||||
replication: Replication,
|
||||
) -> Result<Arc<PostgresNode>> {
|
||||
let port = port.unwrap_or_else(|| self.get_port());
|
||||
|
||||
let node = Arc::new(PostgresNode {
|
||||
name: name.to_owned(),
|
||||
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
|
||||
@@ -90,7 +91,7 @@ impl ComputeControlPlane {
|
||||
pageserver: Arc::clone(&self.pageserver),
|
||||
is_test: false,
|
||||
timeline_id,
|
||||
lsn,
|
||||
replication,
|
||||
tenant_id,
|
||||
uses_wal_proposer: false,
|
||||
pg_version,
|
||||
@@ -108,6 +109,16 @@ impl ComputeControlPlane {
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub enum Replication {
|
||||
// Regular read-write node
|
||||
Primary,
|
||||
// if recovery_target_lsn is provided
|
||||
Static(Lsn),
|
||||
// Hot standby running on a timleine
|
||||
Replica,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PostgresNode {
|
||||
pub address: SocketAddr,
|
||||
@@ -116,7 +127,7 @@ pub struct PostgresNode {
|
||||
pageserver: Arc<PageServerNode>,
|
||||
is_test: bool,
|
||||
pub timeline_id: TimelineId,
|
||||
pub lsn: Option<Lsn>, // if it's a read-only node. None for primary
|
||||
pub replication: Replication,
|
||||
pub tenant_id: TenantId,
|
||||
uses_wal_proposer: bool,
|
||||
pg_version: u32,
|
||||
@@ -162,9 +173,17 @@ impl PostgresNode {
|
||||
fs::read_to_string(pg_version_path).unwrap_or_else(|_| DEFAULT_PG_VERSION.to_string());
|
||||
let pg_version = u32::from_str(&pg_version_str)?;
|
||||
|
||||
// parse recovery_target_lsn, if any
|
||||
let recovery_target_lsn: Option<Lsn> =
|
||||
conf.parse_field_optional("recovery_target_lsn", &context)?;
|
||||
// parse recovery_target_lsn and primary_conninfo into Recovery Target, if any
|
||||
let replication = if let Some(lsn_str) = conf.get("recovery_target_lsn") {
|
||||
Replication::Static(Lsn::from_str(lsn_str)?)
|
||||
} else if let Some(slot_name) = conf.get("primary_slot_name") {
|
||||
let slot_name = slot_name.to_string();
|
||||
let prefix = format!("repl_{}_{}_", timeline_id, name);
|
||||
assert!(slot_name.starts_with(&prefix));
|
||||
Replication::Replica
|
||||
} else {
|
||||
Replication::Primary
|
||||
};
|
||||
|
||||
// ok now
|
||||
Ok(PostgresNode {
|
||||
@@ -174,7 +193,7 @@ impl PostgresNode {
|
||||
pageserver: Arc::clone(pageserver),
|
||||
is_test: false,
|
||||
timeline_id,
|
||||
lsn: recovery_target_lsn,
|
||||
replication,
|
||||
tenant_id,
|
||||
uses_wal_proposer,
|
||||
pg_version,
|
||||
@@ -327,50 +346,83 @@ impl PostgresNode {
|
||||
}
|
||||
conf.append("neon.tenant_id", &self.tenant_id.to_string());
|
||||
conf.append("neon.timeline_id", &self.timeline_id.to_string());
|
||||
if let Some(lsn) = self.lsn {
|
||||
conf.append("recovery_target_lsn", &lsn.to_string());
|
||||
}
|
||||
|
||||
conf.append_line("");
|
||||
// Configure backpressure
|
||||
// - Replication write lag depends on how fast the walreceiver can process incoming WAL.
|
||||
// This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
|
||||
// so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB.
|
||||
// Actually latency should be much smaller (better if < 1sec). But we assume that recently
|
||||
// updates pages are not requested from pageserver.
|
||||
// - Replication flush lag depends on speed of persisting data by checkpointer (creation of
|
||||
// delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to
|
||||
// remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long
|
||||
// recovery time (in case of pageserver crash) and disk space overflow at safekeepers.
|
||||
// - Replication apply lag depends on speed of uploading changes to S3 by uploader thread.
|
||||
// To be able to restore database in case of pageserver node crash, safekeeper should not
|
||||
// remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers
|
||||
// (if they are not able to upload WAL to S3).
|
||||
conf.append("max_replication_write_lag", "15MB");
|
||||
conf.append("max_replication_flush_lag", "10GB");
|
||||
// Replication-related configurations, such as WAL sending
|
||||
match &self.replication {
|
||||
Replication::Primary => {
|
||||
// Configure backpressure
|
||||
// - Replication write lag depends on how fast the walreceiver can process incoming WAL.
|
||||
// This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
|
||||
// so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB.
|
||||
// Actually latency should be much smaller (better if < 1sec). But we assume that recently
|
||||
// updates pages are not requested from pageserver.
|
||||
// - Replication flush lag depends on speed of persisting data by checkpointer (creation of
|
||||
// delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to
|
||||
// remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long
|
||||
// recovery time (in case of pageserver crash) and disk space overflow at safekeepers.
|
||||
// - Replication apply lag depends on speed of uploading changes to S3 by uploader thread.
|
||||
// To be able to restore database in case of pageserver node crash, safekeeper should not
|
||||
// remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers
|
||||
// (if they are not able to upload WAL to S3).
|
||||
conf.append("max_replication_write_lag", "15MB");
|
||||
conf.append("max_replication_flush_lag", "10GB");
|
||||
|
||||
if !self.env.safekeepers.is_empty() {
|
||||
// Configure the node to connect to the safekeepers
|
||||
conf.append("synchronous_standby_names", "walproposer");
|
||||
if !self.env.safekeepers.is_empty() {
|
||||
// Configure the node to connect to the safekeepers
|
||||
conf.append("synchronous_standby_names", "walproposer");
|
||||
|
||||
let safekeepers = self
|
||||
.env
|
||||
.safekeepers
|
||||
.iter()
|
||||
.map(|sk| format!("localhost:{}", sk.pg_port))
|
||||
.collect::<Vec<String>>()
|
||||
.join(",");
|
||||
conf.append("neon.safekeepers", &safekeepers);
|
||||
} else {
|
||||
// We only use setup without safekeepers for tests,
|
||||
// and don't care about data durability on pageserver,
|
||||
// so set more relaxed synchronous_commit.
|
||||
conf.append("synchronous_commit", "remote_write");
|
||||
let safekeepers = self
|
||||
.env
|
||||
.safekeepers
|
||||
.iter()
|
||||
.map(|sk| format!("localhost:{}", sk.pg_port))
|
||||
.collect::<Vec<String>>()
|
||||
.join(",");
|
||||
conf.append("neon.safekeepers", &safekeepers);
|
||||
} else {
|
||||
// We only use setup without safekeepers for tests,
|
||||
// and don't care about data durability on pageserver,
|
||||
// so set more relaxed synchronous_commit.
|
||||
conf.append("synchronous_commit", "remote_write");
|
||||
|
||||
// Configure the node to stream WAL directly to the pageserver
|
||||
// This isn't really a supported configuration, but can be useful for
|
||||
// testing.
|
||||
conf.append("synchronous_standby_names", "pageserver");
|
||||
// Configure the node to stream WAL directly to the pageserver
|
||||
// This isn't really a supported configuration, but can be useful for
|
||||
// testing.
|
||||
conf.append("synchronous_standby_names", "pageserver");
|
||||
}
|
||||
}
|
||||
Replication::Static(lsn) => {
|
||||
conf.append("recovery_target_lsn", &lsn.to_string());
|
||||
}
|
||||
Replication::Replica => {
|
||||
assert!(!self.env.safekeepers.is_empty());
|
||||
|
||||
// TODO: use future host field from safekeeper spec
|
||||
// Pass the list of safekeepers to the replica so that it can connect to any of them,
|
||||
// whichever is alailiable.
|
||||
let sk_ports = self
|
||||
.env
|
||||
.safekeepers
|
||||
.iter()
|
||||
.map(|x| x.pg_port.to_string())
|
||||
.collect::<Vec<_>>()
|
||||
.join(",");
|
||||
let sk_hosts = vec!["localhost"; self.env.safekeepers.len()].join(",");
|
||||
|
||||
let connstr = format!(
|
||||
"host={} port={} options='-c timeline_id={} tenant_id={}' application_name=replica replication=true",
|
||||
sk_hosts,
|
||||
sk_ports,
|
||||
&self.timeline_id.to_string(),
|
||||
&self.tenant_id.to_string(),
|
||||
);
|
||||
|
||||
let slot_name = format!("repl_{}_", self.timeline_id);
|
||||
conf.append("primary_conninfo", connstr.as_str());
|
||||
conf.append("primary_slot_name", slot_name.as_str());
|
||||
conf.append("hot_standby", "on");
|
||||
}
|
||||
}
|
||||
|
||||
let mut file = File::create(self.pgdata().join("postgresql.conf"))?;
|
||||
@@ -383,21 +435,27 @@ impl PostgresNode {
|
||||
}
|
||||
|
||||
fn load_basebackup(&self, auth_token: &Option<String>) -> Result<()> {
|
||||
let backup_lsn = if let Some(lsn) = self.lsn {
|
||||
Some(lsn)
|
||||
} else if self.uses_wal_proposer {
|
||||
// LSN 0 means that it is bootstrap and we need to download just
|
||||
// latest data from the pageserver. That is a bit clumsy but whole bootstrap
|
||||
// procedure evolves quite actively right now, so let's think about it again
|
||||
// when things would be more stable (TODO).
|
||||
let lsn = self.sync_safekeepers(auth_token, self.pg_version)?;
|
||||
if lsn == Lsn(0) {
|
||||
None
|
||||
} else {
|
||||
Some(lsn)
|
||||
let backup_lsn = match &self.replication {
|
||||
Replication::Primary => {
|
||||
if self.uses_wal_proposer {
|
||||
// LSN 0 means that it is bootstrap and we need to download just
|
||||
// latest data from the pageserver. That is a bit clumsy but whole bootstrap
|
||||
// procedure evolves quite actively right now, so let's think about it again
|
||||
// when things would be more stable (TODO).
|
||||
let lsn = self.sync_safekeepers(auth_token, self.pg_version)?;
|
||||
if lsn == Lsn(0) {
|
||||
None
|
||||
} else {
|
||||
Some(lsn)
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
Replication::Static(lsn) => Some(*lsn),
|
||||
Replication::Replica => {
|
||||
None // Take the latest snapshot available to start with
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
self.do_basebackup(backup_lsn)?;
|
||||
@@ -487,7 +545,7 @@ impl PostgresNode {
|
||||
// 3. Load basebackup
|
||||
self.load_basebackup(auth_token)?;
|
||||
|
||||
if self.lsn.is_some() {
|
||||
if self.replication != Replication::Primary {
|
||||
File::create(self.pgdata().join("standby.signal"))?;
|
||||
}
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@ use std::io::BufRead;
|
||||
use std::str::FromStr;
|
||||
|
||||
/// In-memory representation of a postgresql.conf file
|
||||
#[derive(Default)]
|
||||
#[derive(Default, Debug)]
|
||||
pub struct PostgresConf {
|
||||
lines: Vec<String>,
|
||||
hash: HashMap<String, String>,
|
||||
|
||||
@@ -13,6 +13,7 @@ bincode.workspace = true
|
||||
bytes.workspace = true
|
||||
heapless.workspace = true
|
||||
hyper = { workspace = true, features = ["full"] }
|
||||
futures = { workspace = true}
|
||||
routerify.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
@@ -39,7 +40,7 @@ pq_proto.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
url.workspace = true
|
||||
|
||||
uuid = { version = "1.2", features = ["v4", "serde"] }
|
||||
[dev-dependencies]
|
||||
byteorder.workspace = true
|
||||
bytes.workspace = true
|
||||
|
||||
@@ -8,8 +8,7 @@ use hyper::{Method, StatusCode};
|
||||
use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder};
|
||||
use once_cell::sync::Lazy;
|
||||
use routerify::ext::RequestExt;
|
||||
use routerify::RequestInfo;
|
||||
use routerify::{Middleware, Router, RouterBuilder, RouterService};
|
||||
use routerify::{Middleware, RequestInfo, Router, RouterBuilder, RouterService};
|
||||
use tokio::task::JoinError;
|
||||
use tracing;
|
||||
|
||||
@@ -27,14 +26,35 @@ static SERVE_METRICS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static X_REQUEST_ID_HEADER_STR: &str = "x-request-id";
|
||||
|
||||
static X_REQUEST_ID_HEADER: HeaderName = HeaderName::from_static(X_REQUEST_ID_HEADER_STR);
|
||||
#[derive(Debug, Default, Clone)]
|
||||
struct RequestId(String);
|
||||
|
||||
async fn logger(res: Response<Body>, info: RequestInfo) -> Result<Response<Body>, ApiError> {
|
||||
let request_id = info.context::<RequestId>().unwrap_or_default().0;
|
||||
|
||||
// cannot factor out the Level to avoid the repetition
|
||||
// because tracing can only work with const Level
|
||||
// which is not the case here
|
||||
|
||||
if info.method() == Method::GET && res.status() == StatusCode::OK {
|
||||
tracing::debug!("{} {} {}", info.method(), info.uri().path(), res.status());
|
||||
tracing::debug!(
|
||||
"{} {} {} {}",
|
||||
info.method(),
|
||||
info.uri().path(),
|
||||
request_id,
|
||||
res.status()
|
||||
);
|
||||
} else {
|
||||
tracing::info!("{} {} {}", info.method(), info.uri().path(), res.status());
|
||||
tracing::info!(
|
||||
"{} {} {} {}",
|
||||
info.method(),
|
||||
info.uri().path(),
|
||||
request_id,
|
||||
res.status()
|
||||
);
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
@@ -63,9 +83,52 @@ async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
pub fn add_request_id_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
|
||||
) -> Middleware<B, ApiError> {
|
||||
Middleware::pre(move |req| async move {
|
||||
let request_id = match req.headers().get(&X_REQUEST_ID_HEADER) {
|
||||
Some(request_id) => request_id
|
||||
.to_str()
|
||||
.expect("extract request id value")
|
||||
.to_owned(),
|
||||
None => {
|
||||
let request_id = uuid::Uuid::new_v4();
|
||||
request_id.to_string()
|
||||
}
|
||||
};
|
||||
|
||||
if req.method() == Method::GET {
|
||||
tracing::debug!("{} {} {}", req.method(), req.uri().path(), request_id);
|
||||
} else {
|
||||
tracing::info!("{} {} {}", req.method(), req.uri().path(), request_id);
|
||||
}
|
||||
req.set_context(RequestId(request_id));
|
||||
|
||||
Ok(req)
|
||||
})
|
||||
}
|
||||
|
||||
async fn add_request_id_header_to_response(
|
||||
mut res: Response<Body>,
|
||||
req_info: RequestInfo,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
if let Some(request_id) = req_info.context::<RequestId>() {
|
||||
if let Ok(request_header_value) = HeaderValue::from_str(&request_id.0) {
|
||||
res.headers_mut()
|
||||
.insert(&X_REQUEST_ID_HEADER, request_header_value);
|
||||
};
|
||||
};
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
pub fn make_router() -> RouterBuilder<hyper::Body, ApiError> {
|
||||
Router::builder()
|
||||
.middleware(add_request_id_middleware())
|
||||
.middleware(Middleware::post_with_info(logger))
|
||||
.middleware(Middleware::post_with_info(
|
||||
add_request_id_header_to_response,
|
||||
))
|
||||
.get("/metrics", prometheus_metrics_handler)
|
||||
.err_handler(error::handler)
|
||||
}
|
||||
@@ -231,3 +294,48 @@ where
|
||||
|
||||
Ok(())
|
||||
}
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use futures::future::poll_fn;
|
||||
use hyper::service::Service;
|
||||
use routerify::RequestServiceBuilder;
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_request_id_returned() {
|
||||
let builder = RequestServiceBuilder::new(make_router().build().unwrap()).unwrap();
|
||||
let remote_addr = SocketAddr::new(IpAddr::from_str("127.0.0.1").unwrap(), 80);
|
||||
let mut service = builder.build(remote_addr);
|
||||
if let Err(e) = poll_fn(|ctx| service.poll_ready(ctx)).await {
|
||||
panic!("request service is not ready: {:?}", e);
|
||||
}
|
||||
|
||||
let mut req: Request<Body> = Request::default();
|
||||
req.headers_mut()
|
||||
.append(&X_REQUEST_ID_HEADER, HeaderValue::from_str("42").unwrap());
|
||||
|
||||
let resp: Response<hyper::body::Body> = service.call(req).await.unwrap();
|
||||
|
||||
let header_val = resp.headers().get(&X_REQUEST_ID_HEADER).unwrap();
|
||||
|
||||
assert!(header_val == "42", "response header mismatch");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_request_id_empty() {
|
||||
let builder = RequestServiceBuilder::new(make_router().build().unwrap()).unwrap();
|
||||
let remote_addr = SocketAddr::new(IpAddr::from_str("127.0.0.1").unwrap(), 80);
|
||||
let mut service = builder.build(remote_addr);
|
||||
if let Err(e) = poll_fn(|ctx| service.poll_ready(ctx)).await {
|
||||
panic!("request service is not ready: {:?}", e);
|
||||
}
|
||||
|
||||
let req: Request<Body> = Request::default();
|
||||
let resp: Response<hyper::body::Body> = service.call(req).await.unwrap();
|
||||
|
||||
let header_val = resp.headers().get(&X_REQUEST_ID_HEADER);
|
||||
|
||||
assert_ne!(header_val, None, "response header should NOT be empty");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2479,7 +2479,7 @@ impl Timeline {
|
||||
|
||||
// The new on-disk layers are now in the layer map. We can remove the
|
||||
// in-memory layer from the map now.
|
||||
{
|
||||
{
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let l = layers.frozen_layers.pop_front();
|
||||
|
||||
@@ -2489,43 +2489,6 @@ impl Timeline {
|
||||
assert!(LayerMap::compare_arced_layers(&l.unwrap(), &frozen_layer));
|
||||
|
||||
// release lock on 'layers'
|
||||
drop(layers);
|
||||
|
||||
// Only drop the ephemeral file after releasing the layer map lock.
|
||||
// The reason is that the drop function needs to lock page cache slots.
|
||||
// If there is another thread that is already holding a slot which we
|
||||
// need to lock, and that thread is waiting for the layer map lock, we
|
||||
// would have a deadlock:
|
||||
//
|
||||
// wants ┌─────────┐
|
||||
// us ────────────────►│ cache │
|
||||
// ▲ │ slot │
|
||||
// │ │ │ │
|
||||
// assigned │ └────┼────┘
|
||||
// │ │
|
||||
// │ │assigned
|
||||
// ┌────┼───┐ │
|
||||
// │ │ │ ▼
|
||||
// │ layers │◄─────────────── them
|
||||
// │ │ wants
|
||||
// └────────┘
|
||||
//
|
||||
// How can this happen? I don't know. Basically we need to walk up
|
||||
// the call graph for all PageCache::try_lock_for_read and PageCache::try_lock_for_write
|
||||
// and check whether any of them holds onto the guard object that these methods return.
|
||||
// The block_io::BlockReader trait implementations look like good candidates.
|
||||
// For example, the BlockReader::read_blk impl for EphemeralFile returns the guard object
|
||||
// for the cache slot straight to the caller.
|
||||
// But, there's are many callers of BlockReader::read_blk, and rust-analyzer has no way
|
||||
// to find just the <EphemeralFile as BlockReader>::read_blk callers.
|
||||
//
|
||||
// One obvious place is InMemoryLayer::get_value_reconstruct_data , which uses a BlockReader::block_cursor.
|
||||
// That cursor object holds onto the most recently read page cache slot until cursor object is dropped.
|
||||
// But, all relevant uses of InMemoryLayer::get_value_reconstruct_data are in Timeline::get_reconstruct_data,
|
||||
// which already holds the layer map lock in shared mode. So, it can't cause this deadlock.
|
||||
//
|
||||
// Maybe this is a red herring?
|
||||
drop(l);
|
||||
}
|
||||
|
||||
fail_point!("checkpoint-after-sync");
|
||||
|
||||
@@ -1,12 +0,0 @@
|
||||
# Configuration for cgroups in VM compute nodes
|
||||
group neon-postgres {
|
||||
perm {
|
||||
admin {
|
||||
uid = vm-informant;
|
||||
}
|
||||
task {
|
||||
gid = users;
|
||||
}
|
||||
}
|
||||
memory {}
|
||||
}
|
||||
@@ -21,7 +21,6 @@ crossbeam-utils = { version = "0.8" }
|
||||
either = { version = "1" }
|
||||
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
|
||||
futures = { version = "0.3" }
|
||||
futures-channel = { version = "0.3", features = ["sink"] }
|
||||
futures-executor = { version = "0.3" }
|
||||
futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
|
||||
hashbrown = { version = "0.12", features = ["raw"] }
|
||||
|
||||
Reference in New Issue
Block a user