Compare commits

...

23 Commits

Author SHA1 Message Date
Arthur Petukhovsky
e70d486281 Add safekeeper protocol to send decoded records 2023-11-09 19:33:41 +00:00
John Spray
56630b0eda Merge remote-tracking branch 'upstream/compute_sharding_support' into jcsp/sharding-pt1 2023-11-09 16:43:19 +00:00
John Spray
27e9cb91ed Revert "extend test_change_pageserver for failure case, rework changing pageserver (#5693)"
This reverts commit b989ad1922.
2023-11-09 16:43:10 +00:00
John Spray
733877a8ff shard.rs: update hashing 2023-11-09 16:39:45 +00:00
John Spray
93c52b5763 fixup inspect 2023-11-09 16:26:16 +00:00
John Spray
f54cb63eb4 pageserver: omit consumption metrics from shard != 0 2023-11-09 16:25:49 +00:00
John Spray
8e2fee7d06 DNM verbose WAL ingest logging 2023-11-09 16:24:29 +00:00
John Spray
fbbad434a3 pageserver: hacky version of putting shard into remote storage paths 2023-11-09 16:24:28 +00:00
John Spray
8f27c57748 neon_local: create tenants with multiple shards 2023-11-09 16:23:21 +00:00
John Spray
6662c8f1ed WIP neon_local 2023-11-09 16:23:21 +00:00
John Spray
61244afb59 pageserver: filter WAL by ShardIdentity 2023-11-09 16:23:21 +00:00
John Spray
e20732fdcb Thread ShardIdentity through to Timeline 2023-11-09 16:23:21 +00:00
John Spray
feae5f716f pageserver: carry a ShardIdentity in LocationConf 2023-11-09 16:23:21 +00:00
John Spray
ae19f28f59 Add sharding types 2023-11-09 16:23:21 +00:00
John Spray
22a848cf2b control_plane: use pageserver/ sub path for local_fs 2023-11-09 16:23:21 +00:00
John Spray
360ca01952 tests: explicitly disable remote storage if it is None 2023-11-09 16:23:21 +00:00
John Spray
bf059935a0 Use the same local_fs path as tests 2023-11-09 16:23:21 +00:00
John Spray
71bf90548d neon_local: don't use default remote storage if user specified it on CLI 2023-11-09 16:23:21 +00:00
John Spray
e368705692 control_plane: implement cargo neon migrate 2023-11-09 16:23:21 +00:00
John Spray
e94b9e9ce8 control_plane: add /inspect to attachment service 2023-11-09 16:23:21 +00:00
John Spray
57cbc20dce neon_local: remote storage by default & enable multiple pageservers in
init
2023-11-09 16:23:21 +00:00
Konstantin Knizhnik
0aeba9fc4c Take in account stripe size when calculating shard hash number 2023-11-09 10:14:55 +02:00
Konstantin Knizhnik
6e0055b9f6 Add support for PS shardoing in compute 2023-11-09 08:07:35 +02:00
39 changed files with 1763 additions and 631 deletions

2
Cargo.lock generated
View File

