Compare commits

...

9 Commits

Author SHA1 Message Date
Konstantin Knizhnik
66fdb54d8d Merge with #5837 2023-12-05 12:05:27 +00:00
Konstantin Knizhnik
21c3c55e6b Fix flushing prefetch requests in page_server_request 2023-12-05 12:05:27 +00:00
John Spray
2c21c74ff4 DNM: script for sharding demo 2023-12-05 12:05:27 +00:00
John Spray
2d0930f622 neon_local: basic sharding support 2023-12-05 12:05:27 +00:00
John Spray
1518675bac postgres: use modified hash 2023-12-05 12:05:27 +00:00
Konstantin Knizhnik
e3c6fc3e51 compute: Add support for PS shardoing in compute 2023-12-05 12:05:27 +00:00
Konstantin Knizhnik
039f96c8a6 Add simple test for sharding 2023-12-05 12:05:27 +00:00
John Spray
8d50593e17 tests: enable running test_pg_regress with sharding 2023-12-05 12:05:27 +00:00
John Spray
f7795ee2a5 pageserver: filter WAL by ShardIdentity 2023-12-05 12:03:33 +00:00
16 changed files with 726 additions and 326 deletions

View File

@@ -15,7 +15,8 @@ use control_plane::pageserver::{PageServerNode, PAGESERVER_REMOTE_STORAGE_DIR};
use control_plane::safekeeper::SafekeeperNode;
use control_plane::tenant_migration::migrate_tenant;
use control_plane::{broker, local_env};
use pageserver_api::models::TimelineInfo;
use pageserver_api::models::{LocationConfig, LocationConfigMode, TimelineInfo};
use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
use pageserver_api::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
@@ -26,6 +27,7 @@ use safekeeper_api::{
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
};
use std::collections::{BTreeSet, HashMap};
use std::num::ParseIntError;
use std::path::PathBuf;
use std::process::exit;
use std::str::FromStr;
@@ -97,6 +99,22 @@ struct TimelineTreeEl {
pub children: BTreeSet<TimelineId>,
}
/// Helper for CLI args that contain a comma-separate list of NodeId
fn parse_ids_arg(
matches: &ArgMatches,
arg: &str,
) -> Result<Option<Vec<NodeId>>, std::num::ParseIntError> {
if let Some(id_str) = matches.get_one::<String>(arg) {
let r: Result<Vec<_>, ParseIntError> = id_str
.split(',')
.map(|ps_id| u64::from_str(str::trim(ps_id)).map(NodeId))
.collect();
r.map(Some)
} else {
Ok(Some(vec![DEFAULT_PAGESERVER_ID]))
}
}
// Main entry point for the 'neon_local' CLI utility
//
// This utility helps to manage neon installation. That includes following:
@@ -374,9 +392,10 @@ fn pageserver_config_overrides(init_match: &ArgMatches) -> Vec<&str> {
}
fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> anyhow::Result<()> {
let pageserver = get_default_pageserver(env);
match tenant_match.subcommand() {
Some(("list", _)) => {
// TODO: make command aware of multiple pageservers
let pageserver = get_default_pageserver(env);
for t in pageserver.tenant_list()? {
println!("{} {:?}", t.id, t.state);
}
@@ -387,38 +406,94 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
.map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
.unwrap_or_default();
let shard_count: u8 = create_match
.get_one::<u8>("shard-count")
.cloned()
.unwrap_or(1);
// If tenant ID was not specified, generate one
let tenant_id = parse_tenant_id(create_match)?.unwrap_or_else(TenantId::generate);
let generation = if env.control_plane_api.is_some() {
// We must register the tenant with the attachment service, so
// that when the pageserver restarts, it will be re-attached.
let attachment_service = AttachmentService::from_env(env);
attachment_service.attach_hook(tenant_id, pageserver.conf.id)?
} else {
None
};
pageserver.tenant_create(tenant_id, generation, tenant_conf)?;
println!("tenant {tenant_id} successfully created on the pageserver");
// Create an initial timeline for the new tenant
let new_timeline_id = parse_timeline_id(create_match)?;
// We will create an initial timeline for the new tenant
let new_timeline_id =
parse_timeline_id(create_match)?.unwrap_or(TimelineId::generate());
let pg_version = create_match
.get_one::<u32>("pg-version")
.copied()
.context("Failed to parse postgres version from the argument string")?;
let timeline_info = pageserver.timeline_create(
tenant_id,
new_timeline_id,
None,
None,
Some(pg_version),
None,
)?;
let new_timeline_id = timeline_info.timeline_id;
let last_record_lsn = timeline_info.last_record_lsn;
// TODO: implement ability for one pageserver to hold multiple
// shards for the same tenant. Until then, we must place each
// shard on a different pageserver.
assert!(env.pageservers.len() >= shard_count as usize);
let cfg_shard_count = if shard_count > 1 {
shard_count
} else {
// For single-sharded mode, use the legacy unsharded configuration. This avoids
// breaking any existing tests that assume legacy unsharded storage paths
0
};
for shard_number in 0..shard_count {
let ps_conf = env.pageservers.get(shard_number as usize).unwrap();
let pageserver = PageServerNode::from_env(env, ps_conf);
// TODO: per-shard generations
let generation = if env.control_plane_api.is_some() {
// We must register the tenant with the attachment service, so
// that when the pageserver restarts, it will be re-attached.
let attachment_service = AttachmentService::from_env(env);
attachment_service.attach_hook(tenant_id, pageserver.conf.id)?
} else {
None
};
// TODO: shard-aware POST /v1/tenant. Currently tenant creation on the
// pageserver is a no-op, but we shouldn't skip the command entirely.
let tenant_conf = PageServerNode::build_config(tenant_conf.clone())?;
let tenant_shard_id = TenantShardId {
shard_number: ShardNumber(shard_number),
shard_count: ShardCount(cfg_shard_count),
tenant_id,
};
let location_conf = LocationConfig {
shard_count: cfg_shard_count,
shard_number,
shard_stripe_size: 32768,
mode: LocationConfigMode::AttachedSingle,
generation,
secondary_conf: None,
tenant_conf,
};
pageserver.location_config(tenant_shard_id, location_conf, None)?;
println!(
"tenant {tenant_id} successfully created on pageserver {}",
pageserver.conf.id
);
}
for shard_number in 0..shard_count {
let ps_conf = env.pageservers.get(shard_number as usize).unwrap();
let pageserver = PageServerNode::from_env(env, ps_conf);
let tenant_shard_id = TenantShardId {
shard_number: ShardNumber(shard_number),
shard_count: ShardCount(cfg_shard_count),
tenant_id,
};
pageserver.timeline_create(
tenant_shard_id,
Some(new_timeline_id),
None,
None,
Some(pg_version),
None,
)?;
}
env.register_branch_mapping(
DEFAULT_BRANCH_NAME.to_string(),
@@ -426,9 +501,7 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
new_timeline_id,
)?;
println!(
"Created an initial timeline '{new_timeline_id}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
);
println!("Created an initial timeline '{new_timeline_id}' for tenant: {tenant_id}",);
if create_match.get_flag("set-default") {
println!("Setting tenant {tenant_id} as a default one");
@@ -448,6 +521,8 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
.map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
.unwrap_or_default();
// TODO: make command aware of multiple pageservers
let pageserver = get_default_pageserver(env);
pageserver
.tenant_config(tenant_id, tenant_conf)
.with_context(|| format!("Tenant config failed for tenant with id {tenant_id}"))?;
@@ -491,7 +566,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
let new_timeline_id_opt = parse_timeline_id(create_match)?;
let timeline_info = pageserver.timeline_create(
tenant_id,
TenantShardId::unsharded(tenant_id),
new_timeline_id_opt,
None,
None,
@@ -554,7 +629,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
None,
pg_version,
ComputeMode::Primary,
DEFAULT_PAGESERVER_ID,
vec![DEFAULT_PAGESERVER_ID],
)?;
println!("Done");
}
@@ -579,7 +654,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
.transpose()
.context("Failed to parse ancestor start Lsn from the request")?;
let timeline_info = pageserver.timeline_create(
tenant_id,
TenantShardId::unsharded(tenant_id),
None,
start_lsn,
Some(ancestor_timeline_id),
@@ -704,13 +779,8 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.copied()
.unwrap_or(false);
let pageserver_id =
if let Some(id_str) = sub_args.get_one::<String>("endpoint-pageserver-id") {
NodeId(id_str.parse().context("while parsing pageserver id")?)
} else {
DEFAULT_PAGESERVER_ID
};
let pageserver_ids = parse_ids_arg(sub_args, "endpoint-pageserver-id")?
.unwrap_or(vec![DEFAULT_PAGESERVER_ID]);
let mode = match (lsn, hot_standby) {
(Some(lsn), false) => ComputeMode::Static(lsn),
(None, true) => ComputeMode::Replica,
@@ -738,7 +808,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
http_port,
pg_version,
mode,
pageserver_id,
pageserver_ids,
)?;
}
"start" => {
@@ -746,29 +816,14 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.get_one::<String>("endpoint_id")
.ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
let pageserver_id =
if let Some(id_str) = sub_args.get_one::<String>("endpoint-pageserver-id") {
NodeId(id_str.parse().context("while parsing pageserver id")?)
} else {
DEFAULT_PAGESERVER_ID
};
let pageservers = parse_ids_arg(sub_args, "endpoint-pageserver-id")?
.unwrap_or(vec![DEFAULT_PAGESERVER_ID]);
let remote_ext_config = sub_args.get_one::<String>("remote-ext-config");
// If --safekeepers argument is given, use only the listed safekeeper nodes.
let safekeepers =
if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
let mut safekeepers: Vec<NodeId> = Vec::new();
for sk_id in safekeepers_str.split(',').map(str::trim) {
let sk_id = NodeId(u64::from_str(sk_id).map_err(|_| {
anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list")
})?);
safekeepers.push(sk_id);
}
safekeepers
} else {
env.safekeepers.iter().map(|sk| sk.id).collect()
};
let safekeepers = parse_ids_arg(sub_args, "safekeepers")?
.unwrap_or_else(|| env.safekeepers.iter().map(|sk| sk.id).collect());
let endpoint = cplane
.endpoints
@@ -781,7 +836,8 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
endpoint.timeline_id,
)?;
let ps_conf = env.get_pageserver_conf(pageserver_id)?;
// We assume that all pageservers have the same auth conf
let ps_conf = env.get_pageserver_conf(pageservers[0])?;
let auth_token = if matches!(ps_conf.pg_auth_type, AuthType::NeonJWT) {
let claims = Claims::new(Some(endpoint.tenant_id), Scope::Tenant);
@@ -801,15 +857,21 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.endpoints
.get(endpoint_id.as_str())
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
let pageserver_id =
if let Some(id_str) = sub_args.get_one::<String>("endpoint-pageserver-id") {
Some(NodeId(
id_str.parse().context("while parsing pageserver id")?,
))
} else {
None
};
endpoint.reconfigure(pageserver_id)?;
let pageserver_ids: Option<Result<Vec<NodeId>, _>> = sub_args
.get_many::<String>("endpoint-pageserver-id")
.map(|ids| {
ids.map(|id_str| id_str.parse().context("while parsing pageserver id"))
.map(|r| r.map(NodeId))
.collect()
});
let pageserver_ids = match pageserver_ids {
Some(Ok(v)) => Ok(Some(v)),
Some(Err(e)) => Err(e),
None => Ok(None),
}?;
endpoint.reconfigure(pageserver_ids)?;
}
"stop" => {
let endpoint_id = sub_args
@@ -1313,6 +1375,7 @@ fn cli() -> Command {
.arg(pg_version_arg.clone())
.arg(Arg::new("set-default").long("set-default").action(ArgAction::SetTrue).required(false)
.help("Use this tenant in future CLI commands where tenant_id is needed, but not specified"))
.arg(Arg::new("shard-count").value_parser(value_parser!(u8)).long("shard-count").action(ArgAction::Set).help("Number of shards in the new tenant (default 1)"))
)
.subcommand(Command::new("set-default").arg(tenant_id_arg.clone().required(true))
.about("Set a particular tenant as default in future CLI commands where tenant_id is needed, but not specified"))

View File

@@ -67,7 +67,7 @@ pub struct EndpointConf {
http_port: u16,
pg_version: u32,
skip_pg_catalog_updates: bool,
pageserver_id: NodeId,
pageservers: Vec<NodeId>,
}
//
@@ -82,6 +82,33 @@ pub struct ComputeControlPlane {
env: LocalEnv,
}
fn load_pageservers(
env: &LocalEnv,
pageserver_ids: &Vec<NodeId>,
) -> anyhow::Result<Vec<PageServerNode>> {
let mut pageservers = Vec::new();
for ps_id in pageserver_ids {
let pageserver = env
.get_pageserver_conf(*ps_id)
.map(|conf| PageServerNode::from_env(env, conf))?;
pageservers.push(pageserver);
}
Ok(pageservers)
}
fn build_pageserver_connstr(pageservers: &[PageServerNode]) -> String {
pageservers
.iter()
.map(|ps| {
let config = ps.pg_connection_config.clone();
let (host, port) = (config.host(), config.port());
// NOTE: avoid spaces in connection string, because it is less error prone if we forward it somewhere.
format!("postgresql://no_user@{host}:{port}")
})
.collect::<Vec<_>>()
.join(",")
}
impl ComputeControlPlane {
// Load current endpoints from the endpoints/ subdirectories
pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
@@ -119,19 +146,16 @@ impl ComputeControlPlane {
http_port: Option<u16>,
pg_version: u32,
mode: ComputeMode,
pageserver_id: NodeId,
pageservers: Vec<NodeId>,
) -> Result<Arc<Endpoint>> {
let pg_port = pg_port.unwrap_or_else(|| self.get_port());
let http_port = http_port.unwrap_or_else(|| self.get_port() + 1);
let pageserver =
PageServerNode::from_env(&self.env, self.env.get_pageserver_conf(pageserver_id)?);
let ep = Arc::new(Endpoint {
endpoint_id: endpoint_id.to_owned(),
pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), pg_port),
http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), http_port),
env: self.env.clone(),
pageserver,
pageservers: load_pageservers(&self.env, &pageservers)?,
timeline_id,
mode,
tenant_id,
@@ -157,7 +181,7 @@ impl ComputeControlPlane {
pg_port,
pg_version,
skip_pg_catalog_updates: true,
pageserver_id,
pageservers,
})?,
)?;
std::fs::write(
@@ -216,7 +240,7 @@ pub struct Endpoint {
// These are not part of the endpoint as such, but the environment
// the endpoint runs in.
pub env: LocalEnv,
pageserver: PageServerNode,
pageservers: Vec<PageServerNode>,
// Optimizations
skip_pg_catalog_updates: bool,
@@ -239,15 +263,14 @@ impl Endpoint {
let conf: EndpointConf =
serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
let pageserver =
PageServerNode::from_env(env, env.get_pageserver_conf(conf.pageserver_id)?);
let pageservers: Vec<PageServerNode> = load_pageservers(env, &conf.pageservers)?;
Ok(Endpoint {
pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.pg_port),
http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.http_port),
endpoint_id,
env: env.clone(),
pageserver,
pageservers,
timeline_id: conf.timeline_id,
mode: conf.mode,
tenant_id: conf.tenant_id,
@@ -482,13 +505,7 @@ impl Endpoint {
std::fs::remove_dir_all(self.pgdata())?;
}
let pageserver_connstring = {
let config = &self.pageserver.pg_connection_config;
let (host, port) = (config.host(), config.port());
// NOTE: avoid spaces in connection string, because it is less error prone if we forward it somewhere.
format!("postgresql://no_user@{host}:{port}")
};
let pageserver_connstring = build_pageserver_connstr(&self.pageservers);
let mut safekeeper_connstrings = Vec::new();
if self.mode == ComputeMode::Primary {
for sk_id in safekeepers {
@@ -658,7 +675,7 @@ impl Endpoint {
}
}
pub fn reconfigure(&self, pageserver_id: Option<NodeId>) -> Result<()> {
pub fn reconfigure(&self, pageservers: Option<Vec<NodeId>>) -> Result<()> {
let mut spec: ComputeSpec = {
let spec_path = self.endpoint_path().join("spec.json");
let file = std::fs::File::open(spec_path)?;
@@ -668,23 +685,20 @@ impl Endpoint {
let postgresql_conf = self.read_postgresql_conf()?;
spec.cluster.postgresql_conf = Some(postgresql_conf);
if let Some(pageserver_id) = pageserver_id {
if let Some(pageservers) = pageservers {
let endpoint_config_path = self.endpoint_path().join("endpoint.json");
let mut endpoint_conf: EndpointConf = {
let file = std::fs::File::open(&endpoint_config_path)?;
serde_json::from_reader(file)?
};
endpoint_conf.pageserver_id = pageserver_id;
endpoint_conf.pageservers = pageservers.clone();
std::fs::write(
endpoint_config_path,
serde_json::to_string_pretty(&endpoint_conf)?,
)?;
let pageserver =
PageServerNode::from_env(&self.env, self.env.get_pageserver_conf(pageserver_id)?);
let ps_http_conf = &pageserver.pg_connection_config;
let (host, port) = (ps_http_conf.host(), ps_http_conf.port());
spec.pageserver_connstring = Some(format!("postgresql://no_user@{host}:{port}"));
let pageservers = load_pageservers(&self.env, &pageservers)?;
spec.pageserver_connstring = Some(build_pageserver_connstr(&pageservers));
}
let client = reqwest::blocking::Client::new();

View File

@@ -340,15 +340,8 @@ impl PageServerNode {
.json()?)
}
pub fn tenant_create(
&self,
new_tenant_id: TenantId,
generation: Option<u32>,
settings: HashMap<&str, &str>,
) -> anyhow::Result<TenantId> {
let mut settings = settings.clone();
let config = models::TenantConfig {
pub fn build_config(mut settings: HashMap<&str, &str>) -> anyhow::Result<models::TenantConfig> {
Ok(models::TenantConfig {
checkpoint_distance: settings
.remove("checkpoint_distance")
.map(|x| x.parse::<u64>())
@@ -407,8 +400,16 @@ impl PageServerNode {
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'gc_feedback' as bool")?,
};
})
}
pub fn tenant_create(
&self,
new_tenant_id: TenantId,
generation: Option<u32>,
settings: HashMap<&str, &str>,
) -> anyhow::Result<TenantId> {
let config = Self::build_config(settings.clone())?;
let request = models::TenantCreateRequest {
new_tenant_id: TenantShardId::unsharded(new_tenant_id),
generation,
@@ -521,15 +522,18 @@ impl PageServerNode {
pub fn location_config(
&self,
tenant_id: TenantId,
tenant_shard_id: TenantShardId,
config: LocationConfig,
flush_ms: Option<Duration>,
) -> anyhow::Result<()> {
let req_body = TenantLocationConfigRequest { tenant_id, config };
let req_body = TenantLocationConfigRequest {
tenant_shard_id,
config,
};
let path = format!(
"{}/tenant/{}/location_config",
self.http_base_url, tenant_id
self.http_base_url, tenant_shard_id
);
let path = if let Some(flush_ms) = flush_ms {
format!("{}?flush_ms={}", path, flush_ms.as_millis())
@@ -560,7 +564,7 @@ impl PageServerNode {
pub fn timeline_create(
&self,
tenant_id: TenantId,
tenant_shard_id: TenantShardId,
new_timeline_id: Option<TimelineId>,
ancestor_start_lsn: Option<Lsn>,
ancestor_timeline_id: Option<TimelineId>,
@@ -572,7 +576,7 @@ impl PageServerNode {
self.http_request(
Method::POST,
format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id),
format!("{}/tenant/{}/timeline", self.http_base_url, tenant_shard_id),
)?
.json(&models::TimelineCreateRequest {
new_timeline_id,
@@ -585,11 +589,11 @@ impl PageServerNode {
.error_from_body()?
.json::<Option<TimelineInfo>>()
.with_context(|| {
format!("Failed to parse timeline creation response for tenant id: {tenant_id}")
format!("Failed to parse timeline creation response for tenant id: {tenant_shard_id}")
})?
.with_context(|| {
format!(
"No timeline id was found in the timeline creation response for tenant {tenant_id}"
"No timeline id was found in the timeline creation response for tenant {tenant_shard_id}"
)
})
}

View File

@@ -11,6 +11,7 @@ use crate::{
use pageserver_api::models::{
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
};
use pageserver_api::shard::TenantShardId;
use std::collections::HashMap;
use std::time::Duration;
use utils::{
@@ -108,6 +109,9 @@ pub fn migrate_tenant(
}
}
// No support for sharding in this function yet
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let previous = attachment_service.inspect(tenant_id)?;
let mut baseline_lsns = None;
if let Some((generation, origin_ps_id)) = &previous {
@@ -117,7 +121,7 @@ pub fn migrate_tenant(
println!("🔁 Already attached to {origin_ps_id}, freshening...");
let gen = attachment_service.attach_hook(tenant_id, dest_ps.conf.id)?;
let dest_conf = build_location_config(LocationConfigMode::AttachedSingle, gen, None);
dest_ps.location_config(tenant_id, dest_conf, None)?;
dest_ps.location_config(tenant_shard_id, dest_conf, None)?;
println!("✅ Migration complete");
return Ok(());
}
@@ -126,7 +130,7 @@ pub fn migrate_tenant(
let stale_conf =
build_location_config(LocationConfigMode::AttachedStale, Some(*generation), None);
origin_ps.location_config(tenant_id, stale_conf, Some(Duration::from_secs(10)))?;
origin_ps.location_config(tenant_shard_id, stale_conf, Some(Duration::from_secs(10)))?;
baseline_lsns = Some(get_lsns(tenant_id, &origin_ps)?);
}
@@ -135,7 +139,7 @@ pub fn migrate_tenant(
let dest_conf = build_location_config(LocationConfigMode::AttachedMulti, gen, None);
println!("🔁 Attaching to pageserver {}", dest_ps.conf.id);
dest_ps.location_config(tenant_id, dest_conf, None)?;
dest_ps.location_config(tenant_shard_id, dest_conf, None)?;
if let Some(baseline) = baseline_lsns {
println!("🕑 Waiting for LSN to catch up...");
@@ -149,7 +153,7 @@ pub fn migrate_tenant(
"🔁 Reconfiguring endpoint {} to use pageserver {}",
endpoint_name, dest_ps.conf.id
);
endpoint.reconfigure(Some(dest_ps.conf.id))?;
endpoint.reconfigure(Some(vec![dest_ps.conf.id]))?;
}
}
@@ -181,7 +185,7 @@ pub fn migrate_tenant(
"💤 Switching to secondary mode on pageserver {}",
other_ps.conf.id
);
other_ps.location_config(tenant_id, secondary_conf, None)?;
other_ps.location_config(tenant_shard_id, secondary_conf, None)?;
}
println!(
@@ -189,7 +193,7 @@ pub fn migrate_tenant(
dest_ps.conf.id
);
let dest_conf = build_location_config(LocationConfigMode::AttachedSingle, gen, None);
dest_ps.location_config(tenant_id, dest_conf, None)?;
dest_ps.location_config(tenant_shard_id, dest_conf, None)?;
println!("✅ Migration complete");

21
demo_sharding.sh Normal file
View File

@@ -0,0 +1,21 @@
export RUST_LOG=DEBUG
SHARDS=4
PAGESERVERS=`seq -s , 1 $SHARDS`
SCALE=10
ARGS=--features=testing
set -e
set +e
cargo neon $ARGS stop ; killall -9 storage_broker ; killall -9 safekeeper ; killall -9 pageserver ; killall -9 postgres ; killall -9 attachment_service ; rm -rf .neon
set -e
cargo build --package=pageserver && cargo neon $ARGS init --num-pageservers=$SHARDS && RUST_LOG=debug cargo neon $ARGS start && cargo neon $ARGS tenant create --shard-count=$SHARDS --tenant-id=1f359dd625e519a1a4e8d7509690f6fc --timeline-id=3d34095be52fec4c44a92e774c573b57 --set-default
cargo neon $ARGS endpoint create --pageserver-id=$PAGESERVERS && cargo neon endpoint start --pageserver-id=$PAGESERVERS ep-main
pgbench postgres -i -h 127.0.0.1 -p 55432 -U cloud_admin -s $SCALE
du -sh .neon/local_fs_remote_storage/pageserver/tenants/1f359dd625e519a1a4e8d7509690f6fc*

View File

@@ -293,7 +293,7 @@ pub struct StatusResponse {
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TenantLocationConfigRequest {
pub tenant_id: TenantId,
pub tenant_shard_id: TenantShardId,
#[serde(flatten)]
pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it
}

View File

@@ -411,6 +411,12 @@ impl ShardIdentity {
String::new()
}
}
/// Convenience for checking if this identity is the 0th shard in a tenant,
/// for special cases on shard 0 such as ingesting relation sizes.
pub fn is_zero(&self) -> bool {
self.number == ShardNumber(0)
}
}
impl Serialize for ShardIndex {

View File

@@ -1167,6 +1167,30 @@ pub(crate) static DELETION_QUEUE: Lazy<DeletionQueueMetrics> = Lazy::new(|| {
}
});
pub(crate) struct WalIngestMetrics {
pub(crate) records_received: IntCounter,
pub(crate) records_committed: IntCounter,
pub(crate) records_filtered: IntCounter,
}
pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| WalIngestMetrics {
records_received: register_int_counter!(
"pageserver_wal_ingest_records_received",
"Number of WAL records received from safekeeper"
)
.expect("failed to define a metric"),
records_committed: register_int_counter!(
"pageserver_wal_ingest_records_committed",
"Number of WAL records which resulted in writes to pageserver storage"
)
.expect("failed to define a metric"),
records_filtered: register_int_counter!(
"pageserver_wal_ingest_records_filtered",
"Number of WAL records filtered out due to sharding"
)
.expect("failed to define a metric"),
});
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RemoteOpKind {
Upload,

View File

@@ -1368,6 +1368,10 @@ impl<'a> DatadirModification<'a> {
Ok(())
}
pub(crate) fn is_empty(&self) -> bool {
self.pending_updates.is_empty() && self.pending_deletions.is_empty()
}
// Internal helper functions to batch the modifications
async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {

View File

@@ -21,6 +21,7 @@
//! redo Postgres process, but some records it can handle directly with
//! bespoken Rust code.
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
@@ -30,6 +31,7 @@ use bytes::{Buf, Bytes, BytesMut};
use tracing::*;
use crate::context::RequestContext;
use crate::metrics::WAL_INGEST;
use crate::pgdatadir_mapping::*;
use crate::tenant::PageReconstructError;
use crate::tenant::Timeline;
@@ -46,6 +48,7 @@ use postgres_ffi::BLCKSZ;
use utils::lsn::Lsn;
pub struct WalIngest<'a> {
shard: ShardIdentity,
timeline: &'a Timeline,
checkpoint: CheckPoint,
@@ -65,6 +68,7 @@ impl<'a> WalIngest<'a> {
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
Ok(WalIngest {
shard: *timeline.get_shard_identity(),
timeline,
checkpoint,
checkpoint_modified: false,
@@ -87,6 +91,8 @@ impl<'a> WalIngest<'a> {
decoded: &mut DecodedWALRecord,
ctx: &RequestContext,
) -> anyhow::Result<()> {
WAL_INGEST.records_received.inc();
modification.lsn = lsn;
decode_wal_record(recdata, decoded, self.timeline.pg_version)?;
@@ -355,6 +361,32 @@ impl<'a> WalIngest<'a> {
// Iterate through all the blocks that the record modifies, and
// "put" a separate copy of the record for each block.
for blk in decoded.blocks.iter() {
let rel = RelTag {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum,
};
let key = rel_block_to_key(rel, blk.blkno);
let key_is_local = self.shard.is_key_local(&key);
tracing::debug!(
"ingest: shard decision {} (checkpoint={}) for key {}",
if !key_is_local { "drop" } else { "keep" },
self.checkpoint_modified,
key
);
if !key_is_local {
if self.shard.is_zero() {
// Shard 0 tracks relation sizes. Although we will not store this block, we will observe
// its blkno in case it implicitly extends a relation.
self.observe_decoded_block(modification, blk, ctx).await?;
}
continue;
}
self.ingest_decoded_block(modification, lsn, decoded, blk, ctx)
.await?;
}
@@ -367,13 +399,37 @@ impl<'a> WalIngest<'a> {
self.checkpoint_modified = false;
}
if modification.is_empty() {
tracing::debug!("ingest: filtered out record @ LSN {lsn}");
WAL_INGEST.records_filtered.inc();
return Ok(());
}
// Now that this record has been fully handled, including updating the
// checkpoint data, let the repository know that it is up-to-date to this LSN
WAL_INGEST.records_committed.inc();
modification.commit(ctx).await?;
Ok(())
}
/// Do not store this block, but observe it for the purposes of updating our relation size state.
async fn observe_decoded_block(
&mut self,
modification: &mut DatadirModification<'_>,
blk: &DecodedBkpBlock,
ctx: &RequestContext,
) -> Result<(), PageReconstructError> {
let rel = RelTag {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum,
};
self.handle_rel_extend(modification, rel, blk.blkno, ctx)
.await
}
async fn ingest_decoded_block(
&mut self,
modification: &mut DatadirModification<'_>,
@@ -1465,8 +1521,15 @@ impl<'a> WalIngest<'a> {
//info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
modification.put_rel_extend(rel, new_nblocks, ctx).await?;
let mut key = rel_block_to_key(rel, blknum);
// fill the gap with zeros
for gap_blknum in old_nblocks..blknum {
key.field6 = gap_blknum;
if self.shard.get_shard_number(&key) != self.shard.number {
continue;
}
modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone())?;
}
}

View File

@@ -18,6 +18,7 @@
#include "fmgr.h"
#include "access/xlog.h"
#include "access/xlogutils.h"
#include "common/hashfn.h"
#include "storage/buf_internals.h"
#include "storage/lwlock.h"
#include "storage/ipc.h"
@@ -36,22 +37,12 @@
#include "neon.h"
#include "walproposer.h"
#include "neon_utils.h"
#include "control_plane_connector.h"
#define PageStoreTrace DEBUG5
#define RECONNECT_INTERVAL_USEC 1000000
bool connected = false;
PGconn *pageserver_conn = NULL;
/*
* WaitEventSet containing:
* - WL_SOCKET_READABLE on pageserver_conn,
* - WL_LATCH_SET on MyLatch, and
* - WL_EXIT_ON_PM_DEATH.
*/
WaitEventSet *pageserver_conn_wes = NULL;
/* GUCs */
char *neon_timeline;
char *neon_tenant;
@@ -64,87 +55,175 @@ int flush_every_n_requests = 8;
int n_reconnect_attempts = 0;
int max_reconnect_attempts = 60;
#define MAX_PAGESERVER_CONNSTRING_SIZE 256
typedef struct
{
LWLockId lock;
pg_atomic_uint64 update_counter;
char pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE];
} PagestoreShmemState;
#if PG_VERSION_NUM >= 150000
static shmem_request_hook_type prev_shmem_request_hook = NULL;
static void walproposer_shmem_request(void);
#endif
static shmem_startup_hook_type prev_shmem_startup_hook;
static PagestoreShmemState *pagestore_shared;
static uint64 pagestore_local_counter = 0;
static char local_pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE];
int stripe_size;
bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;
static bool pageserver_flush(void);
static void pageserver_disconnect(void);
static bool pageserver_flush(shardno_t shard_no);
static void pageserver_disconnect(shardno_t shard_no);
static void AssignPageserverConnstring(const char *newval, void *extra);
static bool
PagestoreShmemIsValid()
{
return pagestore_shared && UsedShmemSegAddr;
}
static shmem_startup_hook_type prev_shmem_startup_hook;
#if PG_VERSION_NUM>=150000
static shmem_request_hook_type prev_shmem_request_hook;
#endif
static bool
CheckPageserverConnstring(char **newval, void **extra, GucSource source)
typedef struct
{
return strlen(*newval) < MAX_PAGESERVER_CONNSTRING_SIZE;
size_t n_shards;
pg_atomic_uint64 begin_update_counter;
pg_atomic_uint64 end_update_counter;
char shard_connstr[MAX_SHARDS][MAX_PS_CONNSTR_LEN];
} ShardMap;
static ShardMap* shard_map;
static uint64 shard_map_update_counter;
typedef struct
{
/*
* Connection for each shard
*/
PGconn *conn;
/*
* WaitEventSet containing:
* - WL_SOCKET_READABLE on pageserver_conn,
* - WL_LATCH_SET on MyLatch, and
* - WL_EXIT_ON_PM_DEATH.
*/
WaitEventSet *wes;
} PageServer;
static PageServer page_servers[MAX_SHARDS];
static shardno_t max_attached_shard_no;
static void
psm_shmem_startup(void)
{
bool found;
if (prev_shmem_startup_hook)
{
prev_shmem_startup_hook();
}
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
shard_map = (ShardMap*)ShmemInitStruct("shard_map", sizeof(ShardMap), &found);
if (!found)
{
shard_map->n_shards = 0;
pg_atomic_init_u64(&shard_map->begin_update_counter, 0);
pg_atomic_init_u64(&shard_map->end_update_counter, 0);
AssignPageserverConnstring(page_server_connstring, NULL);
}
LWLockRelease(AddinShmemInitLock);
}
static void
AssignPageserverConnstring(const char *newval, void *extra)
psm_shmem_request(void)
{
if(!PagestoreShmemIsValid())
return;
LWLockAcquire(pagestore_shared->lock, LW_EXCLUSIVE);
strlcpy(pagestore_shared->pageserver_connstring, newval, MAX_PAGESERVER_CONNSTRING_SIZE);
pg_atomic_fetch_add_u64(&pagestore_shared->update_counter, 1);
LWLockRelease(pagestore_shared->lock);
}
#if PG_VERSION_NUM>=150000
if (prev_shmem_request_hook)
prev_shmem_request_hook();
#endif
static bool
CheckConnstringUpdated()
{
if(!PagestoreShmemIsValid())
return false;
return pagestore_local_counter < pg_atomic_read_u64(&pagestore_shared->update_counter);
RequestAddinShmemSpace(sizeof(ShardMap));
}
static void
ReloadConnstring()
psm_init(void)
{
if(!PagestoreShmemIsValid())
return;
LWLockAcquire(pagestore_shared->lock, LW_SHARED);
strlcpy(local_pageserver_connstring, pagestore_shared->pageserver_connstring, sizeof(local_pageserver_connstring));
pagestore_local_counter = pg_atomic_read_u64(&pagestore_shared->update_counter);
LWLockRelease(pagestore_shared->lock);
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = psm_shmem_startup;
#if PG_VERSION_NUM>=150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = psm_shmem_request;
#else
psm_shmem_request();
#endif
}
/*
* Reload page map if needed and return number of shards and connection string for the specified shard
*/
static shardno_t
load_shard_map(shardno_t shard_no, char* connstr)
{
shardno_t n_shards;
uint64 begin_update_counter;
uint64 end_update_counter;
/*
* There is race condition here between backendc and postmaster which can update shard map.
* We recheck update couner after copying connection string to check that configuration was not changed.
*/
do
{
begin_update_counter = pg_atomic_read_u64(&shard_map->begin_update_counter);
end_update_counter = pg_atomic_read_u64(&shard_map->end_update_counter);
n_shards = shard_map->n_shards;
if (shard_no >= n_shards)
elog(ERROR, "Shard %d is greater or equal than number of shards %d", shard_no, n_shards);
if (connstr)
strncpy(connstr, shard_map->shard_connstr[shard_no], MAX_PS_CONNSTR_LEN);
}
while (begin_update_counter != end_update_counter
|| begin_update_counter != pg_atomic_read_u64(&shard_map->begin_update_counter)
|| end_update_counter != pg_atomic_read_u64(&shard_map->end_update_counter));
if (shard_map_update_counter != end_update_counter)
{
/* Reset all connections if connection strings are changed */
for (shardno_t i = 0; i < max_attached_shard_no; i++)
{
if (page_servers[i].conn)
pageserver_disconnect(i);
}
max_attached_shard_no = 0;
shard_map_update_counter = end_update_counter;
}
return n_shards;
}
#define MB (1024*1024)
shardno_t
get_shard_number(BufferTag* tag)
{
shardno_t n_shards = load_shard_map(0, NULL);
uint32 hash;
#if PG_MAJORVERSION_NUM < 16
hash = murmurhash32(tag->rnode.relNode);
hash = hash_combine(hash, murmurhash32(tag->blockNum/(MB/BLCKSZ)/stripe_size));
#else
hash = murmurhash32(tag->relNumber);
hash = hash_combine(hash, murmurhash32(tag->blockNum/(MB/BLCKSZ)/stripe_size));
#endif
return hash % n_shards;
}
static bool
pageserver_connect(int elevel)
pageserver_connect(shardno_t shard_no, int elevel)
{
char *query;
int ret;
const char *keywords[3];
const char *values[3];
int n;
PGconn* conn;
WaitEventSet *wes;
char connstr[MAX_PS_CONNSTR_LEN];
Assert(!connected);
Assert(page_servers[shard_no].conn == NULL);
if(CheckConnstringUpdated())
{
ReloadConnstring();
}
(void)load_shard_map(shard_no, connstr); /* refresh page map if needed */
/*
* Connect using the connection string we got from the
@@ -165,19 +244,18 @@ pageserver_connect(int elevel)
n++;
}
keywords[n] = "dbname";
values[n] = local_pageserver_connstring;
values[n] = connstr;
n++;
keywords[n] = NULL;
values[n] = NULL;
n++;
pageserver_conn = PQconnectdbParams(keywords, values, 1);
conn = PQconnectdbParams(keywords, values, 1);
if (PQstatus(pageserver_conn) == CONNECTION_BAD)
if (PQstatus(conn) == CONNECTION_BAD)
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
char *msg = pchomp(PQerrorMessage(conn));
PQfinish(pageserver_conn);
pageserver_conn = NULL;
PQfinish(conn);
ereport(elevel,
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
@@ -185,30 +263,28 @@ pageserver_connect(int elevel)
errdetail_internal("%s", msg)));
return false;
}
query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
ret = PQsendQuery(pageserver_conn, query);
ret = PQsendQuery(conn, query);
if (ret != 1)
{
PQfinish(pageserver_conn);
pageserver_conn = NULL;
PQfinish(conn);
neon_log(elevel, "could not send pagestream command to pageserver");
return false;
}
pageserver_conn_wes = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(pageserver_conn_wes, WL_LATCH_SET, PGINVALID_SOCKET,
wes = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(pageserver_conn_wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
AddWaitEventToSet(wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
AddWaitEventToSet(pageserver_conn_wes, WL_SOCKET_READABLE, PQsocket(pageserver_conn), NULL, NULL);
AddWaitEventToSet(wes, WL_SOCKET_READABLE, PQsocket(conn), NULL, NULL);
while (PQisBusy(pageserver_conn))
while (PQisBusy(conn))
{
WaitEvent event;
/* Sleep until there's something to do */
(void) WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
(void) WaitEventSetWait(wes, -1L, &event, 1, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
@@ -216,14 +292,12 @@ pageserver_connect(int elevel)
/* Data available in socket? */
if (event.events & WL_SOCKET_READABLE)
{
if (!PQconsumeInput(pageserver_conn))
if (!PQconsumeInput(conn))
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
char *msg = pchomp(PQerrorMessage(conn));
PQfinish(pageserver_conn);
pageserver_conn = NULL;
FreeWaitEventSet(pageserver_conn_wes);
pageserver_conn_wes = NULL;
PQfinish(conn);
FreeWaitEventSet(wes);
neon_log(elevel, "could not complete handshake with pageserver: %s",
msg);
@@ -232,9 +306,11 @@ pageserver_connect(int elevel)
}
}
neon_log(LOG, "libpagestore: connected to '%s'", page_server_connstring);
neon_log(LOG, "libpagestore: connected to '%s'", connstr);
page_servers[shard_no].conn = conn;
page_servers[shard_no].wes = wes;
max_attached_shard_no = Max(shard_no+1, max_attached_shard_no);
connected = true;
return true;
}
@@ -242,10 +318,10 @@ pageserver_connect(int elevel)
* A wrapper around PQgetCopyData that checks for interrupts while sleeping.
*/
static int
call_PQgetCopyData(char **buffer)
call_PQgetCopyData(shardno_t shard_no, char **buffer)
{
int ret;
PGconn* pageserver_conn = page_servers[shard_no].conn;
retry:
ret = PQgetCopyData(pageserver_conn, buffer, 1 /* async */ );
@@ -254,7 +330,7 @@ retry:
WaitEvent event;
/* Sleep until there's something to do */
(void) WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
(void) WaitEventSetWait(page_servers[shard_no].wes, -1L, &event, 1, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
@@ -279,7 +355,7 @@ retry:
static void
pageserver_disconnect(void)
pageserver_disconnect(shardno_t shard_no)
{
/*
* If anything goes wrong while we were sending a request, it's not clear
@@ -288,38 +364,32 @@ pageserver_disconnect(void)
* time later after we have already sent a new unrelated request. Close
* the connection to avoid getting confused.
*/
if (connected)
if (page_servers[shard_no].conn)
{
neon_log(LOG, "dropping connection to page server due to error");
PQfinish(pageserver_conn);
pageserver_conn = NULL;
connected = false;
PQfinish(page_servers[shard_no].conn);
page_servers[shard_no].conn = NULL;
prefetch_on_ps_disconnect();
}
if (pageserver_conn_wes != NULL)
if (page_servers[shard_no].wes != NULL)
{
FreeWaitEventSet(pageserver_conn_wes);
pageserver_conn_wes = NULL;
FreeWaitEventSet(page_servers[shard_no].wes);
page_servers[shard_no].wes = NULL;
}
}
static bool
pageserver_send(NeonRequest * request)
pageserver_send(shardno_t shard_no, NeonRequest * request)
{
StringInfoData req_buff;
if(CheckConnstringUpdated())
{
pageserver_disconnect();
ReloadConnstring();
}
PGconn* pageserver_conn = page_servers[shard_no].conn;
/* If the connection was lost for some reason, reconnect */
if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD)
if (pageserver_conn && PQstatus(pageserver_conn) == CONNECTION_BAD)
{
neon_log(LOG, "pageserver_send disconnect bad connection");
pageserver_disconnect();
pageserver_disconnect(shard_no);
}
req_buff = nm_pack_request(request);
@@ -331,9 +401,9 @@ pageserver_send(NeonRequest * request)
* See https://github.com/neondatabase/neon/issues/1138
* So try to reestablish connection in case of failure.
*/
if (!connected)
if (!page_servers[shard_no].conn)
{
while (!pageserver_connect(n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
while (!pageserver_connect(shard_no, n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
{
HandleMainLoopInterrupts();
n_reconnect_attempts += 1;
@@ -342,7 +412,9 @@ pageserver_send(NeonRequest * request)
n_reconnect_attempts = 0;
}
/*
pageserver_conn = page_servers[shard_no].conn;
/*
* Send request.
*
* In principle, this could block if the output buffer is full, and we
@@ -353,7 +425,7 @@ pageserver_send(NeonRequest * request)
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0)
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect();
pageserver_disconnect(shard_no);
neon_log(LOG, "pageserver_send disconnect because failed to send page request (try to reconnect): %s", msg);
pfree(msg);
pfree(req_buff.data);
@@ -373,12 +445,12 @@ pageserver_send(NeonRequest * request)
}
static NeonResponse *
pageserver_receive(void)
pageserver_receive(shardno_t shard_no)
{
StringInfoData resp_buff;
NeonResponse *resp;
if (!connected)
PGconn* pageserver_conn = page_servers[shard_no].conn;
if (!pageserver_conn)
return NULL;
PG_TRY();
@@ -386,7 +458,7 @@ pageserver_receive(void)
/* read response */
int rc;
rc = call_PQgetCopyData(&resp_buff.data);
rc = call_PQgetCopyData(shard_no, &resp_buff.data);
if (rc >= 0)
{
resp_buff.len = rc;
@@ -405,25 +477,25 @@ pageserver_receive(void)
else if (rc == -1)
{
neon_log(LOG, "pageserver_receive disconnect because call_PQgetCopyData returns -1: %s", pchomp(PQerrorMessage(pageserver_conn)));
pageserver_disconnect();
pageserver_disconnect(shard_no);
resp = NULL;
}
else if (rc == -2)
{
char* msg = pchomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect();
pageserver_disconnect(shard_no);
neon_log(ERROR, "pageserver_receive disconnect because could not read COPY data: %s", msg);
}
else
{
pageserver_disconnect();
pageserver_disconnect(shard_no);
neon_log(ERROR, "pageserver_receive disconnect because unexpected PQgetCopyData return value: %d", rc);
}
}
PG_CATCH();
{
neon_log(LOG, "pageserver_receive disconnect due to caught exception");
pageserver_disconnect();
pageserver_disconnect(shard_no);
PG_RE_THROW();
}
PG_END_TRY();
@@ -433,9 +505,10 @@ pageserver_receive(void)
static bool
pageserver_flush(void)
pageserver_flush(shardno_t shard_no)
{
if (!connected)
PGconn* pageserver_conn = page_servers[shard_no].conn;
if (!pageserver_conn)
{
neon_log(WARNING, "Tried to flush while disconnected");
}
@@ -444,7 +517,7 @@ pageserver_flush(void)
if (PQflush(pageserver_conn))
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect();
pageserver_disconnect(shard_no);
neon_log(LOG, "pageserver_flush disconnect because failed to flush page requests: %s", msg);
pfree(msg);
return false;
@@ -468,62 +541,61 @@ check_neon_id(char **newval, void **extra, GucSource source)
return **newval == '\0' || HexDecodeString(id, *newval, 16);
}
static Size
PagestoreShmemSize(void)
{
return sizeof(PagestoreShmemState);
}
static bool
PagestoreShmemInit(void)
{
bool found;
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
pagestore_shared = ShmemInitStruct("libpagestore shared state",
PagestoreShmemSize(),
&found);
if(!found)
{
pagestore_shared->lock = &(GetNamedLWLockTranche("neon_libpagestore")->lock);
pg_atomic_init_u64(&pagestore_shared->update_counter, 0);
AssignPageserverConnstring(page_server_connstring, NULL);
}
LWLockRelease(AddinShmemInitLock);
return found;
}
static void
pagestore_shmem_startup_hook(void)
AssignPageserverConnstring(const char *newval, void *extra)
{
if(prev_shmem_startup_hook)
prev_shmem_startup_hook();
/*
* Load shard map only at Postmaster.
* If old page server is not available, then backends can be blocked in attempts to reconnect to it and do not reload config in this loop
*/
if (shard_map != NULL && (MyProcPid == PostmasterPid || shard_map->n_shards == 0))
{
char const* shard_connstr = newval;
char const* sep;
size_t connstr_len;
int i = 0;
bool shard_map_changed = false;
do
{
sep = strchr(shard_connstr, ',');
connstr_len = sep != NULL ? sep - shard_connstr : strlen(shard_connstr);
if (connstr_len == 0)
break; /* trailing comma */
if (i >= MAX_SHARDS)
{
elog(LOG, "Too many shards");
return;
}
if (connstr_len >= MAX_PS_CONNSTR_LEN)
{
elog(LOG, "Connection string too long");
return;
}
if (i >= shard_map->n_shards ||
strcmp(shard_map->shard_connstr[i], shard_connstr) != 0)
{
if (!shard_map_changed)
{
pg_atomic_add_fetch_u64(&shard_map->begin_update_counter, 1);
shard_map_changed = true;
}
memcpy(shard_map->shard_connstr[i], shard_connstr, connstr_len+1);
}
shard_connstr = sep + 1;
i += 1;
} while (sep != NULL);
PagestoreShmemInit();
}
static void
pagestore_shmem_request(void)
{
#if PG_VERSION_NUM >= 150000
if(prev_shmem_request_hook)
prev_shmem_request_hook();
#endif
RequestAddinShmemSpace(PagestoreShmemSize());
RequestNamedLWLockTranche("neon_libpagestore", 1);
}
static void
pagestore_prepare_shmem(void)
{
#if PG_VERSION_NUM >= 150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = pagestore_shmem_request;
#else
pagestore_shmem_request();
#endif
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = pagestore_shmem_startup_hook;
if (i == 0)
{
elog(LOG, "No shards were specified");
return;
}
if (shard_map_changed)
{
shard_map->n_shards = i;
pg_atomic_add_fetch_u64(&shard_map->end_update_counter, 1);
}
}
}
/*
@@ -532,8 +604,6 @@ pagestore_prepare_shmem(void)
void
pg_init_libpagestore(void)
{
pagestore_prepare_shmem();
DefineCustomStringVariable("neon.pageserver_connstring",
"connection string to the page server",
NULL,
@@ -541,7 +611,7 @@ pg_init_libpagestore(void)
"",
PGC_SIGHUP,
0, /* no flags required */
CheckPageserverConnstring, AssignPageserverConnstring, NULL);
NULL, AssignPageserverConnstring, NULL);
DefineCustomStringVariable("neon.timeline_id",
"Neon timeline_id the server is running on",
@@ -561,6 +631,15 @@ pg_init_libpagestore(void)
0, /* no flags required */
check_neon_id, NULL, NULL);
DefineCustomIntVariable("neon.stripe_size",
"sharding sripe size",
NULL,
&stripe_size,
256, 1, INT_MAX,
PGC_SIGHUP,
GUC_UNIT_MB,
NULL, NULL, NULL);
DefineCustomIntVariable("neon.max_cluster_size",
"cluster size limit",
NULL,
@@ -623,4 +702,5 @@ pg_init_libpagestore(void)
}
lfc_init();
psm_init();
}

View File

@@ -20,12 +20,16 @@
#include RELFILEINFO_HDR
#include "storage/block.h"
#include "storage/smgr.h"
#include "storage/buf_internals.h"
#include "lib/stringinfo.h"
#include "libpq/pqformat.h"
#include "utils/memutils.h"
#include "pg_config.h"
#define MAX_SHARDS 128
#define MAX_PS_CONNSTR_LEN 128
typedef enum
{
/* pagestore_client -> pagestore */
@@ -144,11 +148,13 @@ extern char *nm_to_string(NeonMessage * msg);
* API
*/
typedef unsigned shardno_t;
typedef struct
{
bool (*send) (NeonRequest * request);
NeonResponse *(*receive) (void);
bool (*flush) (void);
bool (*send) (shardno_t shard_no, NeonRequest * request);
NeonResponse *(*receive) (shardno_t shard_no);
bool (*flush) (shardno_t shard_no);
} page_server_api;
extern void prefetch_on_ps_disconnect(void);
@@ -165,6 +171,8 @@ extern char *neon_tenant;
extern bool wal_redo;
extern int32 max_cluster_size;
extern shardno_t get_shard_number(BufferTag* tag);
extern const f_smgr *smgr_neon(BackendId backend, NRelFileInfo rinfo);
extern void smgr_init_neon(void);
extern void readahead_buffer_resize(int newsize, void *extra);

View File

@@ -164,6 +164,7 @@ typedef struct PrefetchRequest {
XLogRecPtr actual_request_lsn;
NeonResponse *response; /* may be null */
PrefetchStatus status;
shardno_t shard_no;
uint64 my_ring_index;
} PrefetchRequest;
@@ -225,6 +226,8 @@ typedef struct PrefetchState {
/* the buffers */
prfh_hash *prf_hash;
int max_shard_no;
uint8 shard_bitmap[(MAX_SHARDS + 7)/8];
PrefetchRequest prf_buffer[]; /* prefetch buffers */
} PrefetchState;
@@ -313,6 +316,7 @@ compact_prefetch_buffers(void)
Assert(target_slot->status == PRFS_UNUSED);
target_slot->buftag = source_slot->buftag;
target_slot->shard_no = source_slot->shard_no;
target_slot->status = source_slot->status;
target_slot->response = source_slot->response;
target_slot->effective_request_lsn = source_slot->effective_request_lsn;
@@ -477,6 +481,23 @@ prefetch_cleanup_trailing_unused(void)
}
}
static bool
prefetch_flush_requests(void)
{
for (shardno_t shard_no = 0; shard_no < MyPState->max_shard_no; shard_no++)
{
if (MyPState->shard_bitmap[shard_no >> 3] & (1 << (shard_no & 7)))
{
if (!page_server->flush(shard_no))
return false;
MyPState->shard_bitmap[shard_no >> 3] &= ~(1 << (shard_no & 7));
}
}
MyPState->max_shard_no = 0;
return true;
}
/*
* Wait for slot of ring_index to have received its response.
* The caller is responsible for making sure the request buffer is flushed.
@@ -492,7 +513,7 @@ prefetch_wait_for(uint64 ring_index)
if (MyPState->ring_flush <= ring_index &&
MyPState->ring_unused > MyPState->ring_flush)
{
if (!page_server->flush())
if (!prefetch_flush_requests())
return false;
MyPState->ring_flush = MyPState->ring_unused;
}
@@ -530,7 +551,7 @@ prefetch_read(PrefetchRequest *slot)
Assert(slot->my_ring_index == MyPState->ring_receive);
old = MemoryContextSwitchTo(MyPState->errctx);
response = (NeonResponse *) page_server->receive();
response = (NeonResponse *) page_server->receive(slot->shard_no);
MemoryContextSwitchTo(old);
if (response)
{
@@ -682,12 +703,14 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
Assert(slot->response == NULL);
Assert(slot->my_ring_index == MyPState->ring_unused);
while (!page_server->send((NeonRequest *) &request));
while (!page_server->send(slot->shard_no, (NeonRequest *) &request));
/* update prefetch state */
MyPState->n_requests_inflight += 1;
MyPState->n_unused -= 1;
MyPState->ring_unused += 1;
MyPState->shard_bitmap[slot->shard_no >> 3] |= 1 << (slot->shard_no & 7);
MyPState->max_shard_no = Max(slot->shard_no+1, MyPState->max_shard_no);
/* update slot state */
slot->status = PRFS_REQUESTED;
@@ -847,6 +870,7 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
* function reads the buffer tag from the slot.
*/
slot->buftag = tag;
slot->shard_no = get_shard_number(&tag);
slot->my_ring_index = ring_index;
prefetch_do_request(slot, force_latest, force_lsn);
@@ -857,7 +881,7 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
if (flush_every_n_requests > 0 &&
MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests)
{
if (!page_server->flush())
if (!prefetch_flush_requests())
{
/* Prefetch set is reset in case of error, so we should try to register our request once again */
goto Retry;
@@ -872,11 +896,42 @@ static NeonResponse *
page_server_request(void const *req)
{
NeonResponse* resp;
BufferTag tag = {0};
shardno_t shard_no;
switch (((NeonRequest *) req)->tag)
{
case T_NeonExistsRequest:
CopyNRelFileInfoToBufTag(tag, ((NeonExistsRequest *) req)->rinfo);
break;
case T_NeonNblocksRequest:
CopyNRelFileInfoToBufTag(tag, ((NeonNblocksRequest *) req)->rinfo);
break;
case T_NeonDbSizeRequest:
NInfoGetDbOid(BufTagGetNRelFileInfo(tag)) = ((NeonDbSizeRequest *) req)->dbNode;
break;
case T_NeonGetPageRequest:
CopyNRelFileInfoToBufTag(tag, ((NeonGetPageRequest *) req)->rinfo);
tag.blockNum = ((NeonGetPageRequest *) req)->blkno;
break;
default:
elog(ERROR, "Unexpected request tag: %d", ((NeonRequest *) req)->tag);
}
shard_no = get_shard_number(&tag);
/*
* TODO: temporary workarround - we stream all WAL only to shard 0, so metadata and forks other than main
* should be requested from shard 0. We still need to call get_shard_no() to check if shard map is up-to-date
*/
if (((NeonRequest *) req)->tag != T_NeonGetPageRequest || ((NeonGetPageRequest *) req)->forknum != MAIN_FORKNUM)
{
shard_no = 0;
}
do {
while (!page_server->send((NeonRequest *) req) || !page_server->flush());
MyPState->ring_flush = MyPState->ring_unused;
while (!page_server->send(shard_no, (NeonRequest *) req) || !page_server->flush(shard_no));
consume_prefetch_responses();
resp = page_server->receive();
resp = page_server->receive(shard_no);
} while (resp == NULL);
return resp;

View File

@@ -456,6 +456,7 @@ class NeonEnvBuilder:
self.initial_tenant = initial_tenant or TenantId.generate()
self.initial_timeline = initial_timeline or TimelineId.generate()
self.enable_generations = False
self.initial_shard_count = 1
self.scrub_on_exit = False
self.test_output_dir = test_output_dir
@@ -497,7 +498,10 @@ class NeonEnvBuilder:
f"Services started, creating initial tenant {env.initial_tenant} and its initial timeline"
)
initial_tenant, initial_timeline = env.neon_cli.create_tenant(
tenant_id=env.initial_tenant, conf=initial_tenant_conf, timeline_id=env.initial_timeline
tenant_id=env.initial_tenant,
conf=initial_tenant_conf,
timeline_id=env.initial_timeline,
shard_count=self.initial_shard_count,
)
assert env.initial_tenant == initial_tenant
assert env.initial_timeline == initial_timeline
@@ -1144,6 +1148,7 @@ class NeonCli(AbstractNeonCli):
timeline_id: Optional[TimelineId] = None,
conf: Optional[Dict[str, str]] = None,
set_default: bool = False,
shard_count: int = 1,
) -> Tuple[TenantId, TimelineId]:
"""
Creates a new tenant, returns its id and its initial timeline's id.
@@ -1160,6 +1165,8 @@ class NeonCli(AbstractNeonCli):
str(timeline_id),
"--pg-version",
self.env.pg_version,
"--shard-count",
str(shard_count),
]
if conf is not None:
args.extend(
@@ -1382,7 +1389,7 @@ class NeonCli(AbstractNeonCli):
tenant_id: Optional[TenantId] = None,
hot_standby: bool = False,
lsn: Optional[Lsn] = None,
pageserver_id: Optional[int] = None,
pageserver_ids: Optional[list[int]] = None,
) -> "subprocess.CompletedProcess[str]":
args = [
"endpoint",
@@ -1404,8 +1411,10 @@ class NeonCli(AbstractNeonCli):
args.append(endpoint_id)
if hot_standby:
args.extend(["--hot-standby", "true"])
if pageserver_id is not None:
args.extend(["--pageserver-id", str(pageserver_id)])
if pageserver_ids is not None:
args.extend(["--pageserver-id", ",".join(str(i) for i in pageserver_ids)])
log.info(f"endpoint_create: {args}")
res = self.raw_cli(args)
res.check_returncode()
@@ -2451,7 +2460,7 @@ class Endpoint(PgProtocol):
hot_standby: bool = False,
lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None,
pageserver_id: Optional[int] = None,
pageserver_ids: Optional[list[int]] = None,
) -> "Endpoint":
"""
Create a new Postgres endpoint.
@@ -2473,7 +2482,7 @@ class Endpoint(PgProtocol):
hot_standby=hot_standby,
pg_port=self.pg_port,
http_port=self.http_port,
pageserver_id=pageserver_id,
pageserver_ids=pageserver_ids,
)
path = Path("endpoints") / self.endpoint_id / "pgdata"
self.pgdata_dir = os.path.join(self.env.repo_dir, path)
@@ -2609,7 +2618,7 @@ class Endpoint(PgProtocol):
lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None,
remote_ext_config: Optional[str] = None,
pageserver_id: Optional[int] = None,
pageserver_ids: Optional[list[int]] = None,
) -> "Endpoint":
"""
Create an endpoint, apply config, and start Postgres.
@@ -2624,7 +2633,7 @@ class Endpoint(PgProtocol):
config_lines=config_lines,
hot_standby=hot_standby,
lsn=lsn,
pageserver_id=pageserver_id,
pageserver_ids=pageserver_ids,
).start(remote_ext_config=remote_ext_config)
log.info(f"Postgres startup took {time.time() - started_at} seconds")
@@ -2660,7 +2669,7 @@ class EndpointFactory:
hot_standby: bool = False,
config_lines: Optional[List[str]] = None,
remote_ext_config: Optional[str] = None,
pageserver_id: Optional[int] = None,
pageserver_ids: Optional[list[int]] = None,
) -> Endpoint:
ep = Endpoint(
self.env,
@@ -2678,7 +2687,7 @@ class EndpointFactory:
config_lines=config_lines,
lsn=lsn,
remote_ext_config=remote_ext_config,
pageserver_id=pageserver_id,
pageserver_ids=pageserver_ids,
)
def create(
@@ -3162,7 +3171,15 @@ def check_restored_datadir_content(
cur.execute("CHECKPOINT")
# wait for pageserver to catch up
wait_for_last_flush_lsn(env, endpoint, endpoint.tenant_id, timeline_id)
if len(env.pageservers) > 1:
# FIXME: wait_for_last_flush_lsn needs teaching about sharding: it tries to query
# LSNs for shard-naive TenantId
return
for pageserver in env.pageservers:
wait_for_last_flush_lsn(
env, endpoint, endpoint.tenant_id, timeline_id, pageserver_id=pageserver.id
)
# stop postgres to ensure that files won't change
endpoint.stop()

View File

@@ -3,24 +3,38 @@
#
from pathlib import Path
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
import pytest
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
check_restored_datadir_content,
)
# Run the main PostgreSQL regression tests, in src/test/regress.
#
@pytest.mark.parametrize("shard_count", [2])
def test_pg_regress(
neon_simple_env: NeonEnv,
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
pg_bin,
capsys,
base_dir: Path,
pg_distrib_dir: Path,
shard_count: int,
):
env = neon_simple_env
neon_env_builder.enable_generations = True
neon_env_builder.initial_shard_count = shard_count
neon_env_builder.num_pageservers = shard_count
env = neon_env_builder.init_start()
for pageserver in env.pageservers:
# FIXME: attachment_service is not yet sharding aware, so generation validation is broken.
pageserver.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
endpoint = env.endpoints.create_start("main", pageserver_ids=[p.id for p in env.pageservers])
env.neon_cli.create_branch("test_pg_regress", "empty")
# Connect to postgres and create a database called "regression".
endpoint = env.endpoints.create_start("test_pg_regress")
endpoint.safe_psql("CREATE DATABASE regression")
# Create some local directories for pg_regress to run in.

View File

@@ -0,0 +1,23 @@
from fixtures.neon_fixtures import NeonEnvBuilder
import pytest
@pytest.mark.parametrize("shard_count", [2])
@pytest.mark.timeout(1000)
def test_sharding(neon_env_builder: NeonEnvBuilder, shard_count: int):
neon_env_builder.enable_generations = True
neon_env_builder.initial_shard_count = shard_count
neon_env_builder.num_pageservers = shard_count
env = neon_env_builder.init_start()
for pageserver in env.pageservers:
# FIXME: attachment_service is not yet sharding aware, so generation validation is broken.
pageserver.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
pageserver.allowed_errors.append(".*Dropping stale deletions for tenant.*")
endpoint = env.endpoints.create_start("main", pageserver_ids=[p.id for p in env.pageservers])
with endpoint.cursor() as cur:
cur.execute("SET statement_timeout=0") # disable statement timeout
cur.execute("create table t(t bigint, payload text default repeat('?',200))")
cur.execute("insert into t values(generate_series(1,10000000))")
cur.execute("select count(*) from t")
assert cur.fetchone()[0] == 10000000