mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 14:40:37 +00:00
Compare commits
9 Commits
sasha_dont
...
jcsp/shard
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
66fdb54d8d | ||
|
|
21c3c55e6b | ||
|
|
2c21c74ff4 | ||
|
|
2d0930f622 | ||
|
|
1518675bac | ||
|
|
e3c6fc3e51 | ||
|
|
039f96c8a6 | ||
|
|
8d50593e17 | ||
|
|
f7795ee2a5 |
@@ -274,13 +274,7 @@ fn main() -> Result<()> {
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
state.error = Some(format!("{:?}", err));
|
||||
state.status = ComputeStatus::Failed;
|
||||
// Notify others that Postgres failed to start. In case of configuring the
|
||||
// empty compute, it's likely that API handler is still waiting for compute
|
||||
// state change. With this we will notify it that compute is in Failed state,
|
||||
// so control plane will know about it earlier and record proper error instead
|
||||
// of timeout.
|
||||
compute.state_changed.notify_all();
|
||||
drop(state); // unlock
|
||||
drop(state);
|
||||
delay_exit = true;
|
||||
None
|
||||
}
|
||||
|
||||
@@ -15,7 +15,8 @@ use control_plane::pageserver::{PageServerNode, PAGESERVER_REMOTE_STORAGE_DIR};
|
||||
use control_plane::safekeeper::SafekeeperNode;
|
||||
use control_plane::tenant_migration::migrate_tenant;
|
||||
use control_plane::{broker, local_env};
|
||||
use pageserver_api::models::TimelineInfo;
|
||||
use pageserver_api::models::{LocationConfig, LocationConfigMode, TimelineInfo};
|
||||
use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
|
||||
use pageserver_api::{
|
||||
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
|
||||
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
|
||||
@@ -26,6 +27,7 @@ use safekeeper_api::{
|
||||
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
|
||||
};
|
||||
use std::collections::{BTreeSet, HashMap};
|
||||
use std::num::ParseIntError;
|
||||
use std::path::PathBuf;
|
||||
use std::process::exit;
|
||||
use std::str::FromStr;
|
||||
@@ -97,6 +99,22 @@ struct TimelineTreeEl {
|
||||
pub children: BTreeSet<TimelineId>,
|
||||
}
|
||||
|
||||
/// Helper for CLI args that contain a comma-separate list of NodeId
|
||||
fn parse_ids_arg(
|
||||
matches: &ArgMatches,
|
||||
arg: &str,
|
||||
) -> Result<Option<Vec<NodeId>>, std::num::ParseIntError> {
|
||||
if let Some(id_str) = matches.get_one::<String>(arg) {
|
||||
let r: Result<Vec<_>, ParseIntError> = id_str
|
||||
.split(',')
|
||||
.map(|ps_id| u64::from_str(str::trim(ps_id)).map(NodeId))
|
||||
.collect();
|
||||
r.map(Some)
|
||||
} else {
|
||||
Ok(Some(vec![DEFAULT_PAGESERVER_ID]))
|
||||
}
|
||||
}
|
||||
|
||||
// Main entry point for the 'neon_local' CLI utility
|
||||
//
|
||||
// This utility helps to manage neon installation. That includes following:
|
||||
@@ -374,9 +392,10 @@ fn pageserver_config_overrides(init_match: &ArgMatches) -> Vec<&str> {
|
||||
}
|
||||
|
||||
fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> anyhow::Result<()> {
|
||||
let pageserver = get_default_pageserver(env);
|
||||
match tenant_match.subcommand() {
|
||||
Some(("list", _)) => {
|
||||
// TODO: make command aware of multiple pageservers
|
||||
let pageserver = get_default_pageserver(env);
|
||||
for t in pageserver.tenant_list()? {
|
||||
println!("{} {:?}", t.id, t.state);
|
||||
}
|
||||
@@ -387,38 +406,94 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
|
||||
.map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
|
||||
.unwrap_or_default();
|
||||
|
||||
let shard_count: u8 = create_match
|
||||
.get_one::<u8>("shard-count")
|
||||
.cloned()
|
||||
.unwrap_or(1);
|
||||
|
||||
// If tenant ID was not specified, generate one
|
||||
let tenant_id = parse_tenant_id(create_match)?.unwrap_or_else(TenantId::generate);
|
||||
|
||||
let generation = if env.control_plane_api.is_some() {
|
||||
// We must register the tenant with the attachment service, so
|
||||
// that when the pageserver restarts, it will be re-attached.
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
attachment_service.attach_hook(tenant_id, pageserver.conf.id)?
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
pageserver.tenant_create(tenant_id, generation, tenant_conf)?;
|
||||
println!("tenant {tenant_id} successfully created on the pageserver");
|
||||
|
||||
// Create an initial timeline for the new tenant
|
||||
let new_timeline_id = parse_timeline_id(create_match)?;
|
||||
// We will create an initial timeline for the new tenant
|
||||
let new_timeline_id =
|
||||
parse_timeline_id(create_match)?.unwrap_or(TimelineId::generate());
|
||||
let pg_version = create_match
|
||||
.get_one::<u32>("pg-version")
|
||||
.copied()
|
||||
.context("Failed to parse postgres version from the argument string")?;
|
||||
|
||||
let timeline_info = pageserver.timeline_create(
|
||||
tenant_id,
|
||||
new_timeline_id,
|
||||
None,
|
||||
None,
|
||||
Some(pg_version),
|
||||
None,
|
||||
)?;
|
||||
let new_timeline_id = timeline_info.timeline_id;
|
||||
let last_record_lsn = timeline_info.last_record_lsn;
|
||||
// TODO: implement ability for one pageserver to hold multiple
|
||||
// shards for the same tenant. Until then, we must place each
|
||||
// shard on a different pageserver.
|
||||
assert!(env.pageservers.len() >= shard_count as usize);
|
||||
|
||||
let cfg_shard_count = if shard_count > 1 {
|
||||
shard_count
|
||||
} else {
|
||||
// For single-sharded mode, use the legacy unsharded configuration. This avoids
|
||||
// breaking any existing tests that assume legacy unsharded storage paths
|
||||
0
|
||||
};
|
||||
|
||||
for shard_number in 0..shard_count {
|
||||
let ps_conf = env.pageservers.get(shard_number as usize).unwrap();
|
||||
let pageserver = PageServerNode::from_env(env, ps_conf);
|
||||
|
||||
// TODO: per-shard generations
|
||||
let generation = if env.control_plane_api.is_some() {
|
||||
// We must register the tenant with the attachment service, so
|
||||
// that when the pageserver restarts, it will be re-attached.
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
attachment_service.attach_hook(tenant_id, pageserver.conf.id)?
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// TODO: shard-aware POST /v1/tenant. Currently tenant creation on the
|
||||
// pageserver is a no-op, but we shouldn't skip the command entirely.
|
||||
|
||||
let tenant_conf = PageServerNode::build_config(tenant_conf.clone())?;
|
||||
|
||||
let tenant_shard_id = TenantShardId {
|
||||
shard_number: ShardNumber(shard_number),
|
||||
shard_count: ShardCount(cfg_shard_count),
|
||||
tenant_id,
|
||||
};
|
||||
|
||||
let location_conf = LocationConfig {
|
||||
shard_count: cfg_shard_count,
|
||||
shard_number,
|
||||
shard_stripe_size: 32768,
|
||||
mode: LocationConfigMode::AttachedSingle,
|
||||
generation,
|
||||
secondary_conf: None,
|
||||
tenant_conf,
|
||||
};
|
||||
pageserver.location_config(tenant_shard_id, location_conf, None)?;
|
||||
println!(
|
||||
"tenant {tenant_id} successfully created on pageserver {}",
|
||||
pageserver.conf.id
|
||||
);
|
||||
}
|
||||
|
||||
for shard_number in 0..shard_count {
|
||||
let ps_conf = env.pageservers.get(shard_number as usize).unwrap();
|
||||
let pageserver = PageServerNode::from_env(env, ps_conf);
|
||||
let tenant_shard_id = TenantShardId {
|
||||
shard_number: ShardNumber(shard_number),
|
||||
shard_count: ShardCount(cfg_shard_count),
|
||||
tenant_id,
|
||||
};
|
||||
|
||||
pageserver.timeline_create(
|
||||
tenant_shard_id,
|
||||
Some(new_timeline_id),
|
||||
None,
|
||||
None,
|
||||
Some(pg_version),
|
||||
None,
|
||||
)?;
|
||||
}
|
||||
|
||||
env.register_branch_mapping(
|
||||
DEFAULT_BRANCH_NAME.to_string(),
|
||||
@@ -426,9 +501,7 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
|
||||
new_timeline_id,
|
||||
)?;
|
||||
|
||||
println!(
|
||||
"Created an initial timeline '{new_timeline_id}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
|
||||
);
|
||||
println!("Created an initial timeline '{new_timeline_id}' for tenant: {tenant_id}",);
|
||||
|
||||
if create_match.get_flag("set-default") {
|
||||
println!("Setting tenant {tenant_id} as a default one");
|
||||
@@ -448,6 +521,8 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
|
||||
.map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
|
||||
.unwrap_or_default();
|
||||
|
||||
// TODO: make command aware of multiple pageservers
|
||||
let pageserver = get_default_pageserver(env);
|
||||
pageserver
|
||||
.tenant_config(tenant_id, tenant_conf)
|
||||
.with_context(|| format!("Tenant config failed for tenant with id {tenant_id}"))?;
|
||||
@@ -491,7 +566,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
|
||||
let new_timeline_id_opt = parse_timeline_id(create_match)?;
|
||||
|
||||
let timeline_info = pageserver.timeline_create(
|
||||
tenant_id,
|
||||
TenantShardId::unsharded(tenant_id),
|
||||
new_timeline_id_opt,
|
||||
None,
|
||||
None,
|
||||
@@ -554,7 +629,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
|
||||
None,
|
||||
pg_version,
|
||||
ComputeMode::Primary,
|
||||
DEFAULT_PAGESERVER_ID,
|
||||
vec![DEFAULT_PAGESERVER_ID],
|
||||
)?;
|
||||
println!("Done");
|
||||
}
|
||||
@@ -579,7 +654,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
|
||||
.transpose()
|
||||
.context("Failed to parse ancestor start Lsn from the request")?;
|
||||
let timeline_info = pageserver.timeline_create(
|
||||
tenant_id,
|
||||
TenantShardId::unsharded(tenant_id),
|
||||
None,
|
||||
start_lsn,
|
||||
Some(ancestor_timeline_id),
|
||||
@@ -704,13 +779,8 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
.copied()
|
||||
.unwrap_or(false);
|
||||
|
||||
let pageserver_id =
|
||||
if let Some(id_str) = sub_args.get_one::<String>("endpoint-pageserver-id") {
|
||||
NodeId(id_str.parse().context("while parsing pageserver id")?)
|
||||
} else {
|
||||
DEFAULT_PAGESERVER_ID
|
||||
};
|
||||
|
||||
let pageserver_ids = parse_ids_arg(sub_args, "endpoint-pageserver-id")?
|
||||
.unwrap_or(vec![DEFAULT_PAGESERVER_ID]);
|
||||
let mode = match (lsn, hot_standby) {
|
||||
(Some(lsn), false) => ComputeMode::Static(lsn),
|
||||
(None, true) => ComputeMode::Replica,
|
||||
@@ -738,7 +808,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
http_port,
|
||||
pg_version,
|
||||
mode,
|
||||
pageserver_id,
|
||||
pageserver_ids,
|
||||
)?;
|
||||
}
|
||||
"start" => {
|
||||
@@ -746,29 +816,14 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
.get_one::<String>("endpoint_id")
|
||||
.ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
|
||||
|
||||
let pageserver_id =
|
||||
if let Some(id_str) = sub_args.get_one::<String>("endpoint-pageserver-id") {
|
||||
NodeId(id_str.parse().context("while parsing pageserver id")?)
|
||||
} else {
|
||||
DEFAULT_PAGESERVER_ID
|
||||
};
|
||||
let pageservers = parse_ids_arg(sub_args, "endpoint-pageserver-id")?
|
||||
.unwrap_or(vec![DEFAULT_PAGESERVER_ID]);
|
||||
|
||||
let remote_ext_config = sub_args.get_one::<String>("remote-ext-config");
|
||||
|
||||
// If --safekeepers argument is given, use only the listed safekeeper nodes.
|
||||
let safekeepers =
|
||||
if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
|
||||
let mut safekeepers: Vec<NodeId> = Vec::new();
|
||||
for sk_id in safekeepers_str.split(',').map(str::trim) {
|
||||
let sk_id = NodeId(u64::from_str(sk_id).map_err(|_| {
|
||||
anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list")
|
||||
})?);
|
||||
safekeepers.push(sk_id);
|
||||
}
|
||||
safekeepers
|
||||
} else {
|
||||
env.safekeepers.iter().map(|sk| sk.id).collect()
|
||||
};
|
||||
let safekeepers = parse_ids_arg(sub_args, "safekeepers")?
|
||||
.unwrap_or_else(|| env.safekeepers.iter().map(|sk| sk.id).collect());
|
||||
|
||||
let endpoint = cplane
|
||||
.endpoints
|
||||
@@ -781,7 +836,8 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
endpoint.timeline_id,
|
||||
)?;
|
||||
|
||||
let ps_conf = env.get_pageserver_conf(pageserver_id)?;
|
||||
// We assume that all pageservers have the same auth conf
|
||||
let ps_conf = env.get_pageserver_conf(pageservers[0])?;
|
||||
let auth_token = if matches!(ps_conf.pg_auth_type, AuthType::NeonJWT) {
|
||||
let claims = Claims::new(Some(endpoint.tenant_id), Scope::Tenant);
|
||||
|
||||
@@ -801,15 +857,21 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
.endpoints
|
||||
.get(endpoint_id.as_str())
|
||||
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
|
||||
let pageserver_id =
|
||||
if let Some(id_str) = sub_args.get_one::<String>("endpoint-pageserver-id") {
|
||||
Some(NodeId(
|
||||
id_str.parse().context("while parsing pageserver id")?,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
endpoint.reconfigure(pageserver_id)?;
|
||||
let pageserver_ids: Option<Result<Vec<NodeId>, _>> = sub_args
|
||||
.get_many::<String>("endpoint-pageserver-id")
|
||||
.map(|ids| {
|
||||
ids.map(|id_str| id_str.parse().context("while parsing pageserver id"))
|
||||
.map(|r| r.map(NodeId))
|
||||
.collect()
|
||||
});
|
||||
|
||||
let pageserver_ids = match pageserver_ids {
|
||||
Some(Ok(v)) => Ok(Some(v)),
|
||||
Some(Err(e)) => Err(e),
|
||||
None => Ok(None),
|
||||
}?;
|
||||
|
||||
endpoint.reconfigure(pageserver_ids)?;
|
||||
}
|
||||
"stop" => {
|
||||
let endpoint_id = sub_args
|
||||
@@ -1313,6 +1375,7 @@ fn cli() -> Command {
|
||||
.arg(pg_version_arg.clone())
|
||||
.arg(Arg::new("set-default").long("set-default").action(ArgAction::SetTrue).required(false)
|
||||
.help("Use this tenant in future CLI commands where tenant_id is needed, but not specified"))
|
||||
.arg(Arg::new("shard-count").value_parser(value_parser!(u8)).long("shard-count").action(ArgAction::Set).help("Number of shards in the new tenant (default 1)"))
|
||||
)
|
||||
.subcommand(Command::new("set-default").arg(tenant_id_arg.clone().required(true))
|
||||
.about("Set a particular tenant as default in future CLI commands where tenant_id is needed, but not specified"))
|
||||
|
||||
@@ -67,7 +67,7 @@ pub struct EndpointConf {
|
||||
http_port: u16,
|
||||
pg_version: u32,
|
||||
skip_pg_catalog_updates: bool,
|
||||
pageserver_id: NodeId,
|
||||
pageservers: Vec<NodeId>,
|
||||
}
|
||||
|
||||
//
|
||||
@@ -82,6 +82,33 @@ pub struct ComputeControlPlane {
|
||||
env: LocalEnv,
|
||||
}
|
||||
|
||||
fn load_pageservers(
|
||||
env: &LocalEnv,
|
||||
pageserver_ids: &Vec<NodeId>,
|
||||
) -> anyhow::Result<Vec<PageServerNode>> {
|
||||
let mut pageservers = Vec::new();
|
||||
for ps_id in pageserver_ids {
|
||||
let pageserver = env
|
||||
.get_pageserver_conf(*ps_id)
|
||||
.map(|conf| PageServerNode::from_env(env, conf))?;
|
||||
pageservers.push(pageserver);
|
||||
}
|
||||
Ok(pageservers)
|
||||
}
|
||||
|
||||
fn build_pageserver_connstr(pageservers: &[PageServerNode]) -> String {
|
||||
pageservers
|
||||
.iter()
|
||||
.map(|ps| {
|
||||
let config = ps.pg_connection_config.clone();
|
||||
let (host, port) = (config.host(), config.port());
|
||||
// NOTE: avoid spaces in connection string, because it is less error prone if we forward it somewhere.
|
||||
format!("postgresql://no_user@{host}:{port}")
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.join(",")
|
||||
}
|
||||
|
||||
impl ComputeControlPlane {
|
||||
// Load current endpoints from the endpoints/ subdirectories
|
||||
pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
|
||||
@@ -119,19 +146,16 @@ impl ComputeControlPlane {
|
||||
http_port: Option<u16>,
|
||||
pg_version: u32,
|
||||
mode: ComputeMode,
|
||||
pageserver_id: NodeId,
|
||||
pageservers: Vec<NodeId>,
|
||||
) -> Result<Arc<Endpoint>> {
|
||||
let pg_port = pg_port.unwrap_or_else(|| self.get_port());
|
||||
let http_port = http_port.unwrap_or_else(|| self.get_port() + 1);
|
||||
let pageserver =
|
||||
PageServerNode::from_env(&self.env, self.env.get_pageserver_conf(pageserver_id)?);
|
||||
|
||||
let ep = Arc::new(Endpoint {
|
||||
endpoint_id: endpoint_id.to_owned(),
|
||||
pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), pg_port),
|
||||
http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), http_port),
|
||||
env: self.env.clone(),
|
||||
pageserver,
|
||||
pageservers: load_pageservers(&self.env, &pageservers)?,
|
||||
timeline_id,
|
||||
mode,
|
||||
tenant_id,
|
||||
@@ -157,7 +181,7 @@ impl ComputeControlPlane {
|
||||
pg_port,
|
||||
pg_version,
|
||||
skip_pg_catalog_updates: true,
|
||||
pageserver_id,
|
||||
pageservers,
|
||||
})?,
|
||||
)?;
|
||||
std::fs::write(
|
||||
@@ -216,7 +240,7 @@ pub struct Endpoint {
|
||||
// These are not part of the endpoint as such, but the environment
|
||||
// the endpoint runs in.
|
||||
pub env: LocalEnv,
|
||||
pageserver: PageServerNode,
|
||||
pageservers: Vec<PageServerNode>,
|
||||
|
||||
// Optimizations
|
||||
skip_pg_catalog_updates: bool,
|
||||
@@ -239,15 +263,14 @@ impl Endpoint {
|
||||
let conf: EndpointConf =
|
||||
serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
|
||||
|
||||
let pageserver =
|
||||
PageServerNode::from_env(env, env.get_pageserver_conf(conf.pageserver_id)?);
|
||||
let pageservers: Vec<PageServerNode> = load_pageservers(env, &conf.pageservers)?;
|
||||
|
||||
Ok(Endpoint {
|
||||
pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.pg_port),
|
||||
http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.http_port),
|
||||
endpoint_id,
|
||||
env: env.clone(),
|
||||
pageserver,
|
||||
pageservers,
|
||||
timeline_id: conf.timeline_id,
|
||||
mode: conf.mode,
|
||||
tenant_id: conf.tenant_id,
|
||||
@@ -482,13 +505,7 @@ impl Endpoint {
|
||||
std::fs::remove_dir_all(self.pgdata())?;
|
||||
}
|
||||
|
||||
let pageserver_connstring = {
|
||||
let config = &self.pageserver.pg_connection_config;
|
||||
let (host, port) = (config.host(), config.port());
|
||||
|
||||
// NOTE: avoid spaces in connection string, because it is less error prone if we forward it somewhere.
|
||||
format!("postgresql://no_user@{host}:{port}")
|
||||
};
|
||||
let pageserver_connstring = build_pageserver_connstr(&self.pageservers);
|
||||
let mut safekeeper_connstrings = Vec::new();
|
||||
if self.mode == ComputeMode::Primary {
|
||||
for sk_id in safekeepers {
|
||||
@@ -658,7 +675,7 @@ impl Endpoint {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn reconfigure(&self, pageserver_id: Option<NodeId>) -> Result<()> {
|
||||
pub fn reconfigure(&self, pageservers: Option<Vec<NodeId>>) -> Result<()> {
|
||||
let mut spec: ComputeSpec = {
|
||||
let spec_path = self.endpoint_path().join("spec.json");
|
||||
let file = std::fs::File::open(spec_path)?;
|
||||
@@ -668,23 +685,20 @@ impl Endpoint {
|
||||
let postgresql_conf = self.read_postgresql_conf()?;
|
||||
spec.cluster.postgresql_conf = Some(postgresql_conf);
|
||||
|
||||
if let Some(pageserver_id) = pageserver_id {
|
||||
if let Some(pageservers) = pageservers {
|
||||
let endpoint_config_path = self.endpoint_path().join("endpoint.json");
|
||||
let mut endpoint_conf: EndpointConf = {
|
||||
let file = std::fs::File::open(&endpoint_config_path)?;
|
||||
serde_json::from_reader(file)?
|
||||
};
|
||||
endpoint_conf.pageserver_id = pageserver_id;
|
||||
endpoint_conf.pageservers = pageservers.clone();
|
||||
std::fs::write(
|
||||
endpoint_config_path,
|
||||
serde_json::to_string_pretty(&endpoint_conf)?,
|
||||
)?;
|
||||
|
||||
let pageserver =
|
||||
PageServerNode::from_env(&self.env, self.env.get_pageserver_conf(pageserver_id)?);
|
||||
let ps_http_conf = &pageserver.pg_connection_config;
|
||||
let (host, port) = (ps_http_conf.host(), ps_http_conf.port());
|
||||
spec.pageserver_connstring = Some(format!("postgresql://no_user@{host}:{port}"));
|
||||
let pageservers = load_pageservers(&self.env, &pageservers)?;
|
||||
spec.pageserver_connstring = Some(build_pageserver_connstr(&pageservers));
|
||||
}
|
||||
|
||||
let client = reqwest::blocking::Client::new();
|
||||
|
||||
@@ -340,15 +340,8 @@ impl PageServerNode {
|
||||
.json()?)
|
||||
}
|
||||
|
||||
pub fn tenant_create(
|
||||
&self,
|
||||
new_tenant_id: TenantId,
|
||||
generation: Option<u32>,
|
||||
settings: HashMap<&str, &str>,
|
||||
) -> anyhow::Result<TenantId> {
|
||||
let mut settings = settings.clone();
|
||||
|
||||
let config = models::TenantConfig {
|
||||
pub fn build_config(mut settings: HashMap<&str, &str>) -> anyhow::Result<models::TenantConfig> {
|
||||
Ok(models::TenantConfig {
|
||||
checkpoint_distance: settings
|
||||
.remove("checkpoint_distance")
|
||||
.map(|x| x.parse::<u64>())
|
||||
@@ -407,8 +400,16 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_feedback' as bool")?,
|
||||
};
|
||||
})
|
||||
}
|
||||
|
||||
pub fn tenant_create(
|
||||
&self,
|
||||
new_tenant_id: TenantId,
|
||||
generation: Option<u32>,
|
||||
settings: HashMap<&str, &str>,
|
||||
) -> anyhow::Result<TenantId> {
|
||||
let config = Self::build_config(settings.clone())?;
|
||||
let request = models::TenantCreateRequest {
|
||||
new_tenant_id: TenantShardId::unsharded(new_tenant_id),
|
||||
generation,
|
||||
@@ -521,15 +522,18 @@ impl PageServerNode {
|
||||
|
||||
pub fn location_config(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
config: LocationConfig,
|
||||
flush_ms: Option<Duration>,
|
||||
) -> anyhow::Result<()> {
|
||||
let req_body = TenantLocationConfigRequest { tenant_id, config };
|
||||
let req_body = TenantLocationConfigRequest {
|
||||
tenant_shard_id,
|
||||
config,
|
||||
};
|
||||
|
||||
let path = format!(
|
||||
"{}/tenant/{}/location_config",
|
||||
self.http_base_url, tenant_id
|
||||
self.http_base_url, tenant_shard_id
|
||||
);
|
||||
let path = if let Some(flush_ms) = flush_ms {
|
||||
format!("{}?flush_ms={}", path, flush_ms.as_millis())
|
||||
@@ -560,7 +564,7 @@ impl PageServerNode {
|
||||
|
||||
pub fn timeline_create(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
new_timeline_id: Option<TimelineId>,
|
||||
ancestor_start_lsn: Option<Lsn>,
|
||||
ancestor_timeline_id: Option<TimelineId>,
|
||||
@@ -572,7 +576,7 @@ impl PageServerNode {
|
||||
|
||||
self.http_request(
|
||||
Method::POST,
|
||||
format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id),
|
||||
format!("{}/tenant/{}/timeline", self.http_base_url, tenant_shard_id),
|
||||
)?
|
||||
.json(&models::TimelineCreateRequest {
|
||||
new_timeline_id,
|
||||
@@ -585,11 +589,11 @@ impl PageServerNode {
|
||||
.error_from_body()?
|
||||
.json::<Option<TimelineInfo>>()
|
||||
.with_context(|| {
|
||||
format!("Failed to parse timeline creation response for tenant id: {tenant_id}")
|
||||
format!("Failed to parse timeline creation response for tenant id: {tenant_shard_id}")
|
||||
})?
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"No timeline id was found in the timeline creation response for tenant {tenant_id}"
|
||||
"No timeline id was found in the timeline creation response for tenant {tenant_shard_id}"
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ use crate::{
|
||||
use pageserver_api::models::{
|
||||
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
use utils::{
|
||||
@@ -108,6 +109,9 @@ pub fn migrate_tenant(
|
||||
}
|
||||
}
|
||||
|
||||
// No support for sharding in this function yet
|
||||
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
|
||||
|
||||
let previous = attachment_service.inspect(tenant_id)?;
|
||||
let mut baseline_lsns = None;
|
||||
if let Some((generation, origin_ps_id)) = &previous {
|
||||
@@ -117,7 +121,7 @@ pub fn migrate_tenant(
|
||||
println!("🔁 Already attached to {origin_ps_id}, freshening...");
|
||||
let gen = attachment_service.attach_hook(tenant_id, dest_ps.conf.id)?;
|
||||
let dest_conf = build_location_config(LocationConfigMode::AttachedSingle, gen, None);
|
||||
dest_ps.location_config(tenant_id, dest_conf, None)?;
|
||||
dest_ps.location_config(tenant_shard_id, dest_conf, None)?;
|
||||
println!("✅ Migration complete");
|
||||
return Ok(());
|
||||
}
|
||||
@@ -126,7 +130,7 @@ pub fn migrate_tenant(
|
||||
|
||||
let stale_conf =
|
||||
build_location_config(LocationConfigMode::AttachedStale, Some(*generation), None);
|
||||
origin_ps.location_config(tenant_id, stale_conf, Some(Duration::from_secs(10)))?;
|
||||
origin_ps.location_config(tenant_shard_id, stale_conf, Some(Duration::from_secs(10)))?;
|
||||
|
||||
baseline_lsns = Some(get_lsns(tenant_id, &origin_ps)?);
|
||||
}
|
||||
@@ -135,7 +139,7 @@ pub fn migrate_tenant(
|
||||
let dest_conf = build_location_config(LocationConfigMode::AttachedMulti, gen, None);
|
||||
|
||||
println!("🔁 Attaching to pageserver {}", dest_ps.conf.id);
|
||||
dest_ps.location_config(tenant_id, dest_conf, None)?;
|
||||
dest_ps.location_config(tenant_shard_id, dest_conf, None)?;
|
||||
|
||||
if let Some(baseline) = baseline_lsns {
|
||||
println!("🕑 Waiting for LSN to catch up...");
|
||||
@@ -149,7 +153,7 @@ pub fn migrate_tenant(
|
||||
"🔁 Reconfiguring endpoint {} to use pageserver {}",
|
||||
endpoint_name, dest_ps.conf.id
|
||||
);
|
||||
endpoint.reconfigure(Some(dest_ps.conf.id))?;
|
||||
endpoint.reconfigure(Some(vec![dest_ps.conf.id]))?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -181,7 +185,7 @@ pub fn migrate_tenant(
|
||||
"💤 Switching to secondary mode on pageserver {}",
|
||||
other_ps.conf.id
|
||||
);
|
||||
other_ps.location_config(tenant_id, secondary_conf, None)?;
|
||||
other_ps.location_config(tenant_shard_id, secondary_conf, None)?;
|
||||
}
|
||||
|
||||
println!(
|
||||
@@ -189,7 +193,7 @@ pub fn migrate_tenant(
|
||||
dest_ps.conf.id
|
||||
);
|
||||
let dest_conf = build_location_config(LocationConfigMode::AttachedSingle, gen, None);
|
||||
dest_ps.location_config(tenant_id, dest_conf, None)?;
|
||||
dest_ps.location_config(tenant_shard_id, dest_conf, None)?;
|
||||
|
||||
println!("✅ Migration complete");
|
||||
|
||||
|
||||
21
demo_sharding.sh
Normal file
21
demo_sharding.sh
Normal file
@@ -0,0 +1,21 @@
|
||||
|
||||
|
||||
export RUST_LOG=DEBUG
|
||||
SHARDS=4
|
||||
PAGESERVERS=`seq -s , 1 $SHARDS`
|
||||
SCALE=10
|
||||
ARGS=--features=testing
|
||||
|
||||
set -e
|
||||
|
||||
set +e
|
||||
cargo neon $ARGS stop ; killall -9 storage_broker ; killall -9 safekeeper ; killall -9 pageserver ; killall -9 postgres ; killall -9 attachment_service ; rm -rf .neon
|
||||
set -e
|
||||
|
||||
cargo build --package=pageserver && cargo neon $ARGS init --num-pageservers=$SHARDS && RUST_LOG=debug cargo neon $ARGS start && cargo neon $ARGS tenant create --shard-count=$SHARDS --tenant-id=1f359dd625e519a1a4e8d7509690f6fc --timeline-id=3d34095be52fec4c44a92e774c573b57 --set-default
|
||||
|
||||
cargo neon $ARGS endpoint create --pageserver-id=$PAGESERVERS && cargo neon endpoint start --pageserver-id=$PAGESERVERS ep-main
|
||||
|
||||
pgbench postgres -i -h 127.0.0.1 -p 55432 -U cloud_admin -s $SCALE
|
||||
|
||||
du -sh .neon/local_fs_remote_storage/pageserver/tenants/1f359dd625e519a1a4e8d7509690f6fc*
|
||||
@@ -293,7 +293,7 @@ pub struct StatusResponse {
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct TenantLocationConfigRequest {
|
||||
pub tenant_id: TenantId,
|
||||
pub tenant_shard_id: TenantShardId,
|
||||
#[serde(flatten)]
|
||||
pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it
|
||||
}
|
||||
|
||||
@@ -411,6 +411,12 @@ impl ShardIdentity {
|
||||
String::new()
|
||||
}
|
||||
}
|
||||
|
||||
/// Convenience for checking if this identity is the 0th shard in a tenant,
|
||||
/// for special cases on shard 0 such as ingesting relation sizes.
|
||||
pub fn is_zero(&self) -> bool {
|
||||
self.number == ShardNumber(0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for ShardIndex {
|
||||
|
||||
@@ -709,26 +709,6 @@ async fn tenant_detach_handler(
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn tenant_reset_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
let drop_cache: Option<bool> = parse_query_param(&request, "drop_cache")?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
let state = get_state(&request);
|
||||
state
|
||||
.tenant_manager
|
||||
.reset_tenant(tenant_shard_id, drop_cache.unwrap_or(false), ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn tenant_load_handler(
|
||||
mut request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
@@ -1848,9 +1828,6 @@ pub fn make_router(
|
||||
.post("/v1/tenant/:tenant_id/detach", |r| {
|
||||
api_handler(r, tenant_detach_handler)
|
||||
})
|
||||
.post("/v1/tenant/:tenant_shard_id/reset", |r| {
|
||||
api_handler(r, tenant_reset_handler)
|
||||
})
|
||||
.post("/v1/tenant/:tenant_id/load", |r| {
|
||||
api_handler(r, tenant_load_handler)
|
||||
})
|
||||
|
||||
@@ -205,7 +205,7 @@ async fn timed<Fut: std::future::Future>(
|
||||
match tokio::time::timeout(warn_at, &mut fut).await {
|
||||
Ok(ret) => {
|
||||
tracing::info!(
|
||||
stage = name,
|
||||
task = name,
|
||||
elapsed_ms = started.elapsed().as_millis(),
|
||||
"completed"
|
||||
);
|
||||
@@ -213,7 +213,7 @@ async fn timed<Fut: std::future::Future>(
|
||||
}
|
||||
Err(_) => {
|
||||
tracing::info!(
|
||||
stage = name,
|
||||
task = name,
|
||||
elapsed_ms = started.elapsed().as_millis(),
|
||||
"still waiting, taking longer than expected..."
|
||||
);
|
||||
@@ -222,7 +222,7 @@ async fn timed<Fut: std::future::Future>(
|
||||
|
||||
// this has a global allowed_errors
|
||||
tracing::warn!(
|
||||
stage = name,
|
||||
task = name,
|
||||
elapsed_ms = started.elapsed().as_millis(),
|
||||
"completed, took longer than expected"
|
||||
);
|
||||
|
||||
@@ -1167,6 +1167,30 @@ pub(crate) static DELETION_QUEUE: Lazy<DeletionQueueMetrics> = Lazy::new(|| {
|
||||
}
|
||||
});
|
||||
|
||||
pub(crate) struct WalIngestMetrics {
|
||||
pub(crate) records_received: IntCounter,
|
||||
pub(crate) records_committed: IntCounter,
|
||||
pub(crate) records_filtered: IntCounter,
|
||||
}
|
||||
|
||||
pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| WalIngestMetrics {
|
||||
records_received: register_int_counter!(
|
||||
"pageserver_wal_ingest_records_received",
|
||||
"Number of WAL records received from safekeeper"
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
records_committed: register_int_counter!(
|
||||
"pageserver_wal_ingest_records_committed",
|
||||
"Number of WAL records which resulted in writes to pageserver storage"
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
records_filtered: register_int_counter!(
|
||||
"pageserver_wal_ingest_records_filtered",
|
||||
"Number of WAL records filtered out due to sharding"
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
});
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub enum RemoteOpKind {
|
||||
Upload,
|
||||
|
||||
@@ -1368,6 +1368,10 @@ impl<'a> DatadirModification<'a> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn is_empty(&self) -> bool {
|
||||
self.pending_updates.is_empty() && self.pending_deletions.is_empty()
|
||||
}
|
||||
|
||||
// Internal helper functions to batch the modifications
|
||||
|
||||
async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
|
||||
|
||||
@@ -2515,7 +2515,7 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
debug!("persisting tenantconf to {config_path}");
|
||||
info!("persisting tenantconf to {config_path}");
|
||||
|
||||
let mut conf_content = r#"# This file contains a specific per-tenant's config.
|
||||
# It is read in case of pageserver restart.
|
||||
@@ -2550,7 +2550,7 @@ impl Tenant {
|
||||
target_config_path: &Utf8Path,
|
||||
tenant_conf: &TenantConfOpt,
|
||||
) -> anyhow::Result<()> {
|
||||
debug!("persisting tenantconf to {target_config_path}");
|
||||
info!("persisting tenantconf to {target_config_path}");
|
||||
|
||||
let mut conf_content = r#"# This file contains a specific per-tenant's config.
|
||||
# It is read in case of pageserver restart.
|
||||
|
||||
@@ -270,6 +270,49 @@ async fn safe_rename_tenant_dir(path: impl AsRef<Utf8Path>) -> std::io::Result<U
|
||||
static TENANTS: Lazy<std::sync::RwLock<TenantsMap>> =
|
||||
Lazy::new(|| std::sync::RwLock::new(TenantsMap::Initializing));
|
||||
|
||||
/// Create a directory, including parents. This does no fsyncs and makes
|
||||
/// no guarantees about the persistence of the resulting metadata: for
|
||||
/// use when creating dirs for use as cache.
|
||||
async fn unsafe_create_dir_all(path: &Utf8PathBuf) -> std::io::Result<()> {
|
||||
let mut dirs_to_create = Vec::new();
|
||||
let mut path: &Utf8Path = path.as_ref();
|
||||
|
||||
// Figure out which directories we need to create.
|
||||
loop {
|
||||
let meta = tokio::fs::metadata(path).await;
|
||||
match meta {
|
||||
Ok(metadata) if metadata.is_dir() => break,
|
||||
Ok(_) => {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::AlreadyExists,
|
||||
format!("non-directory found in path: {path}"),
|
||||
));
|
||||
}
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => {}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
|
||||
dirs_to_create.push(path);
|
||||
|
||||
match path.parent() {
|
||||
Some(parent) => path = parent,
|
||||
None => {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidInput,
|
||||
format!("can't find parent of path '{path}'"),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create directories from parent to child.
|
||||
for &path in dirs_to_create.iter().rev() {
|
||||
tokio::fs::create_dir(path).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// The TenantManager is responsible for storing and mutating the collection of all tenants
|
||||
/// that this pageserver process has state for. Every Tenant and SecondaryTenant instance
|
||||
/// lives inside the TenantManager.
|
||||
@@ -603,13 +646,7 @@ pub(crate) fn tenant_spawn(
|
||||
"Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}"
|
||||
);
|
||||
|
||||
info!(
|
||||
tenant_id = %tenant_shard_id.tenant_id,
|
||||
shard_id = %tenant_shard_id.shard_slug(),
|
||||
generation = ?location_conf.location.generation,
|
||||
attach_mode = ?location_conf.location.attach_mode,
|
||||
"Attaching tenant"
|
||||
);
|
||||
info!("Attaching tenant {tenant_shard_id}");
|
||||
let tenant = match Tenant::spawn(
|
||||
conf,
|
||||
tenant_shard_id,
|
||||
@@ -998,7 +1035,7 @@ impl TenantManager {
|
||||
LocationMode::Secondary(_) => {
|
||||
// Directory doesn't need to be fsync'd because if we crash it can
|
||||
// safely be recreated next time this tenant location is configured.
|
||||
tokio::fs::create_dir_all(&tenant_path)
|
||||
unsafe_create_dir_all(&tenant_path)
|
||||
.await
|
||||
.with_context(|| format!("Creating {tenant_path}"))?;
|
||||
|
||||
@@ -1014,7 +1051,7 @@ impl TenantManager {
|
||||
// Directory doesn't need to be fsync'd because we do not depend on
|
||||
// it to exist after crashes: it may be recreated when tenant is
|
||||
// re-attached, see https://github.com/neondatabase/neon/issues/5550
|
||||
tokio::fs::create_dir_all(&tenant_path)
|
||||
unsafe_create_dir_all(&timelines_path)
|
||||
.await
|
||||
.with_context(|| format!("Creating {timelines_path}"))?;
|
||||
|
||||
@@ -1044,81 +1081,6 @@ impl TenantManager {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Resetting a tenant is equivalent to detaching it, then attaching it again with the same
|
||||
/// LocationConf that was last used to attach it. Optionally, the local file cache may be
|
||||
/// dropped before re-attaching.
|
||||
///
|
||||
/// This is not part of a tenant's normal lifecycle: it is used for debug/support, in situations
|
||||
/// where an issue is identified that would go away with a restart of the tenant.
|
||||
///
|
||||
/// This does not have any special "force" shutdown of a tenant: it relies on the tenant's tasks
|
||||
/// to respect the cancellation tokens used in normal shutdown().
|
||||
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %drop_cache))]
|
||||
pub(crate) async fn reset_tenant(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
drop_cache: bool,
|
||||
ctx: RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
|
||||
let Some(old_slot) = slot_guard.get_old_value() else {
|
||||
anyhow::bail!("Tenant not found when trying to reset");
|
||||
};
|
||||
|
||||
let Some(tenant) = old_slot.get_attached() else {
|
||||
slot_guard.revert();
|
||||
anyhow::bail!("Tenant is not in attached state");
|
||||
};
|
||||
|
||||
let (_guard, progress) = utils::completion::channel();
|
||||
match tenant.shutdown(progress, false).await {
|
||||
Ok(()) => {
|
||||
slot_guard.drop_old_value()?;
|
||||
}
|
||||
Err(_barrier) => {
|
||||
slot_guard.revert();
|
||||
anyhow::bail!("Cannot reset Tenant, already shutting down");
|
||||
}
|
||||
}
|
||||
|
||||
let tenant_path = self.conf.tenant_path(&tenant_shard_id);
|
||||
let timelines_path = self.conf.timelines_path(&tenant_shard_id);
|
||||
let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
|
||||
|
||||
if drop_cache {
|
||||
tracing::info!("Dropping local file cache");
|
||||
|
||||
match tokio::fs::read_dir(&timelines_path).await {
|
||||
Err(e) => {
|
||||
tracing::warn!("Failed to list timelines while dropping cache: {}", e);
|
||||
}
|
||||
Ok(mut entries) => {
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
tokio::fs::remove_dir_all(entry.path()).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let shard_identity = config.shard;
|
||||
let tenant = tenant_spawn(
|
||||
self.conf,
|
||||
tenant_shard_id,
|
||||
&tenant_path,
|
||||
self.resources.clone(),
|
||||
AttachedTenantConf::try_from(config)?,
|
||||
shard_identity,
|
||||
None,
|
||||
self.tenants,
|
||||
SpawnMode::Normal,
|
||||
&ctx,
|
||||
)?;
|
||||
|
||||
slot_guard.upsert(TenantSlot::Attached(tenant))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@@ -1327,7 +1289,8 @@ pub(crate) async fn delete_tenant(
|
||||
// See https://github.com/neondatabase/neon/issues/5080
|
||||
|
||||
// TODO(sharding): make delete API sharding-aware
|
||||
let slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?;
|
||||
let mut slot_guard =
|
||||
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?;
|
||||
|
||||
// unwrap is safe because we used MustExist mode when acquiring
|
||||
let tenant = match slot_guard.get_old_value().as_ref().unwrap() {
|
||||
@@ -1654,10 +1617,9 @@ pub enum TenantSlotUpsertError {
|
||||
MapState(#[from] TenantMapError),
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[derive(Debug)]
|
||||
enum TenantSlotDropError {
|
||||
/// It is only legal to drop a TenantSlot if its contents are fully shut down
|
||||
#[error("Tenant was not shut down")]
|
||||
NotShutdown,
|
||||
}
|
||||
|
||||
@@ -1717,9 +1679,9 @@ impl SlotGuard {
|
||||
}
|
||||
}
|
||||
|
||||
/// Get any value that was present in the slot before we acquired ownership
|
||||
/// Take any value that was present in the slot before we acquired ownership
|
||||
/// of it: in state transitions, this will be the old state.
|
||||
fn get_old_value(&self) -> &Option<TenantSlot> {
|
||||
fn get_old_value(&mut self) -> &Option<TenantSlot> {
|
||||
&self.old_value
|
||||
}
|
||||
|
||||
|
||||
@@ -21,6 +21,7 @@
|
||||
//! redo Postgres process, but some records it can handle directly with
|
||||
//! bespoken Rust code.
|
||||
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
|
||||
use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
|
||||
use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
|
||||
@@ -30,6 +31,7 @@ use bytes::{Buf, Bytes, BytesMut};
|
||||
use tracing::*;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::metrics::WAL_INGEST;
|
||||
use crate::pgdatadir_mapping::*;
|
||||
use crate::tenant::PageReconstructError;
|
||||
use crate::tenant::Timeline;
|
||||
@@ -46,6 +48,7 @@ use postgres_ffi::BLCKSZ;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
pub struct WalIngest<'a> {
|
||||
shard: ShardIdentity,
|
||||
timeline: &'a Timeline,
|
||||
|
||||
checkpoint: CheckPoint,
|
||||
@@ -65,6 +68,7 @@ impl<'a> WalIngest<'a> {
|
||||
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
|
||||
|
||||
Ok(WalIngest {
|
||||
shard: *timeline.get_shard_identity(),
|
||||
timeline,
|
||||
checkpoint,
|
||||
checkpoint_modified: false,
|
||||
@@ -87,6 +91,8 @@ impl<'a> WalIngest<'a> {
|
||||
decoded: &mut DecodedWALRecord,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
WAL_INGEST.records_received.inc();
|
||||
|
||||
modification.lsn = lsn;
|
||||
decode_wal_record(recdata, decoded, self.timeline.pg_version)?;
|
||||
|
||||
@@ -355,6 +361,32 @@ impl<'a> WalIngest<'a> {
|
||||
// Iterate through all the blocks that the record modifies, and
|
||||
// "put" a separate copy of the record for each block.
|
||||
for blk in decoded.blocks.iter() {
|
||||
let rel = RelTag {
|
||||
spcnode: blk.rnode_spcnode,
|
||||
dbnode: blk.rnode_dbnode,
|
||||
relnode: blk.rnode_relnode,
|
||||
forknum: blk.forknum,
|
||||
};
|
||||
|
||||
let key = rel_block_to_key(rel, blk.blkno);
|
||||
let key_is_local = self.shard.is_key_local(&key);
|
||||
|
||||
tracing::debug!(
|
||||
"ingest: shard decision {} (checkpoint={}) for key {}",
|
||||
if !key_is_local { "drop" } else { "keep" },
|
||||
self.checkpoint_modified,
|
||||
key
|
||||
);
|
||||
|
||||
if !key_is_local {
|
||||
if self.shard.is_zero() {
|
||||
// Shard 0 tracks relation sizes. Although we will not store this block, we will observe
|
||||
// its blkno in case it implicitly extends a relation.
|
||||
self.observe_decoded_block(modification, blk, ctx).await?;
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
self.ingest_decoded_block(modification, lsn, decoded, blk, ctx)
|
||||
.await?;
|
||||
}
|
||||
@@ -367,13 +399,37 @@ impl<'a> WalIngest<'a> {
|
||||
self.checkpoint_modified = false;
|
||||
}
|
||||
|
||||
if modification.is_empty() {
|
||||
tracing::debug!("ingest: filtered out record @ LSN {lsn}");
|
||||
WAL_INGEST.records_filtered.inc();
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Now that this record has been fully handled, including updating the
|
||||
// checkpoint data, let the repository know that it is up-to-date to this LSN
|
||||
WAL_INGEST.records_committed.inc();
|
||||
modification.commit(ctx).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Do not store this block, but observe it for the purposes of updating our relation size state.
|
||||
async fn observe_decoded_block(
|
||||
&mut self,
|
||||
modification: &mut DatadirModification<'_>,
|
||||
blk: &DecodedBkpBlock,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), PageReconstructError> {
|
||||
let rel = RelTag {
|
||||
spcnode: blk.rnode_spcnode,
|
||||
dbnode: blk.rnode_dbnode,
|
||||
relnode: blk.rnode_relnode,
|
||||
forknum: blk.forknum,
|
||||
};
|
||||
self.handle_rel_extend(modification, rel, blk.blkno, ctx)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn ingest_decoded_block(
|
||||
&mut self,
|
||||
modification: &mut DatadirModification<'_>,
|
||||
@@ -1465,8 +1521,15 @@ impl<'a> WalIngest<'a> {
|
||||
//info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
|
||||
modification.put_rel_extend(rel, new_nblocks, ctx).await?;
|
||||
|
||||
let mut key = rel_block_to_key(rel, blknum);
|
||||
// fill the gap with zeros
|
||||
for gap_blknum in old_nblocks..blknum {
|
||||
key.field6 = gap_blknum;
|
||||
|
||||
if self.shard.get_shard_number(&key) != self.shard.number {
|
||||
continue;
|
||||
}
|
||||
|
||||
modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone())?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,7 +18,9 @@
|
||||
#include "fmgr.h"
|
||||
#include "access/xlog.h"
|
||||
#include "access/xlogutils.h"
|
||||
#include "common/hashfn.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "storage/lwlock.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/pg_shmem.h"
|
||||
#include "c.h"
|
||||
@@ -35,24 +37,12 @@
|
||||
#include "neon.h"
|
||||
#include "walproposer.h"
|
||||
#include "neon_utils.h"
|
||||
|
||||
#include <pthread.h>
|
||||
#include "control_plane_connector.h"
|
||||
|
||||
#define PageStoreTrace DEBUG5
|
||||
|
||||
#define RECONNECT_INTERVAL_USEC 1000000
|
||||
|
||||
bool connected = false;
|
||||
PGconn *pageserver_conn = NULL;
|
||||
|
||||
/*
|
||||
* WaitEventSet containing:
|
||||
* - WL_SOCKET_READABLE on pageserver_conn,
|
||||
* - WL_LATCH_SET on MyLatch, and
|
||||
* - WL_EXIT_ON_PM_DEATH.
|
||||
*/
|
||||
WaitEventSet *pageserver_conn_wes = NULL;
|
||||
|
||||
/* GUCs */
|
||||
char *neon_timeline;
|
||||
char *neon_tenant;
|
||||
@@ -65,96 +55,175 @@ int flush_every_n_requests = 8;
|
||||
|
||||
int n_reconnect_attempts = 0;
|
||||
int max_reconnect_attempts = 60;
|
||||
|
||||
#define MAX_PAGESERVER_CONNSTRING_SIZE 256
|
||||
|
||||
typedef struct
|
||||
{
|
||||
pthread_rwlock_t lock;
|
||||
pg_atomic_uint64 update_counter;
|
||||
char pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE];
|
||||
} PagestoreShmemState;
|
||||
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
static shmem_request_hook_type prev_shmem_request_hook = NULL;
|
||||
static void walproposer_shmem_request(void);
|
||||
#endif
|
||||
static shmem_startup_hook_type prev_shmem_startup_hook;
|
||||
static PagestoreShmemState *pagestore_shared;
|
||||
static uint64 pagestore_local_counter = 0;
|
||||
static char local_pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE];
|
||||
int stripe_size;
|
||||
|
||||
bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;
|
||||
|
||||
static bool pageserver_flush(void);
|
||||
static void pageserver_disconnect(void);
|
||||
static bool pageserver_flush(shardno_t shard_no);
|
||||
static void pageserver_disconnect(shardno_t shard_no);
|
||||
static void AssignPageserverConnstring(const char *newval, void *extra);
|
||||
|
||||
static bool
|
||||
PagestoreShmemIsValid()
|
||||
{
|
||||
return pagestore_shared && UsedShmemSegAddr;
|
||||
}
|
||||
static shmem_startup_hook_type prev_shmem_startup_hook;
|
||||
#if PG_VERSION_NUM>=150000
|
||||
static shmem_request_hook_type prev_shmem_request_hook;
|
||||
#endif
|
||||
|
||||
static bool
|
||||
CheckPageserverConnstring(char **newval, void **extra, GucSource source)
|
||||
typedef struct
|
||||
{
|
||||
return strlen(*newval) < MAX_PAGESERVER_CONNSTRING_SIZE;
|
||||
size_t n_shards;
|
||||
pg_atomic_uint64 begin_update_counter;
|
||||
pg_atomic_uint64 end_update_counter;
|
||||
char shard_connstr[MAX_SHARDS][MAX_PS_CONNSTR_LEN];
|
||||
} ShardMap;
|
||||
|
||||
|
||||
static ShardMap* shard_map;
|
||||
static uint64 shard_map_update_counter;
|
||||
|
||||
typedef struct
|
||||
{
|
||||
/*
|
||||
* Connection for each shard
|
||||
*/
|
||||
PGconn *conn;
|
||||
/*
|
||||
* WaitEventSet containing:
|
||||
* - WL_SOCKET_READABLE on pageserver_conn,
|
||||
* - WL_LATCH_SET on MyLatch, and
|
||||
* - WL_EXIT_ON_PM_DEATH.
|
||||
*/
|
||||
WaitEventSet *wes;
|
||||
} PageServer;
|
||||
|
||||
static PageServer page_servers[MAX_SHARDS];
|
||||
static shardno_t max_attached_shard_no;
|
||||
|
||||
static void
|
||||
psm_shmem_startup(void)
|
||||
{
|
||||
bool found;
|
||||
if (prev_shmem_startup_hook)
|
||||
{
|
||||
prev_shmem_startup_hook();
|
||||
}
|
||||
|
||||
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
|
||||
|
||||
shard_map = (ShardMap*)ShmemInitStruct("shard_map", sizeof(ShardMap), &found);
|
||||
if (!found)
|
||||
{
|
||||
shard_map->n_shards = 0;
|
||||
pg_atomic_init_u64(&shard_map->begin_update_counter, 0);
|
||||
pg_atomic_init_u64(&shard_map->end_update_counter, 0);
|
||||
AssignPageserverConnstring(page_server_connstring, NULL);
|
||||
}
|
||||
LWLockRelease(AddinShmemInitLock);
|
||||
}
|
||||
|
||||
static void
|
||||
AssignPageserverConnstring(const char *newval, void *extra)
|
||||
psm_shmem_request(void)
|
||||
{
|
||||
if(!PagestoreShmemIsValid())
|
||||
return;
|
||||
pthread_rwlock_wrlock(&pagestore_shared->lock);
|
||||
strlcpy(pagestore_shared->pageserver_connstring, newval, MAX_PAGESERVER_CONNSTRING_SIZE);
|
||||
pg_atomic_fetch_add_u64(&pagestore_shared->update_counter, 1);
|
||||
pthread_rwlock_unlock(&pagestore_shared->lock);
|
||||
#if PG_VERSION_NUM>=150000
|
||||
if (prev_shmem_request_hook)
|
||||
prev_shmem_request_hook();
|
||||
#endif
|
||||
|
||||
RequestAddinShmemSpace(sizeof(ShardMap));
|
||||
}
|
||||
|
||||
static bool
|
||||
CheckConnstringUpdated()
|
||||
static void
|
||||
psm_init(void)
|
||||
{
|
||||
if(!PagestoreShmemIsValid())
|
||||
return false;
|
||||
return pagestore_local_counter < pg_atomic_read_u64(&pagestore_shared->update_counter);
|
||||
prev_shmem_startup_hook = shmem_startup_hook;
|
||||
shmem_startup_hook = psm_shmem_startup;
|
||||
#if PG_VERSION_NUM>=150000
|
||||
prev_shmem_request_hook = shmem_request_hook;
|
||||
shmem_request_hook = psm_shmem_request;
|
||||
#else
|
||||
psm_shmem_request();
|
||||
#endif
|
||||
}
|
||||
|
||||
/* Returns true if the connstring has changed and false if not */
|
||||
static bool
|
||||
ReloadConnstring()
|
||||
/*
|
||||
* Reload page map if needed and return number of shards and connection string for the specified shard
|
||||
*/
|
||||
static shardno_t
|
||||
load_shard_map(shardno_t shard_no, char* connstr)
|
||||
{
|
||||
if(!PagestoreShmemIsValid())
|
||||
return false;
|
||||
pthread_rwlock_rdlock(&pagestore_shared->lock);
|
||||
shardno_t n_shards;
|
||||
uint64 begin_update_counter;
|
||||
uint64 end_update_counter;
|
||||
|
||||
if(strcmp(local_pageserver_connstring, pagestore_shared->pageserver_connstring) == 0)
|
||||
{
|
||||
pthread_rwlock_unlock(&pagestore_shared->lock);
|
||||
return false;
|
||||
/*
|
||||
* There is race condition here between backendc and postmaster which can update shard map.
|
||||
* We recheck update couner after copying connection string to check that configuration was not changed.
|
||||
*/
|
||||
do
|
||||
{
|
||||
begin_update_counter = pg_atomic_read_u64(&shard_map->begin_update_counter);
|
||||
end_update_counter = pg_atomic_read_u64(&shard_map->end_update_counter);
|
||||
|
||||
n_shards = shard_map->n_shards;
|
||||
if (shard_no >= n_shards)
|
||||
elog(ERROR, "Shard %d is greater or equal than number of shards %d", shard_no, n_shards);
|
||||
|
||||
if (connstr)
|
||||
strncpy(connstr, shard_map->shard_connstr[shard_no], MAX_PS_CONNSTR_LEN);
|
||||
|
||||
}
|
||||
while (begin_update_counter != end_update_counter
|
||||
|| begin_update_counter != pg_atomic_read_u64(&shard_map->begin_update_counter)
|
||||
|| end_update_counter != pg_atomic_read_u64(&shard_map->end_update_counter));
|
||||
|
||||
if (shard_map_update_counter != end_update_counter)
|
||||
{
|
||||
/* Reset all connections if connection strings are changed */
|
||||
for (shardno_t i = 0; i < max_attached_shard_no; i++)
|
||||
{
|
||||
if (page_servers[i].conn)
|
||||
pageserver_disconnect(i);
|
||||
}
|
||||
max_attached_shard_no = 0;
|
||||
shard_map_update_counter = end_update_counter;
|
||||
}
|
||||
|
||||
strlcpy(local_pageserver_connstring, pagestore_shared->pageserver_connstring, sizeof(local_pageserver_connstring));
|
||||
pagestore_local_counter = pg_atomic_read_u64(&pagestore_shared->update_counter);
|
||||
pthread_rwlock_unlock(&pagestore_shared->lock);
|
||||
return true;
|
||||
return n_shards;
|
||||
}
|
||||
|
||||
#define MB (1024*1024)
|
||||
|
||||
shardno_t
|
||||
get_shard_number(BufferTag* tag)
|
||||
{
|
||||
shardno_t n_shards = load_shard_map(0, NULL);
|
||||
uint32 hash;
|
||||
|
||||
#if PG_MAJORVERSION_NUM < 16
|
||||
hash = murmurhash32(tag->rnode.relNode);
|
||||
hash = hash_combine(hash, murmurhash32(tag->blockNum/(MB/BLCKSZ)/stripe_size));
|
||||
#else
|
||||
hash = murmurhash32(tag->relNumber);
|
||||
hash = hash_combine(hash, murmurhash32(tag->blockNum/(MB/BLCKSZ)/stripe_size));
|
||||
#endif
|
||||
|
||||
return hash % n_shards;
|
||||
}
|
||||
|
||||
static bool
|
||||
pageserver_connect(int elevel)
|
||||
pageserver_connect(shardno_t shard_no, int elevel)
|
||||
{
|
||||
char *query;
|
||||
int ret;
|
||||
const char *keywords[3];
|
||||
const char *values[3];
|
||||
int n;
|
||||
PGconn* conn;
|
||||
WaitEventSet *wes;
|
||||
char connstr[MAX_PS_CONNSTR_LEN];
|
||||
|
||||
Assert(!connected);
|
||||
Assert(page_servers[shard_no].conn == NULL);
|
||||
|
||||
if(CheckConnstringUpdated())
|
||||
{
|
||||
ReloadConnstring();
|
||||
}
|
||||
(void)load_shard_map(shard_no, connstr); /* refresh page map if needed */
|
||||
|
||||
/*
|
||||
* Connect using the connection string we got from the
|
||||
@@ -175,19 +244,18 @@ pageserver_connect(int elevel)
|
||||
n++;
|
||||
}
|
||||
keywords[n] = "dbname";
|
||||
values[n] = local_pageserver_connstring;
|
||||
values[n] = connstr;
|
||||
n++;
|
||||
keywords[n] = NULL;
|
||||
values[n] = NULL;
|
||||
n++;
|
||||
pageserver_conn = PQconnectdbParams(keywords, values, 1);
|
||||
conn = PQconnectdbParams(keywords, values, 1);
|
||||
|
||||
if (PQstatus(pageserver_conn) == CONNECTION_BAD)
|
||||
if (PQstatus(conn) == CONNECTION_BAD)
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
char *msg = pchomp(PQerrorMessage(conn));
|
||||
|
||||
PQfinish(pageserver_conn);
|
||||
pageserver_conn = NULL;
|
||||
PQfinish(conn);
|
||||
|
||||
ereport(elevel,
|
||||
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
|
||||
@@ -195,30 +263,28 @@ pageserver_connect(int elevel)
|
||||
errdetail_internal("%s", msg)));
|
||||
return false;
|
||||
}
|
||||
|
||||
query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
|
||||
ret = PQsendQuery(pageserver_conn, query);
|
||||
ret = PQsendQuery(conn, query);
|
||||
if (ret != 1)
|
||||
{
|
||||
PQfinish(pageserver_conn);
|
||||
pageserver_conn = NULL;
|
||||
PQfinish(conn);
|
||||
neon_log(elevel, "could not send pagestream command to pageserver");
|
||||
return false;
|
||||
}
|
||||
|
||||
pageserver_conn_wes = CreateWaitEventSet(TopMemoryContext, 3);
|
||||
AddWaitEventToSet(pageserver_conn_wes, WL_LATCH_SET, PGINVALID_SOCKET,
|
||||
wes = CreateWaitEventSet(TopMemoryContext, 3);
|
||||
AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET,
|
||||
MyLatch, NULL);
|
||||
AddWaitEventToSet(pageserver_conn_wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
|
||||
AddWaitEventToSet(wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
|
||||
NULL, NULL);
|
||||
AddWaitEventToSet(pageserver_conn_wes, WL_SOCKET_READABLE, PQsocket(pageserver_conn), NULL, NULL);
|
||||
AddWaitEventToSet(wes, WL_SOCKET_READABLE, PQsocket(conn), NULL, NULL);
|
||||
|
||||
while (PQisBusy(pageserver_conn))
|
||||
while (PQisBusy(conn))
|
||||
{
|
||||
WaitEvent event;
|
||||
|
||||
/* Sleep until there's something to do */
|
||||
(void) WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
|
||||
(void) WaitEventSetWait(wes, -1L, &event, 1, PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
@@ -226,14 +292,12 @@ pageserver_connect(int elevel)
|
||||
/* Data available in socket? */
|
||||
if (event.events & WL_SOCKET_READABLE)
|
||||
{
|
||||
if (!PQconsumeInput(pageserver_conn))
|
||||
if (!PQconsumeInput(conn))
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
char *msg = pchomp(PQerrorMessage(conn));
|
||||
|
||||
PQfinish(pageserver_conn);
|
||||
pageserver_conn = NULL;
|
||||
FreeWaitEventSet(pageserver_conn_wes);
|
||||
pageserver_conn_wes = NULL;
|
||||
PQfinish(conn);
|
||||
FreeWaitEventSet(wes);
|
||||
|
||||
neon_log(elevel, "could not complete handshake with pageserver: %s",
|
||||
msg);
|
||||
@@ -242,9 +306,11 @@ pageserver_connect(int elevel)
|
||||
}
|
||||
}
|
||||
|
||||
neon_log(LOG, "libpagestore: connected to '%s'", page_server_connstring);
|
||||
neon_log(LOG, "libpagestore: connected to '%s'", connstr);
|
||||
page_servers[shard_no].conn = conn;
|
||||
page_servers[shard_no].wes = wes;
|
||||
max_attached_shard_no = Max(shard_no+1, max_attached_shard_no);
|
||||
|
||||
connected = true;
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -252,10 +318,10 @@ pageserver_connect(int elevel)
|
||||
* A wrapper around PQgetCopyData that checks for interrupts while sleeping.
|
||||
*/
|
||||
static int
|
||||
call_PQgetCopyData(char **buffer)
|
||||
call_PQgetCopyData(shardno_t shard_no, char **buffer)
|
||||
{
|
||||
int ret;
|
||||
|
||||
PGconn* pageserver_conn = page_servers[shard_no].conn;
|
||||
retry:
|
||||
ret = PQgetCopyData(pageserver_conn, buffer, 1 /* async */ );
|
||||
|
||||
@@ -264,7 +330,7 @@ retry:
|
||||
WaitEvent event;
|
||||
|
||||
/* Sleep until there's something to do */
|
||||
(void) WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
|
||||
(void) WaitEventSetWait(page_servers[shard_no].wes, -1L, &event, 1, PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
@@ -289,7 +355,7 @@ retry:
|
||||
|
||||
|
||||
static void
|
||||
pageserver_disconnect(void)
|
||||
pageserver_disconnect(shardno_t shard_no)
|
||||
{
|
||||
/*
|
||||
* If anything goes wrong while we were sending a request, it's not clear
|
||||
@@ -298,41 +364,32 @@ pageserver_disconnect(void)
|
||||
* time later after we have already sent a new unrelated request. Close
|
||||
* the connection to avoid getting confused.
|
||||
*/
|
||||
if (connected)
|
||||
if (page_servers[shard_no].conn)
|
||||
{
|
||||
PQfinish(pageserver_conn);
|
||||
pageserver_conn = NULL;
|
||||
connected = false;
|
||||
neon_log(LOG, "dropping connection to page server due to error");
|
||||
PQfinish(page_servers[shard_no].conn);
|
||||
page_servers[shard_no].conn = NULL;
|
||||
|
||||
prefetch_on_ps_disconnect();
|
||||
}
|
||||
if (pageserver_conn_wes != NULL)
|
||||
if (page_servers[shard_no].wes != NULL)
|
||||
{
|
||||
FreeWaitEventSet(pageserver_conn_wes);
|
||||
pageserver_conn_wes = NULL;
|
||||
FreeWaitEventSet(page_servers[shard_no].wes);
|
||||
page_servers[shard_no].wes = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static bool
|
||||
pageserver_send(NeonRequest * request)
|
||||
pageserver_send(shardno_t shard_no, NeonRequest * request)
|
||||
{
|
||||
StringInfoData req_buff;
|
||||
|
||||
if(CheckConnstringUpdated())
|
||||
{
|
||||
bool should_disconnect = ReloadConnstring();
|
||||
if(should_disconnect)
|
||||
{
|
||||
neon_log(LOG, "pageserver_send disconnect because connstring changed");
|
||||
pageserver_disconnect();
|
||||
}
|
||||
}
|
||||
PGconn* pageserver_conn = page_servers[shard_no].conn;
|
||||
|
||||
/* If the connection was lost for some reason, reconnect */
|
||||
if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD)
|
||||
if (pageserver_conn && PQstatus(pageserver_conn) == CONNECTION_BAD)
|
||||
{
|
||||
neon_log(LOG, "pageserver_send disconnect bad connection");
|
||||
pageserver_disconnect();
|
||||
pageserver_disconnect(shard_no);
|
||||
}
|
||||
|
||||
req_buff = nm_pack_request(request);
|
||||
@@ -344,9 +401,9 @@ pageserver_send(NeonRequest * request)
|
||||
* See https://github.com/neondatabase/neon/issues/1138
|
||||
* So try to reestablish connection in case of failure.
|
||||
*/
|
||||
if (!connected)
|
||||
if (!page_servers[shard_no].conn)
|
||||
{
|
||||
while (!pageserver_connect(n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
|
||||
while (!pageserver_connect(shard_no, n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
|
||||
{
|
||||
HandleMainLoopInterrupts();
|
||||
n_reconnect_attempts += 1;
|
||||
@@ -355,7 +412,9 @@ pageserver_send(NeonRequest * request)
|
||||
n_reconnect_attempts = 0;
|
||||
}
|
||||
|
||||
/*
|
||||
pageserver_conn = page_servers[shard_no].conn;
|
||||
|
||||
/*
|
||||
* Send request.
|
||||
*
|
||||
* In principle, this could block if the output buffer is full, and we
|
||||
@@ -366,7 +425,7 @@ pageserver_send(NeonRequest * request)
|
||||
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0)
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
pageserver_disconnect();
|
||||
pageserver_disconnect(shard_no);
|
||||
neon_log(LOG, "pageserver_send disconnect because failed to send page request (try to reconnect): %s", msg);
|
||||
pfree(msg);
|
||||
pfree(req_buff.data);
|
||||
@@ -386,12 +445,12 @@ pageserver_send(NeonRequest * request)
|
||||
}
|
||||
|
||||
static NeonResponse *
|
||||
pageserver_receive(void)
|
||||
pageserver_receive(shardno_t shard_no)
|
||||
{
|
||||
StringInfoData resp_buff;
|
||||
NeonResponse *resp;
|
||||
|
||||
if (!connected)
|
||||
PGconn* pageserver_conn = page_servers[shard_no].conn;
|
||||
if (!pageserver_conn)
|
||||
return NULL;
|
||||
|
||||
PG_TRY();
|
||||
@@ -399,7 +458,7 @@ pageserver_receive(void)
|
||||
/* read response */
|
||||
int rc;
|
||||
|
||||
rc = call_PQgetCopyData(&resp_buff.data);
|
||||
rc = call_PQgetCopyData(shard_no, &resp_buff.data);
|
||||
if (rc >= 0)
|
||||
{
|
||||
resp_buff.len = rc;
|
||||
@@ -418,25 +477,25 @@ pageserver_receive(void)
|
||||
else if (rc == -1)
|
||||
{
|
||||
neon_log(LOG, "pageserver_receive disconnect because call_PQgetCopyData returns -1: %s", pchomp(PQerrorMessage(pageserver_conn)));
|
||||
pageserver_disconnect();
|
||||
pageserver_disconnect(shard_no);
|
||||
resp = NULL;
|
||||
}
|
||||
else if (rc == -2)
|
||||
{
|
||||
char* msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
pageserver_disconnect();
|
||||
pageserver_disconnect(shard_no);
|
||||
neon_log(ERROR, "pageserver_receive disconnect because could not read COPY data: %s", msg);
|
||||
}
|
||||
else
|
||||
{
|
||||
pageserver_disconnect();
|
||||
pageserver_disconnect(shard_no);
|
||||
neon_log(ERROR, "pageserver_receive disconnect because unexpected PQgetCopyData return value: %d", rc);
|
||||
}
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
neon_log(LOG, "pageserver_receive disconnect due to caught exception");
|
||||
pageserver_disconnect();
|
||||
pageserver_disconnect(shard_no);
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
@@ -446,9 +505,10 @@ pageserver_receive(void)
|
||||
|
||||
|
||||
static bool
|
||||
pageserver_flush(void)
|
||||
pageserver_flush(shardno_t shard_no)
|
||||
{
|
||||
if (!connected)
|
||||
PGconn* pageserver_conn = page_servers[shard_no].conn;
|
||||
if (!pageserver_conn)
|
||||
{
|
||||
neon_log(WARNING, "Tried to flush while disconnected");
|
||||
}
|
||||
@@ -457,7 +517,7 @@ pageserver_flush(void)
|
||||
if (PQflush(pageserver_conn))
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
pageserver_disconnect();
|
||||
pageserver_disconnect(shard_no);
|
||||
neon_log(LOG, "pageserver_flush disconnect because failed to flush page requests: %s", msg);
|
||||
pfree(msg);
|
||||
return false;
|
||||
@@ -481,67 +541,61 @@ check_neon_id(char **newval, void **extra, GucSource source)
|
||||
return **newval == '\0' || HexDecodeString(id, *newval, 16);
|
||||
}
|
||||
|
||||
static Size
|
||||
PagestoreShmemSize(void)
|
||||
{
|
||||
return sizeof(PagestoreShmemState);
|
||||
}
|
||||
|
||||
static bool
|
||||
PagestoreShmemInit(void)
|
||||
{
|
||||
bool found;
|
||||
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
|
||||
pagestore_shared = ShmemInitStruct("libpagestore shared state",
|
||||
PagestoreShmemSize(),
|
||||
&found);
|
||||
if(!found)
|
||||
{
|
||||
pthread_rwlockattr_t attr;
|
||||
pthread_rwlockattr_init(&attr);
|
||||
pthread_rwlockattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
|
||||
pthread_rwlock_init(&pagestore_shared->lock, &attr);
|
||||
pthread_rwlockattr_destroy(&attr);
|
||||
|
||||
pg_atomic_init_u64(&pagestore_shared->update_counter, 0);
|
||||
AssignPageserverConnstring(page_server_connstring, NULL);
|
||||
}
|
||||
LWLockRelease(AddinShmemInitLock);
|
||||
return found;
|
||||
}
|
||||
|
||||
static void
|
||||
pagestore_shmem_startup_hook(void)
|
||||
AssignPageserverConnstring(const char *newval, void *extra)
|
||||
{
|
||||
if(prev_shmem_startup_hook)
|
||||
prev_shmem_startup_hook();
|
||||
/*
|
||||
* Load shard map only at Postmaster.
|
||||
* If old page server is not available, then backends can be blocked in attempts to reconnect to it and do not reload config in this loop
|
||||
*/
|
||||
if (shard_map != NULL && (MyProcPid == PostmasterPid || shard_map->n_shards == 0))
|
||||
{
|
||||
char const* shard_connstr = newval;
|
||||
char const* sep;
|
||||
size_t connstr_len;
|
||||
int i = 0;
|
||||
bool shard_map_changed = false;
|
||||
do
|
||||
{
|
||||
sep = strchr(shard_connstr, ',');
|
||||
connstr_len = sep != NULL ? sep - shard_connstr : strlen(shard_connstr);
|
||||
if (connstr_len == 0)
|
||||
break; /* trailing comma */
|
||||
if (i >= MAX_SHARDS)
|
||||
{
|
||||
elog(LOG, "Too many shards");
|
||||
return;
|
||||
}
|
||||
if (connstr_len >= MAX_PS_CONNSTR_LEN)
|
||||
{
|
||||
elog(LOG, "Connection string too long");
|
||||
return;
|
||||
}
|
||||
if (i >= shard_map->n_shards ||
|
||||
strcmp(shard_map->shard_connstr[i], shard_connstr) != 0)
|
||||
{
|
||||
if (!shard_map_changed)
|
||||
{
|
||||
pg_atomic_add_fetch_u64(&shard_map->begin_update_counter, 1);
|
||||
shard_map_changed = true;
|
||||
}
|
||||
memcpy(shard_map->shard_connstr[i], shard_connstr, connstr_len+1);
|
||||
}
|
||||
shard_connstr = sep + 1;
|
||||
i += 1;
|
||||
} while (sep != NULL);
|
||||
|
||||
PagestoreShmemInit();
|
||||
}
|
||||
|
||||
static void
|
||||
pagestore_shmem_request(void)
|
||||
{
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
if(prev_shmem_request_hook)
|
||||
prev_shmem_request_hook();
|
||||
#endif
|
||||
|
||||
RequestAddinShmemSpace(PagestoreShmemSize());
|
||||
RequestNamedLWLockTranche("neon_libpagestore", 1);
|
||||
}
|
||||
|
||||
static void
|
||||
pagestore_prepare_shmem(void)
|
||||
{
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
prev_shmem_request_hook = shmem_request_hook;
|
||||
shmem_request_hook = pagestore_shmem_request;
|
||||
#else
|
||||
pagestore_shmem_request();
|
||||
#endif
|
||||
prev_shmem_startup_hook = shmem_startup_hook;
|
||||
shmem_startup_hook = pagestore_shmem_startup_hook;
|
||||
if (i == 0)
|
||||
{
|
||||
elog(LOG, "No shards were specified");
|
||||
return;
|
||||
}
|
||||
if (shard_map_changed)
|
||||
{
|
||||
shard_map->n_shards = i;
|
||||
pg_atomic_add_fetch_u64(&shard_map->end_update_counter, 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -550,8 +604,6 @@ pagestore_prepare_shmem(void)
|
||||
void
|
||||
pg_init_libpagestore(void)
|
||||
{
|
||||
pagestore_prepare_shmem();
|
||||
|
||||
DefineCustomStringVariable("neon.pageserver_connstring",
|
||||
"connection string to the page server",
|
||||
NULL,
|
||||
@@ -559,7 +611,7 @@ pg_init_libpagestore(void)
|
||||
"",
|
||||
PGC_SIGHUP,
|
||||
0, /* no flags required */
|
||||
CheckPageserverConnstring, AssignPageserverConnstring, NULL);
|
||||
NULL, AssignPageserverConnstring, NULL);
|
||||
|
||||
DefineCustomStringVariable("neon.timeline_id",
|
||||
"Neon timeline_id the server is running on",
|
||||
@@ -579,6 +631,15 @@ pg_init_libpagestore(void)
|
||||
0, /* no flags required */
|
||||
check_neon_id, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable("neon.stripe_size",
|
||||
"sharding sripe size",
|
||||
NULL,
|
||||
&stripe_size,
|
||||
256, 1, INT_MAX,
|
||||
PGC_SIGHUP,
|
||||
GUC_UNIT_MB,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable("neon.max_cluster_size",
|
||||
"cluster size limit",
|
||||
NULL,
|
||||
@@ -641,4 +702,5 @@ pg_init_libpagestore(void)
|
||||
}
|
||||
|
||||
lfc_init();
|
||||
psm_init();
|
||||
}
|
||||
|
||||
@@ -20,12 +20,16 @@
|
||||
#include RELFILEINFO_HDR
|
||||
#include "storage/block.h"
|
||||
#include "storage/smgr.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "libpq/pqformat.h"
|
||||
#include "utils/memutils.h"
|
||||
|
||||
#include "pg_config.h"
|
||||
|
||||
#define MAX_SHARDS 128
|
||||
#define MAX_PS_CONNSTR_LEN 128
|
||||
|
||||
typedef enum
|
||||
{
|
||||
/* pagestore_client -> pagestore */
|
||||
@@ -144,11 +148,13 @@ extern char *nm_to_string(NeonMessage * msg);
|
||||
* API
|
||||
*/
|
||||
|
||||
typedef unsigned shardno_t;
|
||||
|
||||
typedef struct
|
||||
{
|
||||
bool (*send) (NeonRequest * request);
|
||||
NeonResponse *(*receive) (void);
|
||||
bool (*flush) (void);
|
||||
bool (*send) (shardno_t shard_no, NeonRequest * request);
|
||||
NeonResponse *(*receive) (shardno_t shard_no);
|
||||
bool (*flush) (shardno_t shard_no);
|
||||
} page_server_api;
|
||||
|
||||
extern void prefetch_on_ps_disconnect(void);
|
||||
@@ -165,6 +171,8 @@ extern char *neon_tenant;
|
||||
extern bool wal_redo;
|
||||
extern int32 max_cluster_size;
|
||||
|
||||
extern shardno_t get_shard_number(BufferTag* tag);
|
||||
|
||||
extern const f_smgr *smgr_neon(BackendId backend, NRelFileInfo rinfo);
|
||||
extern void smgr_init_neon(void);
|
||||
extern void readahead_buffer_resize(int newsize, void *extra);
|
||||
|
||||
@@ -59,7 +59,6 @@
|
||||
#include "replication/walsender.h"
|
||||
#include "storage/bufmgr.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "storage/fsm_internals.h"
|
||||
#include "storage/smgr.h"
|
||||
#include "storage/md.h"
|
||||
#include "pgstat.h"
|
||||
@@ -165,6 +164,7 @@ typedef struct PrefetchRequest {
|
||||
XLogRecPtr actual_request_lsn;
|
||||
NeonResponse *response; /* may be null */
|
||||
PrefetchStatus status;
|
||||
shardno_t shard_no;
|
||||
uint64 my_ring_index;
|
||||
} PrefetchRequest;
|
||||
|
||||
@@ -226,6 +226,8 @@ typedef struct PrefetchState {
|
||||
|
||||
/* the buffers */
|
||||
prfh_hash *prf_hash;
|
||||
int max_shard_no;
|
||||
uint8 shard_bitmap[(MAX_SHARDS + 7)/8];
|
||||
PrefetchRequest prf_buffer[]; /* prefetch buffers */
|
||||
} PrefetchState;
|
||||
|
||||
@@ -314,6 +316,7 @@ compact_prefetch_buffers(void)
|
||||
Assert(target_slot->status == PRFS_UNUSED);
|
||||
|
||||
target_slot->buftag = source_slot->buftag;
|
||||
target_slot->shard_no = source_slot->shard_no;
|
||||
target_slot->status = source_slot->status;
|
||||
target_slot->response = source_slot->response;
|
||||
target_slot->effective_request_lsn = source_slot->effective_request_lsn;
|
||||
@@ -478,6 +481,23 @@ prefetch_cleanup_trailing_unused(void)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static bool
|
||||
prefetch_flush_requests(void)
|
||||
{
|
||||
for (shardno_t shard_no = 0; shard_no < MyPState->max_shard_no; shard_no++)
|
||||
{
|
||||
if (MyPState->shard_bitmap[shard_no >> 3] & (1 << (shard_no & 7)))
|
||||
{
|
||||
if (!page_server->flush(shard_no))
|
||||
return false;
|
||||
MyPState->shard_bitmap[shard_no >> 3] &= ~(1 << (shard_no & 7));
|
||||
}
|
||||
}
|
||||
MyPState->max_shard_no = 0;
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Wait for slot of ring_index to have received its response.
|
||||
* The caller is responsible for making sure the request buffer is flushed.
|
||||
@@ -493,7 +513,7 @@ prefetch_wait_for(uint64 ring_index)
|
||||
if (MyPState->ring_flush <= ring_index &&
|
||||
MyPState->ring_unused > MyPState->ring_flush)
|
||||
{
|
||||
if (!page_server->flush())
|
||||
if (!prefetch_flush_requests())
|
||||
return false;
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
}
|
||||
@@ -531,7 +551,7 @@ prefetch_read(PrefetchRequest *slot)
|
||||
Assert(slot->my_ring_index == MyPState->ring_receive);
|
||||
|
||||
old = MemoryContextSwitchTo(MyPState->errctx);
|
||||
response = (NeonResponse *) page_server->receive();
|
||||
response = (NeonResponse *) page_server->receive(slot->shard_no);
|
||||
MemoryContextSwitchTo(old);
|
||||
if (response)
|
||||
{
|
||||
@@ -683,12 +703,14 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
|
||||
Assert(slot->response == NULL);
|
||||
Assert(slot->my_ring_index == MyPState->ring_unused);
|
||||
|
||||
while (!page_server->send((NeonRequest *) &request));
|
||||
while (!page_server->send(slot->shard_no, (NeonRequest *) &request));
|
||||
|
||||
/* update prefetch state */
|
||||
MyPState->n_requests_inflight += 1;
|
||||
MyPState->n_unused -= 1;
|
||||
MyPState->ring_unused += 1;
|
||||
MyPState->shard_bitmap[slot->shard_no >> 3] |= 1 << (slot->shard_no & 7);
|
||||
MyPState->max_shard_no = Max(slot->shard_no+1, MyPState->max_shard_no);
|
||||
|
||||
/* update slot state */
|
||||
slot->status = PRFS_REQUESTED;
|
||||
@@ -848,6 +870,7 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
|
||||
* function reads the buffer tag from the slot.
|
||||
*/
|
||||
slot->buftag = tag;
|
||||
slot->shard_no = get_shard_number(&tag);
|
||||
slot->my_ring_index = ring_index;
|
||||
|
||||
prefetch_do_request(slot, force_latest, force_lsn);
|
||||
@@ -858,7 +881,7 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
|
||||
if (flush_every_n_requests > 0 &&
|
||||
MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests)
|
||||
{
|
||||
if (!page_server->flush())
|
||||
if (!prefetch_flush_requests())
|
||||
{
|
||||
/* Prefetch set is reset in case of error, so we should try to register our request once again */
|
||||
goto Retry;
|
||||
@@ -873,11 +896,42 @@ static NeonResponse *
|
||||
page_server_request(void const *req)
|
||||
{
|
||||
NeonResponse* resp;
|
||||
BufferTag tag = {0};
|
||||
shardno_t shard_no;
|
||||
|
||||
switch (((NeonRequest *) req)->tag)
|
||||
{
|
||||
case T_NeonExistsRequest:
|
||||
CopyNRelFileInfoToBufTag(tag, ((NeonExistsRequest *) req)->rinfo);
|
||||
break;
|
||||
case T_NeonNblocksRequest:
|
||||
CopyNRelFileInfoToBufTag(tag, ((NeonNblocksRequest *) req)->rinfo);
|
||||
break;
|
||||
case T_NeonDbSizeRequest:
|
||||
NInfoGetDbOid(BufTagGetNRelFileInfo(tag)) = ((NeonDbSizeRequest *) req)->dbNode;
|
||||
break;
|
||||
case T_NeonGetPageRequest:
|
||||
CopyNRelFileInfoToBufTag(tag, ((NeonGetPageRequest *) req)->rinfo);
|
||||
tag.blockNum = ((NeonGetPageRequest *) req)->blkno;
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "Unexpected request tag: %d", ((NeonRequest *) req)->tag);
|
||||
}
|
||||
shard_no = get_shard_number(&tag);
|
||||
|
||||
/*
|
||||
* TODO: temporary workarround - we stream all WAL only to shard 0, so metadata and forks other than main
|
||||
* should be requested from shard 0. We still need to call get_shard_no() to check if shard map is up-to-date
|
||||
*/
|
||||
if (((NeonRequest *) req)->tag != T_NeonGetPageRequest || ((NeonGetPageRequest *) req)->forknum != MAIN_FORKNUM)
|
||||
{
|
||||
shard_no = 0;
|
||||
}
|
||||
|
||||
do {
|
||||
while (!page_server->send((NeonRequest *) req) || !page_server->flush());
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
while (!page_server->send(shard_no, (NeonRequest *) req) || !page_server->flush(shard_no));
|
||||
consume_prefetch_responses();
|
||||
resp = page_server->receive();
|
||||
resp = page_server->receive(shard_no);
|
||||
} while (resp == NULL);
|
||||
return resp;
|
||||
|
||||
@@ -2723,86 +2777,6 @@ smgr_init_neon(void)
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, XLogRecPtr end_recptr)
|
||||
{
|
||||
BlockNumber relsize;
|
||||
/* Extend the relation if we know its size */
|
||||
if (get_cached_relsize(rinfo, forknum, &relsize))
|
||||
{
|
||||
if (relsize < blkno + 1)
|
||||
{
|
||||
update_cached_relsize(rinfo, forknum, blkno + 1);
|
||||
SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Size was not cached. We populate the cache now, with the size of the
|
||||
* relation measured after this WAL record is applied.
|
||||
*
|
||||
* This length is later reused when we open the smgr to read the block,
|
||||
* which is fine and expected.
|
||||
*/
|
||||
|
||||
NeonResponse *response;
|
||||
NeonNblocksResponse *nbresponse;
|
||||
NeonNblocksRequest request = {
|
||||
.req = (NeonRequest) {
|
||||
.lsn = end_recptr,
|
||||
.latest = false,
|
||||
.tag = T_NeonNblocksRequest,
|
||||
},
|
||||
.rinfo = rinfo,
|
||||
.forknum = forknum,
|
||||
};
|
||||
|
||||
response = page_server_request(&request);
|
||||
|
||||
Assert(response->tag == T_NeonNblocksResponse);
|
||||
nbresponse = (NeonNblocksResponse *) response;
|
||||
|
||||
relsize = Max(nbresponse->n_blocks, blkno+1);
|
||||
|
||||
set_cached_relsize(rinfo, forknum, relsize);
|
||||
SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum);
|
||||
|
||||
elog(SmgrTrace, "Set length to %d", relsize);
|
||||
}
|
||||
}
|
||||
|
||||
#define FSM_TREE_DEPTH ((SlotsPerFSMPage >= 1626) ? 3 : 4)
|
||||
|
||||
/*
|
||||
* TODO: May be it is better to make correspondent fgunctio from freespace.c public?
|
||||
*/
|
||||
static BlockNumber
|
||||
get_fsm_physical_block(BlockNumber heapblk)
|
||||
{
|
||||
BlockNumber pages;
|
||||
int leafno;
|
||||
int l;
|
||||
|
||||
/*
|
||||
* Calculate the logical page number of the first leaf page below the
|
||||
* given page.
|
||||
*/
|
||||
leafno = heapblk / SlotsPerFSMPage;
|
||||
|
||||
/* Count upper level nodes required to address the leaf page */
|
||||
pages = 0;
|
||||
for (l = 0; l < FSM_TREE_DEPTH; l++)
|
||||
{
|
||||
pages += leafno + 1;
|
||||
leafno /= SlotsPerFSMPage;
|
||||
}
|
||||
|
||||
/* Turn the page count into 0-based block number */
|
||||
return pages - 1;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Return whether we can skip the redo for this block.
|
||||
*
|
||||
@@ -2850,6 +2824,7 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
|
||||
LWLock *partitionLock;
|
||||
Buffer buffer;
|
||||
bool no_redo_needed;
|
||||
BlockNumber relsize;
|
||||
|
||||
if (old_redo_read_buffer_filter && old_redo_read_buffer_filter(record, block_id))
|
||||
return true;
|
||||
@@ -2899,10 +2874,49 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
|
||||
|
||||
LWLockRelease(partitionLock);
|
||||
|
||||
neon_extend_rel_size(rinfo, forknum, blkno, end_recptr);
|
||||
if (forknum == MAIN_FORKNUM)
|
||||
/* Extend the relation if we know its size */
|
||||
if (get_cached_relsize(rinfo, forknum, &relsize))
|
||||
{
|
||||
neon_extend_rel_size(rinfo, FSM_FORKNUM, get_fsm_physical_block(blkno), end_recptr);
|
||||
if (relsize < blkno + 1)
|
||||
{
|
||||
update_cached_relsize(rinfo, forknum, blkno + 1);
|
||||
SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Size was not cached. We populate the cache now, with the size of the
|
||||
* relation measured after this WAL record is applied.
|
||||
*
|
||||
* This length is later reused when we open the smgr to read the block,
|
||||
* which is fine and expected.
|
||||
*/
|
||||
|
||||
NeonResponse *response;
|
||||
NeonNblocksResponse *nbresponse;
|
||||
NeonNblocksRequest request = {
|
||||
.req = (NeonRequest) {
|
||||
.lsn = end_recptr,
|
||||
.latest = false,
|
||||
.tag = T_NeonNblocksRequest,
|
||||
},
|
||||
.rinfo = rinfo,
|
||||
.forknum = forknum,
|
||||
};
|
||||
|
||||
response = page_server_request(&request);
|
||||
|
||||
Assert(response->tag == T_NeonNblocksResponse);
|
||||
nbresponse = (NeonNblocksResponse *) response;
|
||||
|
||||
Assert(nbresponse->n_blocks > blkno);
|
||||
|
||||
set_cached_relsize(rinfo, forknum, nbresponse->n_blocks);
|
||||
SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum);
|
||||
|
||||
elog(SmgrTrace, "Set length to %d", nbresponse->n_blocks);
|
||||
}
|
||||
|
||||
return no_redo_needed;
|
||||
}
|
||||
|
||||
@@ -456,6 +456,7 @@ class NeonEnvBuilder:
|
||||
self.initial_tenant = initial_tenant or TenantId.generate()
|
||||
self.initial_timeline = initial_timeline or TimelineId.generate()
|
||||
self.enable_generations = False
|
||||
self.initial_shard_count = 1
|
||||
self.scrub_on_exit = False
|
||||
self.test_output_dir = test_output_dir
|
||||
|
||||
@@ -497,7 +498,10 @@ class NeonEnvBuilder:
|
||||
f"Services started, creating initial tenant {env.initial_tenant} and its initial timeline"
|
||||
)
|
||||
initial_tenant, initial_timeline = env.neon_cli.create_tenant(
|
||||
tenant_id=env.initial_tenant, conf=initial_tenant_conf, timeline_id=env.initial_timeline
|
||||
tenant_id=env.initial_tenant,
|
||||
conf=initial_tenant_conf,
|
||||
timeline_id=env.initial_timeline,
|
||||
shard_count=self.initial_shard_count,
|
||||
)
|
||||
assert env.initial_tenant == initial_tenant
|
||||
assert env.initial_timeline == initial_timeline
|
||||
@@ -1144,6 +1148,7 @@ class NeonCli(AbstractNeonCli):
|
||||
timeline_id: Optional[TimelineId] = None,
|
||||
conf: Optional[Dict[str, str]] = None,
|
||||
set_default: bool = False,
|
||||
shard_count: int = 1,
|
||||
) -> Tuple[TenantId, TimelineId]:
|
||||
"""
|
||||
Creates a new tenant, returns its id and its initial timeline's id.
|
||||
@@ -1160,6 +1165,8 @@ class NeonCli(AbstractNeonCli):
|
||||
str(timeline_id),
|
||||
"--pg-version",
|
||||
self.env.pg_version,
|
||||
"--shard-count",
|
||||
str(shard_count),
|
||||
]
|
||||
if conf is not None:
|
||||
args.extend(
|
||||
@@ -1382,7 +1389,7 @@ class NeonCli(AbstractNeonCli):
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
hot_standby: bool = False,
|
||||
lsn: Optional[Lsn] = None,
|
||||
pageserver_id: Optional[int] = None,
|
||||
pageserver_ids: Optional[list[int]] = None,
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
args = [
|
||||
"endpoint",
|
||||
@@ -1404,8 +1411,10 @@ class NeonCli(AbstractNeonCli):
|
||||
args.append(endpoint_id)
|
||||
if hot_standby:
|
||||
args.extend(["--hot-standby", "true"])
|
||||
if pageserver_id is not None:
|
||||
args.extend(["--pageserver-id", str(pageserver_id)])
|
||||
if pageserver_ids is not None:
|
||||
args.extend(["--pageserver-id", ",".join(str(i) for i in pageserver_ids)])
|
||||
|
||||
log.info(f"endpoint_create: {args}")
|
||||
|
||||
res = self.raw_cli(args)
|
||||
res.check_returncode()
|
||||
@@ -2451,7 +2460,7 @@ class Endpoint(PgProtocol):
|
||||
hot_standby: bool = False,
|
||||
lsn: Optional[Lsn] = None,
|
||||
config_lines: Optional[List[str]] = None,
|
||||
pageserver_id: Optional[int] = None,
|
||||
pageserver_ids: Optional[list[int]] = None,
|
||||
) -> "Endpoint":
|
||||
"""
|
||||
Create a new Postgres endpoint.
|
||||
@@ -2473,7 +2482,7 @@ class Endpoint(PgProtocol):
|
||||
hot_standby=hot_standby,
|
||||
pg_port=self.pg_port,
|
||||
http_port=self.http_port,
|
||||
pageserver_id=pageserver_id,
|
||||
pageserver_ids=pageserver_ids,
|
||||
)
|
||||
path = Path("endpoints") / self.endpoint_id / "pgdata"
|
||||
self.pgdata_dir = os.path.join(self.env.repo_dir, path)
|
||||
@@ -2609,7 +2618,7 @@ class Endpoint(PgProtocol):
|
||||
lsn: Optional[Lsn] = None,
|
||||
config_lines: Optional[List[str]] = None,
|
||||
remote_ext_config: Optional[str] = None,
|
||||
pageserver_id: Optional[int] = None,
|
||||
pageserver_ids: Optional[list[int]] = None,
|
||||
) -> "Endpoint":
|
||||
"""
|
||||
Create an endpoint, apply config, and start Postgres.
|
||||
@@ -2624,7 +2633,7 @@ class Endpoint(PgProtocol):
|
||||
config_lines=config_lines,
|
||||
hot_standby=hot_standby,
|
||||
lsn=lsn,
|
||||
pageserver_id=pageserver_id,
|
||||
pageserver_ids=pageserver_ids,
|
||||
).start(remote_ext_config=remote_ext_config)
|
||||
|
||||
log.info(f"Postgres startup took {time.time() - started_at} seconds")
|
||||
@@ -2660,7 +2669,7 @@ class EndpointFactory:
|
||||
hot_standby: bool = False,
|
||||
config_lines: Optional[List[str]] = None,
|
||||
remote_ext_config: Optional[str] = None,
|
||||
pageserver_id: Optional[int] = None,
|
||||
pageserver_ids: Optional[list[int]] = None,
|
||||
) -> Endpoint:
|
||||
ep = Endpoint(
|
||||
self.env,
|
||||
@@ -2678,7 +2687,7 @@ class EndpointFactory:
|
||||
config_lines=config_lines,
|
||||
lsn=lsn,
|
||||
remote_ext_config=remote_ext_config,
|
||||
pageserver_id=pageserver_id,
|
||||
pageserver_ids=pageserver_ids,
|
||||
)
|
||||
|
||||
def create(
|
||||
@@ -3162,7 +3171,15 @@ def check_restored_datadir_content(
|
||||
cur.execute("CHECKPOINT")
|
||||
|
||||
# wait for pageserver to catch up
|
||||
wait_for_last_flush_lsn(env, endpoint, endpoint.tenant_id, timeline_id)
|
||||
if len(env.pageservers) > 1:
|
||||
# FIXME: wait_for_last_flush_lsn needs teaching about sharding: it tries to query
|
||||
# LSNs for shard-naive TenantId
|
||||
return
|
||||
for pageserver in env.pageservers:
|
||||
wait_for_last_flush_lsn(
|
||||
env, endpoint, endpoint.tenant_id, timeline_id, pageserver_id=pageserver.id
|
||||
)
|
||||
|
||||
# stop postgres to ensure that files won't change
|
||||
endpoint.stop()
|
||||
|
||||
|
||||
@@ -260,14 +260,6 @@ class PageserverHttpClient(requests.Session):
|
||||
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/detach", params=params)
|
||||
self.verbose_error(res)
|
||||
|
||||
def tenant_reset(self, tenant_id: TenantId, drop_cache: bool):
|
||||
params = {}
|
||||
if drop_cache:
|
||||
params["drop_cache"] = "true"
|
||||
|
||||
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/reset", params=params)
|
||||
self.verbose_error(res)
|
||||
|
||||
def tenant_delete(self, tenant_id: TenantId):
|
||||
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
|
||||
self.verbose_error(res)
|
||||
|
||||
@@ -60,10 +60,7 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
|
||||
execute("SELECT count(*) FROM foo")
|
||||
assert fetchone() == (100000,)
|
||||
|
||||
# Reconfigure it using the same connstring just to make sure nothing breaks
|
||||
# as we have special handling for if the connstring doesn't change
|
||||
for _ in range(5):
|
||||
endpoint.reconfigure(pageserver_id=alt_pageserver_id)
|
||||
endpoint.reconfigure(pageserver_id=alt_pageserver_id)
|
||||
|
||||
# Verify that the neon.pageserver_connstring GUC is set to the correct thing
|
||||
execute("SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'")
|
||||
|
||||
@@ -3,24 +3,38 @@
|
||||
#
|
||||
from pathlib import Path
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
check_restored_datadir_content,
|
||||
)
|
||||
|
||||
|
||||
# Run the main PostgreSQL regression tests, in src/test/regress.
|
||||
#
|
||||
@pytest.mark.parametrize("shard_count", [2])
|
||||
def test_pg_regress(
|
||||
neon_simple_env: NeonEnv,
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
test_output_dir: Path,
|
||||
pg_bin,
|
||||
capsys,
|
||||
base_dir: Path,
|
||||
pg_distrib_dir: Path,
|
||||
shard_count: int,
|
||||
):
|
||||
env = neon_simple_env
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.initial_shard_count = shard_count
|
||||
neon_env_builder.num_pageservers = shard_count
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
for pageserver in env.pageservers:
|
||||
# FIXME: attachment_service is not yet sharding aware, so generation validation is broken.
|
||||
pageserver.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
|
||||
|
||||
endpoint = env.endpoints.create_start("main", pageserver_ids=[p.id for p in env.pageservers])
|
||||
|
||||
env.neon_cli.create_branch("test_pg_regress", "empty")
|
||||
# Connect to postgres and create a database called "regression".
|
||||
endpoint = env.endpoints.create_start("test_pg_regress")
|
||||
endpoint.safe_psql("CREATE DATABASE regression")
|
||||
|
||||
# Create some local directories for pg_regress to run in.
|
||||
|
||||
@@ -1,29 +0,0 @@
|
||||
import random
|
||||
import time
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
def test_physical_replication(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
n_records = 100000
|
||||
with env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
endpoint_id="primary",
|
||||
) as primary:
|
||||
with primary.connect() as p_con:
|
||||
with p_con.cursor() as p_cur:
|
||||
p_cur.execute(
|
||||
"CREATE TABLE t(pk bigint primary key, payload text default repeat('?',200))"
|
||||
)
|
||||
time.sleep(1)
|
||||
with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary:
|
||||
with primary.connect() as p_con:
|
||||
with p_con.cursor() as p_cur:
|
||||
with secondary.connect() as s_con:
|
||||
with s_con.cursor() as s_cur:
|
||||
for pk in range(n_records):
|
||||
p_cur.execute("insert into t (pk) values (%s)", (pk,))
|
||||
s_cur.execute(
|
||||
"select * from t where pk=%s", (random.randrange(1, n_records),)
|
||||
)
|
||||
23
test_runner/regress/test_sharding.py
Normal file
23
test_runner/regress/test_sharding.py
Normal file
@@ -0,0 +1,23 @@
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
import pytest
|
||||
|
||||
@pytest.mark.parametrize("shard_count", [2])
|
||||
@pytest.mark.timeout(1000)
|
||||
def test_sharding(neon_env_builder: NeonEnvBuilder, shard_count: int):
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.initial_shard_count = shard_count
|
||||
neon_env_builder.num_pageservers = shard_count
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
for pageserver in env.pageservers:
|
||||
# FIXME: attachment_service is not yet sharding aware, so generation validation is broken.
|
||||
pageserver.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
|
||||
pageserver.allowed_errors.append(".*Dropping stale deletions for tenant.*")
|
||||
|
||||
endpoint = env.endpoints.create_start("main", pageserver_ids=[p.id for p in env.pageservers])
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute("SET statement_timeout=0") # disable statement timeout
|
||||
cur.execute("create table t(t bigint, payload text default repeat('?',200))")
|
||||
cur.execute("insert into t values(generate_series(1,10000000))")
|
||||
cur.execute("select count(*) from t")
|
||||
assert cur.fetchone()[0] == 10000000
|
||||
@@ -1,5 +1,4 @@
|
||||
import asyncio
|
||||
import enum
|
||||
import random
|
||||
import time
|
||||
from threading import Thread
|
||||
@@ -52,20 +51,11 @@ def do_gc_target(
|
||||
log.info("gc http thread returning")
|
||||
|
||||
|
||||
class ReattachMode(str, enum.Enum):
|
||||
REATTACH_EXPLICIT = "explicit"
|
||||
REATTACH_RESET = "reset"
|
||||
REATTACH_RESET_DROP = "reset"
|
||||
|
||||
|
||||
# Basic detach and re-attach test
|
||||
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
|
||||
@pytest.mark.parametrize(
|
||||
"mode",
|
||||
[ReattachMode.REATTACH_EXPLICIT, ReattachMode.REATTACH_RESET, ReattachMode.REATTACH_RESET_DROP],
|
||||
)
|
||||
def test_tenant_reattach(
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind, mode: str
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
):
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
|
||||
@@ -110,15 +100,8 @@ def test_tenant_reattach(
|
||||
ps_metrics.query_one("pageserver_last_record_lsn", filter=tenant_metric_filter).value
|
||||
)
|
||||
|
||||
if mode == ReattachMode.REATTACH_EXPLICIT:
|
||||
# Explicitly detach then attach the tenant as two separate API calls
|
||||
pageserver_http.tenant_detach(tenant_id)
|
||||
pageserver_http.tenant_attach(tenant_id)
|
||||
elif mode in (ReattachMode.REATTACH_RESET, ReattachMode.REATTACH_RESET_DROP):
|
||||
# Use the reset API to detach/attach in one shot
|
||||
pageserver_http.tenant_reset(tenant_id, mode == ReattachMode.REATTACH_RESET_DROP)
|
||||
else:
|
||||
raise NotImplementedError(mode)
|
||||
pageserver_http.tenant_detach(tenant_id)
|
||||
pageserver_http.tenant_attach(tenant_id)
|
||||
|
||||
time.sleep(1) # for metrics propagation
|
||||
|
||||
|
||||
Reference in New Issue
Block a user