@@ -3303,6 +3303,7 @@ dependencies = [
"serde_with",
"strum",
"strum_macros",
"url",
"utils",
"workspace_hack",
]
@@ -4465,6 +4466,7 @@ dependencies = [
"hyper",
"metrics",
"once_cell",
"pageserver_api",
"parking_lot 0.12.1",
"postgres",
"postgres-protocol",

View File

@@ -710,12 +710,8 @@ impl ComputeNode {
// `pg_ctl` for start / stop, so this just seems much easier to do as we already
// have opened connection to Postgres and superuser access.
#[instrument(skip_all)]
fn pg_reload_conf(&self) -> Result<()> {
let pgctl_bin = Path::new(&self.pgbin).parent().unwrap().join("pg_ctl");
Command::new(pgctl_bin)
.args(["reload", "-D", &self.pgdata])
.output()
.expect("cannot run pg_ctl process");
fn pg_reload_conf(&self, client: &mut Client) -> Result<()> {
client.simple_query("SELECT pg_reload_conf()")?;
Ok(())
}
@@ -728,9 +724,9 @@ impl ComputeNode {
// Write new config
let pgdata_path = Path::new(&self.pgdata);
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec, None)?;
self.pg_reload_conf()?;
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
self.pg_reload_conf(&mut client)?;
// Proceed with post-startup configuration. Note, that order of operations is important.
// Disable DDL forwarding because control plane already knows about these roles/databases.

View File

@@ -24,6 +24,16 @@ pub struct AttachHookResponse {
pub gen: Option<u32>,
}
#[derive(Serialize, Deserialize)]
pub struct InspectRequest {
pub tenant_id: TenantId,
}
#[derive(Serialize, Deserialize)]
pub struct InspectResponse {
pub attachment: Option<(u32, NodeId)>,
}
impl AttachmentService {
pub fn from_env(env: &LocalEnv) -> Self {
let path = env.base_data_dir.join("attachments.json");
@@ -101,4 +111,29 @@ impl AttachmentService {
let response = response.json::<AttachHookResponse>()?;
Ok(response.gen)
}
pub fn inspect(&self, tenant_id: TenantId) -> anyhow::Result<Option<(u32, NodeId)>> {
use hyper::StatusCode;
let url = self
.env
.control_plane_api
.clone()
.unwrap()
.join("inspect")
.unwrap();
let client = reqwest::blocking::ClientBuilder::new()
.build()
.expect("Failed to construct http client");
let request = InspectRequest { tenant_id };
let response = client.post(url).json(&request).send()?;
if response.status() != StatusCode::OK {
return Err(anyhow!("Unexpected status {}", response.status()));
}
let response = response.json::<InspectResponse>()?;
Ok(response.attachment)
}
}

View File

@@ -32,7 +32,9 @@ use pageserver_api::control_api::{
ValidateResponseTenant,
};
use control_plane::attachment_service::{AttachHookRequest, AttachHookResponse};
use control_plane::attachment_service::{
AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse,
};
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
@@ -255,12 +257,28 @@ async fn handle_attach_hook(mut req: Request<Body>) -> Result<Response<Body>, Ap
)
}
async fn handle_inspect(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
let inspect_req = json_request::<InspectRequest>(&mut req).await?;
let state = get_state(&req).inner.clone();
let locked = state.write().await;
let tenant_state = locked.tenants.get(&inspect_req.tenant_id);
json_response(
StatusCode::OK,
InspectResponse {
attachment: tenant_state.and_then(|s| s.pageserver.map(|ps| (s.generation, ps))),
},
)
}
fn make_router(persistent_state: PersistentState) -> RouterBuilder<hyper::Body, ApiError> {
endpoint::make_router()
.data(Arc::new(State::new(persistent_state)))
.post("/re-attach", |r| request_span(r, handle_re_attach))
.post("/validate", |r| request_span(r, handle_validate))
.post("/attach-hook", |r| request_span(r, handle_attach_hook))
.post("/inspect", |r| request_span(r, handle_inspect))
}
#[tokio::main]

View File

@@ -11,13 +11,14 @@ use compute_api::spec::ComputeMode;
use control_plane::attachment_service::AttachmentService;
use control_plane::endpoint::ComputeControlPlane;
use control_plane::local_env::LocalEnv;
use control_plane::pageserver::PageServerNode;
use control_plane::pageserver::{PageServerNode, PAGESERVER_REMOTE_STORAGE_DIR};
use control_plane::safekeeper::SafekeeperNode;
use control_plane::tenant_migration::migrate_tenant;
use control_plane::{broker, local_env};
use pageserver_api::models::TimelineInfo;
use pageserver_api::models::{LocationConfig, LocationConfigMode, TimelineInfo};
use pageserver_api::{
DEFAULT_HTTP_LISTEN_ADDR as DEFAULT_PAGESERVER_HTTP_ADDR,
DEFAULT_PG_LISTEN_ADDR as DEFAULT_PAGESERVER_PG_ADDR,
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
};
use postgres_backend::AuthType;
use safekeeper_api::{
@@ -29,6 +30,7 @@ use std::path::PathBuf;
use std::process::exit;
use std::str::FromStr;
use storage_broker::DEFAULT_LISTEN_ADDR as DEFAULT_BROKER_ADDR;
use utils::generation::Generation;
use utils::{
auth::{Claims, Scope},
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
@@ -46,8 +48,8 @@ const DEFAULT_PG_VERSION: &str = "15";
const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/";
fn default_conf() -> String {
format!(
fn default_conf(num_pageservers: u16) -> String {
let mut template = format!(
r#"
# Default built-in configuration, defined in main.rs
control_plane_api = '{DEFAULT_PAGESERVER_CONTROL_PLANE_API}'
@@ -55,21 +57,33 @@ control_plane_api = '{DEFAULT_PAGESERVER_CONTROL_PLANE_API}'
[broker]
listen_addr = '{DEFAULT_BROKER_ADDR}'
[[pageservers]]
id = {DEFAULT_PAGESERVER_ID}
listen_pg_addr = '{DEFAULT_PAGESERVER_PG_ADDR}'
listen_http_addr = '{DEFAULT_PAGESERVER_HTTP_ADDR}'
pg_auth_type = '{trust_auth}'
http_auth_type = '{trust_auth}'
[[safekeepers]]
id = {DEFAULT_SAFEKEEPER_ID}
pg_port = {DEFAULT_SAFEKEEPER_PG_PORT}
http_port = {DEFAULT_SAFEKEEPER_HTTP_PORT}
"#,
trust_auth = AuthType::Trust,
)
);
for i in 0..num_pageservers {
let pageserver_id = NodeId(DEFAULT_PAGESERVER_ID.0 + i as u64);
let pg_port = DEFAULT_PAGESERVER_PG_PORT + i;
let http_port = DEFAULT_PAGESERVER_HTTP_PORT + i;
template += &format!(
r#"
[[pageservers]]
id = {pageserver_id}
listen_pg_addr = '127.0.0.1:{pg_port}'
listen_http_addr = '127.0.0.1:{http_port}'
pg_auth_type = '{trust_auth}'
http_auth_type = '{trust_auth}'
"#,
trust_auth = AuthType::Trust,
)
}
template
}
///
@@ -295,6 +309,9 @@ fn parse_timeline_id(sub_match: &ArgMatches) -> anyhow::Result<Option<TimelineId
}
fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
let num_pageservers = init_match
.get_one::<u16>("num-pageservers")
.expect("num-pageservers arg has a default");
// Create config file
let toml_file: String = if let Some(config_path) = init_match.get_one::<PathBuf>("config") {
// load and parse the file
@@ -306,7 +323,7 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
})?
} else {
// Built-in default config
default_conf()
default_conf(*num_pageservers)
};
let pg_version = init_match
@@ -320,6 +337,9 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
env.init(pg_version, force)
.context("Failed to initialize neon repository")?;
// Create remote storage location for default LocalFs remote storage
std::fs::create_dir_all(env.base_data_dir.join(PAGESERVER_REMOTE_STORAGE_DIR))?;
// Initialize pageserver, create initial tenant and timeline.
for ps_conf in &env.pageservers {
PageServerNode::from_env(&env, ps_conf)
@@ -355,9 +375,10 @@ fn pageserver_config_overrides(init_match: &ArgMatches) -> Vec<&str> {
}
fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> anyhow::Result<()> {
let pageserver = get_default_pageserver(env);
match tenant_match.subcommand() {
Some(("list", _)) => {
// TODO: make command aware of multiple pageservers
let pageserver = get_default_pageserver(env);
for t in pageserver.tenant_list()? {
println!("{} {:?}", t.id, t.state);
}
@@ -368,37 +389,73 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
.map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
.unwrap_or_default();
let shard_count: u8 = create_match
.get_one::<u8>("shard-count")
.cloned()
.unwrap_or(1);
// If tenant ID was not specified, generate one
let tenant_id = parse_tenant_id(create_match)?.unwrap_or_else(TenantId::generate);
let generation = if env.control_plane_api.is_some() {
// We must register the tenant with the attachment service, so
// that when the pageserver restarts, it will be re-attached.
let attachment_service = AttachmentService::from_env(env);
attachment_service.attach_hook(tenant_id, pageserver.conf.id)?
} else {
None
};
pageserver.tenant_create(tenant_id, generation, tenant_conf)?;
println!("tenant {tenant_id} successfully created on the pageserver");
// Create an initial timeline for the new tenant
let new_timeline_id = parse_timeline_id(create_match)?;
// We will create an initial timeline for the new tenant
let new_timeline_id =
parse_timeline_id(create_match)?.unwrap_or(TimelineId::generate());
let pg_version = create_match
.get_one::<u32>("pg-version")
.copied()
.context("Failed to parse postgres version from the argument string")?;
let timeline_info = pageserver.timeline_create(
tenant_id,
new_timeline_id,
None,
None,
Some(pg_version),
)?;
let new_timeline_id = timeline_info.timeline_id;
let last_record_lsn = timeline_info.last_record_lsn;
// TODO: implement ability for one pageserver to hold multiple
// shards for the same tenant. Until then, we must place each
// shard on a different pageserver.
assert!(env.pageservers.len() >= shard_count as usize);
for shard_number in 0..shard_count {
let ps_conf = env.pageservers.get(shard_number as usize).unwrap();
let pageserver = PageServerNode::from_env(env, ps_conf);
// TODO: per-shard generations
let generation = if env.control_plane_api.is_some() {
// We must register the tenant with the attachment service, so
// that when the pageserver restarts, it will be re-attached.
let attachment_service = AttachmentService::from_env(env);
attachment_service.attach_hook(tenant_id, pageserver.conf.id)?
} else {
None
};
// TODO: shard-aware POST /v1/tenant. Currently tenant creation on the
// pageserver is a no-op, but we shouldn't skip the command entirely.
let tenant_conf = PageServerNode::build_config(tenant_conf.clone())?;
let location_conf = LocationConfig {
shard_count,
shard_number,
shard_stripe_size: 32000,
mode: LocationConfigMode::AttachedSingle,
generation: generation.map(Generation::new),
secondary_conf: None,
tenant_conf,
};
pageserver.location_config(tenant_id, location_conf)?;
println!(
"tenant {tenant_id} successfully created on pageserver {}",
pageserver.conf.id
);
}
for shard_number in 0..shard_count {
let ps_conf = env.pageservers.get(shard_number as usize).unwrap();
let pageserver = PageServerNode::from_env(env, ps_conf);
pageserver.timeline_create(
tenant_id,
Some(new_timeline_id),
None,
None,
Some(pg_version),
)?;
}
env.register_branch_mapping(
DEFAULT_BRANCH_NAME.to_string(),
@@ -406,9 +463,7 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
new_timeline_id,
)?;
println!(
"Created an initial timeline '{new_timeline_id}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
);
println!("Created an initial timeline '{new_timeline_id}' for tenant: {tenant_id}",);
if create_match.get_flag("set-default") {
println!("Setting tenant {tenant_id} as a default one");
@@ -428,11 +483,22 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
.map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
.unwrap_or_default();
// TODO: make command aware of multiple pageservers
let pageserver = get_default_pageserver(env);
pageserver
.tenant_config(tenant_id, tenant_conf)
.with_context(|| format!("Tenant config failed for tenant with id {tenant_id}"))?;
println!("tenant {tenant_id} successfully configured on the pageserver");
}
Some(("migrate", matches)) => {
let tenant_id = get_tenant_id(matches, env)?;
let new_pageserver = get_pageserver(env, matches)?;
let new_pageserver_id = new_pageserver.conf.id;
migrate_tenant(env, tenant_id, new_pageserver)?;
println!("tenant {tenant_id} migrated to {}", new_pageserver_id);
}
Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name),
None => bail!("no tenant subcommand provided"),
}
@@ -867,20 +933,20 @@ fn handle_mappings(sub_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Res
}
}
fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result<PageServerNode> {
let node_id = if let Some(id_str) = args.get_one::<String>("pageserver-id") {
NodeId(id_str.parse().context("while parsing pageserver id")?)
} else {
DEFAULT_PAGESERVER_ID
};
Ok(PageServerNode::from_env(
env,
env.get_pageserver_conf(node_id)?,
))
}
fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result<PageServerNode> {
let node_id = if let Some(id_str) = args.get_one::<String>("pageserver-id") {
NodeId(id_str.parse().context("while parsing pageserver id")?)
} else {
DEFAULT_PAGESERVER_ID
};
Ok(PageServerNode::from_env(
env,
env.get_pageserver_conf(node_id)?,
))
}
match sub_match.subcommand() {
Some(("start", subcommand_args)) => {
if let Err(e) = get_pageserver(env, subcommand_args)?
@@ -917,6 +983,20 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
}
}
Some(("migrate", subcommand_args)) => {
let pageserver = get_pageserver(env, subcommand_args)?;
//TODO what shutdown strategy should we use here?
if let Err(e) = pageserver.stop(false) {
eprintln!("pageserver stop failed: {}", e);
exit(1);
}
if let Err(e) = pageserver.start(&pageserver_config_overrides(subcommand_args)) {
eprintln!("pageserver start failed: {e}");
exit(1);
}
}
Some(("status", subcommand_args)) => {
match get_pageserver(env, subcommand_args)?.check_status() {
Ok(_) => println!("Page server is up and running"),
@@ -1224,6 +1304,13 @@ fn cli() -> Command {
.help("Force initialization even if the repository is not empty")
.required(false);
let num_pageservers_arg = Arg::new("num-pageservers")
.value_parser(value_parser!(u16))
.long("num-pageservers")
.help("How many pageservers to create (default 1)")
.required(false)
.default_value("1");
Command::new("Neon CLI")
.arg_required_else_help(true)
.version(GIT_VERSION)
@@ -1231,6 +1318,7 @@ fn cli() -> Command {
Command::new("init")
.about("Initialize a new Neon repository, preparing configs for services to start with")
.arg(pageserver_config_args.clone())
.arg(num_pageservers_arg.clone())
.arg(
Arg::new("config")
.long("config")
@@ -1295,12 +1383,17 @@ fn cli() -> Command {
.arg(pg_version_arg.clone())
.arg(Arg::new("set-default").long("set-default").action(ArgAction::SetTrue).required(false)
.help("Use this tenant in future CLI commands where tenant_id is needed, but not specified"))
.arg(Arg::new("shard-count").value_parser(value_parser!(u8)).long("shard-count").action(ArgAction::Set).help("Number of shards in the new tenant (default 1)"))
)
.subcommand(Command::new("set-default").arg(tenant_id_arg.clone().required(true))
.about("Set a particular tenant as default in future CLI commands where tenant_id is needed, but not specified"))
.subcommand(Command::new("config")
.arg(tenant_id_arg.clone())
.arg(Arg::new("config").short('c').num_args(1).action(ArgAction::Append).required(false)))
.subcommand(Command::new("migrate")
.about("Migrate a tenant from one pageserver to another")
.arg(tenant_id_arg.clone())
.arg(pageserver_id_arg.clone()))
)
.subcommand(
Command::new("pageserver")

View File

@@ -14,3 +14,4 @@ pub mod local_env;
pub mod pageserver;
pub mod postgresql_conf;
pub mod safekeeper;
pub mod tenant_migration;

View File

@@ -15,7 +15,9 @@ use std::{io, result};
use anyhow::{bail, Context};
use camino::Utf8PathBuf;
use pageserver_api::models::{self, TenantInfo, TimelineInfo};
use pageserver_api::models::{
self, LocationConfig, TenantInfo, TenantLocationConfigRequest, TimelineInfo,
};
use postgres_backend::AuthType;
use postgres_connection::{parse_host_port, PgConnectionConfig};
use reqwest::blocking::{Client, RequestBuilder, Response};
@@ -31,6 +33,9 @@ use utils::{
use crate::local_env::PageServerConf;
use crate::{background_process, local_env::LocalEnv};
/// Directory within .neon which will be used by default for LocalFs remote storage.
pub const PAGESERVER_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/pageserver";
#[derive(Error, Debug)]
pub enum PageserverHttpError {
#[error("Reqwest error: {0}")]
@@ -98,8 +103,10 @@ impl PageServerNode {
}
}
// pageserver conf overrides defined by neon_local configuration.
fn neon_local_overrides(&self) -> Vec<String> {
/// Merge overrides provided by the user on the command line with our default overides derived from neon_local configuration.
///
/// These all end up on the command line of the `pageserver` binary.
fn neon_local_overrides(&self, cli_overrides: &[&str]) -> Vec<String> {
let id = format!("id={}", self.conf.id);
// FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc.
let pg_distrib_dir_param = format!(
@@ -132,12 +139,25 @@ impl PageServerNode {
));
}
if !cli_overrides
.iter()
.any(|c| c.starts_with("remote_storage"))
{
overrides.push(format!(
"remote_storage={{local_path='../{PAGESERVER_REMOTE_STORAGE_DIR}'}}"
));
}
if self.conf.http_auth_type != AuthType::Trust || self.conf.pg_auth_type != AuthType::Trust
{
// Keys are generated in the toplevel repo dir, pageservers' workdirs
// are one level below that, so refer to keys with ../
overrides.push("auth_validation_public_key_path='../auth_public_key.pem'".to_owned());
}
// Apply the user-provided overrides
overrides.extend(cli_overrides.iter().map(|&c| c.to_owned()));
overrides
}
@@ -203,9 +223,6 @@ impl PageServerNode {
}
fn start_node(&self, config_overrides: &[&str], update_config: bool) -> anyhow::Result<Child> {
let mut overrides = self.neon_local_overrides();
overrides.extend(config_overrides.iter().map(|&c| c.to_owned()));
let datadir = self.repo_path();
print!(
"Starting pageserver node {} at '{}' in {:?}",
@@ -248,8 +265,7 @@ impl PageServerNode {
) -> Vec<Cow<'a, str>> {
let mut args = vec![Cow::Borrowed("-D"), Cow::Borrowed(datadir_path_str)];
let mut overrides = self.neon_local_overrides();
overrides.extend(config_overrides.iter().map(|&c| c.to_owned()));
let overrides = self.neon_local_overrides(config_overrides);
for config_override in overrides {
args.push(Cow::Borrowed("-c"));
args.push(Cow::Owned(config_override));
@@ -322,15 +338,8 @@ impl PageServerNode {
.json()?)
}
pub fn tenant_create(
&self,
new_tenant_id: TenantId,
generation: Option<u32>,
settings: HashMap<&str, &str>,
) -> anyhow::Result<TenantId> {
let mut settings = settings.clone();
let config = models::TenantConfig {
pub fn build_config(mut settings: HashMap<&str, &str>) -> anyhow::Result<models::TenantConfig> {
Ok(models::TenantConfig {
checkpoint_distance: settings
.remove("checkpoint_distance")
.map(|x| x.parse::<u64>())
@@ -389,8 +398,16 @@ impl PageServerNode {
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'gc_feedback' as bool")?,
};
})
}
pub fn tenant_create(
&self,
new_tenant_id: TenantId,
generation: Option<u32>,
settings: HashMap<&str, &str>,
) -> anyhow::Result<TenantId> {
let config = Self::build_config(settings.clone())?;
let request = models::TenantCreateRequest {
new_tenant_id,
generation,
@@ -501,6 +518,27 @@ impl PageServerNode {
Ok(())
}
pub fn location_config(
&self,
tenant_id: TenantId,
config: LocationConfig,
) -> anyhow::Result<()> {
let req_body = TenantLocationConfigRequest { tenant_id, config };
self.http_request(
Method::PUT,
format!(
"{}/tenant/{}/location_config",
self.http_base_url, tenant_id
),
)?
.json(&req_body)
.send()?
.error_from_body()?;
Ok(())
}
pub fn timeline_list(&self, tenant_id: &TenantId) -> anyhow::Result<Vec<TimelineInfo>> {
let timeline_infos: Vec<TimelineInfo> = self
.http_request(

View File

@@ -0,0 +1,217 @@
//!
//! Functionality for migrating tenants across pageservers: unlike most of neon_local, this code
//! isn't scoped to a particular physical service, as it needs to update compute endpoints to
//! point to the new pageserver.
//!
use crate::local_env::LocalEnv;
use crate::{
attachment_service::AttachmentService, endpoint::ComputeControlPlane,
pageserver::PageServerNode,
};
use pageserver_api::models::{
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
};
use std::collections::HashMap;
use std::time::Duration;
use utils::{
generation::Generation,
id::{TenantId, TimelineId},
lsn::Lsn,
};
/// Given an attached pageserver, retrieve the LSN for all timelines
fn get_lsns(
tenant_id: TenantId,
pageserver: &PageServerNode,
) -> anyhow::Result<HashMap<TimelineId, Lsn>> {
let timelines = pageserver.timeline_list(&tenant_id)?;
Ok(timelines
.into_iter()
.map(|t| (t.timeline_id, t.last_record_lsn))
.collect())
}
/// Wait for the timeline LSNs on `pageserver` to catch up with or overtake
/// `baseline`.
fn await_lsn(
tenant_id: TenantId,
pageserver: &PageServerNode,
baseline: HashMap<TimelineId, Lsn>,
) -> anyhow::Result<()> {
loop {
let latest = match get_lsns(tenant_id, pageserver) {
Ok(l) => l,
Err(e) => {
println!(
"🕑 Can't get LSNs on pageserver {} yet, waiting ({e})",
pageserver.conf.id
);
std::thread::sleep(Duration::from_millis(500));
continue;
}
};
let mut any_behind: bool = false;
for (timeline_id, baseline_lsn) in &baseline {
match latest.get(timeline_id) {
Some(latest_lsn) => {
println!("🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}");
if latest_lsn < baseline_lsn {
any_behind = true;
}
}
None => {
// Expected timeline isn't yet visible on migration destination.
// (IRL we would have to account for timeline deletion, but this
// is just test helper)
any_behind = true;
}
}
}
if !any_behind {
println!("✅ LSN caught up. Proceeding...");
break;
} else {
std::thread::sleep(Duration::from_millis(500));
}
}
Ok(())
}
/// This function spans multiple services, to demonstrate live migration of a tenant
/// between pageservers:
/// - Coordinate attach/secondary/detach on pageservers
/// - call into attachment_service for generations
/// - reconfigure compute endpoints to point to new attached pageserver
pub fn migrate_tenant(
env: &LocalEnv,
tenant_id: TenantId,
dest_ps: PageServerNode,
) -> anyhow::Result<()> {
// Get a new generation
let attachment_service = AttachmentService::from_env(env);
let previous = attachment_service.inspect(tenant_id)?;
let mut baseline_lsns = None;
if let Some((generation, origin_ps_id)) = &previous {
let origin_ps = PageServerNode::from_env(env, env.get_pageserver_conf(*origin_ps_id)?);
if origin_ps_id == &dest_ps.conf.id {
println!("🔁 Already attached to {origin_ps_id}, freshening...");
let gen = attachment_service.attach_hook(tenant_id, dest_ps.conf.id)?;
let dest_conf = LocationConfig {
shard_count: 0,
shard_number: 0,
shard_stripe_size: 0,
mode: LocationConfigMode::AttachedSingle,
generation: gen.map(Generation::new),
secondary_conf: None,
tenant_conf: TenantConfig::default(),
};
dest_ps.location_config(tenant_id, dest_conf)?;
println!("✅ Migration complete");
return Ok(());
}
println!("🔁 Switching origin pageserver {origin_ps_id} to stale mode");
let stale_conf = LocationConfig {
shard_count: 0,
shard_number: 0,
shard_stripe_size: 0,
mode: LocationConfigMode::AttachedStale,
generation: Some(Generation::new(*generation)),
secondary_conf: None,
tenant_conf: TenantConfig::default(),
};
origin_ps.location_config(tenant_id, stale_conf)?;
baseline_lsns = Some(get_lsns(tenant_id, &origin_ps)?);
}
let gen = attachment_service.attach_hook(tenant_id, dest_ps.conf.id)?;
let dest_conf = LocationConfig {
shard_count: 0,
shard_number: 0,
shard_stripe_size: 0,
mode: LocationConfigMode::AttachedMulti,
generation: gen.map(Generation::new),
secondary_conf: None,
tenant_conf: TenantConfig::default(),
};
println!("🔁 Attaching to pageserver {}", dest_ps.conf.id);
dest_ps.location_config(tenant_id, dest_conf)?;
if let Some(baseline) = baseline_lsns {
println!("🕑 Waiting for LSN to catch up...");
await_lsn(tenant_id, &dest_ps, baseline)?;
}
let cplane = ComputeControlPlane::load(env.clone())?;
for (endpoint_name, endpoint) in &cplane.endpoints {
if endpoint.tenant_id == tenant_id {
println!(
"🔁 Reconfiguring endpoint {} to use pageserver {}",
endpoint_name, dest_ps.conf.id
);
endpoint.reconfigure(Some(dest_ps.conf.id))?;
}
}
for other_ps_conf in &env.pageservers {
if other_ps_conf.id == dest_ps.conf.id {
continue;
}
let other_ps = PageServerNode::from_env(env, other_ps_conf);
let other_ps_tenants = other_ps.tenant_list()?;
// Check if this tenant is attached
let found = other_ps_tenants
.into_iter()
.map(|t| t.id)
.any(|i| i == tenant_id);
if !found {
continue;
}
// Downgrade to a secondary location
let secondary_conf = LocationConfig {
shard_count: 0,
shard_number: 0,
shard_stripe_size: 0,
mode: LocationConfigMode::Secondary,
generation: None,
secondary_conf: Some(LocationConfigSecondary { warm: true }),
tenant_conf: TenantConfig::default(),
};
println!(
"💤 Switching to secondary mode on pageserver {}",
other_ps.conf.id
);
other_ps.location_config(tenant_id, secondary_conf)?;
}
println!(
"🔁 Switching to AttachedSingle mode on pageserver {}",
dest_ps.conf.id
);
let dest_conf = LocationConfig {
shard_count: 0,
shard_number: 0,
shard_stripe_size: 0,
mode: LocationConfigMode::AttachedSingle,
generation: gen.map(Generation::new),
secondary_conf: None,
tenant_conf: TenantConfig::default(),
};
dest_ps.location_config(tenant_id, dest_conf)?;
println!("✅ Migration complete");
Ok(())
}

View File

@@ -17,5 +17,6 @@ postgres_ffi.workspace = true
enum-map.workspace = true
strum.workspace = true
strum_macros.workspace = true
url.workspace = true
workspace_hack.workspace = true
workspace_hack.workspace = true

View File

@@ -0,0 +1,142 @@
use anyhow::{bail, Result};
use byteorder::{ByteOrder, BE};
use serde::{Deserialize, Serialize};
use std::fmt;
/// Key used in the Repository kv-store.
///
/// The Repository treats this as an opaque struct, but see the code in pgdatadir_mapping.rs
/// for what we actually store in these fields.
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
pub struct Key {
pub field1: u8,
pub field2: u32,
pub field3: u32,
pub field4: u32,
pub field5: u8,
pub field6: u32,
}
pub const KEY_SIZE: usize = 18;
impl Key {
/// 'field2' is used to store tablespaceid for relations and small enum numbers for other relish.
/// As long as Neon does not support tablespace (because of lack of access to local file system),
/// we can assume that only some predefined namespace OIDs are used which can fit in u16
pub fn to_i128(&self) -> i128 {
assert!(self.field2 < 0xFFFF || self.field2 == 0xFFFFFFFF || self.field2 == 0x22222222);
(((self.field1 & 0xf) as i128) << 120)
| (((self.field2 & 0xFFFF) as i128) << 104)
| ((self.field3 as i128) << 72)
| ((self.field4 as i128) << 40)
| ((self.field5 as i128) << 32)
| self.field6 as i128
}
pub const fn from_i128(x: i128) -> Self {
Key {
field1: ((x >> 120) & 0xf) as u8,
field2: ((x >> 104) & 0xFFFF) as u32,
field3: (x >> 72) as u32,
field4: (x >> 40) as u32,
field5: (x >> 32) as u8,
field6: x as u32,
}
}
pub fn next(&self) -> Key {
self.add(1)
}
pub fn add(&self, x: u32) -> Key {
let mut key = *self;
let r = key.field6.overflowing_add(x);
key.field6 = r.0;
if r.1 {
let r = key.field5.overflowing_add(1);
key.field5 = r.0;
if r.1 {
let r = key.field4.overflowing_add(1);
key.field4 = r.0;
if r.1 {
let r = key.field3.overflowing_add(1);
key.field3 = r.0;
if r.1 {
let r = key.field2.overflowing_add(1);
key.field2 = r.0;
if r.1 {
let r = key.field1.overflowing_add(1);
key.field1 = r.0;
assert!(!r.1);
}
}
}
}
}
key
}
pub fn from_slice(b: &[u8]) -> Self {
Key {
field1: b[0],
field2: u32::from_be_bytes(b[1..5].try_into().unwrap()),
field3: u32::from_be_bytes(b[5..9].try_into().unwrap()),
field4: u32::from_be_bytes(b[9..13].try_into().unwrap()),
field5: b[13],
field6: u32::from_be_bytes(b[14..18].try_into().unwrap()),
}
}
pub fn write_to_byte_slice(&self, buf: &mut [u8]) {
buf[0] = self.field1;
BE::write_u32(&mut buf[1..5], self.field2);
BE::write_u32(&mut buf[5..9], self.field3);
BE::write_u32(&mut buf[9..13], self.field4);
buf[13] = self.field5;
BE::write_u32(&mut buf[14..18], self.field6);
}
}
impl fmt::Display for Key {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{:02X}{:08X}{:08X}{:08X}{:02X}{:08X}",
self.field1, self.field2, self.field3, self.field4, self.field5, self.field6
)
}
}
impl Key {
pub const MIN: Key = Key {
field1: u8::MIN,
field2: u32::MIN,
field3: u32::MIN,
field4: u32::MIN,
field5: u8::MIN,
field6: u32::MIN,
};
pub const MAX: Key = Key {
field1: u8::MAX,
field2: u32::MAX,
field3: u32::MAX,
field4: u32::MAX,
field5: u8::MAX,
field6: u32::MAX,
};
pub fn from_hex(s: &str) -> Result<Self> {
if s.len() != 36 {
bail!("parse error");
}
Ok(Key {
field1: u8::from_str_radix(&s[0..2], 16)?,
field2: u32::from_str_radix(&s[2..10], 16)?,
field3: u32::from_str_radix(&s[10..18], 16)?,
field4: u32::from_str_radix(&s[18..26], 16)?,
field5: u8::from_str_radix(&s[26..28], 16)?,
field6: u32::from_str_radix(&s[28..36], 16)?,
})
}
}

View File

@@ -4,8 +4,10 @@ use const_format::formatcp;
/// Public API types
pub mod control_api;
pub mod key;
pub mod models;
pub mod reltag;
pub mod shard;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");

View File

@@ -259,6 +259,9 @@ pub struct LocationConfigSecondary {
/// for use in external-facing APIs.
#[derive(Serialize, Deserialize, Debug)]
pub struct LocationConfig {
pub shard_number: u8,
pub shard_count: u8,
pub shard_stripe_size: u32,
pub mode: LocationConfigMode,
/// If attaching, in what generation?
#[serde(default)]

View File

@@ -0,0 +1,210 @@
use crate::key::Key;
use serde::{Deserialize, Serialize};
use utils::id::NodeId;
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug)]
pub struct ShardNumber(pub u8);
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug)]
pub struct ShardCount(pub u8);
impl ShardNumber {
fn within_count(&self, rhs: ShardCount) -> bool {
self.0 < rhs.0
}
}
/// Stripe size in number of pages
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
pub struct ShardStripeSize(pub u32);
/// Layout version: for future upgrades where we might change how the key->shard mapping works
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
pub struct ShardLayout(u8);
const LAYOUT_V1: ShardLayout = ShardLayout(1);
/// Default stripe size in pages: 256MiB divided by 8kiB page size.
const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
/// The ShardIdentity contains the information needed for one member of map
/// to resolve a key to a shard, and then check whether that shard is ==self.
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
pub struct ShardIdentity {
pub layout: ShardLayout,
pub number: ShardNumber,
pub count: ShardCount,
pub stripe_size: ShardStripeSize,
}
/// The location of a shard contains both the logical identity of the pageserver
/// holding it (control plane's perspective), and the physical page service port
/// that postgres should use (endpoint's perspective).
#[derive(Clone)]
pub struct ShardLocation {
pub id: NodeId,
pub page_service: (url::Host, u16),
}
/// The ShardMap is sufficient information to map any Key to the page service
/// which should store it.
#[derive(Clone)]
struct ShardMap {
layout: ShardLayout,
count: ShardCount,
stripe_size: ShardStripeSize,
pageservers: Vec<Option<ShardLocation>>,
}
impl ShardMap {
pub fn get_location(&self, shard_number: ShardNumber) -> &Option<ShardLocation> {
assert!(shard_number.within_count(self.count));
self.pageservers.get(shard_number.0 as usize).unwrap()
}
pub fn get_identity(&self, shard_number: ShardNumber) -> ShardIdentity {
assert!(shard_number.within_count(self.count));
ShardIdentity {
layout: self.layout,
number: shard_number,
count: self.count,
stripe_size: self.stripe_size,
}
}
/// Return Some if the key is assigned to a particular shard. Else the key
/// should be ingested by all shards (e.g. dbdir metadata).
pub fn get_shard_number(&self, key: &Key) -> Option<ShardNumber> {
if self.count < ShardCount(2) || key_is_broadcast(key) {
None
} else {
Some(key_to_shard_number(self.count, self.stripe_size, key))
}
}
pub fn default_with_shards(shard_count: ShardCount) -> Self {
ShardMap {
layout: LAYOUT_V1,
count: shard_count,
stripe_size: DEFAULT_STRIPE_SIZE,
pageservers: (0..shard_count.0 as usize).map(|_| None).collect(),
}
}
}
impl ShardIdentity {
/// An identity with number=0 count=0 is a "none" identity, which represents legacy
/// tenants. Modern single-shard tenants should not use this: they should
/// have number=0 count=1.
pub fn none() -> Self {
Self {
number: ShardNumber(0),
count: ShardCount(0),
layout: LAYOUT_V1,
stripe_size: DEFAULT_STRIPE_SIZE,
}
}
pub fn new(number: ShardNumber, count: ShardCount, stripe_size: ShardStripeSize) -> Self {
Self {
number,
count,
layout: LAYOUT_V1,
stripe_size,
}
}
pub fn get_shard_number(&self, key: &Key) -> ShardNumber {
key_to_shard_number(self.count, self.stripe_size, key)
}
/// Return true if the key should be ingested by this shard
pub fn is_key_local(&self, key: &Key) -> bool {
if self.count < ShardCount(2) || key_is_broadcast(key) {
true
} else {
key_to_shard_number(self.count, self.stripe_size, key) == self.number
}
}
pub fn slug(&self) -> String {
if self.count > ShardCount(0) {
format!("-{:02x}{:02x}", self.number.0, self.count.0)
} else {
String::new()
}
}
}
impl Default for ShardIdentity {
/// The default identity is to be the only shard for a tenant, i.e. the legacy
/// pre-sharding case.
fn default() -> Self {
ShardIdentity {
layout: LAYOUT_V1,
number: ShardNumber(0),
count: ShardCount(1),
stripe_size: DEFAULT_STRIPE_SIZE,
}
}
}
/// Whether this key should be ingested by all shards
fn key_is_broadcast(key: &Key) -> bool {
// TODO: deduplicate wrt pgdatadir_mapping.rs
fn is_rel_block_key(key: &Key) -> bool {
key.field1 == 0x00 && key.field4 != 0
}
// TODO: can we be less conservative? Starting point is to broadcast everything
// except for rel block keys
!is_rel_block_key(key)
}
/// Provide the same result as the function in postgres `hashfn.h` with the same name
fn murmurhash32(data: u32) -> u32 {
let mut h = data;
h ^= h >> 16;
h *= 0x85ebca6b;
h ^= h >> 13;
h *= 0xc2b2ae35;
h ^= h >> 16;
h
}
/// Provide the same result as the function in postgres `hashfn.h` with the same name
fn hash_combine(mut a: u32, b: u32) -> u32 {
a ^= b + 0x9e3779b9 + (a << 6) + (a >> 2);
a
}
/// Where a Key is to be distributed across shards, select the shard. This function
/// does not account for keys that should be broadcast across shards.
///
/// The hashing in this function must exactly match what we do in postgres smgr
/// code. The resulting distribution of pages is intended to preserve locality within
/// `stripe_size` ranges of contiguous block numbers in the same relation, while otherwise
/// distributing data pseudo-randomly.
///
/// The mapping of key to shard is not stable across changes to ShardCount: this is intentional
/// and will be handled at higher levels when shards are split.
fn key_to_shard_number(count: ShardCount, stripe_size: ShardStripeSize, key: &Key) -> ShardNumber {
// Fast path for un-sharded tenants or broadcast keys
if count < ShardCount(2) || key_is_broadcast(key) {
return ShardNumber(0);
}
// spcNode
let mut hash = murmurhash32(key.field2);
// dbNode
hash = hash_combine(hash, murmurhash32(key.field3));
// relNode
hash = hash_combine(hash, murmurhash32(key.field4));
// blockNum/stripe size
hash = hash_combine(hash, murmurhash32(key.field6 / stripe_size.0));
let shard = (hash % count.0 as u32) as u8;
ShardNumber(shard)
}

View File

@@ -3,6 +3,7 @@ use anyhow::Context;
use chrono::{DateTime, Utc};
use consumption_metrics::EventType;
use futures::stream::StreamExt;
use pageserver_api::shard::ShardNumber;
use std::{sync::Arc, time::SystemTime};
use utils::{
id::{TenantId, TimelineId},
@@ -229,6 +230,11 @@ where
while let Some((tenant_id, tenant)) = tenants.next().await {
let mut tenant_resident_size = 0;
// Sharded tenants report all consumption metrics from shard zero
if tenant.get_shard().number == ShardNumber(0) {
continue;
}
for timeline in tenant.list_timelines() {
let timeline_id = timeline.timeline_id;

View File

@@ -15,6 +15,7 @@ use crate::virtual_file::VirtualFile;
use anyhow::Context;
use camino::Utf8PathBuf;
use hex::FromHex;
use pageserver_api::shard::ShardIdentity;
use remote_storage::{GenericRemoteStorage, RemotePath};
use serde::Deserialize;
use serde::Serialize;
@@ -300,6 +301,7 @@ impl DeletionList {
fn push(
&mut self,
tenant: &TenantId,
shard: &ShardIdentity,
timeline: &TimelineId,
generation: Generation,
objects: &mut Vec<RemotePath>,
@@ -326,7 +328,7 @@ impl DeletionList {
let timeline_entry = tenant_entry.timelines.entry(*timeline).or_default();
let timeline_remote_path = remote_timeline_path(tenant, timeline);
let timeline_remote_path = remote_timeline_path(tenant, shard, timeline);
self.size += objects.len();
timeline_entry.extend(objects.drain(..).map(|p| {
@@ -341,7 +343,9 @@ impl DeletionList {
let mut result = Vec::new();
for (tenant, tenant_deletions) in self.tenants.into_iter() {
for (timeline, timeline_layers) in tenant_deletions.timelines.into_iter() {
let timeline_remote_path = remote_timeline_path(&tenant, &timeline);
// FIXME: need to update DeletionList definition to store the ShardIdentity for each Tenant
let timeline_remote_path =
remote_timeline_path(&tenant, &ShardIdentity::none(), &timeline);
result.extend(
timeline_layers
.into_iter()
@@ -507,6 +511,7 @@ impl DeletionQueueClient {
pub(crate) async fn push_layers(
&self,
tenant_id: TenantId,
shard: &ShardIdentity,
timeline_id: TimelineId,
current_generation: Generation,
layers: Vec<(LayerFileName, Generation)>,
@@ -517,6 +522,7 @@ impl DeletionQueueClient {
for (layer, generation) in layers {
layer_paths.push(remote_layer_path(
&tenant_id,
shard,
&timeline_id,
&layer,
generation,
@@ -829,7 +835,8 @@ mod test {
gen: Generation,
) -> anyhow::Result<String> {
let tenant_id = self.harness.tenant_id;
let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID);
let relative_remote_path =
remote_timeline_path(&tenant_id, &ShardIdentity::none(), &TIMELINE_ID);
let remote_timeline_path = self.remote_fs_dir.join(relative_remote_path.get_path());
std::fs::create_dir_all(&remote_timeline_path)?;
let remote_layer_file_name = format!("{}{}", file_name, gen.get_suffix());
@@ -981,7 +988,8 @@ mod test {
let tenant_id = ctx.harness.tenant_id;
let content: Vec<u8> = "victim1 contents".into();
let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID);
let relative_remote_path =
remote_timeline_path(&tenant_id, &ShardIdentity::none(), &TIMELINE_ID);
let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
let deletion_prefix = ctx.harness.conf.deletion_prefix();
@@ -1010,6 +1018,7 @@ mod test {
client
.push_layers(
tenant_id,
&ShardIdentity::none(),
TIMELINE_ID,
now_generation,
[(layer_file_name_1.clone(), layer_generation)].to_vec(),
@@ -1055,7 +1064,8 @@ mod test {
ctx.set_latest_generation(latest_generation);
let tenant_id = ctx.harness.tenant_id;
let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID);
let relative_remote_path =
remote_timeline_path(&tenant_id, &ShardIdentity::none(), &TIMELINE_ID);
let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
// Initial state: a remote layer exists
@@ -1066,6 +1076,7 @@ mod test {
client
.push_layers(
tenant_id,
&ShardIdentity::none(),
TIMELINE_ID,
stale_generation,
[(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(),
@@ -1081,6 +1092,7 @@ mod test {
client
.push_layers(
tenant_id,
&ShardIdentity::none(),
TIMELINE_ID,
latest_generation,
[(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(),
@@ -1104,7 +1116,8 @@ mod test {
let tenant_id = ctx.harness.tenant_id;
let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID);
let relative_remote_path =
remote_timeline_path(&tenant_id, &ShardIdentity::none(), &TIMELINE_ID);
let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
let deletion_prefix = ctx.harness.conf.deletion_prefix();
@@ -1119,6 +1132,7 @@ mod test {
client
.push_layers(
tenant_id,
&ShardIdentity::none(),
TIMELINE_ID,
now_generation.previous(),
[(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(),
@@ -1133,6 +1147,7 @@ mod test {
client
.push_layers(
tenant_id,
&ShardIdentity::none(),
TIMELINE_ID,
now_generation,
[(EXAMPLE_LAYER_NAME_ALT.clone(), layer_generation)].to_vec(),
@@ -1228,6 +1243,7 @@ pub(crate) mod mock {
for (layer, generation) in op.layers {
objects.push(remote_layer_path(
&op.tenant_id,
&ShardIdentity::none(),
&op.timeline_id,
&layer,
generation,

View File

@@ -19,6 +19,7 @@ use std::collections::HashMap;
use std::fs::create_dir_all;
use std::time::Duration;
use pageserver_api::shard::ShardIdentity;
use regex::Regex;
use remote_storage::RemotePath;
use tokio_util::sync::CancellationToken;
@@ -390,6 +391,8 @@ impl ListWriter {
for (layer, generation) in op.layers {
layer_paths.push(remote_layer_path(
&op.tenant_id,
// TODO: store shard in deletion list
&ShardIdentity::none(),
&op.timeline_id,
&layer,
generation,
@@ -399,6 +402,8 @@ impl ListWriter {
if !self.pending.push(
&op.tenant_id,
// TODO: store shard in deletion list
&ShardIdentity::none(),
&op.timeline_id,
op.generation,
&mut layer_paths,
@@ -406,6 +411,8 @@ impl ListWriter {
self.flush().await;
let retry_succeeded = self.pending.push(
&op.tenant_id,
// TODO: store shard in deletion list
&ShardIdentity::none(),
&op.timeline_id,
op.generation,
&mut layer_paths,

View File

@@ -1288,9 +1288,11 @@ impl<'a> DatadirModification<'a> {
self.pending_nblocks = 0;
for (key, value) in self.pending_updates.drain() {
tracing::debug!("commit: put {} @ {}", key, lsn);
writer.put(key, lsn, &value, ctx).await?;
}
for key_range in self.pending_deletions.drain(..) {
tracing::debug!("commit: delete {:?} @ {}", key_range, lsn);
writer.delete(key_range, lsn).await?;
}
@@ -1303,6 +1305,10 @@ impl<'a> DatadirModification<'a> {
Ok(())
}
pub fn is_no_op(&self) -> bool {
self.pending_updates.is_empty() && self.pending_deletions.is_empty()
}
// Internal helper functions to batch the modifications
async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
@@ -1514,7 +1520,7 @@ fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key {
}
}
fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
pub fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
Key {
field1: 0x00,
field2: rel.spcnode,

View File

@@ -1,106 +1,11 @@
use crate::walrecord::NeonWalRecord;
use anyhow::{bail, Result};
use byteorder::{ByteOrder, BE};
use anyhow::Result;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::ops::{AddAssign, Range};
use std::time::Duration;
/// Key used in the Repository kv-store.
///
/// The Repository treats this as an opaque struct, but see the code in pgdatadir_mapping.rs
/// for what we actually store in these fields.
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
pub struct Key {
pub field1: u8,
pub field2: u32,
pub field3: u32,
pub field4: u32,
pub field5: u8,
pub field6: u32,
}
pub const KEY_SIZE: usize = 18;
impl Key {
/// 'field2' is used to store tablespaceid for relations and small enum numbers for other relish.
/// As long as Neon does not support tablespace (because of lack of access to local file system),
/// we can assume that only some predefined namespace OIDs are used which can fit in u16
pub fn to_i128(&self) -> i128 {
assert!(self.field2 < 0xFFFF || self.field2 == 0xFFFFFFFF || self.field2 == 0x22222222);
(((self.field1 & 0xf) as i128) << 120)
| (((self.field2 & 0xFFFF) as i128) << 104)
| ((self.field3 as i128) << 72)
| ((self.field4 as i128) << 40)
| ((self.field5 as i128) << 32)
| self.field6 as i128
}
pub const fn from_i128(x: i128) -> Self {
Key {
field1: ((x >> 120) & 0xf) as u8,
field2: ((x >> 104) & 0xFFFF) as u32,
field3: (x >> 72) as u32,
field4: (x >> 40) as u32,
field5: (x >> 32) as u8,
field6: x as u32,
}
}
pub fn next(&self) -> Key {
self.add(1)
}
pub fn add(&self, x: u32) -> Key {
let mut key = *self;
let r = key.field6.overflowing_add(x);
key.field6 = r.0;
if r.1 {
let r = key.field5.overflowing_add(1);
key.field5 = r.0;
if r.1 {
let r = key.field4.overflowing_add(1);
key.field4 = r.0;
if r.1 {
let r = key.field3.overflowing_add(1);
key.field3 = r.0;
if r.1 {
let r = key.field2.overflowing_add(1);
key.field2 = r.0;
if r.1 {
let r = key.field1.overflowing_add(1);
key.field1 = r.0;
assert!(!r.1);
}
}
}
}
}
key
}
pub fn from_slice(b: &[u8]) -> Self {
Key {
field1: b[0],
field2: u32::from_be_bytes(b[1..5].try_into().unwrap()),
field3: u32::from_be_bytes(b[5..9].try_into().unwrap()),
field4: u32::from_be_bytes(b[9..13].try_into().unwrap()),
field5: b[13],
field6: u32::from_be_bytes(b[14..18].try_into().unwrap()),
}
}
pub fn write_to_byte_slice(&self, buf: &mut [u8]) {
buf[0] = self.field1;
BE::write_u32(&mut buf[1..5], self.field2);
BE::write_u32(&mut buf[5..9], self.field3);
BE::write_u32(&mut buf[9..13], self.field4);
buf[13] = self.field5;
BE::write_u32(&mut buf[14..18], self.field6);
}
}
pub use pageserver_api::key::{Key, KEY_SIZE};
pub fn key_range_size(key_range: &Range<Key>) -> u32 {
let start = key_range.start;
@@ -129,49 +34,6 @@ pub fn singleton_range(key: Key) -> Range<Key> {
key..key.next()
}
impl fmt::Display for Key {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{:02X}{:08X}{:08X}{:08X}{:02X}{:08X}",
self.field1, self.field2, self.field3, self.field4, self.field5, self.field6
)
}
}
impl Key {
pub const MIN: Key = Key {
field1: u8::MIN,
field2: u32::MIN,
field3: u32::MIN,
field4: u32::MIN,
field5: u8::MIN,
field6: u32::MIN,
};
pub const MAX: Key = Key {
field1: u8::MAX,
field2: u32::MAX,
field3: u32::MAX,
field4: u32::MAX,
field5: u8::MAX,
field6: u32::MAX,
};
pub fn from_hex(s: &str) -> Result<Self> {
if s.len() != 36 {
bail!("parse error");
}
Ok(Key {
field1: u8::from_str_radix(&s[0..2], 16)?,
field2: u32::from_str_radix(&s[2..10], 16)?,
field3: u32::from_str_radix(&s[10..18], 16)?,
field4: u32::from_str_radix(&s[18..26], 16)?,
field5: u8::from_str_radix(&s[26..28], 16)?,
field6: u32::from_str_radix(&s[28..36], 16)?,
})
}
}
/// A 'value' stored for a one Key.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Value {

View File

@@ -15,6 +15,7 @@ use anyhow::{bail, Context};
use camino::{Utf8Path, Utf8PathBuf};
use futures::FutureExt;
use pageserver_api::models::TimelineState;
use pageserver_api::shard::ShardIdentity;
use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
use storage_broker::BrokerClientChannel;
@@ -169,6 +170,7 @@ pub struct TenantSharedResources {
/// for an attached tenant is a subset of the [`LocationConf`], represented
/// in this struct.
pub(super) struct AttachedTenantConf {
shard: ShardIdentity,
tenant_conf: TenantConfOpt,
location: AttachedLocationConfig,
}
@@ -177,6 +179,7 @@ impl AttachedTenantConf {
fn try_from(location_conf: LocationConf) -> anyhow::Result<Self> {
match &location_conf.mode {
LocationMode::Attached(attach_conf) => Ok(Self {
shard: location_conf.shard,
tenant_conf: location_conf.tenant_conf,
location: attach_conf.clone(),
}),
@@ -685,9 +688,11 @@ impl Tenant {
// Get list of remote timelines
// download index files for every tenant timeline
info!("listing remote timelines");
let shard = self.tenant_conf.read().unwrap().shard.clone();
let (remote_timeline_ids, other_keys) = remote_timeline_client::list_remote_timelines(
remote_storage,
self.tenant_id,
&shard,
cancel.clone(),
)
.await?;
@@ -1148,6 +1153,7 @@ impl Tenant {
self.deletion_queue_client.clone(),
self.conf,
self.tenant_id,
self.tenant_conf.read().unwrap().shard.clone(),
timeline_id,
self.generation,
);
@@ -2122,6 +2128,11 @@ impl Tenant {
pub fn get_tenant_id(&self) -> TenantId {
self.tenant_id
}
pub(crate) fn get_shard(&self) -> ShardIdentity {
self.tenant_conf.read().unwrap().shard.clone()
}
pub fn tenant_specific_overrides(&self) -> TenantConfOpt {
self.tenant_conf.read().unwrap().tenant_conf
}
@@ -2988,6 +2999,7 @@ impl Tenant {
self.deletion_queue_client.clone(),
self.conf,
self.tenant_id,
self.tenant_conf.read().unwrap().shard.clone(),
timeline_id,
self.generation,
);

View File

@@ -10,6 +10,7 @@
//!
use anyhow::Context;
use pageserver_api::models;
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
use serde::{Deserialize, Serialize};
use std::num::NonZeroU64;
use std::time::Duration;
@@ -85,6 +86,11 @@ pub(crate) enum LocationMode {
/// but have distinct LocationConf.
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct LocationConf {
/// Detailed identity of this TenantShard. The shard number and count usually
/// appear in the keys of maps containing tenants, but it is convenient to also
/// store it here.
pub(crate) shard: ShardIdentity,
/// The location-specific part of the configuration, describes the operating
/// mode of this pageserver for this tenant.
pub(crate) mode: LocationMode,
@@ -156,6 +162,7 @@ impl LocationConf {
/// possible state. This function should eventually be removed.
pub(crate) fn attached_single(tenant_conf: TenantConfOpt, generation: Generation) -> Self {
Self {
shard: ShardIdentity::none(),
mode: LocationMode::Attached(AttachedLocationConfig {
generation,
attach_mode: AttachmentMode::Single,
@@ -226,7 +233,21 @@ impl LocationConf {
}
};
Ok(Self { mode, tenant_conf })
let shard = if conf.shard_count == 0 {
ShardIdentity::none()
} else {
ShardIdentity::new(
ShardNumber(conf.shard_number),
ShardCount(conf.shard_count),
ShardStripeSize(conf.shard_stripe_size),
)
};
Ok(Self {
shard,
mode,
tenant_conf,
})
}
}
@@ -236,6 +257,7 @@ impl Default for LocationConf {
// => tech debt since https://github.com/neondatabase/neon/issues/1555
fn default() -> Self {
Self {
shard: ShardIdentity::none(),
mode: LocationMode::Attached(AttachedLocationConfig {
generation: Generation::none(),
attach_mode: AttachmentMode::Single,

View File

@@ -771,7 +771,10 @@ impl TenantManager {
new_location_config: LocationConf,
ctx: &RequestContext,
) -> Result<(), anyhow::Error> {
info!("configuring tenant location {tenant_id} to state {new_location_config:?}");
info!(
"configuring tenant location {tenant_id} {} to state {new_location_config:?}",
new_location_config.shard.slug()
);
// Special case fast-path for updates to Tenant: if our upsert is only updating configuration,
// then we do not need to set the slot to InProgress, we can just call into the

View File

@@ -188,6 +188,7 @@ use anyhow::Context;
use camino::Utf8Path;
use chrono::{NaiveDateTime, Utc};
use pageserver_api::shard::ShardIdentity;
use scopeguard::ScopeGuard;
use tokio_util::sync::CancellationToken;
use utils::backoff::{
@@ -298,6 +299,7 @@ pub struct RemoteTimelineClient {
runtime: tokio::runtime::Handle,
tenant_id: TenantId,
shard: ShardIdentity,
timeline_id: TimelineId,
generation: Generation,
@@ -322,9 +324,12 @@ impl RemoteTimelineClient {
deletion_queue_client: DeletionQueueClient,
conf: &'static PageServerConf,
tenant_id: TenantId,
shard: ShardIdentity,
timeline_id: TimelineId,
generation: Generation,
) -> RemoteTimelineClient {
tracing::info!("RemoteTimelineClient::new shard={}", shard.slug());
RemoteTimelineClient {
conf,
runtime: if cfg!(test) {
@@ -334,6 +339,7 @@ impl RemoteTimelineClient {
BACKGROUND_RUNTIME.handle().clone()
},
tenant_id,
shard,
timeline_id,
generation,
storage_impl: remote_storage,
@@ -461,6 +467,7 @@ impl RemoteTimelineClient {
let index_part = download::download_index_part(
&self.storage_impl,
&self.tenant_id,
&self.shard,
&self.timeline_id,
self.generation,
cancel,
@@ -503,6 +510,7 @@ impl RemoteTimelineClient {
self.conf,
&self.storage_impl,
self.tenant_id,
&self.shard,
self.timeline_id,
layer_file_name,
layer_metadata,
@@ -893,6 +901,7 @@ impl RemoteTimelineClient {
upload::upload_index_part(
&self.storage_impl,
&self.tenant_id,
&self.shard,
&self.timeline_id,
self.generation,
&index_part_with_deleted_at,
@@ -951,6 +960,7 @@ impl RemoteTimelineClient {
.map(|(file_name, meta)| {
remote_layer_path(
&self.tenant_id,
&self.shard,
&self.timeline_id,
&file_name,
meta.generation,
@@ -964,7 +974,8 @@ impl RemoteTimelineClient {
// Do not delete index part yet, it is needed for possible retry. If we remove it first
// and retry will arrive to different pageserver there wont be any traces of it on remote storage
let timeline_storage_path = remote_timeline_path(&self.tenant_id, &self.timeline_id);
let timeline_storage_path =
remote_timeline_path(&self.tenant_id, &self.shard, &self.timeline_id);
// Execute all pending deletions, so that when we proceed to do a list_prefixes below, we aren't
// taking the burden of listing all the layers that we already know we should delete.
@@ -1000,7 +1011,12 @@ impl RemoteTimelineClient {
.unwrap_or(
// No generation-suffixed indices, assume we are dealing with
// a legacy index.
remote_index_path(&self.tenant_id, &self.timeline_id, Generation::none()),
remote_index_path(
&self.tenant_id,
&self.shard,
&self.timeline_id,
Generation::none(),
),
);
let remaining_layers: Vec<RemotePath> = remaining
@@ -1178,13 +1194,20 @@ impl RemoteTimelineClient {
let upload_result: anyhow::Result<()> = match &task.op {
UploadOp::UploadLayer(ref layer, ref layer_metadata) => {
let path = layer.local_path();
upload::upload_timeline_layer(
self.conf,
&self.storage_impl,
path,
layer_metadata,
let remote_path = remote_layer_path(
&self.tenant_id,
&self.shard,
&self.timeline_id,
&layer.layer_desc().filename(),
self.generation,
);
let local_path = layer.local_path();
upload::upload_timeline_layer(
&self.storage_impl,
local_path,
remote_path,
layer_metadata,
)
.measure_remote_op(
self.tenant_id,
@@ -1208,6 +1231,7 @@ impl RemoteTimelineClient {
let res = upload::upload_index_part(
&self.storage_impl,
&self.tenant_id,
&self.shard,
&self.timeline_id,
self.generation,
index_part,
@@ -1233,6 +1257,7 @@ impl RemoteTimelineClient {
.deletion_queue_client
.push_layers(
self.tenant_id,
&self.shard,
self.timeline_id,
self.generation,
delete.layers.clone(),
@@ -1503,24 +1528,33 @@ impl RemoteTimelineClient {
}
}
pub fn remote_timelines_path(tenant_id: &TenantId) -> RemotePath {
let path = format!("tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}");
pub fn remote_timelines_path(tenant_id: &TenantId, shard: &ShardIdentity) -> RemotePath {
let path = format!(
"tenants/{tenant_id}{}/{TIMELINES_SEGMENT_NAME}",
shard.slug()
);
RemotePath::from_string(&path).expect("Failed to construct path")
}
pub fn remote_timeline_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> RemotePath {
remote_timelines_path(tenant_id).join(Utf8Path::new(&timeline_id.to_string()))
pub fn remote_timeline_path(
tenant_id: &TenantId,
shard: &ShardIdentity,
timeline_id: &TimelineId,
) -> RemotePath {
remote_timelines_path(tenant_id, shard).join(Utf8Path::new(&timeline_id.to_string()))
}
pub fn remote_layer_path(
tenant_id: &TenantId,
shard: &ShardIdentity,
timeline_id: &TimelineId,
layer_file_name: &LayerFileName,
generation: Generation,
) -> RemotePath {
// Generation-aware key format
let path = format!(
"tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
"tenants/{tenant_id}{0}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{1}{2}",
shard.slug(),
layer_file_name.file_name(),
generation.get_suffix()
);
@@ -1530,11 +1564,13 @@ pub fn remote_layer_path(
pub fn remote_index_path(
tenant_id: &TenantId,
shard: &ShardIdentity,
timeline_id: &TimelineId,
generation: Generation,
) -> RemotePath {
RemotePath::from_string(&format!(
"tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
"tenants/{tenant_id}{0}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{1}{2}",
shard.slug(),
IndexPart::FILE_NAME,
generation.get_suffix()
))
@@ -1558,29 +1594,6 @@ pub fn parse_remote_index_path(path: RemotePath) -> Option<Generation> {
}
}
/// Files on the remote storage are stored with paths, relative to the workdir.
/// That path includes in itself both tenant and timeline ids, allowing to have a unique remote storage path.
///
/// Errors if the path provided does not start from pageserver's workdir.
pub fn remote_path(
conf: &PageServerConf,
local_path: &Utf8Path,
generation: Generation,
) -> anyhow::Result<RemotePath> {
let stripped = local_path
.strip_prefix(&conf.workdir)
.context("Failed to strip workdir prefix")?;
let suffixed = format!("{0}{1}", stripped, generation.get_suffix());
RemotePath::new(Utf8Path::new(&suffixed)).with_context(|| {
format!(
"to resolve remote part of path {:?} for base {:?}",
local_path, conf.workdir
)
})
}
#[cfg(test)]
mod tests {
use super::*;
@@ -1677,6 +1690,7 @@ mod tests {
conf: self.harness.conf,
runtime: tokio::runtime::Handle::current(),
tenant_id: self.harness.tenant_id,
shard: ShardIdentity::none(),
timeline_id: TIMELINE_ID,
generation,
storage_impl: self.harness.remote_storage.clone(),
@@ -2010,7 +2024,13 @@ mod tests {
std::fs::create_dir_all(remote_timeline_dir).expect("creating test dir should work");
let index_path = test_state.harness.remote_fs_dir.join(
remote_index_path(&test_state.harness.tenant_id, &TIMELINE_ID, generation).get_path(),
remote_index_path(
&test_state.harness.tenant_id,
&ShardIdentity::none(),
&TIMELINE_ID,
generation,
)
.get_path(),
);
eprintln!("Writing {index_path}");
std::fs::write(&index_path, index_part_bytes).unwrap();

View File

@@ -9,6 +9,7 @@ use std::time::Duration;
use anyhow::{anyhow, Context};
use camino::Utf8Path;
use pageserver_api::shard::ShardIdentity;
use tokio::fs;
use tokio::io::AsyncWriteExt;
use tokio_util::sync::CancellationToken;
@@ -40,6 +41,7 @@ pub async fn download_layer_file<'a>(
conf: &'static PageServerConf,
storage: &'a GenericRemoteStorage,
tenant_id: TenantId,
shard: &ShardIdentity,
timeline_id: TimelineId,
layer_file_name: &'a LayerFileName,
layer_metadata: &'a LayerFileMetadata,
@@ -52,6 +54,7 @@ pub async fn download_layer_file<'a>(
let remote_path = remote_layer_path(
&tenant_id,
shard,
&timeline_id,
layer_file_name,
layer_metadata.generation,
@@ -170,9 +173,10 @@ pub fn is_temp_download_file(path: &Utf8Path) -> bool {
pub async fn list_remote_timelines(
storage: &GenericRemoteStorage,
tenant_id: TenantId,
shard: &ShardIdentity,
cancel: CancellationToken,
) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
let remote_path = remote_timelines_path(&tenant_id);
let remote_path = remote_timelines_path(&tenant_id, shard);
fail::fail_point!("storage-sync-list-remote-timelines", |_| {
anyhow::bail!("storage-sync-list-remote-timelines");
@@ -212,11 +216,12 @@ pub async fn list_remote_timelines(
async fn do_download_index_part(
storage: &GenericRemoteStorage,
tenant_id: &TenantId,
shard: &ShardIdentity,
timeline_id: &TimelineId,
index_generation: Generation,
cancel: CancellationToken,
) -> Result<IndexPart, DownloadError> {
let remote_path = remote_index_path(tenant_id, timeline_id, index_generation);
let remote_path = remote_index_path(tenant_id, shard, timeline_id, index_generation);
let index_part_bytes = download_retry_forever(
|| async {
@@ -253,6 +258,7 @@ async fn do_download_index_part(
pub(super) async fn download_index_part(
storage: &GenericRemoteStorage,
tenant_id: &TenantId,
shard: &ShardIdentity,
timeline_id: &TimelineId,
my_generation: Generation,
cancel: CancellationToken,
@@ -261,8 +267,15 @@ pub(super) async fn download_index_part(
if my_generation.is_none() {
// Operating without generations: just fetch the generation-less path
return do_download_index_part(storage, tenant_id, timeline_id, my_generation, cancel)
.await;
return do_download_index_part(
storage,
tenant_id,
shard,
timeline_id,
my_generation,
cancel,
)
.await;
}
// Stale case: If we were intentionally attached in a stale generation, there may already be a remote
@@ -272,6 +285,7 @@ pub(super) async fn download_index_part(
let res = do_download_index_part(
storage,
tenant_id,
shard,
timeline_id,
my_generation,
cancel.clone(),
@@ -299,6 +313,7 @@ pub(super) async fn download_index_part(
let res = do_download_index_part(
storage,
tenant_id,
shard,
timeline_id,
my_generation.previous(),
cancel.clone(),
@@ -321,7 +336,7 @@ pub(super) async fn download_index_part(
// General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
// objects, and select the highest one with a generation <= my_generation.
let index_prefix = remote_index_path(tenant_id, timeline_id, Generation::none());
let index_prefix = remote_index_path(tenant_id, shard, timeline_id, Generation::none());
let indices = backoff::retry(
|| async { storage.list_files(Some(&index_prefix)).await },
|_| false,
@@ -347,14 +362,21 @@ pub(super) async fn download_index_part(
match max_previous_generation {
Some(g) => {
tracing::debug!("Found index_part in generation {g:?}");
do_download_index_part(storage, tenant_id, timeline_id, g, cancel).await
do_download_index_part(storage, tenant_id, shard, timeline_id, g, cancel).await
}
None => {
// Migration from legacy pre-generation state: we have a generation but no prior
// attached pageservers did. Try to load from a no-generation path.
tracing::info!("No index_part.json* found");
do_download_index_part(storage, tenant_id, timeline_id, Generation::none(), cancel)
.await
do_download_index_part(
storage,
tenant_id,
shard,
timeline_id,
Generation::none(),
cancel,
)
.await
}
}
}

View File

@@ -3,15 +3,13 @@
use anyhow::{bail, Context};
use camino::Utf8Path;
use fail::fail_point;
use pageserver_api::shard::ShardIdentity;
use std::io::ErrorKind;
use tokio::fs;
use super::Generation;
use crate::{
config::PageServerConf,
tenant::remote_timeline_client::{index::IndexPart, remote_index_path, remote_path},
};
use remote_storage::GenericRemoteStorage;
use crate::tenant::remote_timeline_client::{index::IndexPart, remote_index_path};
use remote_storage::{GenericRemoteStorage, RemotePath};
use utils::id::{TenantId, TimelineId};
use super::index::LayerFileMetadata;
@@ -22,6 +20,7 @@ use tracing::info;
pub(super) async fn upload_index_part<'a>(
storage: &'a GenericRemoteStorage,
tenant_id: &TenantId,
shard: &ShardIdentity,
timeline_id: &TimelineId,
generation: Generation,
index_part: &'a IndexPart,
@@ -38,7 +37,7 @@ pub(super) async fn upload_index_part<'a>(
let index_part_size = index_part_bytes.len();
let index_part_bytes = tokio::io::BufReader::new(std::io::Cursor::new(index_part_bytes));
let remote_path = remote_index_path(tenant_id, timeline_id, generation);
let remote_path = remote_index_path(tenant_id, shard, timeline_id, generation);
storage
.upload_storage_object(Box::new(index_part_bytes), index_part_size, &remote_path)
.await
@@ -50,11 +49,10 @@ pub(super) async fn upload_index_part<'a>(
///
/// On an error, bumps the retries count and reschedules the entire task.
pub(super) async fn upload_timeline_layer<'a>(
conf: &'static PageServerConf,
storage: &'a GenericRemoteStorage,
source_path: &'a Utf8Path,
source_path: &Utf8Path,
remote_path: RemotePath,
known_metadata: &'a LayerFileMetadata,
generation: Generation,
) -> anyhow::Result<()> {
fail_point!("before-upload-layer", |_| {
bail!("failpoint before-upload-layer")
@@ -62,7 +60,6 @@ pub(super) async fn upload_timeline_layer<'a>(
pausable_failpoint!("before-upload-layer-pausable");
let storage_path = remote_path(conf, source_path, generation)?;
let source_file_res = fs::File::open(&source_path).await;
let source_file = match source_file_res {
Ok(source_file) => source_file,
@@ -97,7 +94,7 @@ pub(super) async fn upload_timeline_layer<'a>(
.with_context(|| format!("convert {source_path:?} size {fs_size} usize"))?;
storage
.upload(source_file, fs_size, &storage_path, None)
.upload(source_file, fs_size, &remote_path, None)
.await
.with_context(|| format!("upload layer from local path '{source_path}'"))?;

View File

@@ -12,8 +12,12 @@ use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
use fail::fail_point;
use itertools::Itertools;
use pageserver_api::models::{
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, LayerMapInfo, TimelineState,
use pageserver_api::{
models::{
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, LayerMapInfo,
TimelineState,
},
shard::ShardIdentity,
};
use serde_with::serde_as;
use storage_broker::BrokerClientChannel;
@@ -1310,6 +1314,11 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.gc_feedback)
}
pub(crate) fn get_shard(&self) -> ShardIdentity {
let tenant_conf = &self.tenant_conf.read().unwrap();
tenant_conf.shard.clone()
}
pub(super) fn tenant_conf_updated(&self) {
// NB: Most tenant conf options are read by background loops, so,
// changes will automatically be picked up.

View File

@@ -36,7 +36,7 @@ use crate::{
};
use postgres_backend::is_expected_io_error;
use postgres_connection::PgConnectionConfig;
use postgres_ffi::waldecoder::WalStreamDecoder;
use utils::pageserver_feedback::PageserverFeedback;
use utils::{id::NodeId, lsn::Lsn};
@@ -244,13 +244,22 @@ pub(super) async fn handle_walreceiver_connection(
info!("last_record_lsn {last_rec_lsn} starting replication from {startpoint}, safekeeper is at {end_of_wal}...");
let query = format!("START_REPLICATION PHYSICAL {startpoint}");
let shard = timeline.get_shard();
let shard_str = serde_json::to_string(&shard).map_err(|e| {
WalReceiverError::Other(anyhow!(
"Failed to serialize shard info for walreceiver: {e}"
))
})?;
info!("starting replication for shard {shard_str}");
let query = format!(
"START_REPLICATION PHYSICAL {startpoint} (shard={})",
shard_str
);
let copy_stream = replication_client.copy_both_simple(&query).await?;
let mut physical_stream = pin!(ReplicationStream::new(copy_stream));
let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version);
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx).await?;
while let Some(replication_message) = {
@@ -273,9 +282,7 @@ pub(super) async fn handle_walreceiver_connection(
ReplicationMessage::XLogData(xlog_data) => {
connection_status.latest_connection_update = now;
connection_status.commit_lsn = Some(Lsn::from(xlog_data.wal_end()));
connection_status.streaming_lsn = Some(Lsn::from(
xlog_data.wal_start() + xlog_data.data().len() as u64,
));
connection_status.streaming_lsn = Some(Lsn::from(xlog_data.wal_start()));
if !xlog_data.data().is_empty() {
connection_status.latest_wal_update = now;
}
@@ -293,44 +300,29 @@ pub(super) async fn handle_walreceiver_connection(
let status_update = match replication_message {
ReplicationMessage::XLogData(xlog_data) => {
// Pass the WAL data to the decoder, and see if we can decode
// more records as a result.
let data = xlog_data.data();
let startlsn = Lsn::from(xlog_data.wal_start());
let endlsn = startlsn + data.len() as u64;
// Process decoded WAL record.
let next_lsn = Lsn::from(xlog_data.wal_start());
let data = xlog_data.into_data();
trace!("received XLogData between {startlsn} and {endlsn}");
trace!("received XLogData up to {next_lsn}");
waldecoder.feed_bytes(data);
let mut decoded = DecodedWALRecord::default();
let mut modification = timeline.begin_modification(next_lsn);
walingest
.ingest_record(data, next_lsn, &mut modification, &mut decoded, &ctx)
.await
.with_context(|| format!("could not ingest record at {next_lsn}"))?;
{
let mut decoded = DecodedWALRecord::default();
let mut modification = timeline.begin_modification(endlsn);
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
// at risk of hitting a deadlock.
if !lsn.is_aligned() {
return Err(WalReceiverError::Other(anyhow!("LSN not aligned")));
}
fail_point!("walreceiver-after-ingest");
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
.await
.with_context(|| format!("could not ingest record at {lsn}"))?;
last_rec_lsn = next_lsn;
fail_point!("walreceiver-after-ingest");
last_rec_lsn = lsn;
}
}
if !caught_up && endlsn >= end_of_wal {
info!("caught up at LSN {endlsn}");
if !caught_up && next_lsn >= end_of_wal {
info!("caught up at LSN {next_lsn}");
caught_up = true;
}
Some(endlsn)
Some(last_rec_lsn)
}
ReplicationMessage::PrimaryKeepAlive(keepalive) => {

View File

@@ -21,6 +21,7 @@
//! redo Postgres process, but some records it can handle directly with
//! bespoken Rust code.
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
@@ -46,6 +47,7 @@ use postgres_ffi::BLCKSZ;
use utils::lsn::Lsn;
pub struct WalIngest<'a> {
shard: ShardIdentity,
timeline: &'a Timeline,
checkpoint: CheckPoint,
@@ -65,6 +67,7 @@ impl<'a> WalIngest<'a> {
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
Ok(WalIngest {
shard: timeline.get_shard(),
timeline,
checkpoint,
checkpoint_modified: false,
@@ -90,6 +93,36 @@ impl<'a> WalIngest<'a> {
modification.lsn = lsn;
decode_wal_record(recdata, decoded, self.timeline.pg_version)?;
tracing::trace!(
"decoded rmid={} xid={} xl_info={}",
decoded.xl_rmid,
decoded.xl_xid,
decoded.xl_info
);
// Fast path: we may skip the entire record if it only references blocks on another shard.
// Otherwise we proceed, and filter blocks later.
let any_local_blocks = decoded.blocks.iter().any(|blk| {
let rel = RelTag {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum,
};
let key = rel_block_to_key(rel, blk.blkno);
self.shard.is_key_local(&key)
});
// - We need at least one block to skip: otherwise we assume the record's
// payload is all in its other fields, which are metadata-ish things that
// we broadcast to all shards
// - ...and obviously, we can only skip a WAL record if it doesn't need to
// write to any pages in this shard.
let skip_record = decoded.blocks.len() > 0 && !any_local_blocks;
// TODO: actually skip (and update LSN at the time). Currently we just
// check later in the function that if we set skip_record==true, then we
// really would not have done any local IO.
let mut buf = decoded.record.clone();
buf.advance(decoded.main_data_offset);
@@ -358,6 +391,26 @@ impl<'a> WalIngest<'a> {
// Iterate through all the blocks that the record modifies, and
// "put" a separate copy of the record for each block.
for blk in decoded.blocks.iter() {
let rel = RelTag {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum,
};
let key = rel_block_to_key(rel, blk.blkno);
let key_is_local = self.shard.is_key_local(&key);
tracing::info!(
"ingest: shard decision {} (checkpoint={}) for key {}",
if !key_is_local { "drop" } else { "keep" },
self.checkpoint_modified,
key
);
if !key_is_local {
continue;
}
self.ingest_decoded_block(modification, lsn, decoded, blk, ctx)
.await?;
}
@@ -370,6 +423,12 @@ impl<'a> WalIngest<'a> {
self.checkpoint_modified = false;
}
if skip_record && !modification.is_no_op() {
tracing::error!(
"WAL record @ {lsn} would have been dropped, but we actually did modifications!"
);
}
// Now that this record has been fully handled, including updating the
// checkpoint data, let the repository know that it is up-to-date to this LSN
modification.commit(ctx).await?;
@@ -1459,8 +1518,15 @@ impl<'a> WalIngest<'a> {
//info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
modification.put_rel_extend(rel, new_nblocks, ctx).await?;
let mut key = rel_block_to_key(rel, blknum);
// fill the gap with zeros
for gap_blknum in old_nblocks..blknum {
key.field6 = gap_blknum;
if self.shard.get_shard_number(&key) != self.shard.number {
continue;
}
modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone())?;
}
}

View File

@@ -27,12 +27,14 @@
#include "commands/defrem.h"
#include "miscadmin.h"
#include "utils/acl.h"
#include "utils/fmgrprotos.h"
#include "fmgr.h"
#include "utils/guc.h"
#include "port.h"
#include <curl/curl.h>
#include "utils/jsonb.h"
#include "libpq/crypt.h"
#include "pagestore_client.h"
static ProcessUtility_hook_type PreviousProcessUtilityHook = NULL;
@@ -222,6 +224,104 @@ ErrorWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
return nmemb;
}
static size_t
ResponseWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
{
appendBinaryStringInfo((StringInfo)userdata, ptr, size*nmemb);
return nmemb;
}
void
RequestShardMapFromControlPlane(ShardMap* shard_map)
{
shard_map->n_shards = 0;
if (!ConsoleURL)
{
elog(LOG, "ConsoleURL not set, skipping forwarding");
return;
}
StringInfoData resp;
initStringInfo(&resp);
curl_easy_setopt(CurlHandle, CURLOPT_CUSTOMREQUEST, "GET");
curl_easy_setopt(CurlHandle, CURLOPT_URL, ConsoleURL);
curl_easy_setopt(CurlHandle, CURLOPT_ERRORBUFFER, CurlErrorBuf);
curl_easy_setopt(CurlHandle, CURLOPT_TIMEOUT, 3L /* seconds */ );
curl_easy_setopt(CurlHandle, CURLOPT_WRITEDATA, &resp);
curl_easy_setopt(CurlHandle, CURLOPT_WRITEFUNCTION, ResponseWriteCallback);
const int num_retries = 5;
int curl_status;
for (int i = 0; i < num_retries; i++)
{
if ((curl_status = curl_easy_perform(CurlHandle)) == CURLE_OK)
break;
elog(LOG, "Curl request failed on attempt %d: %s", i, CurlErrorBuf);
pg_usleep(1000 * 1000);
}
if (curl_status != CURLE_OK)
{
curl_easy_cleanup(CurlHandle);
elog(ERROR, "Failed to perform curl request: %s", CurlErrorBuf);
}
else
{
long response_code;
if (curl_easy_getinfo(CurlHandle, CURLINFO_RESPONSE_CODE, &response_code) != CURLE_UNKNOWN_OPTION)
{
if (response_code != 200)
{
bool error_exists = resp.len != 0;
if(error_exists)
{
elog(ERROR,
"[PG_LLM] Received HTTP code %ld from OpenAI: %s",
response_code,
resp.data);
}
else
{
elog(ERROR,
"[PG_LLM] Received HTTP code %ld from OpenAI",
response_code);
}
}
}
curl_easy_cleanup(CurlHandle);
JsonbContainer *jsonb = (JsonbContainer *)DatumGetPointer(DirectFunctionCall1(jsonb_in, CStringGetDatum(resp.data)));
JsonbValue v;
JsonbIterator *it;
JsonbIteratorToken r;
it = JsonbIteratorInit(jsonb);
r = JsonbIteratorNext(&it, &v, true);
if (r != WJB_BEGIN_ARRAY)
elog(ERROR, "Array of connection strings expected");
while ((r = JsonbIteratorNext(&it, &v, true)) != WJB_DONE)
{
if (r != WJB_ELEM)
continue;
if (shard_map->n_shards >= MAX_SHARDS)
elog(ERROR, "Too many shards");
if (v.type != jbvString)
elog(ERROR, "Connection string expected");
strncpy(shard_map->shard_connstr[shard_map->n_shards++],
v.val.string.val,
MAX_PS_CONNSTR_LEN);
}
shard_map->update_counter += 1;
pfree(resp.data);
}
}
static void
SendDeltasToControlPlane()
{

View File

@@ -2,5 +2,6 @@
#define CONTROL_PLANE_CONNECTOR_H
void InitControlPlaneConnector();
void RequestShardMapFromControlPlane(ShardMap* shard_map);
#endif

View File

@@ -18,11 +18,10 @@
#include "fmgr.h"
#include "access/xlog.h"
#include "access/xlogutils.h"
#include "common/hashfn.h"
#include "storage/buf_internals.h"
#include "storage/lwlock.h"
#include "storage/ipc.h"
#include "c.h"
#include "postmaster/interrupt.h"
#include "libpq-fe.h"
#include "libpq/pqformat.h"
@@ -35,22 +34,12 @@
#include "neon.h"
#include "walproposer.h"
#include "neon_utils.h"
#include "control_plane_connector.h"
#define PageStoreTrace DEBUG5
#define RECONNECT_INTERVAL_USEC 1000000
bool connected = false;
PGconn *pageserver_conn = NULL;
/*
* WaitEventSet containing:
* - WL_SOCKET_READABLE on pageserver_conn,
* - WL_LATCH_SET on MyLatch, and
* - WL_EXIT_ON_PM_DEATH.
*/
WaitEventSet *pageserver_conn_wes = NULL;
/* GUCs */
char *neon_timeline;
char *neon_tenant;
@@ -64,80 +53,165 @@ int flush_every_n_requests = 8;
int n_reconnect_attempts = 0;
int max_reconnect_attempts = 60;
#define MAX_PAGESERVER_CONNSTRING_SIZE 256
bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;
static bool pageserver_flush(shardno_t shard_no);
static void pageserver_disconnect(shardno_t shard_no);
static pqsigfunc prev_signal_handler;
static shmem_startup_hook_type prev_shmem_startup_hook;
#if PG_VERSION_NUM>=150000
static shmem_request_hook_type prev_shmem_request_hook;
#endif
static ShardMap* shard_map;
static LWLockId shard_map_lock;
static size_t shard_map_update_counter;
typedef struct
{
LWLockId lock;
pg_atomic_uint64 update_counter;
char pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE];
} PagestoreShmemState;
/*
* connection for each shard
*/
PGconn *conn;
/*
* WaitEventSet containing:
* - WL_SOCKET_READABLE on pageserver_conn,
* - WL_LATCH_SET on MyLatch, and
* - WL_EXIT_ON_PM_DEATH.
*/
WaitEventSet *wes;
} PageServer;
#if PG_VERSION_NUM >= 150000
static shmem_request_hook_type prev_shmem_request_hook = NULL;
static void walproposer_shmem_request(void);
static PageServer page_servers[MAX_SHARDS];
static void
psm_shmem_startup(void)
{
bool found;
if (prev_shmem_startup_hook)
{
prev_shmem_startup_hook();
}
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
shard_map = (ShardMap*)ShmemInitStruct("shard_map", sizeof(ShardMap), &found);
if (!found)
{
shard_map_lock = (LWLockId)GetNamedLWLockTranche("shard_map_lock");
shard_map->n_shards = 0;
shard_map->update_counter = 0;
}
LWLockRelease(AddinShmemInitLock);
}
static void
psm_shmem_request(void)
{
#if PG_VERSION_NUM>=150000
if (prev_shmem_request_hook)
prev_shmem_request_hook();
#endif
static shmem_startup_hook_type prev_shmem_startup_hook;
static PagestoreShmemState *pagestore_shared;
static uint64 pagestore_local_counter = 0;
static char local_pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE];
bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;
static bool pageserver_flush(void);
static void pageserver_disconnect(void);
static bool
CheckPageserverConnstring(char **newval, void **extra, GucSource source)
{
return strlen(*newval) < MAX_PAGESERVER_CONNSTRING_SIZE;
RequestAddinShmemSpace(sizeof(ShardMap));
RequestNamedLWLockTranche("shard_map_lock", 1);
}
static void
AssignPageserverConnstring(const char *newval, void *extra)
psm_init(void)
{
if(!pagestore_shared)
return;
LWLockAcquire(pagestore_shared->lock, LW_EXCLUSIVE);
strlcpy(pagestore_shared->pageserver_connstring, newval, MAX_PAGESERVER_CONNSTRING_SIZE);
pg_atomic_fetch_add_u64(&pagestore_shared->update_counter, 1);
LWLockRelease(pagestore_shared->lock);
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = psm_shmem_startup;
#if PG_VERSION_NUM>=150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = psm_shmem_request;
#else
psm_shmem_request();
#endif
}
static bool
CheckConnstringUpdated()
shardno_t
get_shard_number(BufferTag* tag)
{
if(!pagestore_shared)
return false;
return pagestore_local_counter < pg_atomic_read_u64(&pagestore_shared->update_counter);
shardno_t shard_no;
uint32 hash;
#if PG_MAJORVERSION_NUM < 16
hash = murmurhash32(tag->rnode.spcNode);
hash_combine(hash, murmurhash32(tag->rnode.dbNode));
hash_combine(hash, murmurhash32(tag->rnode.relNode));
hash_combine(hash, murmurhash32(tag->blockNum/STRIPE_SIZE));
#else
hash = murmurhash32(tag->spcOid);
hash_combine(hash, murmurhash32(tag->dbOid));
hash_combine(hash, murmurhash32(tag->relNumber));
hash_combine(hash, murmurhash32(tag->blockNum/STRIPE_SIZE));
#endif
LWLockAcquire(shard_map_lock, LW_SHARED);
while (shard_map->n_shards == 0 || shard_map_update_counter != shard_map->update_counter)
{
/* Close all existed connections */
for (shard_no = 0; shard_no < shard_map->n_shards; shard_no++)
{
if (page_servers[shard_no].conn)
pageserver_disconnect(shard_no);
}
/* Request new shard map from control plane under exclusive lock */
LWLockRelease(shard_map_lock);
LWLockAcquire(shard_map_lock, LW_EXCLUSIVE);
if (shard_map->n_shards == 0)
{
if (*page_server_connstring)
{
shard_map->n_shards = 1;
strncpy(shard_map->shard_connstr[0], page_server_connstring, sizeof shard_map->shard_connstr[0]);
}
else
{
RequestShardMapFromControlPlane(shard_map);
}
shard_map_update_counter = shard_map->update_counter;
}
}
shard_no = hash % shard_map->n_shards;
LWLockRelease(shard_map_lock);
return shard_no;
}
static void
ReloadConnstring()
pageserver_sighup_handler(SIGNAL_ARGS)
{
if(!pagestore_shared)
return;
LWLockAcquire(pagestore_shared->lock, LW_SHARED);
strlcpy(local_pageserver_connstring, pagestore_shared->pageserver_connstring, sizeof(local_pageserver_connstring));
pagestore_local_counter = pg_atomic_read_u64(&pagestore_shared->update_counter);
LWLockRelease(pagestore_shared->lock);
if (prev_signal_handler)
{
prev_signal_handler(postgres_signal_arg);
}
neon_log(LOG, "Received SIGHUP, disconnecting pageserver. New pageserver connstring is %s", page_server_connstring);
/* force refetching shard map from control plane */
LWLockAcquire(shard_map_lock, LW_EXCLUSIVE);
shard_map->n_shards = 0;
LWLockRelease(shard_map_lock);
}
static bool
pageserver_connect(int elevel)
pageserver_connect(shardno_t shard_no, int elevel)
{
char *query;
int ret;
const char *keywords[3];
const char *values[3];
int n;
PGconn* conn;
WaitEventSet *wes;
Assert(!connected);
if(CheckConnstringUpdated())
{
ReloadConnstring();
}
Assert(page_servers[shard_no].conn == NULL);
/*
* Connect using the connection string we got from the
@@ -158,19 +232,18 @@ pageserver_connect(int elevel)
n++;
}
keywords[n] = "dbname";
values[n] = local_pageserver_connstring;
values[n] = shard_map->shard_connstr[shard_no];
n++;
keywords[n] = NULL;
values[n] = NULL;
n++;
pageserver_conn = PQconnectdbParams(keywords, values, 1);
conn = PQconnectdbParams(keywords, values, 1);
if (PQstatus(pageserver_conn) == CONNECTION_BAD)
if (PQstatus(conn) == CONNECTION_BAD)
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
char *msg = pchomp(PQerrorMessage(conn));
PQfinish(pageserver_conn);
pageserver_conn = NULL;
PQfinish(conn);
ereport(elevel,
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
@@ -178,30 +251,28 @@ pageserver_connect(int elevel)
errdetail_internal("%s", msg)));
return false;
}
query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
ret = PQsendQuery(pageserver_conn, query);
ret = PQsendQuery(conn, query);
if (ret != 1)
{
PQfinish(pageserver_conn);
pageserver_conn = NULL;
PQfinish(conn);
neon_log(elevel, "could not send pagestream command to pageserver");
return false;
}
pageserver_conn_wes = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(pageserver_conn_wes, WL_LATCH_SET, PGINVALID_SOCKET,
wes = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(pageserver_conn_wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
AddWaitEventToSet(wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
AddWaitEventToSet(pageserver_conn_wes, WL_SOCKET_READABLE, PQsocket(pageserver_conn), NULL, NULL);
AddWaitEventToSet(wes, WL_SOCKET_READABLE, PQsocket(conn), NULL, NULL);
while (PQisBusy(pageserver_conn))
while (PQisBusy(conn))
{
WaitEvent event;
/* Sleep until there's something to do */
(void) WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
(void) WaitEventSetWait(wes, -1L, &event, 1, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
@@ -209,14 +280,12 @@ pageserver_connect(int elevel)
/* Data available in socket? */
if (event.events & WL_SOCKET_READABLE)
{
if (!PQconsumeInput(pageserver_conn))
if (!PQconsumeInput(conn))
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
char *msg = pchomp(PQerrorMessage(conn));
PQfinish(pageserver_conn);
pageserver_conn = NULL;
FreeWaitEventSet(pageserver_conn_wes);
pageserver_conn_wes = NULL;
PQfinish(conn);
FreeWaitEventSet(wes);
neon_log(elevel, "could not complete handshake with pageserver: %s",
msg);
@@ -225,9 +294,10 @@ pageserver_connect(int elevel)
}
}
neon_log(LOG, "libpagestore: connected to '%s'", page_server_connstring);
neon_log(LOG, "libpagestore: connected to '%s'", shard_map->shard_connstr[shard_no]);
page_servers[shard_no].conn = conn;
page_servers[shard_no].wes = wes;
connected = true;
return true;
}
@@ -235,10 +305,10 @@ pageserver_connect(int elevel)
* A wrapper around PQgetCopyData that checks for interrupts while sleeping.
*/
static int
call_PQgetCopyData(char **buffer)
call_PQgetCopyData(shardno_t shard_no, char **buffer)
{
int ret;
PGconn* pageserver_conn = page_servers[shard_no].conn;
retry:
ret = PQgetCopyData(pageserver_conn, buffer, 1 /* async */ );
@@ -247,7 +317,7 @@ retry:
WaitEvent event;
/* Sleep until there's something to do */
(void) WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
(void) WaitEventSetWait(page_servers[shard_no].wes, -1L, &event, 1, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
@@ -272,7 +342,7 @@ retry:
static void
pageserver_disconnect(void)
pageserver_disconnect(shardno_t shard_no)
{
/*
* If anything goes wrong while we were sending a request, it's not clear
@@ -281,38 +351,32 @@ pageserver_disconnect(void)
* time later after we have already sent a new unrelated request. Close
* the connection to avoid getting confused.
*/
if (connected)
if (page_servers[shard_no].conn)
{
neon_log(LOG, "dropping connection to page server due to error");
PQfinish(pageserver_conn);
pageserver_conn = NULL;
connected = false;
PQfinish(page_servers[shard_no].conn);
page_servers[shard_no].conn = NULL;
prefetch_on_ps_disconnect();
}
if (pageserver_conn_wes != NULL)
if (page_servers[shard_no].wes != NULL)
{
FreeWaitEventSet(pageserver_conn_wes);
pageserver_conn_wes = NULL;
FreeWaitEventSet(page_servers[shard_no].wes);
page_servers[shard_no].wes = NULL;
}
}
static bool
pageserver_send(NeonRequest * request)
pageserver_send(shardno_t shard_no, NeonRequest * request)
{
StringInfoData req_buff;
if(CheckConnstringUpdated())
{
pageserver_disconnect();
ReloadConnstring();
}
PGconn* pageserver_conn = page_servers[shard_no].conn;
/* If the connection was lost for some reason, reconnect */
if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD)
if (pageserver_conn && PQstatus(pageserver_conn) == CONNECTION_BAD)
{
neon_log(LOG, "pageserver_send disconnect bad connection");
pageserver_disconnect();
pageserver_disconnect(shard_no);
}
req_buff = nm_pack_request(request);
@@ -324,18 +388,19 @@ pageserver_send(NeonRequest * request)
* See https://github.com/neondatabase/neon/issues/1138
* So try to reestablish connection in case of failure.
*/
if (!connected)
if (!page_servers[shard_no].conn)
{
while (!pageserver_connect(n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
while (!pageserver_connect(shard_no, n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
{
HandleMainLoopInterrupts();
n_reconnect_attempts += 1;
pg_usleep(RECONNECT_INTERVAL_USEC);
}
n_reconnect_attempts = 0;
}
/*
pageserver_conn = page_servers[shard_no].conn;
/*
* Send request.
*
* In principle, this could block if the output buffer is full, and we
@@ -346,7 +411,7 @@ pageserver_send(NeonRequest * request)
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0)
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect();
pageserver_disconnect(shard_no);
neon_log(LOG, "pageserver_send disconnect because failed to send page request (try to reconnect): %s", msg);
pfree(msg);
pfree(req_buff.data);
@@ -366,12 +431,12 @@ pageserver_send(NeonRequest * request)
}
static NeonResponse *
pageserver_receive(void)
pageserver_receive(shardno_t shard_no)
{
StringInfoData resp_buff;
NeonResponse *resp;
if (!connected)
PGconn* pageserver_conn = page_servers[shard_no].conn;
if (!pageserver_conn)
return NULL;
PG_TRY();
@@ -379,7 +444,7 @@ pageserver_receive(void)
/* read response */
int rc;
rc = call_PQgetCopyData(&resp_buff.data);
rc = call_PQgetCopyData(shard_no, &resp_buff.data);
if (rc >= 0)
{
resp_buff.len = rc;
@@ -398,25 +463,25 @@ pageserver_receive(void)
else if (rc == -1)
{
neon_log(LOG, "pageserver_receive disconnect because call_PQgetCopyData returns -1: %s", pchomp(PQerrorMessage(pageserver_conn)));
pageserver_disconnect();
pageserver_disconnect(shard_no);
resp = NULL;
}
else if (rc == -2)
{
char* msg = pchomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect();
pageserver_disconnect(shard_no);
neon_log(ERROR, "pageserver_receive disconnect because could not read COPY data: %s", msg);
}
else
{
pageserver_disconnect();
pageserver_disconnect(shard_no);
neon_log(ERROR, "pageserver_receive disconnect because unexpected PQgetCopyData return value: %d", rc);
}
}
PG_CATCH();
{
neon_log(LOG, "pageserver_receive disconnect due to caught exception");
pageserver_disconnect();
pageserver_disconnect(shard_no);
PG_RE_THROW();
}
PG_END_TRY();
@@ -426,9 +491,10 @@ pageserver_receive(void)
static bool
pageserver_flush(void)
pageserver_flush(shardno_t shard_no)
{
if (!connected)
PGconn* pageserver_conn = page_servers[shard_no].conn;
if (!pageserver_conn)
{
neon_log(WARNING, "Tried to flush while disconnected");
}
@@ -437,7 +503,7 @@ pageserver_flush(void)
if (PQflush(pageserver_conn))
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect();
pageserver_disconnect(shard_no);
neon_log(LOG, "pageserver_flush disconnect because failed to flush page requests: %s", msg);
pfree(msg);
return false;
@@ -446,8 +512,7 @@ pageserver_flush(void)
return true;
}
page_server_api api =
{
page_server_api api = {
.send = pageserver_send,
.flush = pageserver_flush,
.receive = pageserver_receive
@@ -461,72 +526,12 @@ check_neon_id(char **newval, void **extra, GucSource source)
return **newval == '\0' || HexDecodeString(id, *newval, 16);
}
static Size
PagestoreShmemSize(void)
{
return sizeof(PagestoreShmemState);
}
static bool
PagestoreShmemInit(void)
{
bool found;
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
pagestore_shared = ShmemInitStruct("libpagestore shared state",
PagestoreShmemSize(),
&found);
if(!found)
{
pagestore_shared->lock = &(GetNamedLWLockTranche("neon_libpagestore")->lock);
pg_atomic_init_u64(&pagestore_shared->update_counter, 0);
AssignPageserverConnstring(page_server_connstring, NULL);
}
LWLockRelease(AddinShmemInitLock);
return found;
}
static void
pagestore_shmem_startup_hook(void)
{
if(prev_shmem_startup_hook)
prev_shmem_startup_hook();
PagestoreShmemInit();
}
static void
pagestore_shmem_request(void)
{
#if PG_VERSION_NUM >= 150000
if(prev_shmem_request_hook)
prev_shmem_request_hook();
#endif
RequestAddinShmemSpace(PagestoreShmemSize());
RequestNamedLWLockTranche("neon_libpagestore", 1);
}
static void
pagestore_prepare_shmem(void)
{
#if PG_VERSION_NUM >= 150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = pagestore_shmem_request;
#else
pagestore_shmem_request();
#endif
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = pagestore_shmem_startup_hook;
}
/*
* Module initialization function
*/
void
pg_init_libpagestore(void)
{
pagestore_prepare_shmem();
DefineCustomStringVariable("neon.pageserver_connstring",
"connection string to the page server",
NULL,
@@ -534,7 +539,7 @@ pg_init_libpagestore(void)
"",
PGC_SIGHUP,
0, /* no flags required */
CheckPageserverConnstring, AssignPageserverConnstring, NULL);
NULL, NULL, NULL);
DefineCustomStringVariable("neon.timeline_id",
"Neon timeline_id the server is running on",
@@ -615,5 +620,8 @@ pg_init_libpagestore(void)
redo_read_buffer_filter = neon_redo_read_buffer_filter;
}
prev_signal_handler = pqsignal(SIGHUP, pageserver_sighup_handler);
lfc_init();
psm_init();
}

View File

@@ -20,12 +20,25 @@
#include RELFILEINFO_HDR
#include "storage/block.h"
#include "storage/smgr.h"
#include "storage/buf_internals.h"
#include "lib/stringinfo.h"
#include "libpq/pqformat.h"
#include "utils/memutils.h"
#include "pg_config.h"
#define MAX_SHARDS 128
#define STRIPE_SIZE (256 * 1024 / 8) /* TODO: should in betaken from control plane? */
#define MAX_PS_CONNSTR_LEN 128
typedef struct
{
size_t n_shards;
size_t update_counter;
char shard_connstr[MAX_SHARDS][MAX_PS_CONNSTR_LEN];
} ShardMap;
typedef enum
{
/* pagestore_client -> pagestore */
@@ -144,11 +157,13 @@ extern char *nm_to_string(NeonMessage * msg);
* API
*/
typedef unsigned shardno_t;
typedef struct
{
bool (*send) (NeonRequest * request);
NeonResponse *(*receive) (void);
bool (*flush) (void);
bool (*send) (shardno_t shard_no, NeonRequest * request);
NeonResponse *(*receive) (shardno_t shard_no);
bool (*flush) (shardno_t shard_no);
} page_server_api;
extern void prefetch_on_ps_disconnect(void);
@@ -165,6 +180,8 @@ extern char *neon_tenant;
extern bool wal_redo;
extern int32 max_cluster_size;
extern shardno_t get_shard_number(BufferTag* tag);
extern const f_smgr *smgr_neon(BackendId backend, NRelFileInfo rinfo);
extern void smgr_init_neon(void);
extern void readahead_buffer_resize(int newsize, void *extra);

View File

@@ -164,6 +164,7 @@ typedef struct PrefetchRequest {
XLogRecPtr actual_request_lsn;
NeonResponse *response; /* may be null */
PrefetchStatus status;
shardno_t shard_no;
uint64 my_ring_index;
} PrefetchRequest;
@@ -225,6 +226,8 @@ typedef struct PrefetchState {
/* the buffers */
prfh_hash *prf_hash;
int max_shard_no;
uint8 shard_bitmap[(MAX_SHARDS + 7)/8];
PrefetchRequest prf_buffer[]; /* prefetch buffers */
} PrefetchState;
@@ -313,6 +316,7 @@ compact_prefetch_buffers(void)
Assert(target_slot->status == PRFS_UNUSED);
target_slot->buftag = source_slot->buftag;
target_slot->shard_no = source_slot->shard_no;
target_slot->status = source_slot->status;
target_slot->response = source_slot->response;
target_slot->effective_request_lsn = source_slot->effective_request_lsn;
@@ -477,6 +481,23 @@ prefetch_cleanup_trailing_unused(void)
}
}
static bool
prefetch_flush_requests(void)
{
for (shardno_t shard_no = 0; shard_no < MyPState->max_shard_no; shard_no++)
{
if (MyPState->shard_bitmap[shard_no >> 3] & (1 << (shard_no & 7)))
{
if (!page_server->flush(shard_no))
return false;
MyPState->shard_bitmap[shard_no >> 3] &= ~(1 << (shard_no & 7));
}
}
MyPState->max_shard_no = 0;
return true;
}
/*
* Wait for slot of ring_index to have received its response.
* The caller is responsible for making sure the request buffer is flushed.
@@ -492,7 +513,7 @@ prefetch_wait_for(uint64 ring_index)
if (MyPState->ring_flush <= ring_index &&
MyPState->ring_unused > MyPState->ring_flush)
{
if (!page_server->flush())
if (!prefetch_flush_requests())
return false;
MyPState->ring_flush = MyPState->ring_unused;
}
@@ -530,7 +551,7 @@ prefetch_read(PrefetchRequest *slot)
Assert(slot->my_ring_index == MyPState->ring_receive);
old = MemoryContextSwitchTo(MyPState->errctx);
response = (NeonResponse *) page_server->receive();
response = (NeonResponse *) page_server->receive(slot->shard_no);
MemoryContextSwitchTo(old);
if (response)
{
@@ -682,12 +703,14 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
Assert(slot->response == NULL);
Assert(slot->my_ring_index == MyPState->ring_unused);
while (!page_server->send((NeonRequest *) &request));
while (!page_server->send(slot->shard_no, (NeonRequest *) &request));
/* update prefetch state */
MyPState->n_requests_inflight += 1;
MyPState->n_unused -= 1;
MyPState->ring_unused += 1;
MyPState->shard_bitmap[slot->shard_no >> 3] |= 1 << (slot->shard_no & 7);
MyPState->max_shard_no = Max(slot->shard_no+1, MyPState->max_shard_no);
/* update slot state */
slot->status = PRFS_REQUESTED;
@@ -847,6 +870,7 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
* function reads the buffer tag from the slot.
*/
slot->buftag = tag;
slot->shard_no = get_shard_number(&tag);
slot->my_ring_index = ring_index;
prefetch_do_request(slot, force_latest, force_lsn);
@@ -857,7 +881,7 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
if (flush_every_n_requests > 0 &&
MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests)
{
if (!page_server->flush())
if (!prefetch_flush_requests())
{
/* Prefetch set is reset in case of error, so we should try to register our request once again */
goto Retry;
@@ -872,11 +896,34 @@ static NeonResponse *
page_server_request(void const *req)
{
NeonResponse* resp;
BufferTag tag = {0};
shardno_t shard_no;
switch (((NeonRequest *) req)->tag)
{
case T_NeonExistsRequest:
CopyNRelFileInfoToBufTag(tag, ((NeonExistsRequest *) req)->rinfo);
break;
case T_NeonNblocksRequest:
CopyNRelFileInfoToBufTag(tag, ((NeonNblocksRequest *) req)->rinfo);
break;
case T_NeonDbSizeRequest:
NInfoGetDbOid(BufTagGetNRelFileInfo(tag)) = ((NeonDbSizeRequest *) req)->dbNode;
break;
case T_NeonGetPageRequest:
CopyNRelFileInfoToBufTag(tag, ((NeonGetPageRequest *) req)->rinfo);
tag.blockNum = ((NeonGetPageRequest *) req)->blkno;
break;
default:
elog(ERROR, "Unexpected request tag: %d", ((NeonRequest *) req)->tag);
}
shard_no = get_shard_number(&tag);
do {
while (!page_server->send((NeonRequest *) req) || !page_server->flush());
while (!page_server->send(shard_no, (NeonRequest *) req) || !page_server->flush(shard_no));
MyPState->ring_flush = MyPState->ring_unused;
consume_prefetch_responses();
resp = page_server->receive();
resp = page_server->receive(shard_no);
} while (resp == NULL);
return resp;

View File

@@ -41,6 +41,7 @@ toml_edit.workspace = true
tracing.workspace = true
url.workspace = true
metrics.workspace = true
pageserver_api.workspace = true
postgres_backend.workspace = true
postgres_ffi.workspace = true
pq_proto.workspace = true

View File

@@ -2,6 +2,7 @@
//! protocol commands.
use anyhow::Context;
use std::str::FromStr;
use std::str::{self};
use std::sync::Arc;
@@ -12,7 +13,7 @@ use crate::auth::check_permission;
use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
use crate::metrics::{TrafficMetrics, PG_QUERIES_FINISHED, PG_QUERIES_RECEIVED};
use crate::safekeeper::Term;
use crate::send_wal::ReplicationOptions;
use crate::timeline::TimelineError;
use crate::wal_service::ConnectionId;
use crate::{GlobalTimelines, SafeKeeperConf};
@@ -46,7 +47,7 @@ pub struct SafekeeperPostgresHandler {
/// Parsed Postgres command.
enum SafekeeperPostgresCommand {
StartWalPush,
StartReplication { start_lsn: Lsn, term: Option<Term> },
StartReplication(ReplicationOptions),
IdentifySystem,
TimelineStatus,
JSONCtrl { cmd: AppendLogicalMessage },
@@ -58,7 +59,7 @@ fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
} else if cmd.starts_with("START_REPLICATION") {
let re = Regex::new(
// We follow postgres START_REPLICATION LOGICAL options to pass term.
r"START_REPLICATION(?: SLOT [^ ]+)?(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)(?: \(term='(\d+)'\))?",
r"START_REPLICATION(?: SLOT [^ ]+)?(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)(?: \(term='(\d+)'\))?(?: \(shard=(.+)\))?",
)
.unwrap();
let caps = re
@@ -71,7 +72,18 @@ fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
} else {
None
};
Ok(SafekeeperPostgresCommand::StartReplication { start_lsn, term })
let shard = if let Some(m) = caps.get(3) {
Some(serde_json::from_str(m.as_str())?)
} else {
None
};
Ok(SafekeeperPostgresCommand::StartReplication(
ReplicationOptions {
start_lsn,
term,
shard,
},
))
} else if cmd.starts_with("IDENTIFY_SYSTEM") {
Ok(SafekeeperPostgresCommand::IdentifySystem)
} else if cmd.starts_with("TIMELINE_STATUS") {
@@ -86,7 +98,7 @@ fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
}
}
fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str {
fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &'static str {
match cmd {
SafekeeperPostgresCommand::StartWalPush => "START_WAL_PUSH",
SafekeeperPostgresCommand::StartReplication { .. } => "START_REPLICATION",
@@ -228,8 +240,8 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
.instrument(info_span!("WAL receiver"))
.await
}
SafekeeperPostgresCommand::StartReplication { start_lsn, term } => {
self.handle_start_replication(pgb, start_lsn, term)
SafekeeperPostgresCommand::StartReplication(opts) => {
self.handle_start_replication(pgb, opts)
.instrument(info_span!("WAL sender"))
.await
}

View File

@@ -27,6 +27,7 @@ pub mod recovery;
pub mod remove_wal;
pub mod safekeeper;
pub mod send_wal;
pub mod send_wal_sharded;
pub mod timeline;
pub mod wal_backup;
pub mod wal_service;

View File

@@ -6,13 +6,15 @@ use crate::safekeeper::{Term, TermLsn};
use crate::timeline::Timeline;
use crate::wal_service::ConnectionId;
use crate::wal_storage::WalReader;
use crate::GlobalTimelines;
use crate::{send_wal_sharded, GlobalTimelines};
use anyhow::{bail, Context as AnyhowContext};
use bytes::Bytes;
use pageserver_api::shard::ShardIdentity;
use parking_lot::Mutex;
use postgres_backend::PostgresBackend;
use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError};
use postgres_ffi::get_current_timestamp;
use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
use serde::{Deserialize, Serialize};
@@ -31,6 +33,12 @@ use tokio::time::timeout;
use tracing::*;
use utils::{bin_ser::BeSer, lsn::Lsn};
pub struct ReplicationOptions {
pub start_lsn: Lsn,
pub term: Option<Term>,
pub shard: Option<ShardIdentity>,
}
// See: https://www.postgresql.org/docs/13/protocol-replication.html
const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h';
const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
@@ -349,6 +357,22 @@ impl Drop for WalSenderGuard {
}
}
impl WalSenderGuard {
pub async fn should_stop(&self, tli: &Arc<Timeline>) -> bool {
if let Some(remote_consistent_lsn) = self.walsenders.get_ws_remote_consistent_lsn(self.id) {
if tli.should_walsender_stop(remote_consistent_lsn).await {
// Terminate if there is nothing more to send.
// Note that "ending streaming" part of the string is used by
// pageserver to identify WalReceiverError::SuccessfulCompletion,
// do not change this string without updating pageserver.
return true;
}
}
false
}
}
impl SafekeeperPostgresHandler {
/// Wrapper around handle_start_replication_guts handling result. Error is
/// handled here while we're still in walsender ttid span; with API
@@ -356,13 +380,9 @@ impl SafekeeperPostgresHandler {
pub async fn handle_start_replication<IO: AsyncRead + AsyncWrite + Unpin>(
&mut self,
pgb: &mut PostgresBackend<IO>,
start_pos: Lsn,
term: Option<Term>,
opts: ReplicationOptions,
) -> Result<(), QueryError> {
if let Err(end) = self
.handle_start_replication_guts(pgb, start_pos, term)
.await
{
if let Err(end) = self.handle_start_replication_guts(pgb, opts).await {
// Log the result and probably send it to the client, closing the stream.
pgb.handle_copy_stream_end(end).await;
}
@@ -372,12 +392,12 @@ impl SafekeeperPostgresHandler {
pub async fn handle_start_replication_guts<IO: AsyncRead + AsyncWrite + Unpin>(
&mut self,
pgb: &mut PostgresBackend<IO>,
start_pos: Lsn,
term: Option<Term>,
opts: ReplicationOptions,
) -> Result<(), CopyStreamHandlerEnd> {
let appname = self.appname.clone();
let tli =
GlobalTimelines::get(self.ttid).map_err(|e| CopyStreamHandlerEnd::Other(e.into()))?;
let start_pos = opts.start_lsn;
// Use a guard object to remove our entry from the timeline when we are done.
let ws_guard = Arc::new(tli.get_walsenders().register(
@@ -415,11 +435,13 @@ impl SafekeeperPostgresHandler {
}
info!(
"starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}",
"starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}, addr={}, shard={:?}",
start_pos,
end_pos,
matches!(end_watch, EndWatch::Flush(_)),
appname
appname,
pgb.get_peer_addr(),
opts.shard,
);
// switch to copy
@@ -438,23 +460,49 @@ impl SafekeeperPostgresHandler {
// not synchronized with sends, so this avoids deadlocks.
let reader = pgb.split().context("START_REPLICATION split")?;
let mut sender = WalSender {
pgb,
tli: tli.clone(),
appname,
start_pos,
end_pos,
term,
end_watch,
ws_guard: ws_guard.clone(),
wal_reader,
send_buf: [0; MAX_SEND_SIZE],
let ws_guard_clone = ws_guard.clone();
let sender_future = async {
if let Some(_shard) = opts.shard {
send_wal_sharded::WalSender {
pgb,
tli: tli.clone(),
appname,
start_pos,
end_pos,
term: opts.term,
end_watch,
ws_guard: ws_guard_clone,
wal_reader,
send_buf: [0; MAX_SEND_SIZE],
waldecoder: WalStreamDecoder::new(
start_pos,
tli.get_state().await.1.server.pg_version / 10000,
),
}
.run()
.await
} else {
WalSender {
pgb,
tli: tli.clone(),
appname,
start_pos,
end_pos,
term: opts.term,
end_watch,
ws_guard: ws_guard_clone,
wal_reader,
send_buf: [0; MAX_SEND_SIZE],
}
.run()
.await
}
};
let mut reply_reader = ReplyReader { reader, ws_guard };
let res = tokio::select! {
// todo: add read|write .context to these errors
r = sender.run() => r,
r = sender_future => r,
r = reply_reader.run() => r,
};
// Join pg backend back.
@@ -466,14 +514,14 @@ impl SafekeeperPostgresHandler {
/// Walsender streams either up to commit_lsn (normally) or flush_lsn in the
/// given term (recovery by walproposer or peer safekeeper).
enum EndWatch {
pub enum EndWatch {
Commit(Receiver<Lsn>),
Flush(Receiver<TermLsn>),
}
impl EndWatch {
/// Get current end of WAL.
fn get(&self) -> Lsn {
pub fn get(&self) -> Lsn {
match self {
EndWatch::Commit(r) => *r.borrow(),
EndWatch::Flush(r) => r.borrow().lsn,
@@ -481,7 +529,7 @@ impl EndWatch {
}
/// Wait for the update.
async fn changed(&mut self) -> anyhow::Result<()> {
pub async fn changed(&mut self) -> anyhow::Result<()> {
match self {
EndWatch::Commit(r) => r.changed().await?,
EndWatch::Flush(r) => r.changed().await?,
@@ -598,21 +646,11 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
// Check for termination only if we are streaming up to commit_lsn
// (to pageserver).
if let EndWatch::Commit(_) = self.end_watch {
if let Some(remote_consistent_lsn) = self
.ws_guard
.walsenders
.get_ws_remote_consistent_lsn(self.ws_guard.id)
{
if self.tli.should_walsender_stop(remote_consistent_lsn).await {
// Terminate if there is nothing more to send.
// Note that "ending streaming" part of the string is used by
// pageserver to identify WalReceiverError::SuccessfulCompletion,
// do not change this string without updating pageserver.
return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
if self.ws_guard.should_stop(&self.tli).await {
return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
"ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
self.appname, self.start_pos,
)));
}
}
}
@@ -685,7 +723,7 @@ const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
/// - Ok(None) if timeout expired;
/// - Err in case of error -- only if 1) term changed while fetching in recovery
/// mode 2) watch channel closed, which must never happen.
async fn wait_for_lsn(
pub async fn wait_for_lsn(
rx: &mut EndWatch,
client_term: Option<Term>,
start_pos: Lsn,

View File

@@ -0,0 +1,160 @@
use std::{cmp::min, sync::Arc};
use anyhow::Context;
use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend};
use postgres_ffi::{get_current_timestamp, waldecoder::WalStreamDecoder, MAX_SEND_SIZE};
use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{trace};
use utils::lsn::Lsn;
use crate::{
safekeeper::Term,
send_wal::{wait_for_lsn, EndWatch, WalSenderGuard},
timeline::Timeline,
wal_storage::WalReader,
};
/// A half driving sending WAL.
pub struct WalSender<'a, IO> {
pub pgb: &'a mut PostgresBackend<IO>,
pub tli: Arc<Timeline>,
pub appname: Option<String>,
// Position since which we are sending next chunk.
pub start_pos: Lsn,
// WAL up to this position is known to be locally available.
// Usually this is the same as the latest commit_lsn, but in case of
// walproposer recovery, this is flush_lsn.
//
// We send this LSN to the receiver as wal_end, so that it knows how much
// WAL this safekeeper has. This LSN should be as fresh as possible.
pub end_pos: Lsn,
/// When streaming uncommitted part, the term the client acts as the leader
/// in. Streaming is stopped if local term changes to a different (higher)
/// value.
pub term: Option<Term>,
/// Watch channel receiver to learn end of available WAL (and wait for its advancement).
pub end_watch: EndWatch,
pub ws_guard: Arc<WalSenderGuard>,
pub wal_reader: WalReader,
// buffer for readling WAL into to send it
pub send_buf: [u8; MAX_SEND_SIZE],
pub waldecoder: WalStreamDecoder,
}
impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
/// Send WAL until
/// - an error occurs
/// - receiver is caughtup and there is no computes (if streaming up to commit_lsn)
///
/// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
/// convenience.
pub async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
loop {
// Wait for the next portion if it is not there yet, or just
// update our end of WAL available for sending value, we
// communicate it to the receiver.
self.wait_wal().await?;
assert!(
self.end_pos > self.start_pos,
"nothing to send after waiting for WAL"
);
// try to send as much as available, capped by MAX_SEND_SIZE
let mut send_size = self
.end_pos
.checked_sub(self.start_pos)
.context("reading wal without waiting for it first")?
.0 as usize;
send_size = min(send_size, self.send_buf.len());
let send_buf = &mut self.send_buf[..send_size];
let send_size: usize;
{
// If uncommitted part is being pulled, check that the term is
// still the expected one.
let _term_guard = if let Some(t) = self.term {
Some(self.tli.acquire_term(t).await?)
} else {
None
};
// read wal into buffer
send_size = self.wal_reader.read(send_buf).await?
};
let send_buf = &send_buf[..send_size];
// feed waldecoder with the data
self.waldecoder.feed_bytes(send_buf);
self.start_pos += send_size as u64;
while let Some((lsn, recdata)) =
self.waldecoder.poll_decode().context("wal decoding")?
{
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
// at risk of hitting a deadlock.
if !lsn.is_aligned() {
return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
"unaligned record at {}",
lsn
)));
}
trace!(
"read record of {} bytes of WAL ending at {}",
recdata.len(),
lsn
);
// and send it
self.pgb
.write_message(&BeMessage::XLogData(XLogDataBody {
wal_start: lsn.0,
wal_end: self.end_pos.0,
timestamp: get_current_timestamp(),
data: &recdata,
}))
.await?;
}
}
}
/// wait until we have WAL to stream, sending keepalives and checking for
/// exit in the meanwhile
async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
loop {
self.end_pos = self.end_watch.get();
if self.end_pos > self.start_pos {
// We have something to send.
trace!("got end_pos {:?}, streaming", self.end_pos);
return Ok(());
}
// Wait for WAL to appear, now self.end_pos == self.start_pos.
if let Some(lsn) = wait_for_lsn(&mut self.end_watch, self.term, self.start_pos).await? {
self.end_pos = lsn;
trace!("got end_pos {:?}, streaming", self.end_pos);
return Ok(());
}
// Timed out waiting for WAL, check for termination and send KA.
// Check for termination only if we are streaming up to commit_lsn
// (to pageserver).
if let EndWatch::Commit(_) = self.end_watch {
if self.ws_guard.should_stop(&self.tli).await {
return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
"ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
self.appname, self.start_pos,
)));
}
}
self.pgb
.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
wal_end: self.end_pos.0,
timestamp: get_current_timestamp(),
request_reply: true,
}))
.await?;
}
}
}

View File

@@ -1871,6 +1871,8 @@ def append_pageserver_param_overrides(
params_to_update.append(
f"--pageserver-config-override=remote_storage={remote_storage_toml_table}"
)
else:
params_to_update.append('--pageserver-config-override=remote_storage=""')
env_overrides = os.getenv("NEON_PAGESERVER_OVERRIDES")
if env_overrides is not None:

View File

@@ -1,13 +1,9 @@
import asyncio
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.remote_storage import RemoteStorageKind
def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
num_connections = 3
neon_env_builder.num_pageservers = 2
neon_env_builder.enable_pageserver_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
@@ -20,24 +16,15 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
alt_pageserver_id = env.pageservers[1].id
env.pageservers[1].tenant_attach(env.initial_tenant)
pg_conns = [endpoint.connect() for i in range(num_connections)]
curs = [pg_conn.cursor() for pg_conn in pg_conns]
def execute(statement: str):
for cur in curs:
cur.execute(statement)
def fetchone():
results = [cur.fetchone() for cur in curs]
assert all(result == results[0] for result in results)
return results[0]
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
# Create table, and insert some rows. Make it big enough that it doesn't fit in
# shared_buffers, otherwise the SELECT after restart will just return answer
# from shared_buffers without hitting the page server, which defeats the point
# of this test.
curs[0].execute("CREATE TABLE foo (t text)")
curs[0].execute(
cur.execute("CREATE TABLE foo (t text)")
cur.execute(
"""
INSERT INTO foo
SELECT 'long string to consume some space' || g
@@ -46,25 +33,25 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
)
# Verify that the table is larger than shared_buffers
curs[0].execute(
cur.execute(
"""
select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_size
from pg_settings where name = 'shared_buffers'
"""
)
row = curs[0].fetchone()
row = cur.fetchone()
assert row is not None
log.info(f"shared_buffers is {row[0]}, table size {row[1]}")
assert int(row[0]) < int(row[1])
execute("SELECT count(*) FROM foo")
assert fetchone() == (100000,)
cur.execute("SELECT count(*) FROM foo")
assert cur.fetchone() == (100000,)
endpoint.reconfigure(pageserver_id=alt_pageserver_id)
# Verify that the neon.pageserver_connstring GUC is set to the correct thing
execute("SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'")
connstring = fetchone()
cur.execute("SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'")
connstring = cur.fetchone()
assert connstring is not None
expected_connstring = f"postgresql://no_user:@localhost:{env.pageservers[1].service_port.pg}"
assert expected_connstring == expected_connstring
@@ -73,45 +60,5 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
0
].stop() # Stop the old pageserver just to make sure we're reading from the new one
execute("SELECT count(*) FROM foo")
assert fetchone() == (100000,)
# Try failing back, and this time we will stop the current pageserver before reconfiguring
# the endpoint. Whereas the previous reconfiguration was like a healthy migration, this
# is more like what happens in an unexpected pageserver failure.
env.pageservers[0].start()
env.pageservers[1].stop()
endpoint.reconfigure(pageserver_id=env.pageservers[0].id)
execute("SELECT count(*) FROM foo")
assert fetchone() == (100000,)
env.pageservers[0].stop()
env.pageservers[1].start()
# Test a (former) bug where a child process spins without updating its connection string
# by executing a query separately. This query will hang until we issue the reconfigure.
async def reconfigure_async():
await asyncio.sleep(
1
) # Sleep for 1 second just to make sure we actually started our count(*) query
endpoint.reconfigure(pageserver_id=env.pageservers[1].id)
def execute_count():
execute("SELECT count(*) FROM FOO")
async def execute_and_reconfigure():
task_exec = asyncio.to_thread(execute_count)
task_reconfig = asyncio.create_task(reconfigure_async())
await asyncio.gather(
task_exec,
task_reconfig,
)
asyncio.run(execute_and_reconfigure())
assert fetchone() == (100000,)
# One final check that nothing hangs
execute("SELECT count(*) FROM foo")
assert fetchone() == (100000,)
cur.execute("SELECT count(*) FROM foo")
assert cur.fetchone() == (100000,)