Compare commits

...

22 Commits

Author SHA1 Message Date
Konstantin Knizhnik
66fdb54d8d Merge with #5837 2023-12-05 12:05:27 +00:00
Konstantin Knizhnik
21c3c55e6b Fix flushing prefetch requests in page_server_request 2023-12-05 12:05:27 +00:00
John Spray
2c21c74ff4 DNM: script for sharding demo 2023-12-05 12:05:27 +00:00
John Spray
2d0930f622 neon_local: basic sharding support 2023-12-05 12:05:27 +00:00
John Spray
1518675bac postgres: use modified hash 2023-12-05 12:05:27 +00:00
Konstantin Knizhnik
e3c6fc3e51 compute: Add support for PS shardoing in compute 2023-12-05 12:05:27 +00:00
Konstantin Knizhnik
039f96c8a6 Add simple test for sharding 2023-12-05 12:05:27 +00:00
John Spray
8d50593e17 tests: enable running test_pg_regress with sharding 2023-12-05 12:05:27 +00:00
John Spray
f7795ee2a5 pageserver: filter WAL by ShardIdentity 2023-12-05 12:03:33 +00:00
John Spray
61fe9d360d pageserver: add Key->Shard mapping logic & use it in page service (#5980)
## Problem

When a pageserver receives a page service request identified by
TenantId, it must decide which `Tenant` object to route it to.

As in earlier PRs, this stuff is all a no-op for tenants with a single
shard: calls to `is_key_local` always return true without doing any
hashing on a single-shard ShardIdentity.

Closes: https://github.com/neondatabase/neon/issues/6026

## Summary of changes

- Carry immutable `ShardIdentity` objects in Tenant and Timeline. These
provide the information that Tenants/Timelines need to figure out which
shard is responsible for which Key.
- Augment `get_active_tenant_with_timeout` to take a `ShardSelector`
specifying how the shard should be resolved for this tenant. This mode
depends on the kind of request (e.g. basebackups always go to shard
zero).
- In `handle_get_page_at_lsn_request`, handle the case where the
Timeline we looked up at connection time is not the correct shard for
the page being requested. This can happen whenever one node holds
multiple shards for the same tenant. This is currently written as a
"slow path" with the optimistic expectation that usually we'll run with
one shard per pageserver, and the Timeline resolved at connection time
will be the one serving page requests. There is scope for optimization
here later, to avoid doing the full shard lookup for each page.
- Omit consumption metrics from nonzero shards: only the 0th shard is
responsible for tracing accurate relation sizes.

Note to reviewers:
- Testing of these changes is happening separately on the
`jcsp/sharding-pt1` branch, where we have hacked neon_local etc needed
to run a test_pg_regress.
- The main caveat to this implementation is that page service
connections still look up one Timeline when the connection is opened,
before they know which pages are going to be read. If there is one shard
per pageserver then this will always also be the Timeline that serves
page requests. However, if multiple shards are on one pageserver then
get page requests will incur the cost of looking up the correct Timeline
on each getpage request. We may look to improve this in future with a
"sticky" timeline per connection handler so that subsequent requests for
the same Timeline don't have to look up again, and/or by having postgres
pass a shard hint when connecting. This is tracked in the "Loose ends"
section of https://github.com/neondatabase/neon/issues/5507
2023-12-05 12:01:55 +00:00
Conrad Ludgate
f60e49fe8e proxy: fix panic in startup packet (#6032)
## Problem

Panic when less than 8 bytes is presented in a startup packet.

## Summary of changes

We need there to be a 4 byte message code, so the expected min length is
8.
2023-12-05 11:24:16 +01:00
Anna Khanova
c48918d329 Rename metric (#6030)
## Problem

It looks like because of reallocation of the buckets in previous PR, the
metric is broken in graphana.

## Summary of changes

Renamed the metric.
2023-12-05 10:03:07 +00:00
Sasha Krassovsky
bad686bb71 Remove trusted from wal2json (#6035)
## Problem

## Summary of changes
2023-12-04 21:10:23 +00:00
Alexey Kondratov
85d08581ed [compute_ctl] Introduce feature flags in the compute spec (#6016)
## Problem

In the past we've rolled out all new `compute_ctl` functionality right
to all users, which could be risky. I want to have a more fine-grained
control over what we enable, in which env and to which users.

## Summary of changes

Add an option to pass a list of feature flags to `compute_ctl`. If not
passed, it defaults to an empty list. Any unknown flags are ignored.

This allows us to release new experimental features safer, as we can
then flip the flag for one specific user, only Neon employees, free /
pro / etc. users and so on. Or control it per environment.

In the current implementation feature flags are passed via compute spec,
so they do not allow controlling behavior of `empty` computes. For them,
we can either stick with the previous approach, i.e. add separate cli
args or introduce a more generic `--features` cli argument.
2023-12-04 19:54:18 +01:00
Christian Schwarz
c7f1143e57 concurrency-limit low-priority initial logical size calculation [v2] (#6000)
Problem
-------

Before this PR, there was no concurrency limit on initial logical size
computations.

While logical size computations are lazy in theory, in practice
(production), they happen in a short timeframe after restart.

This means that on a PS with 20k tenants, we'd have up to 20k concurrent
initial logical size calculation requests.

This is self-inflicted needless overload.

This hasn't been a problem so far because the `.await` points on the
logical size calculation path never return `Pending`, hence we have a
natural concurrency limit of the number of executor threads.
But, as soon as we return `Pending` somewhere in the logical size
calculation path, other concurrent tasks get scheduled by tokio.
If these other tasks are also logical size calculations, they eventually
pound on the same bottleneck.

For example, in #5479, we want to switch the VirtualFile descriptor
cache to a `tokio::sync::RwLock`, which makes us return `Pending`, and
without measures like this patch, after PS restart, VirtualFile
descriptor cache thrashes heavily for 2 hours until all the logical size
calculations have been computed and the degree of concurrency /
concurrent VirtualFile operations is down to regular levels.
See the *Experiment* section below for details.

<!-- Experiments (see below) show that plain #5479 causes heavy
thrashing of the VirtualFile descriptor cache.
The high degree of concurrency is too much for 
In the case of #5479 the VirtualFile descriptor cache size starts
thrashing heavily.


-->

Background
----------

Before this PR, initial logical size calculation was spawned lazily on
first call to `Timeline::get_current_logical_size()`.

In practice (prod), the lazy calculation is triggered by
`WalReceiverConnectionHandler` if the timeline is active according to
storage broker, or by the first iteration of consumption metrics worker
after restart (`MetricsCollection`).

The spawns by walreceiver are high-priority because logical size is
needed by Safekeepers (via walreceiver `PageserverFeedback`) to enforce
the project logical size limit.
The spawns by metrics collection are not on the user-critical path and
hence low-priority. [^consumption_metrics_slo]

[^consumption_metrics_slo]: We can't delay metrics collection
indefintely because there are TBD internal SLOs tied to metrics
collection happening in a timeline manner
(https://github.com/neondatabase/cloud/issues/7408). But let's ignore
that in this issue.

The ratio of walreceiver-initiated spawns vs
consumption-metrics-initiated spawns can be reconstructed from logs
(`spawning logical size computation from context of task kind {:?}"`).
PR #5995 and #6018 adds metrics for this.

First investigation of the ratio lead to the discovery that walreceiver
spawns 75% of init logical size computations.
That's because of two bugs:
- In Safekeepers: https://github.com/neondatabase/neon/issues/5993
- In interaction between Pageservers and Safekeepers:
https://github.com/neondatabase/neon/issues/5962

The safekeeper bug is likely primarily responsible but we don't have the
data yet. The metrics will hopefully provide some insights.

When assessing production-readiness of this PR, please assume that
neither of these bugs are fixed yet.


Changes In This PR
------------------

With this PR, initial logical size calculation is reworked as follows:

First, all initial logical size calculation task_mgr tasks are started
early, as part of timeline activation, and run a retry loop with long
back-off until success. This removes the lazy computation; it was
needless complexity because in practice, we compute all logical sizes
anyways, because consumption metrics collects it.

Second, within the initial logical size calculation task, each attempt
queues behind the background loop concurrency limiter semaphore. This
fixes the performance issue that we pointed out in the "Problem" section
earlier.

Third, there is a twist to queuing behind the background loop
concurrency limiter semaphore. Logical size is needed by Safekeepers
(via walreceiver `PageserverFeedback`) to enforce the project logical
size limit. However, we currently do open walreceiver connections even
before we have an exact logical size. That's bad, and I'll build on top
of this PR to fix that
(https://github.com/neondatabase/neon/issues/5963). But, for the
purposes of this PR, we don't want to introduce a regression, i.e., we
don't want to provide an exact value later than before this PR. The
solution is to introduce a priority-boosting mechanism
(`GetLogicalSizePriority`), allowing callers of
`Timeline::get_current_logical_size` to specify how urgently they need
an exact value. The effect of specifying high urgency is that the
initial logical size calculation task for the timeline will skip the
concurrency limiting semaphore. This should yield effectively the same
behavior as we had before this PR with lazy spawning.

Last, the priority-boosting mechanism obsoletes the `init_order`'s grace
period for initial logical size calculations. It's a separate commit to
reduce the churn during review. We can drop that commit if people think
it's too much churn, and commit it later once we know this PR here
worked as intended.

Experiment With #5479 
---------------------

I validated this PR combined with #5479 to assess whether we're making
forward progress towards asyncification.

The setup is an `i3en.3xlarge` instance with 20k tenants, each with one
timeline that has 9 layers.
All tenants are inactive, i.e., not known to SKs nor storage broker.
This means all initial logical size calculations are spawned by
consumption metrics `MetricsCollection` task kind.
The consumption metrics worker starts requesting logical sizes at low
priority immediately after restart. This is achieved by deleting the
consumption metrics cache file on disk before starting
PS.[^consumption_metrics_cache_file]

[^consumption_metrics_cache_file] Consumption metrics worker persists
its interval across restarts to achieve persistent reporting intervals
across PS restarts; delete the state file on disk to get predictable
(and I believe worst-case in terms of concurrency during PS restart)
behavior.

Before this patch, all of these timelines would all do their initial
logical size calculation in parallel, leading to extreme thrashing in
page cache and virtual file cache.

With this patch, the virtual file cache thrashing is reduced
significantly (from 80k `open`-system-calls/second to ~500
`open`-system-calls/second during loading).


### Critique

The obvious critique with above experiment is that there's no skipping
of the semaphore, i.e., the priority-boosting aspect of this PR is not
exercised.

If even just 1% of our 20k tenants in the setup were active in
SK/storage_broker, then 200 logical size calculations would skip the
limiting semaphore immediately after restart and run concurrently.

Further critique: given the two bugs wrt timeline inactive vs active
state that were mentioned in the Background section, we could have 75%
of our 20k tenants being (falsely) active on restart.

So... (next section)

This Doesn't Make Us Ready For Async VirtualFile
------------------------------------------------

This PR is a step towards asynchronous `VirtualFile`, aka, #5479 or even
#4744.

But it doesn't yet enable us to ship #5479.

The reason is that this PR doesn't limit the amount of high-priority
logical size computations.
If there are many high-priority logical size calculations requested,
we'll fall over like we did if #5479 is applied without this PR.
And currently, at very least due to the bugs mentioned in the Background
section, we run thousands of high-priority logical size calculations on
PS startup in prod.

So, at a minimum, we need to fix these bugs.

Then we can ship #5479 and #4744, and things will likely be fine under
normal operation.

But in high-traffic situations, overload problems will still be more
likely to happen, e.g., VirtualFile cache descriptor thrashing.
The solution candidates for that are orthogonal to this PR though:
* global concurrency limiting
* per-tenant rate limiting => #5899
* load shedding
* scaling bottleneck resources (fd cache size (neondatabase/cloud#8351),
page cache size(neondatabase/cloud#8351), spread load across more PSes,
etc)

Conclusion
----------

Even with the remarks from in the previous section, we should merge this
PR because:
1. it's an improvement over the status quo (esp. if the aforementioned
bugs wrt timeline active / inactive are fixed)
2. it prepares the way for
https://github.com/neondatabase/neon/pull/6010
3. it gets us close to shipping #5479 and #4744
2023-12-04 17:22:26 +00:00
Christian Schwarz
7403d55013 walredo: stderr cleanup & make explicitly cancel safe (#6031)
# Problem

I need walredo to be cancellation-safe for
https://github.com/neondatabase/neon/pull/6000#discussion_r1412049728

# Solution

We are only `async fn` because of
`wait_for(stderr_logger_task_done).await`, added in #5560 .

The `stderr_logger_cancel` and `stderr_logger_task_done` were there out
of precaution that the stderr logger task might for some reason not stop
when the walredo process terminates.
That hasn't been a problem in practice.
So, simplify things:
- remove `stderr_logger_cancel` and the
`wait_for(...stderr_logger_task_done...)`
- use `tokio::process::ChildStderr` in the stderr logger task
- add metrics to track number of running stderr logger tasks so in case
I'm wrong here, we can use these metrics to identify the issue (not
planning to put them into a dashboard or anything)
2023-12-04 16:06:41 +00:00
Anna Khanova
12f02523a4 Enable dynamic rate limiter (#6029)
## Problem

Limit the number of open connections between the control plane and
proxy.

## Summary of changes

Enable dynamic rate limiter in prod.

Unfortunately the latency metrics are a bit broken, but from logs I see
that on staging for the past 7 days only 2 times latency for acquiring
was greater than 1ms (for most of the cases it's insignificant).
2023-12-04 15:00:24 +00:00
Arseny Sher
207c527270 Safekeepers: persist state before timeline deactivation.
Without it, sometimes on restart we lose latest remote_consistent_lsn which
leads to excessive ps -> sk reconnections.

https://github.com/neondatabase/neon/issues/5993
2023-12-04 18:22:36 +04:00
John Khvatov
eae49ff598 Perform L0 compaction before creating new image layers (#5950)
If there are too many L0 layers before compaction, the compaction
process becomes slow because of slow `Timeline::get`. As a result of the
slowdown, the pageserver will generate even more L0 layers for the next
iteration, further exacerbating the slow performance.

Change to perform L0 -> L1 compaction before creating new images. The
simple change speeds up compaction time and `Timeline::get` to 5x.
`Timeline::get` is faster on top of L1 layers.

Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-12-04 12:35:09 +00:00
Alexander Bayandin
e6b2f89fec test_pg_clients: fix test that reads from stdout (#6021)
## Problem

`test_pg_clients` reads the actual result from a *.stdout file,
https://github.com/neondatabase/neon/pull/5977 has added a header to
such files, so `test_pg_clients` started to fail.

## Summary of changes
- Use `capture_stdout` and compare the expected result with the output
instead of *.stdout file content
2023-12-04 11:18:41 +00:00
John Spray
1d81e70d60 pageserver: tweak logs for index_part loading (#6005)
## Problem

On pageservers upgraded to enable generations, these INFO level logs
were rather frequent. If a tenant timeline hasn't written new layers
since the upgrade, it will emit the "No index_part.json*" log every time
it starts.

## Summary of changes

- Downgrade two log lines from info to debug
- Add a tiny unit test that I wrote for sanity-checking that there
wasn't something wrong with our Generation-comparing logic when loading
index parts.
2023-12-04 09:57:47 +00:00
Anastasia Lubennikova
e3512340c1 Override neon.max_cluster_size for the time of compute_ctl (#5998)
Temporarily reset neon.max_cluster_size to avoid
the possibility of hitting the limit, while we are applying config:
creating new extensions, roles, etc...
2023-12-03 15:21:44 +00:00
51 changed files with 1654 additions and 770 deletions

View File

@@ -721,8 +721,7 @@ RUN wget https://github.com/eulerto/wal2json/archive/refs/tags/wal2json_2_5.tar.
echo "b516653575541cf221b99cf3f8be9b6821f6dbcfc125675c85f35090f824f00e wal2json_2_5.tar.gz" | sha256sum --check && \
mkdir wal2json-src && cd wal2json-src && tar xvzf ../wal2json_2_5.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/wal2json.control
make -j $(getconf _NPROCESSORS_ONLN) install
#########################################################################################
#

View File

@@ -22,7 +22,7 @@ use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::{ComputeMode, ComputeSpec};
use compute_api::spec::{ComputeFeature, ComputeMode, ComputeSpec};
use utils::measured_stream::MeasuredReader;
use remote_storage::{DownloadError, RemotePath};
@@ -277,6 +277,17 @@ fn create_neon_superuser(spec: &ComputeSpec, client: &mut Client) -> Result<()>
}
impl ComputeNode {
/// Check that compute node has corresponding feature enabled.
pub fn has_feature(&self, feature: ComputeFeature) -> bool {
let state = self.state.lock().unwrap();
if let Some(s) = state.pspec.as_ref() {
s.spec.features.contains(&feature)
} else {
false
}
}
pub fn set_status(&self, status: ComputeStatus) {
let mut state = self.state.lock().unwrap();
state.status = status;
@@ -728,7 +739,12 @@ impl ComputeNode {
// Write new config
let pgdata_path = Path::new(&self.pgdata);
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec, None)?;
let postgresql_conf_path = pgdata_path.join("postgresql.conf");
config::write_postgres_conf(&postgresql_conf_path, &spec, None)?;
// temporarily reset max_cluster_size in config
// to avoid the possibility of hitting the limit, while we are reconfiguring:
// creating new extensions, roles, etc...
config::compute_ctl_temp_override_create(pgdata_path, "neon.max_cluster_size=-1")?;
self.pg_reload_conf()?;
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
@@ -749,6 +765,10 @@ impl ComputeNode {
// 'Close' connection
drop(client);
// reset max_cluster_size in config back to original value and reload config
config::compute_ctl_temp_override_remove(pgdata_path)?;
self.pg_reload_conf()?;
let unknown_op = "unknown".to_string();
let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op);
info!(
@@ -809,7 +829,17 @@ impl ComputeNode {
let config_time = Utc::now();
if pspec.spec.mode == ComputeMode::Primary && !pspec.spec.skip_pg_catalog_updates {
let pgdata_path = Path::new(&self.pgdata);
// temporarily reset max_cluster_size in config
// to avoid the possibility of hitting the limit, while we are applying config:
// creating new extensions, roles, etc...
config::compute_ctl_temp_override_create(pgdata_path, "neon.max_cluster_size=-1")?;
self.pg_reload_conf()?;
self.apply_config(&compute_state)?;
config::compute_ctl_temp_override_remove(pgdata_path)?;
self.pg_reload_conf()?;
}
let startup_end_time = Utc::now();

View File

@@ -93,5 +93,25 @@ pub fn write_postgres_conf(
writeln!(file, "neon.extension_server_port={}", port)?;
}
// This is essential to keep this line at the end of the file,
// because it is intended to override any settings above.
writeln!(file, "include_if_exists = 'compute_ctl_temp_override.conf'")?;
Ok(())
}
/// create file compute_ctl_temp_override.conf in pgdata_dir
/// add provided options to this file
pub fn compute_ctl_temp_override_create(pgdata_path: &Path, options: &str) -> Result<()> {
let path = pgdata_path.join("compute_ctl_temp_override.conf");
let mut file = File::create(path)?;
write!(file, "{}", options)?;
Ok(())
}
/// remove file compute_ctl_temp_override.conf in pgdata_dir
pub fn compute_ctl_temp_override_remove(pgdata_path: &Path) -> Result<()> {
let path = pgdata_path.join("compute_ctl_temp_override.conf");
std::fs::remove_file(path)?;
Ok(())
}

View File

@@ -118,19 +118,6 @@ pub fn get_spec_from_control_plane(
spec
}
/// It takes cluster specification and does the following:
/// - Serialize cluster config and put it into `postgresql.conf` completely rewriting the file.
/// - Update `pg_hba.conf` to allow external connections.
pub fn handle_configuration(spec: &ComputeSpec, pgdata_path: &Path) -> Result<()> {
// File `postgresql.conf` is no longer included into `basebackup`, so just
// always write all config into it creating new file.
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec, None)?;
update_pg_hba(pgdata_path)?;
Ok(())
}
/// Check `pg_hba.conf` and update if needed to allow external connections.
pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
// XXX: consider making it a part of spec.json

View File

@@ -15,7 +15,8 @@ use control_plane::pageserver::{PageServerNode, PAGESERVER_REMOTE_STORAGE_DIR};
use control_plane::safekeeper::SafekeeperNode;
use control_plane::tenant_migration::migrate_tenant;
use control_plane::{broker, local_env};
use pageserver_api::models::TimelineInfo;
use pageserver_api::models::{LocationConfig, LocationConfigMode, TimelineInfo};
use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
use pageserver_api::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
@@ -26,6 +27,7 @@ use safekeeper_api::{
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
};
use std::collections::{BTreeSet, HashMap};
use std::num::ParseIntError;
use std::path::PathBuf;
use std::process::exit;
use std::str::FromStr;
@@ -97,6 +99,22 @@ struct TimelineTreeEl {
pub children: BTreeSet<TimelineId>,
}
/// Helper for CLI args that contain a comma-separate list of NodeId
fn parse_ids_arg(
matches: &ArgMatches,
arg: &str,
) -> Result<Option<Vec<NodeId>>, std::num::ParseIntError> {
if let Some(id_str) = matches.get_one::<String>(arg) {
let r: Result<Vec<_>, ParseIntError> = id_str
.split(',')
.map(|ps_id| u64::from_str(str::trim(ps_id)).map(NodeId))
.collect();
r.map(Some)
} else {
Ok(Some(vec![DEFAULT_PAGESERVER_ID]))
}
}
// Main entry point for the 'neon_local' CLI utility
//
// This utility helps to manage neon installation. That includes following:
@@ -374,9 +392,10 @@ fn pageserver_config_overrides(init_match: &ArgMatches) -> Vec<&str> {
}
fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> anyhow::Result<()> {
let pageserver = get_default_pageserver(env);
match tenant_match.subcommand() {
Some(("list", _)) => {
// TODO: make command aware of multiple pageservers
let pageserver = get_default_pageserver(env);
for t in pageserver.tenant_list()? {
println!("{} {:?}", t.id, t.state);
}
@@ -387,38 +406,94 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
.map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
.unwrap_or_default();
let shard_count: u8 = create_match
.get_one::<u8>("shard-count")
.cloned()
.unwrap_or(1);
// If tenant ID was not specified, generate one
let tenant_id = parse_tenant_id(create_match)?.unwrap_or_else(TenantId::generate);
let generation = if env.control_plane_api.is_some() {
// We must register the tenant with the attachment service, so
// that when the pageserver restarts, it will be re-attached.
let attachment_service = AttachmentService::from_env(env);
attachment_service.attach_hook(tenant_id, pageserver.conf.id)?
} else {
None
};
pageserver.tenant_create(tenant_id, generation, tenant_conf)?;
println!("tenant {tenant_id} successfully created on the pageserver");
// Create an initial timeline for the new tenant
let new_timeline_id = parse_timeline_id(create_match)?;
// We will create an initial timeline for the new tenant
let new_timeline_id =
parse_timeline_id(create_match)?.unwrap_or(TimelineId::generate());
let pg_version = create_match
.get_one::<u32>("pg-version")
.copied()
.context("Failed to parse postgres version from the argument string")?;
let timeline_info = pageserver.timeline_create(
tenant_id,
new_timeline_id,
None,
None,
Some(pg_version),
None,
)?;
let new_timeline_id = timeline_info.timeline_id;
let last_record_lsn = timeline_info.last_record_lsn;
// TODO: implement ability for one pageserver to hold multiple
// shards for the same tenant. Until then, we must place each
// shard on a different pageserver.
assert!(env.pageservers.len() >= shard_count as usize);
let cfg_shard_count = if shard_count > 1 {
shard_count
} else {
// For single-sharded mode, use the legacy unsharded configuration. This avoids
// breaking any existing tests that assume legacy unsharded storage paths
0
};
for shard_number in 0..shard_count {
let ps_conf = env.pageservers.get(shard_number as usize).unwrap();
let pageserver = PageServerNode::from_env(env, ps_conf);
// TODO: per-shard generations
let generation = if env.control_plane_api.is_some() {
// We must register the tenant with the attachment service, so
// that when the pageserver restarts, it will be re-attached.
let attachment_service = AttachmentService::from_env(env);
attachment_service.attach_hook(tenant_id, pageserver.conf.id)?
} else {
None
};
// TODO: shard-aware POST /v1/tenant. Currently tenant creation on the
// pageserver is a no-op, but we shouldn't skip the command entirely.
let tenant_conf = PageServerNode::build_config(tenant_conf.clone())?;
let tenant_shard_id = TenantShardId {
shard_number: ShardNumber(shard_number),
shard_count: ShardCount(cfg_shard_count),
tenant_id,
};
let location_conf = LocationConfig {
shard_count: cfg_shard_count,
shard_number,
shard_stripe_size: 32768,
mode: LocationConfigMode::AttachedSingle,
generation,
secondary_conf: None,
tenant_conf,
};
pageserver.location_config(tenant_shard_id, location_conf, None)?;
println!(
"tenant {tenant_id} successfully created on pageserver {}",
pageserver.conf.id
);
}
for shard_number in 0..shard_count {
let ps_conf = env.pageservers.get(shard_number as usize).unwrap();
let pageserver = PageServerNode::from_env(env, ps_conf);
let tenant_shard_id = TenantShardId {
shard_number: ShardNumber(shard_number),
shard_count: ShardCount(cfg_shard_count),
tenant_id,
};
pageserver.timeline_create(
tenant_shard_id,
Some(new_timeline_id),
None,
None,
Some(pg_version),
None,
)?;
}
env.register_branch_mapping(
DEFAULT_BRANCH_NAME.to_string(),
@@ -426,9 +501,7 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
new_timeline_id,
)?;
println!(
"Created an initial timeline '{new_timeline_id}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
);
println!("Created an initial timeline '{new_timeline_id}' for tenant: {tenant_id}",);
if create_match.get_flag("set-default") {
println!("Setting tenant {tenant_id} as a default one");
@@ -448,6 +521,8 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
.map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
.unwrap_or_default();
// TODO: make command aware of multiple pageservers
let pageserver = get_default_pageserver(env);
pageserver
.tenant_config(tenant_id, tenant_conf)
.with_context(|| format!("Tenant config failed for tenant with id {tenant_id}"))?;
@@ -491,7 +566,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
let new_timeline_id_opt = parse_timeline_id(create_match)?;
let timeline_info = pageserver.timeline_create(
tenant_id,
TenantShardId::unsharded(tenant_id),
new_timeline_id_opt,
None,
None,
@@ -554,7 +629,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
None,
pg_version,
ComputeMode::Primary,
DEFAULT_PAGESERVER_ID,
vec![DEFAULT_PAGESERVER_ID],
)?;
println!("Done");
}
@@ -579,7 +654,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
.transpose()
.context("Failed to parse ancestor start Lsn from the request")?;
let timeline_info = pageserver.timeline_create(
tenant_id,
TenantShardId::unsharded(tenant_id),
None,
start_lsn,
Some(ancestor_timeline_id),
@@ -704,13 +779,8 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.copied()
.unwrap_or(false);
let pageserver_id =
if let Some(id_str) = sub_args.get_one::<String>("endpoint-pageserver-id") {
NodeId(id_str.parse().context("while parsing pageserver id")?)
} else {
DEFAULT_PAGESERVER_ID
};
let pageserver_ids = parse_ids_arg(sub_args, "endpoint-pageserver-id")?
.unwrap_or(vec![DEFAULT_PAGESERVER_ID]);
let mode = match (lsn, hot_standby) {
(Some(lsn), false) => ComputeMode::Static(lsn),
(None, true) => ComputeMode::Replica,
@@ -738,7 +808,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
http_port,
pg_version,
mode,
pageserver_id,
pageserver_ids,
)?;
}
"start" => {
@@ -746,29 +816,14 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.get_one::<String>("endpoint_id")
.ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
let pageserver_id =
if let Some(id_str) = sub_args.get_one::<String>("endpoint-pageserver-id") {
NodeId(id_str.parse().context("while parsing pageserver id")?)
} else {
DEFAULT_PAGESERVER_ID
};
let pageservers = parse_ids_arg(sub_args, "endpoint-pageserver-id")?
.unwrap_or(vec![DEFAULT_PAGESERVER_ID]);
let remote_ext_config = sub_args.get_one::<String>("remote-ext-config");
// If --safekeepers argument is given, use only the listed safekeeper nodes.
let safekeepers =
if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
let mut safekeepers: Vec<NodeId> = Vec::new();
for sk_id in safekeepers_str.split(',').map(str::trim) {
let sk_id = NodeId(u64::from_str(sk_id).map_err(|_| {
anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list")
})?);
safekeepers.push(sk_id);
}
safekeepers
} else {
env.safekeepers.iter().map(|sk| sk.id).collect()
};
let safekeepers = parse_ids_arg(sub_args, "safekeepers")?
.unwrap_or_else(|| env.safekeepers.iter().map(|sk| sk.id).collect());
let endpoint = cplane
.endpoints
@@ -781,7 +836,8 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
endpoint.timeline_id,
)?;
let ps_conf = env.get_pageserver_conf(pageserver_id)?;
// We assume that all pageservers have the same auth conf
let ps_conf = env.get_pageserver_conf(pageservers[0])?;
let auth_token = if matches!(ps_conf.pg_auth_type, AuthType::NeonJWT) {
let claims = Claims::new(Some(endpoint.tenant_id), Scope::Tenant);
@@ -801,15 +857,21 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.endpoints
.get(endpoint_id.as_str())
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
let pageserver_id =
if let Some(id_str) = sub_args.get_one::<String>("endpoint-pageserver-id") {
Some(NodeId(
id_str.parse().context("while parsing pageserver id")?,
))
} else {
None
};
endpoint.reconfigure(pageserver_id)?;
let pageserver_ids: Option<Result<Vec<NodeId>, _>> = sub_args
.get_many::<String>("endpoint-pageserver-id")
.map(|ids| {
ids.map(|id_str| id_str.parse().context("while parsing pageserver id"))
.map(|r| r.map(NodeId))
.collect()
});
let pageserver_ids = match pageserver_ids {
Some(Ok(v)) => Ok(Some(v)),
Some(Err(e)) => Err(e),
None => Ok(None),
}?;
endpoint.reconfigure(pageserver_ids)?;
}
"stop" => {
let endpoint_id = sub_args
@@ -1313,6 +1375,7 @@ fn cli() -> Command {
.arg(pg_version_arg.clone())
.arg(Arg::new("set-default").long("set-default").action(ArgAction::SetTrue).required(false)
.help("Use this tenant in future CLI commands where tenant_id is needed, but not specified"))
.arg(Arg::new("shard-count").value_parser(value_parser!(u8)).long("shard-count").action(ArgAction::Set).help("Number of shards in the new tenant (default 1)"))
)
.subcommand(Command::new("set-default").arg(tenant_id_arg.clone().required(true))
.about("Set a particular tenant as default in future CLI commands where tenant_id is needed, but not specified"))

View File

@@ -67,7 +67,7 @@ pub struct EndpointConf {
http_port: u16,
pg_version: u32,
skip_pg_catalog_updates: bool,
pageserver_id: NodeId,
pageservers: Vec<NodeId>,
}
//
@@ -82,6 +82,33 @@ pub struct ComputeControlPlane {
env: LocalEnv,
}
fn load_pageservers(
env: &LocalEnv,
pageserver_ids: &Vec<NodeId>,
) -> anyhow::Result<Vec<PageServerNode>> {
let mut pageservers = Vec::new();
for ps_id in pageserver_ids {
let pageserver = env
.get_pageserver_conf(*ps_id)
.map(|conf| PageServerNode::from_env(env, conf))?;
pageservers.push(pageserver);
}
Ok(pageservers)
}
fn build_pageserver_connstr(pageservers: &[PageServerNode]) -> String {
pageservers
.iter()
.map(|ps| {
let config = ps.pg_connection_config.clone();
let (host, port) = (config.host(), config.port());
// NOTE: avoid spaces in connection string, because it is less error prone if we forward it somewhere.
format!("postgresql://no_user@{host}:{port}")
})
.collect::<Vec<_>>()
.join(",")
}
impl ComputeControlPlane {
// Load current endpoints from the endpoints/ subdirectories
pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
@@ -119,19 +146,16 @@ impl ComputeControlPlane {
http_port: Option<u16>,
pg_version: u32,
mode: ComputeMode,
pageserver_id: NodeId,
pageservers: Vec<NodeId>,
) -> Result<Arc<Endpoint>> {
let pg_port = pg_port.unwrap_or_else(|| self.get_port());
let http_port = http_port.unwrap_or_else(|| self.get_port() + 1);
let pageserver =
PageServerNode::from_env(&self.env, self.env.get_pageserver_conf(pageserver_id)?);
let ep = Arc::new(Endpoint {
endpoint_id: endpoint_id.to_owned(),
pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), pg_port),
http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), http_port),
env: self.env.clone(),
pageserver,
pageservers: load_pageservers(&self.env, &pageservers)?,
timeline_id,
mode,
tenant_id,
@@ -157,7 +181,7 @@ impl ComputeControlPlane {
pg_port,
pg_version,
skip_pg_catalog_updates: true,
pageserver_id,
pageservers,
})?,
)?;
std::fs::write(
@@ -216,7 +240,7 @@ pub struct Endpoint {
// These are not part of the endpoint as such, but the environment
// the endpoint runs in.
pub env: LocalEnv,
pageserver: PageServerNode,
pageservers: Vec<PageServerNode>,
// Optimizations
skip_pg_catalog_updates: bool,
@@ -239,15 +263,14 @@ impl Endpoint {
let conf: EndpointConf =
serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
let pageserver =
PageServerNode::from_env(env, env.get_pageserver_conf(conf.pageserver_id)?);
let pageservers: Vec<PageServerNode> = load_pageservers(env, &conf.pageservers)?;
Ok(Endpoint {
pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.pg_port),
http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.http_port),
endpoint_id,
env: env.clone(),
pageserver,
pageservers,
timeline_id: conf.timeline_id,
mode: conf.mode,
tenant_id: conf.tenant_id,
@@ -482,13 +505,7 @@ impl Endpoint {
std::fs::remove_dir_all(self.pgdata())?;
}
let pageserver_connstring = {
let config = &self.pageserver.pg_connection_config;
let (host, port) = (config.host(), config.port());
// NOTE: avoid spaces in connection string, because it is less error prone if we forward it somewhere.
format!("postgresql://no_user@{host}:{port}")
};
let pageserver_connstring = build_pageserver_connstr(&self.pageservers);
let mut safekeeper_connstrings = Vec::new();
if self.mode == ComputeMode::Primary {
for sk_id in safekeepers {
@@ -519,6 +536,7 @@ impl Endpoint {
skip_pg_catalog_updates: self.skip_pg_catalog_updates,
format_version: 1.0,
operation_uuid: None,
features: vec![],
cluster: Cluster {
cluster_id: None, // project ID: not used
name: None, // project name: not used
@@ -657,7 +675,7 @@ impl Endpoint {
}
}
pub fn reconfigure(&self, pageserver_id: Option<NodeId>) -> Result<()> {
pub fn reconfigure(&self, pageservers: Option<Vec<NodeId>>) -> Result<()> {
let mut spec: ComputeSpec = {
let spec_path = self.endpoint_path().join("spec.json");
let file = std::fs::File::open(spec_path)?;
@@ -667,23 +685,20 @@ impl Endpoint {
let postgresql_conf = self.read_postgresql_conf()?;
spec.cluster.postgresql_conf = Some(postgresql_conf);
if let Some(pageserver_id) = pageserver_id {
if let Some(pageservers) = pageservers {
let endpoint_config_path = self.endpoint_path().join("endpoint.json");
let mut endpoint_conf: EndpointConf = {
let file = std::fs::File::open(&endpoint_config_path)?;
serde_json::from_reader(file)?
};
endpoint_conf.pageserver_id = pageserver_id;
endpoint_conf.pageservers = pageservers.clone();
std::fs::write(
endpoint_config_path,
serde_json::to_string_pretty(&endpoint_conf)?,
)?;
let pageserver =
PageServerNode::from_env(&self.env, self.env.get_pageserver_conf(pageserver_id)?);
let ps_http_conf = &pageserver.pg_connection_config;
let (host, port) = (ps_http_conf.host(), ps_http_conf.port());
spec.pageserver_connstring = Some(format!("postgresql://no_user@{host}:{port}"));
let pageservers = load_pageservers(&self.env, &pageservers)?;
spec.pageserver_connstring = Some(build_pageserver_connstr(&pageservers));
}
let client = reqwest::blocking::Client::new();

View File

@@ -340,15 +340,8 @@ impl PageServerNode {
.json()?)
}
pub fn tenant_create(
&self,
new_tenant_id: TenantId,
generation: Option<u32>,
settings: HashMap<&str, &str>,
) -> anyhow::Result<TenantId> {
let mut settings = settings.clone();
let config = models::TenantConfig {
pub fn build_config(mut settings: HashMap<&str, &str>) -> anyhow::Result<models::TenantConfig> {
Ok(models::TenantConfig {
checkpoint_distance: settings
.remove("checkpoint_distance")
.map(|x| x.parse::<u64>())
@@ -407,8 +400,16 @@ impl PageServerNode {
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'gc_feedback' as bool")?,
};
})
}
pub fn tenant_create(
&self,
new_tenant_id: TenantId,
generation: Option<u32>,
settings: HashMap<&str, &str>,
) -> anyhow::Result<TenantId> {
let config = Self::build_config(settings.clone())?;
let request = models::TenantCreateRequest {
new_tenant_id: TenantShardId::unsharded(new_tenant_id),
generation,
@@ -521,15 +522,18 @@ impl PageServerNode {
pub fn location_config(
&self,
tenant_id: TenantId,
tenant_shard_id: TenantShardId,
config: LocationConfig,
flush_ms: Option<Duration>,
) -> anyhow::Result<()> {
let req_body = TenantLocationConfigRequest { tenant_id, config };
let req_body = TenantLocationConfigRequest {
tenant_shard_id,
config,
};
let path = format!(
"{}/tenant/{}/location_config",
self.http_base_url, tenant_id
self.http_base_url, tenant_shard_id
);
let path = if let Some(flush_ms) = flush_ms {
format!("{}?flush_ms={}", path, flush_ms.as_millis())
@@ -560,7 +564,7 @@ impl PageServerNode {
pub fn timeline_create(
&self,
tenant_id: TenantId,
tenant_shard_id: TenantShardId,
new_timeline_id: Option<TimelineId>,
ancestor_start_lsn: Option<Lsn>,
ancestor_timeline_id: Option<TimelineId>,
@@ -572,7 +576,7 @@ impl PageServerNode {
self.http_request(
Method::POST,
format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id),
format!("{}/tenant/{}/timeline", self.http_base_url, tenant_shard_id),
)?
.json(&models::TimelineCreateRequest {
new_timeline_id,
@@ -585,11 +589,11 @@ impl PageServerNode {
.error_from_body()?
.json::<Option<TimelineInfo>>()
.with_context(|| {
format!("Failed to parse timeline creation response for tenant id: {tenant_id}")
format!("Failed to parse timeline creation response for tenant id: {tenant_shard_id}")
})?
.with_context(|| {
format!(
"No timeline id was found in the timeline creation response for tenant {tenant_id}"
"No timeline id was found in the timeline creation response for tenant {tenant_shard_id}"
)
})
}

View File

@@ -11,6 +11,7 @@ use crate::{
use pageserver_api::models::{
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
};
use pageserver_api::shard::TenantShardId;
use std::collections::HashMap;
use std::time::Duration;
use utils::{
@@ -108,6 +109,9 @@ pub fn migrate_tenant(
}
}
// No support for sharding in this function yet
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let previous = attachment_service.inspect(tenant_id)?;
let mut baseline_lsns = None;
if let Some((generation, origin_ps_id)) = &previous {
@@ -117,7 +121,7 @@ pub fn migrate_tenant(
println!("🔁 Already attached to {origin_ps_id}, freshening...");
let gen = attachment_service.attach_hook(tenant_id, dest_ps.conf.id)?;
let dest_conf = build_location_config(LocationConfigMode::AttachedSingle, gen, None);
dest_ps.location_config(tenant_id, dest_conf, None)?;
dest_ps.location_config(tenant_shard_id, dest_conf, None)?;
println!("✅ Migration complete");
return Ok(());
}
@@ -126,7 +130,7 @@ pub fn migrate_tenant(
let stale_conf =
build_location_config(LocationConfigMode::AttachedStale, Some(*generation), None);
origin_ps.location_config(tenant_id, stale_conf, Some(Duration::from_secs(10)))?;
origin_ps.location_config(tenant_shard_id, stale_conf, Some(Duration::from_secs(10)))?;
baseline_lsns = Some(get_lsns(tenant_id, &origin_ps)?);
}
@@ -135,7 +139,7 @@ pub fn migrate_tenant(
let dest_conf = build_location_config(LocationConfigMode::AttachedMulti, gen, None);
println!("🔁 Attaching to pageserver {}", dest_ps.conf.id);
dest_ps.location_config(tenant_id, dest_conf, None)?;
dest_ps.location_config(tenant_shard_id, dest_conf, None)?;
if let Some(baseline) = baseline_lsns {
println!("🕑 Waiting for LSN to catch up...");
@@ -149,7 +153,7 @@ pub fn migrate_tenant(
"🔁 Reconfiguring endpoint {} to use pageserver {}",
endpoint_name, dest_ps.conf.id
);
endpoint.reconfigure(Some(dest_ps.conf.id))?;
endpoint.reconfigure(Some(vec![dest_ps.conf.id]))?;
}
}
@@ -181,7 +185,7 @@ pub fn migrate_tenant(
"💤 Switching to secondary mode on pageserver {}",
other_ps.conf.id
);
other_ps.location_config(tenant_id, secondary_conf, None)?;
other_ps.location_config(tenant_shard_id, secondary_conf, None)?;
}
println!(
@@ -189,7 +193,7 @@ pub fn migrate_tenant(
dest_ps.conf.id
);
let dest_conf = build_location_config(LocationConfigMode::AttachedSingle, gen, None);
dest_ps.location_config(tenant_id, dest_conf, None)?;
dest_ps.location_config(tenant_shard_id, dest_conf, None)?;
println!("✅ Migration complete");

21
demo_sharding.sh Normal file
View File

@@ -0,0 +1,21 @@
export RUST_LOG=DEBUG
SHARDS=4
PAGESERVERS=`seq -s , 1 $SHARDS`
SCALE=10
ARGS=--features=testing
set -e
set +e
cargo neon $ARGS stop ; killall -9 storage_broker ; killall -9 safekeeper ; killall -9 pageserver ; killall -9 postgres ; killall -9 attachment_service ; rm -rf .neon
set -e
cargo build --package=pageserver && cargo neon $ARGS init --num-pageservers=$SHARDS && RUST_LOG=debug cargo neon $ARGS start && cargo neon $ARGS tenant create --shard-count=$SHARDS --tenant-id=1f359dd625e519a1a4e8d7509690f6fc --timeline-id=3d34095be52fec4c44a92e774c573b57 --set-default
cargo neon $ARGS endpoint create --pageserver-id=$PAGESERVERS && cargo neon endpoint start --pageserver-id=$PAGESERVERS ep-main
pgbench postgres -i -h 127.0.0.1 -p 55432 -U cloud_admin -s $SCALE
du -sh .neon/local_fs_remote_storage/pageserver/tenants/1f359dd625e519a1a4e8d7509690f6fc*

View File

@@ -26,6 +26,13 @@ pub struct ComputeSpec {
// but we don't use it for anything. Serde will ignore missing fields when
// deserializing it.
pub operation_uuid: Option<String>,
/// Compute features to enable. These feature flags are provided, when we
/// know all the details about client's compute, so they cannot be used
/// to change `Empty` compute behavior.
#[serde(default)]
pub features: Vec<ComputeFeature>,
/// Expected cluster state at the end of transition process.
pub cluster: Cluster,
pub delta_operations: Option<Vec<DeltaOp>>,
@@ -68,6 +75,19 @@ pub struct ComputeSpec {
pub remote_extensions: Option<RemoteExtSpec>,
}
/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.
#[derive(Serialize, Clone, Copy, Debug, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ComputeFeature {
// XXX: Add more feature flags here.
// This is a special feature flag that is used to represent unknown feature flags.
// Basically all unknown to enum flags are represented as this one. See unit test
// `parse_unknown_features()` for more details.
#[serde(other)]
UnknownFeature,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct RemoteExtSpec {
pub public_extensions: Option<Vec<String>>,
@@ -229,7 +249,10 @@ mod tests {
#[test]
fn parse_spec_file() {
let file = File::open("tests/cluster_spec.json").unwrap();
let _spec: ComputeSpec = serde_json::from_reader(file).unwrap();
let spec: ComputeSpec = serde_json::from_reader(file).unwrap();
// Features list defaults to empty vector.
assert!(spec.features.is_empty());
}
#[test]
@@ -241,4 +264,22 @@ mod tests {
ob.insert("unknown_field_123123123".into(), "hello".into());
let _spec: ComputeSpec = serde_json::from_value(json).unwrap();
}
#[test]
fn parse_unknown_features() {
// Test that unknown feature flags do not cause any errors.
let file = File::open("tests/cluster_spec.json").unwrap();
let mut json: serde_json::Value = serde_json::from_reader(file).unwrap();
let ob = json.as_object_mut().unwrap();
// Add unknown feature flags.
let features = vec!["foo_bar_feature", "baz_feature"];
ob.insert("features".into(), features.into());
let spec: ComputeSpec = serde_json::from_value(json).unwrap();
assert!(spec.features.len() == 2);
assert!(spec.features.contains(&ComputeFeature::UnknownFeature));
assert_eq!(spec.features, vec![ComputeFeature::UnknownFeature; 2]);
}
}

View File

@@ -140,3 +140,7 @@ impl Key {
})
}
}
pub fn is_rel_block_key(key: &Key) -> bool {
key.field1 == 0x00 && key.field4 != 0
}

View File

@@ -293,7 +293,7 @@ pub struct StatusResponse {
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TenantLocationConfigRequest {
pub tenant_id: TenantId,
pub tenant_shard_id: TenantShardId,
#[serde(flatten)]
pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it
}

View File

@@ -1,5 +1,6 @@
use std::{ops::RangeInclusive, str::FromStr};
use crate::key::{is_rel_block_key, Key};
use hex::FromHex;
use serde::{Deserialize, Serialize};
use thiserror;
@@ -302,6 +303,8 @@ pub struct ShardStripeSize(pub u32);
pub struct ShardLayout(u8);
const LAYOUT_V1: ShardLayout = ShardLayout(1);
/// ShardIdentity uses a magic layout value to indicate if it is unusable
const LAYOUT_BROKEN: ShardLayout = ShardLayout(255);
/// Default stripe size in pages: 256MiB divided by 8kiB page size.
const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
@@ -310,10 +313,10 @@ const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
/// to resolve a key to a shard, and then check whether that shard is ==self.
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
pub struct ShardIdentity {
pub layout: ShardLayout,
pub number: ShardNumber,
pub count: ShardCount,
pub stripe_size: ShardStripeSize,
stripe_size: ShardStripeSize,
layout: ShardLayout,
}
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
@@ -339,6 +342,22 @@ impl ShardIdentity {
}
}
/// A broken instance of this type is only used for `TenantState::Broken` tenants,
/// which are constructed in code paths that don't have access to proper configuration.
///
/// A ShardIdentity in this state may not be used for anything, and should not be persisted.
/// Enforcement is via assertions, to avoid making our interface fallible for this
/// edge case: it is the Tenant's responsibility to avoid trying to do any I/O when in a broken
/// state, and by extension to avoid trying to do any page->shard resolution.
pub fn broken(number: ShardNumber, count: ShardCount) -> Self {
Self {
number,
count,
layout: LAYOUT_BROKEN,
stripe_size: DEFAULT_STRIPE_SIZE,
}
}
pub fn is_unsharded(&self) -> bool {
self.number == ShardNumber(0) && self.count == ShardCount(0)
}
@@ -365,6 +384,39 @@ impl ShardIdentity {
})
}
}
fn is_broken(&self) -> bool {
self.layout == LAYOUT_BROKEN
}
pub fn get_shard_number(&self, key: &Key) -> ShardNumber {
assert!(!self.is_broken());
key_to_shard_number(self.count, self.stripe_size, key)
}
/// Return true if the key should be ingested by this shard
pub fn is_key_local(&self, key: &Key) -> bool {
assert!(!self.is_broken());
if self.count < ShardCount(2) || (key_is_shard0(key) && self.number == ShardNumber(0)) {
true
} else {
key_to_shard_number(self.count, self.stripe_size, key) == self.number
}
}
pub fn shard_slug(&self) -> String {
if self.count > ShardCount(0) {
format!("-{:02x}{:02x}", self.number.0, self.count.0)
} else {
String::new()
}
}
/// Convenience for checking if this identity is the 0th shard in a tenant,
/// for special cases on shard 0 such as ingesting relation sizes.
pub fn is_zero(&self) -> bool {
self.number == ShardNumber(0)
}
}
impl Serialize for ShardIndex {
@@ -438,6 +490,65 @@ impl<'de> Deserialize<'de> for ShardIndex {
}
}
/// Whether this key is always held on shard 0 (e.g. shard 0 holds all SLRU keys
/// in order to be able to serve basebackup requests without peer communication).
fn key_is_shard0(key: &Key) -> bool {
// To decide what to shard out to shards >0, we apply a simple rule that only
// relation pages are distributed to shards other than shard zero. Everything else gets
// stored on shard 0. This guarantees that shard 0 can independently serve basebackup
// requests, and any request other than those for particular blocks in relations.
//
// In this condition:
// - is_rel_block_key includes only relations, i.e. excludes SLRU data and
// all metadata.
// - field6 is set to -1 for relation size pages.
!(is_rel_block_key(key) && key.field6 != 0xffffffff)
}
/// Provide the same result as the function in postgres `hashfn.h` with the same name
fn murmurhash32(mut h: u32) -> u32 {
h ^= h >> 16;
h = h.wrapping_mul(0x85ebca6b);
h ^= h >> 13;
h = h.wrapping_mul(0xc2b2ae35);
h ^= h >> 16;
h
}
/// Provide the same result as the function in postgres `hashfn.h` with the same name
fn hash_combine(mut a: u32, mut b: u32) -> u32 {
b = b.wrapping_add(0x9e3779b9);
b = b.wrapping_add(a << 6);
b = b.wrapping_add(a >> 2);
a ^= b;
a
}
/// Where a Key is to be distributed across shards, select the shard. This function
/// does not account for keys that should be broadcast across shards.
///
/// The hashing in this function must exactly match what we do in postgres smgr
/// code. The resulting distribution of pages is intended to preserve locality within
/// `stripe_size` ranges of contiguous block numbers in the same relation, while otherwise
/// distributing data pseudo-randomly.
///
/// The mapping of key to shard is not stable across changes to ShardCount: this is intentional
/// and will be handled at higher levels when shards are split.
fn key_to_shard_number(count: ShardCount, stripe_size: ShardStripeSize, key: &Key) -> ShardNumber {
// Fast path for un-sharded tenants or broadcast keys
if count < ShardCount(2) || key_is_shard0(key) {
return ShardNumber(0);
}
// relNode
let mut hash = murmurhash32(key.field4);
// blockNum/stripe size
hash = hash_combine(hash, murmurhash32(key.field6 / stripe_size.0));
ShardNumber((hash % count.0 as u32) as u8)
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
@@ -609,4 +720,29 @@ mod tests {
Ok(())
}
// These are only smoke tests to spot check that our implementation doesn't
// deviate from a few examples values: not aiming to validate the overall
// hashing algorithm.
#[test]
fn murmur_hash() {
assert_eq!(murmurhash32(0), 0);
assert_eq!(hash_combine(0xb1ff3b40, 0), 0xfb7923c9);
}
#[test]
fn shard_mapping() {
let key = Key {
field1: 0x00,
field2: 0x67f,
field3: 0x5,
field4: 0x400c,
field5: 0x00,
field6: 0x7d06,
};
let shard = key_to_shard_number(ShardCount(10), DEFAULT_STRIPE_SIZE, &key);
assert_eq!(shard, ShardNumber(8));
}
}

View File

@@ -289,10 +289,10 @@ impl FeStartupPacket {
// We shouldn't advance `buf` as probably full message is not there yet,
// so can't directly use Bytes::get_u32 etc.
let len = (&buf[0..4]).read_u32::<BigEndian>().unwrap() as usize;
// The proposed replacement is `!(4..=MAX_STARTUP_PACKET_LENGTH).contains(&len)`
// The proposed replacement is `!(8..=MAX_STARTUP_PACKET_LENGTH).contains(&len)`
// which is less readable
#[allow(clippy::manual_range_contains)]
if len < 4 || len > MAX_STARTUP_PACKET_LENGTH {
if len < 8 || len > MAX_STARTUP_PACKET_LENGTH {
return Err(ProtocolError::Protocol(format!(
"invalid startup packet message length {}",
len
@@ -975,4 +975,10 @@ mod tests {
let params = make_params("foo\\ bar \\ \\\\ baz\\ lol");
assert_eq!(split_options(&params), ["foo bar", " \\", "baz ", "lol"]);
}
#[test]
fn parse_fe_startup_packet_regression() {
let data = [0, 0, 0, 7, 0, 0, 0, 0];
FeStartupPacket::parse(&mut BytesMut::from_iter(data)).unwrap_err();
}
}

View File

@@ -378,7 +378,7 @@ impl RemoteStorage for S3Bucket {
let empty = Vec::new();
let prefixes = response.common_prefixes.as_ref().unwrap_or(&empty);
tracing::info!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
for object in keys {
let object_path = object.key().expect("response does not contain a key");

View File

@@ -152,3 +152,16 @@ impl Debug for Generation {
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn generation_gt() {
// Important that a None generation compares less than a valid one, during upgrades from
// pre-generation systems.
assert!(Generation::none() < Generation::new(0));
assert!(Generation::none() < Generation::new(1));
}
}

View File

@@ -402,15 +402,11 @@ fn start_pageserver(
let (init_remote_done_tx, init_remote_done_rx) = utils::completion::channel();
let (init_done_tx, init_done_rx) = utils::completion::channel();
let (init_logical_size_done_tx, init_logical_size_done_rx) = utils::completion::channel();
let (background_jobs_can_start, background_jobs_barrier) = utils::completion::channel();
let order = pageserver::InitializationOrder {
initial_tenant_load_remote: Some(init_done_tx),
initial_tenant_load: Some(init_remote_done_tx),
initial_logical_size_can_start: init_done_rx.clone(),
initial_logical_size_attempt: Some(init_logical_size_done_tx),
background_jobs_can_start: background_jobs_barrier.clone(),
};
@@ -464,7 +460,7 @@ fn start_pageserver(
});
let WaitForPhaseResult {
timeout_remaining: timeout,
timeout_remaining: _timeout,
skipped: init_load_skipped,
} = wait_for_phase("initial_tenant_load", init_load_done, timeout).await;
@@ -472,26 +468,6 @@ fn start_pageserver(
scopeguard::ScopeGuard::into_inner(guard);
let guard = scopeguard::guard_on_success((), |_| {
tracing::info!("Cancelled before initial logical sizes completed")
});
let logical_sizes_done = std::pin::pin!(async {
init_logical_size_done_rx.wait().await;
startup_checkpoint(
started_startup_at,
"initial_logical_sizes",
"Initial logical sizes completed",
);
});
let WaitForPhaseResult {
timeout_remaining: _,
skipped: logical_sizes_skipped,
} = wait_for_phase("initial_logical_sizes", logical_sizes_done, timeout).await;
scopeguard::ScopeGuard::into_inner(guard);
// allow background jobs to start: we either completed prior stages, or they reached timeout
// and were skipped. It is important that we do not let them block background jobs indefinitely,
// because things like consumption metrics for billing are blocked by this barrier.
@@ -514,9 +490,6 @@ fn start_pageserver(
if let Some(f) = init_load_skipped {
f.await;
}
if let Some(f) = logical_sizes_skipped {
f.await;
}
scopeguard::ScopeGuard::into_inner(guard);
startup_checkpoint(started_startup_at, "complete", "Startup complete");

View File

@@ -2,6 +2,7 @@ use crate::{context::RequestContext, tenant::timeline::logical_size::CurrentLogi
use chrono::{DateTime, Utc};
use consumption_metrics::EventType;
use futures::stream::StreamExt;
use pageserver_api::shard::ShardNumber;
use std::{sync::Arc, time::SystemTime};
use utils::{
id::{TenantId, TimelineId},
@@ -228,6 +229,11 @@ where
while let Some((tenant_id, tenant)) = tenants.next().await {
let mut tenant_resident_size = 0;
// Sharded tenants report all consumption metrics from shard zero
if tenant.tenant_shard_id().shard_number != ShardNumber(0) {
continue;
}
for timeline in tenant.list_timelines() {
let timeline_id = timeline.timeline_id;
@@ -351,7 +357,12 @@ impl TimelineSnapshot {
let current_exact_logical_size = {
let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
let size = span.in_scope(|| t.get_current_logical_size(ctx));
let size = span.in_scope(|| {
t.get_current_logical_size(
crate::tenant::timeline::GetLogicalSizePriority::Background,
ctx,
)
});
match size {
// Only send timeline logical size when it is fully calculated.
CurrentLogicalSize::Exact(ref size) => Some(size.into()),

View File

@@ -338,7 +338,8 @@ async fn build_timeline_info_common(
Lsn(0) => None,
lsn @ Lsn(_) => Some(lsn),
};
let current_logical_size = timeline.get_current_logical_size(ctx);
let current_logical_size =
timeline.get_current_logical_size(tenant::timeline::GetLogicalSizePriority::User, ctx);
let current_physical_size = Some(timeline.layer_size_sum().await);
let state = timeline.current_state();
let remote_consistent_lsn_projected = timeline

View File

@@ -186,13 +186,6 @@ pub struct InitializationOrder {
/// Each initial tenant load task carries this until completion.
pub initial_tenant_load: Option<utils::completion::Completion>,
/// Barrier for when we can start initial logical size calculations.
pub initial_logical_size_can_start: utils::completion::Barrier,
/// Each timeline owns a clone of this to be consumed on the initial logical size calculation
/// attempt. It is important to drop this once the attempt has completed.
pub initial_logical_size_attempt: Option<utils::completion::Completion>,
/// Barrier for when we can start any background jobs.
///
/// This can be broken up later on, but right now there is just one class of a background job.

View File

@@ -407,16 +407,14 @@ pub(crate) mod initial_logical_size {
use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec};
use once_cell::sync::Lazy;
use crate::task_mgr::TaskKind;
pub(crate) struct StartCalculation(IntCounterVec);
pub(crate) static START_CALCULATION: Lazy<StartCalculation> = Lazy::new(|| {
StartCalculation(
register_int_counter_vec!(
"pageserver_initial_logical_size_start_calculation",
"Incremented each time we start an initial logical size calculation attempt. \
The `task_kind` label is for the task kind that caused this attempt.",
&["attempt", "task_kind"]
The `circumstances` label provides some additional details.",
&["attempt", "circumstances"]
)
.unwrap(),
)
@@ -464,19 +462,24 @@ pub(crate) mod initial_logical_size {
inc_drop_calculation: Option<IntCounter>,
}
#[derive(strum_macros::IntoStaticStr)]
pub(crate) enum StartCircumstances {
EmptyInitial,
SkippedConcurrencyLimiter,
AfterBackgroundTasksRateLimit,
}
impl StartCalculation {
pub(crate) fn first(&self, causing_task_kind: Option<TaskKind>) -> OngoingCalculationGuard {
let task_kind_label: &'static str =
causing_task_kind.map(|k| k.into()).unwrap_or_default();
self.0.with_label_values(&["first", task_kind_label]);
pub(crate) fn first(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard {
let circumstances_label: &'static str = circumstances.into();
self.0.with_label_values(&["first", circumstances_label]);
OngoingCalculationGuard {
inc_drop_calculation: Some(DROP_CALCULATION.first.clone()),
}
}
pub(crate) fn retry(&self, causing_task_kind: Option<TaskKind>) -> OngoingCalculationGuard {
let task_kind_label: &'static str =
causing_task_kind.map(|k| k.into()).unwrap_or_default();
self.0.with_label_values(&["retry", task_kind_label]);
pub(crate) fn retry(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard {
let circumstances_label: &'static str = circumstances.into();
self.0.with_label_values(&["retry", circumstances_label]);
OngoingCalculationGuard {
inc_drop_calculation: Some(DROP_CALCULATION.retry.clone()),
}
@@ -1164,6 +1167,30 @@ pub(crate) static DELETION_QUEUE: Lazy<DeletionQueueMetrics> = Lazy::new(|| {
}
});
pub(crate) struct WalIngestMetrics {
pub(crate) records_received: IntCounter,
pub(crate) records_committed: IntCounter,
pub(crate) records_filtered: IntCounter,
}
pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| WalIngestMetrics {
records_received: register_int_counter!(
"pageserver_wal_ingest_records_received",
"Number of WAL records received from safekeeper"
)
.expect("failed to define a metric"),
records_committed: register_int_counter!(
"pageserver_wal_ingest_records_committed",
"Number of WAL records which resulted in writes to pageserver storage"
)
.expect("failed to define a metric"),
records_filtered: register_int_counter!(
"pageserver_wal_ingest_records_filtered",
"Number of WAL records filtered out due to sharding"
)
.expect("failed to define a metric"),
});
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RemoteOpKind {
Upload,
@@ -1385,6 +1412,8 @@ pub(crate) static WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM: Lazy<Histogram> =
pub(crate) struct WalRedoProcessCounters {
pub(crate) started: IntCounter,
pub(crate) killed_by_cause: enum_map::EnumMap<WalRedoKillCause, IntCounter>,
pub(crate) active_stderr_logger_tasks_started: IntCounter,
pub(crate) active_stderr_logger_tasks_finished: IntCounter,
}
#[derive(Debug, enum_map::Enum, strum_macros::IntoStaticStr)]
@@ -1408,6 +1437,19 @@ impl Default for WalRedoProcessCounters {
&["cause"],
)
.unwrap();
let active_stderr_logger_tasks_started = register_int_counter!(
"pageserver_walredo_stderr_logger_tasks_started_total",
"Number of active walredo stderr logger tasks that have started",
)
.unwrap();
let active_stderr_logger_tasks_finished = register_int_counter!(
"pageserver_walredo_stderr_logger_tasks_finished_total",
"Number of active walredo stderr logger tasks that have finished",
)
.unwrap();
Self {
started,
killed_by_cause: EnumMap::from_array(std::array::from_fn(|i| {
@@ -1415,6 +1457,8 @@ impl Default for WalRedoProcessCounters {
let cause_str: &'static str = cause.into();
killed.with_label_values(&[cause_str])
})),
active_stderr_logger_tasks_started,
active_stderr_logger_tasks_finished,
}
}
}

View File

@@ -53,12 +53,14 @@ use crate::context::{DownloadBehavior, RequestContext};
use crate::import_datadir::import_wal_from_tar;
use crate::metrics;
use crate::metrics::LIVE_CONNECTIONS_COUNT;
use crate::pgdatadir_mapping::rel_block_to_key;
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::mgr;
use crate::tenant::mgr::get_active_tenant_with_timeout;
use crate::tenant::mgr::GetActiveTenantError;
use crate::tenant::mgr::ShardSelector;
use crate::tenant::Timeline;
use crate::trace::Tracer;
@@ -399,16 +401,19 @@ impl PageServerHandler {
{
debug_assert_current_span_has_tenant_and_timeline_id();
// TODO(sharding): enumerate local tenant shards for this tenant, and select the one
// that should serve this request.
// Make request tracer if needed
// Note that since one connection may contain getpage requests that target different
// shards (e.g. during splitting when the compute is not yet aware of the split), the tenant
// that we look up here may not be the one that serves all the actual requests: we will double
// check the mapping of key->shard later before calling into Timeline for getpage requests.
let tenant = mgr::get_active_tenant_with_timeout(
tenant_id,
ShardSelector::First,
ACTIVE_TENANT_TIMEOUT,
&task_mgr::shutdown_token(),
)
.await?;
// Make request tracer if needed
let mut tracer = if tenant.get_trace_read_requests() {
let connection_id = ConnectionId::generate();
let path =
@@ -566,6 +571,7 @@ impl PageServerHandler {
info!("creating new timeline");
let tenant = get_active_tenant_with_timeout(
tenant_id,
ShardSelector::Zero,
ACTIVE_TENANT_TIMEOUT,
&task_mgr::shutdown_token(),
)
@@ -628,7 +634,7 @@ impl PageServerHandler {
debug_assert_current_span_has_tenant_and_timeline_id();
let timeline = self
.get_active_tenant_timeline(tenant_id, timeline_id)
.get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
.await?;
let last_record_lsn = timeline.get_last_record_lsn();
if last_record_lsn != start_lsn {
@@ -807,9 +813,49 @@ impl PageServerHandler {
}
*/
let page = timeline
.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest, ctx)
.await?;
let key = rel_block_to_key(req.rel, req.blkno);
let page = if timeline.get_shard_identity().is_key_local(&key) {
timeline
.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest, ctx)
.await?
} else {
// The Tenant shard we looked up at connection start does not hold this particular
// key: look for other shards in this tenant. This scenario occurs if a pageserver
// has multiple shards for the same tenant.
//
// TODO: optimize this (https://github.com/neondatabase/neon/pull/6037)
let timeline = match self
.get_active_tenant_timeline(
timeline.tenant_shard_id.tenant_id,
timeline.timeline_id,
ShardSelector::Page(key),
)
.await
{
Ok(t) => t,
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
// We already know this tenant exists in general, because we resolved it at
// start of connection. Getting a NotFound here indicates that the shard containing
// the requested page is not present on this node.
// TODO: this should be some kind of structured error that the client will understand,
// so that it can block until its config is updated: this error is expected in the case
// that the Tenant's shards' placements are being updated and the client hasn't been
// informed yet.
//
// https://github.com/neondatabase/neon/issues/6038
return Err(anyhow::anyhow!("Request routed to wrong shard"));
}
Err(e) => return Err(e.into()),
};
// Take a GateGuard for the duration of this request. If we were using our main Timeline object,
// the GateGuard was already held over the whole connection.
let _timeline_guard = timeline.gate.enter().map_err(|_| QueryError::Shutdown)?;
timeline
.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest, ctx)
.await?
};
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
page,
@@ -838,7 +884,7 @@ impl PageServerHandler {
// check that the timeline exists
let timeline = self
.get_active_tenant_timeline(tenant_id, timeline_id)
.get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
.await?;
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
if let Some(lsn) = lsn {
@@ -944,9 +990,11 @@ impl PageServerHandler {
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
selector: ShardSelector,
) -> Result<Arc<Timeline>, GetActiveTimelineError> {
let tenant = get_active_tenant_with_timeout(
tenant_id,
selector,
ACTIVE_TENANT_TIMEOUT,
&task_mgr::shutdown_token(),
)
@@ -1120,7 +1168,7 @@ where
self.check_permission(Some(tenant_id))?;
let timeline = self
.get_active_tenant_timeline(tenant_id, timeline_id)
.get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
.await?;
let end_of_timeline = timeline.get_last_record_rlsn();
@@ -1307,6 +1355,7 @@ where
let tenant = get_active_tenant_with_timeout(
tenant_id,
ShardSelector::Zero,
ACTIVE_TENANT_TIMEOUT,
&task_mgr::shutdown_token(),
)

View File

@@ -13,6 +13,7 @@ use crate::repository::*;
use crate::walrecord::NeonWalRecord;
use anyhow::Context;
use bytes::{Buf, Bytes};
use pageserver_api::key::is_rel_block_key;
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::BLCKSZ;
@@ -282,6 +283,10 @@ impl Timeline {
}
/// Get a list of all existing relations in given tablespace and database.
///
/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub async fn list_rels(
&self,
spcnode: Oid,
@@ -630,6 +635,10 @@ impl Timeline {
///
/// Only relation blocks are counted currently. That excludes metadata,
/// SLRUs, twophase files etc.
///
/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub async fn get_current_logical_size_non_incremental(
&self,
lsn: Lsn,
@@ -1314,7 +1323,7 @@ impl<'a> DatadirModification<'a> {
// Flush relation and SLRU data blocks, keep metadata.
let mut retained_pending_updates = HashMap::new();
for (key, value) in self.pending_updates.drain() {
if is_rel_block_key(key) || is_slru_block_key(key) {
if is_rel_block_key(&key) || is_slru_block_key(key) {
// This bails out on first error without modifying pending_updates.
// That's Ok, cf this function's doc comment.
writer.put(key, self.lsn, &value, ctx).await?;
@@ -1359,6 +1368,10 @@ impl<'a> DatadirModification<'a> {
Ok(())
}
pub(crate) fn is_empty(&self) -> bool {
self.pending_updates.is_empty() && self.pending_deletions.is_empty()
}
// Internal helper functions to batch the modifications
async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
@@ -1570,7 +1583,7 @@ fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key {
}
}
fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
pub(crate) fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
Key {
field1: 0x00,
field2: rel.spcnode,
@@ -1769,10 +1782,6 @@ pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
})
}
fn is_rel_block_key(key: Key) -> bool {
key.field1 == 0x00 && key.field4 != 0
}
pub fn is_rel_fsm_block_key(key: Key) -> bool {
key.field1 == 0x00 && key.field4 != 0 && key.field5 == FSM_FORKNUM && key.field6 != 0xffffffff
}

View File

@@ -19,6 +19,7 @@ use futures::stream::FuturesUnordered;
use futures::FutureExt;
use futures::StreamExt;
use pageserver_api::models::TimelineState;
use pageserver_api::shard::ShardIdentity;
use pageserver_api::shard::TenantShardId;
use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
@@ -236,6 +237,9 @@ pub struct Tenant {
tenant_shard_id: TenantShardId,
// The detailed sharding information, beyond the number/count in tenant_shard_id
shard_identity: ShardIdentity,
/// The remote storage generation, used to protect S3 objects from split-brain.
/// Does not change over the lifetime of the [`Tenant`] object.
///
@@ -312,6 +316,9 @@ impl WalRedoManager {
}
}
/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub async fn request_redo(
&self,
key: crate::repository::Key,
@@ -469,7 +476,6 @@ impl Tenant {
index_part: Option<IndexPart>,
metadata: TimelineMetadata,
ancestor: Option<Arc<Timeline>>,
init_order: Option<&InitializationOrder>,
_ctx: &RequestContext,
) -> anyhow::Result<()> {
let tenant_id = self.tenant_shard_id;
@@ -479,7 +485,6 @@ impl Tenant {
&metadata,
ancestor.clone(),
resources,
init_order,
CreateTimelineCause::Load,
)?;
let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
@@ -567,6 +572,7 @@ impl Tenant {
tenant_shard_id: TenantShardId,
resources: TenantSharedResources,
attached_conf: AttachedTenantConf,
shard_identity: ShardIdentity,
init_order: Option<InitializationOrder>,
tenants: &'static std::sync::RwLock<TenantsMap>,
mode: SpawnMode,
@@ -588,6 +594,7 @@ impl Tenant {
TenantState::Attaching,
conf,
attached_conf,
shard_identity,
wal_redo_manager,
tenant_shard_id,
remote_storage.clone(),
@@ -680,10 +687,6 @@ impl Tenant {
// as we are no longer loading, signal completion by dropping
// the completion while we resume deletion
drop(_completion);
// do not hold to initial_logical_size_attempt as it will prevent loading from proceeding without timeout
let _ = init_order
.as_mut()
.and_then(|x| x.initial_logical_size_attempt.take());
let background_jobs_can_start =
init_order.as_ref().map(|x| &x.background_jobs_can_start);
if let Some(background) = background_jobs_can_start {
@@ -697,7 +700,6 @@ impl Tenant {
&tenant_clone,
preload,
tenants,
init_order,
&ctx,
)
.await
@@ -710,7 +712,7 @@ impl Tenant {
}
}
match tenant_clone.attach(init_order, preload, &ctx).await {
match tenant_clone.attach(preload, &ctx).await {
Ok(()) => {
info!("attach finished, activating");
tenant_clone.activate(broker_client, None, &ctx);
@@ -773,7 +775,6 @@ impl Tenant {
///
async fn attach(
self: &Arc<Tenant>,
init_order: Option<InitializationOrder>,
preload: Option<TenantPreload>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
@@ -786,7 +787,7 @@ impl Tenant {
None => {
// Deprecated dev mode: load from local disk state instead of remote storage
// https://github.com/neondatabase/neon/issues/5624
return self.load_local(init_order, ctx).await;
return self.load_local(ctx).await;
}
};
@@ -881,7 +882,6 @@ impl Tenant {
&index_part.metadata,
Some(remote_timeline_client),
self.deletion_queue_client.clone(),
None,
)
.await
.context("resume_deletion")
@@ -1006,10 +1006,6 @@ impl Tenant {
None
};
// we can load remote timelines during init, but they are assumed to be so rare that
// initialization order is not passed to here.
let init_order = None;
// timeline loading after attach expects to find metadata file for each metadata
save_metadata(
self.conf,
@@ -1027,7 +1023,6 @@ impl Tenant {
Some(index_part),
remote_metadata,
ancestor,
init_order,
ctx,
)
.await
@@ -1051,6 +1046,9 @@ impl Tenant {
},
conf,
AttachedTenantConf::try_from(LocationConf::default()).unwrap(),
// Shard identity isn't meaningful for a broken tenant: it's just a placeholder
// to occupy the slot for this TenantShardId.
ShardIdentity::broken(tenant_shard_id.shard_number, tenant_shard_id.shard_count),
wal_redo_manager,
tenant_shard_id,
None,
@@ -1269,11 +1267,7 @@ impl Tenant {
/// files on disk. Used at pageserver startup.
///
/// No background tasks are started as part of this routine.
async fn load_local(
self: &Arc<Tenant>,
init_order: Option<InitializationOrder>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
async fn load_local(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
span::debug_assert_current_span_has_tenant_id();
debug!("loading tenant task");
@@ -1299,7 +1293,7 @@ impl Tenant {
// Process loadable timelines first
for (timeline_id, local_metadata) in scan.sorted_timelines_to_load {
if let Err(e) = self
.load_local_timeline(timeline_id, local_metadata, init_order.as_ref(), ctx, false)
.load_local_timeline(timeline_id, local_metadata, ctx, false)
.await
{
match e {
@@ -1333,13 +1327,7 @@ impl Tenant {
}
Some(local_metadata) => {
if let Err(e) = self
.load_local_timeline(
timeline_id,
local_metadata,
init_order.as_ref(),
ctx,
true,
)
.load_local_timeline(timeline_id, local_metadata, ctx, true)
.await
{
match e {
@@ -1367,12 +1355,11 @@ impl Tenant {
/// Subroutine of `load_tenant`, to load an individual timeline
///
/// NB: The parent is assumed to be already loaded!
#[instrument(skip(self, local_metadata, init_order, ctx))]
#[instrument(skip(self, local_metadata, ctx))]
async fn load_local_timeline(
self: &Arc<Self>,
timeline_id: TimelineId,
local_metadata: TimelineMetadata,
init_order: Option<&InitializationOrder>,
ctx: &RequestContext,
found_delete_mark: bool,
) -> Result<(), LoadLocalTimelineError> {
@@ -1389,7 +1376,6 @@ impl Tenant {
&local_metadata,
None,
self.deletion_queue_client.clone(),
init_order,
)
.await
.context("resume deletion")
@@ -1406,17 +1392,9 @@ impl Tenant {
None
};
self.timeline_init_and_sync(
timeline_id,
resources,
None,
local_metadata,
ancestor,
init_order,
ctx,
)
.await
.map_err(LoadLocalTimelineError::Load)
self.timeline_init_and_sync(timeline_id, resources, None, local_metadata, ancestor, ctx)
.await
.map_err(LoadLocalTimelineError::Load)
}
pub(crate) fn tenant_id(&self) -> TenantId {
@@ -2311,7 +2289,6 @@ impl Tenant {
new_metadata: &TimelineMetadata,
ancestor: Option<Arc<Timeline>>,
resources: TimelineResources,
init_order: Option<&InitializationOrder>,
cause: CreateTimelineCause,
) -> anyhow::Result<Arc<Timeline>> {
let state = match cause {
@@ -2326,9 +2303,6 @@ impl Tenant {
CreateTimelineCause::Delete => TimelineState::Stopping,
};
let initial_logical_size_can_start = init_order.map(|x| &x.initial_logical_size_can_start);
let initial_logical_size_attempt = init_order.map(|x| &x.initial_logical_size_attempt);
let pg_version = new_metadata.pg_version();
let timeline = Timeline::new(
@@ -2339,11 +2313,10 @@ impl Tenant {
new_timeline_id,
self.tenant_shard_id,
self.generation,
self.shard_identity,
Arc::clone(&self.walredo_mgr),
resources,
pg_version,
initial_logical_size_can_start.cloned(),
initial_logical_size_attempt.cloned().flatten(),
state,
self.cancel.child_token(),
);
@@ -2358,6 +2331,7 @@ impl Tenant {
state: TenantState,
conf: &'static PageServerConf,
attached_conf: AttachedTenantConf,
shard_identity: ShardIdentity,
walredo_mgr: Arc<WalRedoManager>,
tenant_shard_id: TenantShardId,
remote_storage: Option<GenericRemoteStorage>,
@@ -2419,6 +2393,7 @@ impl Tenant {
Tenant {
tenant_shard_id,
shard_identity,
generation: attached_conf.location.generation,
conf,
// using now here is good enough approximation to catch tenants with really long
@@ -3165,7 +3140,6 @@ impl Tenant {
new_metadata,
ancestor,
resources,
None,
CreateTimelineCause::Load,
)
.context("Failed to create timeline data structure")?;
@@ -3831,6 +3805,8 @@ pub(crate) mod harness {
self.generation,
))
.unwrap(),
// This is a legacy/test code path: sharding isn't supported here.
ShardIdentity::unsharded(),
walredo_mgr,
self.tenant_shard_id,
Some(self.remote_storage.clone()),
@@ -3840,7 +3816,7 @@ pub(crate) mod harness {
match mode {
LoadMode::Local => {
tenant
.load_local(None, ctx)
.load_local(ctx)
.instrument(info_span!("try_load", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
.await?;
}
@@ -3850,7 +3826,7 @@ pub(crate) mod harness {
.instrument(info_span!("try_load_preload", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
.await?;
tenant
.attach(None, Some(preload), ctx)
.attach(Some(preload), ctx)
.instrument(info_span!("try_load", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
.await?;
}
@@ -3893,6 +3869,9 @@ pub(crate) mod harness {
pub(crate) struct TestRedoManager;
impl TestRedoManager {
/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub async fn request_redo(
&self,
key: Key,

View File

@@ -15,7 +15,6 @@ use crate::{
context::RequestContext,
task_mgr::{self, TaskKind},
tenant::mgr::{TenantSlot, TenantsMapRemoveResult},
InitializationOrder,
};
use super::{
@@ -390,7 +389,6 @@ impl DeleteTenantFlow {
tenant: &Arc<Tenant>,
preload: Option<TenantPreload>,
tenants: &'static std::sync::RwLock<TenantsMap>,
init_order: Option<InitializationOrder>,
ctx: &RequestContext,
) -> Result<(), DeleteTenantError> {
let (_, progress) = completion::channel();
@@ -400,10 +398,7 @@ impl DeleteTenantFlow {
.await
.expect("cant be stopping or broken");
tenant
.attach(init_order, preload, ctx)
.await
.context("attach")?;
tenant.attach(preload, ctx).await.context("attach")?;
Self::background(
guard,

View File

@@ -2,7 +2,8 @@
//! page server.
use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
use pageserver_api::shard::TenantShardId;
use pageserver_api::key::Key;
use pageserver_api::shard::{ShardIdentity, ShardNumber, TenantShardId};
use rand::{distributions::Alphanumeric, Rng};
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap};
@@ -130,6 +131,18 @@ pub(crate) enum TenantsMapRemoveResult {
InProgress(utils::completion::Barrier),
}
/// When resolving a TenantId to a shard, we may be looking for the 0th
/// shard, or we might be looking for whichever shard holds a particular page.
pub(crate) enum ShardSelector {
/// Only return the 0th shard, if it is present. If a non-0th shard is present,
/// ignore it.
Zero,
/// Pick the first shard we find for the TenantId
First,
/// Pick the shard that holds this key
Page(Key),
}
impl TenantsMap {
/// Convenience function for typical usage, where we want to get a `Tenant` object, for
/// working with attached tenants. If the TenantId is in the map but in Secondary state,
@@ -144,6 +157,49 @@ impl TenantsMap {
}
}
/// A page service client sends a TenantId, and to look up the correct Tenant we must
/// resolve this to a fully qualified TenantShardId.
fn resolve_shard(
&self,
tenant_id: &TenantId,
selector: ShardSelector,
) -> Option<TenantShardId> {
let mut want_shard = None;
match self {
TenantsMap::Initializing => None,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
for slot in m.range(TenantShardId::tenant_range(*tenant_id)) {
match selector {
ShardSelector::First => return Some(*slot.0),
ShardSelector::Zero if slot.0.shard_number == ShardNumber(0) => {
return Some(*slot.0)
}
ShardSelector::Page(key) => {
if let Some(tenant) = slot.1.get_attached() {
// First slot we see for this tenant, calculate the expected shard number
// for the key: we will use this for checking if this and subsequent
// slots contain the key, rather than recalculating the hash each time.
if want_shard.is_none() {
want_shard = Some(tenant.shard_identity.get_shard_number(&key));
}
if Some(tenant.shard_identity.number) == want_shard {
return Some(*slot.0);
}
} else {
continue;
}
}
_ => continue,
}
}
// Fall through: we didn't find an acceptable shard
None
}
}
}
/// Only for use from DeleteTenantFlow. This method directly removes a TenantSlot from the map.
///
/// The normal way to remove a tenant is using a SlotGuard, which will gracefully remove the guarded
@@ -515,12 +571,14 @@ pub async fn init_tenant_mgr(
location_conf.attach_in_generation(generation);
Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await?;
let shard_identity = location_conf.shard;
match tenant_spawn(
conf,
tenant_shard_id,
&tenant_dir_path,
resources.clone(),
AttachedTenantConf::try_from(location_conf)?,
shard_identity,
Some(init_order.clone()),
&TENANTS,
SpawnMode::Normal,
@@ -561,6 +619,7 @@ pub(crate) fn tenant_spawn(
tenant_path: &Utf8Path,
resources: TenantSharedResources,
location_conf: AttachedTenantConf,
shard_identity: ShardIdentity,
init_order: Option<InitializationOrder>,
tenants: &'static std::sync::RwLock<TenantsMap>,
mode: SpawnMode,
@@ -593,6 +652,7 @@ pub(crate) fn tenant_spawn(
tenant_shard_id,
resources,
location_conf,
shard_identity,
init_order,
tenants,
mode,
@@ -762,12 +822,14 @@ pub(crate) async fn create_tenant(
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
let tenant_path = super::create_tenant_files(conf, &location_conf, &tenant_shard_id).await?;
let shard_identity = location_conf.shard;
let created_tenant = tenant_spawn(
conf,
tenant_shard_id,
&tenant_path,
resources,
AttachedTenantConf::try_from(location_conf)?,
shard_identity,
None,
&TENANTS,
SpawnMode::Create,
@@ -860,6 +922,7 @@ impl TenantManager {
Ok(())
}
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
pub(crate) async fn upsert_location(
&self,
tenant_shard_id: TenantShardId,
@@ -996,12 +1059,14 @@ impl TenantManager {
.await
.map_err(SetNewTenantConfigError::Persist)?;
let shard_identity = new_location_config.shard;
let tenant = tenant_spawn(
self.conf,
tenant_shard_id,
&tenant_path,
self.resources.clone(),
AttachedTenantConf::try_from(new_location_config)?,
shard_identity,
None,
self.tenants,
SpawnMode::Normal,
@@ -1100,6 +1165,7 @@ pub(crate) enum GetActiveTenantError {
/// then wait for up to `timeout` (minus however long we waited for the slot).
pub(crate) async fn get_active_tenant_with_timeout(
tenant_id: TenantId,
shard_selector: ShardSelector,
timeout: Duration,
cancel: &CancellationToken,
) -> Result<Arc<Tenant>, GetActiveTenantError> {
@@ -1108,15 +1174,17 @@ pub(crate) async fn get_active_tenant_with_timeout(
Tenant(Arc<Tenant>),
}
// TODO(sharding): make page service interface sharding-aware (page service should apply ShardIdentity to the key
// to decide which shard services the request)
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let wait_start = Instant::now();
let deadline = wait_start + timeout;
let wait_for = {
let (wait_for, tenant_shard_id) = {
let locked = TENANTS.read().unwrap();
// Resolve TenantId to TenantShardId
let tenant_shard_id = locked.resolve_shard(&tenant_id, shard_selector).ok_or(
GetActiveTenantError::NotFound(GetTenantError::NotFound(tenant_id)),
)?;
let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
.map_err(GetTenantError::MapState)?;
match peek_slot {
@@ -1126,7 +1194,7 @@ pub(crate) async fn get_active_tenant_with_timeout(
// Fast path: we don't need to do any async waiting.
return Ok(tenant.clone());
}
_ => WaitFor::Tenant(tenant.clone()),
_ => (WaitFor::Tenant(tenant.clone()), tenant_shard_id),
}
}
Some(TenantSlot::Secondary) => {
@@ -1134,7 +1202,9 @@ pub(crate) async fn get_active_tenant_with_timeout(
tenant_id,
)))
}
Some(TenantSlot::InProgress(barrier)) => WaitFor::Barrier(barrier.clone()),
Some(TenantSlot::InProgress(barrier)) => {
(WaitFor::Barrier(barrier.clone()), tenant_shard_id)
}
None => {
return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound(
tenant_id,
@@ -1377,12 +1447,14 @@ pub(crate) async fn load_tenant(
Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await?;
let shard_identity = location_conf.shard;
let new_tenant = tenant_spawn(
conf,
tenant_shard_id,
&tenant_path,
resources,
AttachedTenantConf::try_from(location_conf)?,
shard_identity,
None,
&TENANTS,
SpawnMode::Normal,
@@ -1472,12 +1544,14 @@ pub(crate) async fn attach_tenant(
// TODO: tenant directory remains on disk if we bail out from here on.
// See https://github.com/neondatabase/neon/issues/4233
let shard_identity = location_conf.shard;
let attached_tenant = tenant_spawn(
conf,
tenant_shard_id,
&tenant_dir,
resources,
AttachedTenantConf::try_from(location_conf)?,
shard_identity,
None,
&TENANTS,
SpawnMode::Normal,

View File

@@ -363,7 +363,7 @@ pub(super) async fn download_index_part(
None => {
// Migration from legacy pre-generation state: we have a generation but no prior
// attached pageservers did. Try to load from a no-generation path.
tracing::info!("No index_part.json* found");
tracing::debug!("No index_part.json* found");
do_download_index_part(
storage,
tenant_shard_id,

View File

@@ -230,6 +230,10 @@ impl Layer {
///
/// It is up to the caller to collect more data from the previous layer and
/// perform WAL redo, if necessary.
///
/// # Cancellation-Safety
///
/// This method is cancellation-safe.
pub(crate) async fn get_value_reconstruct_data(
&self,
key: Key,

View File

@@ -44,6 +44,7 @@ pub(crate) enum BackgroundLoopKind {
Eviction,
ConsumptionMetricsCollectMetrics,
ConsumptionMetricsSyntheticSizeWorker,
InitialLogicalSizeCalculation,
}
impl BackgroundLoopKind {

View File

@@ -18,25 +18,29 @@ use pageserver_api::{
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, LayerMapInfo,
TimelineState,
},
shard::TenantShardId,
shard::{ShardIdentity, TenantShardId},
};
use rand::Rng;
use serde_with::serde_as;
use storage_broker::BrokerClientChannel;
use tokio::{
runtime::Handle,
sync::{oneshot, watch, TryAcquireError},
sync::{oneshot, watch},
};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::{id::TenantTimelineId, sync::gate::Gate};
use std::cmp::{max, min, Ordering};
use std::collections::{BinaryHeap, HashMap, HashSet};
use std::ops::{Deref, Range};
use std::pin::pin;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::{Arc, Mutex, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
use std::{
cmp::{max, min, Ordering},
ops::ControlFlow,
};
use crate::context::{
AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder,
@@ -163,6 +167,10 @@ pub struct Timeline {
/// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
pub(crate) generation: Generation,
/// The detailed sharding information from our parent Tenant. This enables us to map keys
/// to shards, and is constant through the lifetime of this Timeline.
shard_identity: ShardIdentity,
pub pg_version: u32,
/// The tuple has two elements.
@@ -298,13 +306,6 @@ pub struct Timeline {
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
/// Barrier to wait before doing initial logical size calculation. Used only during startup.
initial_logical_size_can_start: Option<completion::Barrier>,
/// Completion shared between all timelines loaded during startup; used to delay heavier
/// background tasks until some logical sizes have been calculated.
initial_logical_size_attempt: Mutex<Option<completion::Completion>>,
/// Load or creation time information about the disk_consistent_lsn and when the loading
/// happened. Used for consumption metrics.
pub(crate) loaded_at: (Lsn, SystemTime),
@@ -453,6 +454,11 @@ pub enum LogicalSizeCalculationCause {
TenantSizeHandler,
}
pub enum GetLogicalSizePriority {
User,
Background,
}
#[derive(enumset::EnumSetType)]
pub(crate) enum CompactFlags {
ForceRepartition,
@@ -489,6 +495,9 @@ impl Timeline {
/// an ancestor branch, for example, or waste a lot of cycles chasing the
/// non-existing key.
///
/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub async fn get(
&self,
key: Key,
@@ -801,7 +810,12 @@ impl Timeline {
.access_stats_behavior(AccessStatsBehavior::Skip)
.build();
// 2. Create new image layers for partitions that have been modified
// 2. Compact
let timer = self.metrics.compact_time_histo.start_timer();
self.compact_level0(target_file_size, ctx).await?;
timer.stop_and_record();
// 3. Create new image layers for partitions that have been modified
// "enough".
let layers = self
.create_image_layers(&partitioning, lsn, false, &image_ctx)
@@ -813,11 +827,6 @@ impl Timeline {
}
}
// 3. Compact
let timer = self.metrics.compact_time_histo.start_timer();
self.compact_level0(target_file_size, ctx).await?;
timer.stop_and_record();
if let Some(remote_client) = &self.remote_client {
// should any new image layer been created, not uploading index_part will
// result in a mismatch between remote_physical_size and layermap calculated
@@ -849,46 +858,6 @@ impl Timeline {
}
}
/// Retrieve current logical size of the timeline.
///
/// The size could be lagging behind the actual number, in case
/// the initial size calculation has not been run (gets triggered on the first size access).
///
/// return size and boolean flag that shows if the size is exact
pub(crate) fn get_current_logical_size(
self: &Arc<Self>,
ctx: &RequestContext,
) -> logical_size::CurrentLogicalSize {
let current_size = self.current_logical_size.current_size();
debug!("Current size: {current_size:?}");
if let (CurrentLogicalSize::Approximate(_), Some(initial_part_end)) =
(current_size, self.current_logical_size.initial_part_end)
{
self.try_spawn_size_init_task(initial_part_end, ctx);
}
if let CurrentLogicalSize::Approximate(_) = &current_size {
if ctx.task_kind() == TaskKind::WalReceiverConnectionHandler {
let first = self
.current_logical_size
.did_return_approximate_to_walreceiver
.compare_exchange(
false,
true,
AtomicOrdering::Relaxed,
AtomicOrdering::Relaxed,
)
.is_ok();
if first {
crate::metrics::initial_logical_size::TIMELINES_WHERE_WALRECEIVER_GOT_APPROXIMATE_SIZE.inc();
}
}
}
current_size
}
/// Check if more than 'checkpoint_distance' of WAL has been accumulated in
/// the in-memory layer, and initiate flushing it if so.
///
@@ -938,6 +907,7 @@ impl Timeline {
background_jobs_can_start: Option<&completion::Barrier>,
ctx: &RequestContext,
) {
self.spawn_initial_logical_size_computation_task(ctx);
self.launch_wal_receiver(ctx, broker_client);
self.set_state(TimelineState::Active);
self.launch_eviction_task(background_jobs_can_start);
@@ -1051,17 +1021,6 @@ impl Timeline {
error!("Not activating a Stopping timeline");
}
(_, new_state) => {
if matches!(
new_state,
TimelineState::Stopping | TimelineState::Broken { .. }
) {
// drop the completion guard, if any; it might be holding off the completion
// forever needlessly
self.initial_logical_size_attempt
.lock()
.unwrap_or_else(|e| e.into_inner())
.take();
}
self.state.send_replace(new_state);
}
}
@@ -1380,11 +1339,10 @@ impl Timeline {
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
generation: Generation,
shard_identity: ShardIdentity,
walredo_mgr: Arc<super::WalRedoManager>,
resources: TimelineResources,
pg_version: u32,
initial_logical_size_can_start: Option<completion::Barrier>,
initial_logical_size_attempt: Option<completion::Completion>,
state: TimelineState,
cancel: CancellationToken,
) -> Arc<Self> {
@@ -1411,6 +1369,7 @@ impl Timeline {
timeline_id,
tenant_shard_id,
generation,
shard_identity,
pg_version,
layers: Arc::new(tokio::sync::RwLock::new(LayerManager::create())),
wanted_image_layers: Mutex::new(None),
@@ -1484,8 +1443,6 @@ impl Timeline {
),
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTimelineFlow::default())),
initial_logical_size_can_start,
initial_logical_size_attempt: Mutex::new(initial_logical_size_attempt),
cancel,
gate: Gate::new(format!("Timeline<{tenant_shard_id}/{timeline_id}>")),
@@ -1797,39 +1754,91 @@ impl Timeline {
Ok(())
}
fn try_spawn_size_init_task(self: &Arc<Self>, lsn: Lsn, ctx: &RequestContext) {
let state = self.current_state();
if matches!(
state,
TimelineState::Broken { .. } | TimelineState::Stopping
) {
// Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken).
return;
/// Retrieve current logical size of the timeline.
///
/// The size could be lagging behind the actual number, in case
/// the initial size calculation has not been run (gets triggered on the first size access).
///
/// return size and boolean flag that shows if the size is exact
pub(crate) fn get_current_logical_size(
self: &Arc<Self>,
priority: GetLogicalSizePriority,
ctx: &RequestContext,
) -> logical_size::CurrentLogicalSize {
let current_size = self.current_logical_size.current_size();
debug!("Current size: {current_size:?}");
match (current_size.accuracy(), priority) {
(logical_size::Accuracy::Exact, _) => (), // nothing to do
(logical_size::Accuracy::Approximate, GetLogicalSizePriority::Background) => {
// background task will eventually deliver an exact value, we're in no rush
}
(logical_size::Accuracy::Approximate, GetLogicalSizePriority::User) => {
// background task is not ready, but user is asking for it now;
// => make the background task skip the line
// (The alternative would be to calculate the size here, but,
// it can actually take a long time if the user has a lot of rels.
// And we'll inevitable need it again; So, let the background task do the work.)
match self
.current_logical_size
.cancel_wait_for_background_loop_concurrency_limit_semaphore
.get()
{
Some(cancel) => cancel.cancel(),
None => {
let state = self.current_state();
if matches!(
state,
TimelineState::Broken { .. } | TimelineState::Stopping
) {
// Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken).
// Don't make noise.
} else {
warn!("unexpected: cancel_wait_for_background_loop_concurrency_limit_semaphore not set, priority-boosting of logical size calculation will not work");
}
}
};
}
}
let permit = match Arc::clone(&self.current_logical_size.initial_size_computation)
.try_acquire_owned()
{
Ok(permit) => permit,
Err(TryAcquireError::NoPermits) => {
// computation already ongoing or finished with success
return;
if let CurrentLogicalSize::Approximate(_) = &current_size {
if ctx.task_kind() == TaskKind::WalReceiverConnectionHandler {
let first = self
.current_logical_size
.did_return_approximate_to_walreceiver
.compare_exchange(
false,
true,
AtomicOrdering::Relaxed,
AtomicOrdering::Relaxed,
)
.is_ok();
if first {
crate::metrics::initial_logical_size::TIMELINES_WHERE_WALRECEIVER_GOT_APPROXIMATE_SIZE.inc();
}
}
Err(TryAcquireError::Closed) => unreachable!("we never call close"),
};
debug_assert!(self
.current_logical_size
.initial_logical_size
.get()
.is_none());
}
current_size
}
fn spawn_initial_logical_size_computation_task(self: &Arc<Self>, ctx: &RequestContext) {
let Some(initial_part_end) = self.current_logical_size.initial_part_end else {
// nothing to do for freshly created timelines;
assert_eq!(
self.current_logical_size.current_size().accuracy(),
logical_size::Accuracy::Exact,
);
return;
};
let cancel_wait_for_background_loop_concurrency_limit_semaphore = CancellationToken::new();
let token = cancel_wait_for_background_loop_concurrency_limit_semaphore.clone();
self.current_logical_size
.cancel_wait_for_background_loop_concurrency_limit_semaphore.set(token)
.expect("initial logical size calculation task must be spawned exactly once per Timeline object");
info!(
"spawning logical size computation from context of task kind {:?}",
ctx.task_kind()
);
let causing_task_kind = ctx.task_kind();
// We need to start the computation task.
// It gets a separate context since it will outlive the request that called this function.
let self_clone = Arc::clone(self);
let background_ctx = ctx.detached_child(
TaskKind::InitialLogicalSizeCalculation,
@@ -1844,96 +1853,152 @@ impl Timeline {
false,
// NB: don't log errors here, task_mgr will do that.
async move {
let cancel = task_mgr::shutdown_token();
self_clone
.initial_logical_size_calculation_task(
initial_part_end,
cancel_wait_for_background_loop_concurrency_limit_semaphore,
cancel,
background_ctx,
)
.await;
Ok(())
}
.instrument(info_span!(parent: None, "initial_size_calculation", tenant_id=%self.tenant_shard_id.tenant_id, timeline_id=%self.timeline_id)),
);
}
// in case we were created during pageserver initialization, wait for
// initialization to complete before proceeding. startup time init runs on the same
// runtime.
tokio::select! {
_ = cancel.cancelled() => { return Ok(()); },
_ = completion::Barrier::maybe_wait(self_clone.initial_logical_size_can_start.clone()) => {}
async fn initial_logical_size_calculation_task(
self: Arc<Self>,
initial_part_end: Lsn,
skip_concurrency_limiter: CancellationToken,
cancel: CancellationToken,
background_ctx: RequestContext,
) {
enum BackgroundCalculationError {
Cancelled,
Other(anyhow::Error),
}
let try_once = |attempt: usize| {
let background_ctx = &background_ctx;
let self_ref = &self;
let skip_concurrency_limiter = &skip_concurrency_limiter;
async move {
let cancel = task_mgr::shutdown_token();
let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit(
BackgroundLoopKind::InitialLogicalSizeCalculation,
background_ctx,
&cancel,
);
use crate::metrics::initial_logical_size::StartCircumstances;
let (_maybe_permit, circumstances) = tokio::select! {
res = wait_for_permit => {
match res {
Ok(permit) => (Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit),
Err(RateLimitError::Cancelled) => {
return Err(BackgroundCalculationError::Cancelled);
}
}
}
() = skip_concurrency_limiter.cancelled() => {
// Some action that is part of a end user interaction requested logical size
// => break out of the rate limit
// TODO: ideally we'd not run on BackgroundRuntime but the requester's runtime;
// but then again what happens if they cancel; also, we should just be using
// one runtime across the entire process, so, let's leave this for now.
(None, StartCircumstances::SkippedConcurrencyLimiter)
}
};
// hold off background tasks from starting until all timelines get to try at least
// once initial logical size calculation; though retry will rarely be useful.
// holding off is done because heavier tasks execute blockingly on the same
// runtime.
//
// dropping this at every outcome is probably better than trying to cling on to it,
// delay will be terminated by a timeout regardless.
let completion = { self_clone.initial_logical_size_attempt.lock().expect("unexpected initial_logical_size_attempt poisoned").take() };
let metrics_guard = match &completion {
Some(_) => crate::metrics::initial_logical_size::START_CALCULATION.first(Some(causing_task_kind)),
None => crate::metrics::initial_logical_size::START_CALCULATION.retry(Some(causing_task_kind)),
let metrics_guard = if attempt == 1 {
crate::metrics::initial_logical_size::START_CALCULATION.first(circumstances)
} else {
crate::metrics::initial_logical_size::START_CALCULATION.retry(circumstances)
};
let calculated_size = match self_clone
.logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx)
match self_ref
.logical_size_calculation_task(
initial_part_end,
LogicalSizeCalculationCause::Initial,
background_ctx,
)
.await
{
Ok(s) => s,
Ok(calculated_size) => Ok((calculated_size, metrics_guard)),
Err(CalculateLogicalSizeError::Cancelled) => {
// Don't make noise, this is a common task.
// In the unlikely case that there is another call to this function, we'll retry
// because initial_logical_size is still None.
info!("initial size calculation cancelled, likely timeline delete / tenant detach");
return Ok(());
Err(BackgroundCalculationError::Cancelled)
}
Err(CalculateLogicalSizeError::Other(err)) => {
if let Some(e @ PageReconstructError::AncestorStopping(_)) =
if let Some(PageReconstructError::AncestorStopping(_)) =
err.root_cause().downcast_ref()
{
// This can happen if the timeline parent timeline switches to
// Stopping state while we're still calculating the initial
// timeline size for the child, for example if the tenant is
// being detached or the pageserver is shut down. Like with
// CalculateLogicalSizeError::Cancelled, don't make noise.
info!("initial size calculation failed because the timeline or its ancestor is Stopping, likely because the tenant is being detached: {e:#}");
return Ok(());
Err(BackgroundCalculationError::Cancelled)
} else {
Err(BackgroundCalculationError::Other(err))
}
return Err(err.context("Failed to calculate logical size"));
}
};
// we cannot query current_logical_size.current_size() to know the current
// *negative* value, only truncated to u64.
let added = self_clone
.current_logical_size
.size_added_after_initial
.load(AtomicOrdering::Relaxed);
let sum = calculated_size.saturating_add_signed(added);
// set the gauge value before it can be set in `update_current_logical_size`.
self_clone.metrics.current_logical_size_gauge.set(sum);
match self_clone
.current_logical_size
.initial_logical_size
.set((calculated_size, metrics_guard.calculation_result_saved()))
{
Ok(()) => (),
Err(_what_we_just_attempted_to_set) => {
let (existing_size, _) = self_clone
.current_logical_size
.initial_logical_size
.get()
.expect("once_cell set was lost, then get failed, impossible.");
// This shouldn't happen because the semaphore is initialized with 1.
// But if it happens, just complain & report success so there are no further retries.
error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing")
}
}
// now that `initial_logical_size.is_some()`, reduce permit count to 0
// so that we prevent future callers from spawning this task
permit.forget();
Ok(())
}.in_current_span(),
);
}
};
let retrying = async {
let mut attempt = 0;
loop {
attempt += 1;
match try_once(attempt).await {
Ok(res) => return ControlFlow::Continue(res),
Err(BackgroundCalculationError::Cancelled) => return ControlFlow::Break(()),
Err(BackgroundCalculationError::Other(e)) => {
warn!(attempt, "initial size calculation failed: {e:?}");
// exponential back-off doesn't make sense at these long intervals;
// use fixed retry interval with generous jitter instead
let sleep_duration = Duration::from_secs(
u64::try_from(
// 1hour base
(60_i64 * 60_i64)
// 10min jitter
+ rand::thread_rng().gen_range(-10 * 60..10 * 60),
)
.expect("10min < 1hour"),
);
tokio::time::sleep(sleep_duration).await;
}
}
}
};
let (calculated_size, metrics_guard) = tokio::select! {
res = retrying => {
match res {
ControlFlow::Continue(calculated_size) => calculated_size,
ControlFlow::Break(()) => return,
}
}
_ = cancel.cancelled() => {
return;
}
};
// we cannot query current_logical_size.current_size() to know the current
// *negative* value, only truncated to u64.
let added = self
.current_logical_size
.size_added_after_initial
.load(AtomicOrdering::Relaxed);
let sum = calculated_size.saturating_add_signed(added);
// set the gauge value before it can be set in `update_current_logical_size`.
self.metrics.current_logical_size_gauge.set(sum);
self.current_logical_size
.initial_logical_size
.set((calculated_size, metrics_guard.calculation_result_saved()))
.ok()
.expect("only this task sets it");
}
pub fn spawn_ondemand_logical_size_calculation(
@@ -1971,6 +2036,9 @@ impl Timeline {
receiver
}
/// # Cancel-Safety
///
/// This method is cancellation-safe.
#[instrument(skip_all)]
async fn logical_size_calculation_task(
self: &Arc<Self>,
@@ -2008,6 +2076,10 @@ impl Timeline {
///
/// NOTE: counted incrementally, includes ancestors. This can be a slow operation,
/// especially if we need to download remote layers.
///
/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub async fn calculate_logical_size(
&self,
up_to_lsn: Lsn,
@@ -2123,6 +2195,10 @@ impl Timeline {
///
/// This function takes the current timeline's locked LayerMap as an argument,
/// so callers can avoid potential race conditions.
///
/// # Cancel-Safety
///
/// This method is cancellation-safe.
async fn get_reconstruct_data(
&self,
key: Key,
@@ -2371,6 +2447,9 @@ impl Timeline {
}
}
/// # Cancel-safety
///
/// This method is cancellation-safe.
async fn lookup_cached_page(
&self,
key: &Key,
@@ -2405,6 +2484,10 @@ impl Timeline {
Ok(Arc::clone(ancestor))
}
pub(crate) fn get_shard_identity(&self) -> &ShardIdentity {
&self.shard_identity
}
///
/// Get a handle to the latest layer for appending.
///

View File

@@ -21,7 +21,6 @@ use crate::{
},
CreateTimelineCause, DeleteTimelineError, Tenant,
},
InitializationOrder,
};
use super::{Timeline, TimelineResources};
@@ -407,7 +406,6 @@ impl DeleteTimelineFlow {
local_metadata: &TimelineMetadata,
remote_client: Option<RemoteTimelineClient>,
deletion_queue_client: DeletionQueueClient,
init_order: Option<&InitializationOrder>,
) -> anyhow::Result<()> {
// Note: here we even skip populating layer map. Timeline is essentially uninitialized.
// RemoteTimelineClient is the only functioning part.
@@ -420,7 +418,6 @@ impl DeleteTimelineFlow {
remote_client,
deletion_queue_client,
},
init_order,
// Important. We dont pass ancestor above because it can be missing.
// Thus we need to skip the validation here.
CreateTimelineCause::Delete,

View File

@@ -1,11 +1,10 @@
use anyhow::Context;
use once_cell::sync::OnceCell;
use tokio::sync::Semaphore;
use once_cell::sync::OnceCell;
use tokio_util::sync::CancellationToken;
use utils::lsn::Lsn;
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering as AtomicOrdering};
use std::sync::Arc;
/// Internal structure to hold all data needed for logical size calculation.
///
@@ -28,8 +27,12 @@ pub(super) struct LogicalSize {
crate::metrics::initial_logical_size::FinishedCalculationGuard,
)>,
/// Semaphore to track ongoing calculation of `initial_logical_size`.
pub initial_size_computation: Arc<tokio::sync::Semaphore>,
/// Cancellation for the best-effort logical size calculation.
///
/// The token is kept in a once-cell so that we can error out if a higher priority
/// request comes in *before* we have started the normal logical size calculation.
pub(crate) cancel_wait_for_background_loop_concurrency_limit_semaphore:
OnceCell<CancellationToken>,
/// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines.
pub initial_part_end: Option<Lsn>,
@@ -72,7 +75,7 @@ pub(crate) enum CurrentLogicalSize {
Exact(Exact),
}
#[derive(Debug, Copy, Clone)]
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) enum Accuracy {
Approximate,
Exact,
@@ -115,11 +118,10 @@ impl LogicalSize {
Self {
initial_logical_size: OnceCell::with_value((0, {
crate::metrics::initial_logical_size::START_CALCULATION
.first(None)
.first(crate::metrics::initial_logical_size::StartCircumstances::EmptyInitial)
.calculation_result_saved()
})),
// initial_logical_size already computed, so, don't admit any calculations
initial_size_computation: Arc::new(Semaphore::new(0)),
cancel_wait_for_background_loop_concurrency_limit_semaphore: OnceCell::new(),
initial_part_end: None,
size_added_after_initial: AtomicI64::new(0),
did_return_approximate_to_walreceiver: AtomicBool::new(false),
@@ -129,7 +131,7 @@ impl LogicalSize {
pub(super) fn deferred_initial(compute_to: Lsn) -> Self {
Self {
initial_logical_size: OnceCell::new(),
initial_size_computation: Arc::new(Semaphore::new(1)),
cancel_wait_for_background_loop_concurrency_limit_semaphore: OnceCell::new(),
initial_part_end: Some(compute_to),
size_added_after_initial: AtomicI64::new(0),
did_return_approximate_to_walreceiver: AtomicBool::new(false),

View File

@@ -397,7 +397,10 @@ pub(super) async fn handle_walreceiver_connection(
// Send the replication feedback message.
// Regular standby_status_update fields are put into this message.
let current_timeline_size = timeline
.get_current_logical_size(&ctx)
.get_current_logical_size(
crate::tenant::timeline::GetLogicalSizePriority::User,
&ctx,
)
// FIXME: https://github.com/neondatabase/neon/issues/5963
.size_dont_care_about_accuracy();
let status_update = PageserverFeedback {

View File

@@ -21,6 +21,7 @@
//! redo Postgres process, but some records it can handle directly with
//! bespoken Rust code.
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
@@ -30,6 +31,7 @@ use bytes::{Buf, Bytes, BytesMut};
use tracing::*;
use crate::context::RequestContext;
use crate::metrics::WAL_INGEST;
use crate::pgdatadir_mapping::*;
use crate::tenant::PageReconstructError;
use crate::tenant::Timeline;
@@ -46,6 +48,7 @@ use postgres_ffi::BLCKSZ;
use utils::lsn::Lsn;
pub struct WalIngest<'a> {
shard: ShardIdentity,
timeline: &'a Timeline,
checkpoint: CheckPoint,
@@ -65,6 +68,7 @@ impl<'a> WalIngest<'a> {
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
Ok(WalIngest {
shard: *timeline.get_shard_identity(),
timeline,
checkpoint,
checkpoint_modified: false,
@@ -87,6 +91,8 @@ impl<'a> WalIngest<'a> {
decoded: &mut DecodedWALRecord,
ctx: &RequestContext,
) -> anyhow::Result<()> {
WAL_INGEST.records_received.inc();
modification.lsn = lsn;
decode_wal_record(recdata, decoded, self.timeline.pg_version)?;
@@ -355,6 +361,32 @@ impl<'a> WalIngest<'a> {
// Iterate through all the blocks that the record modifies, and
// "put" a separate copy of the record for each block.
for blk in decoded.blocks.iter() {
let rel = RelTag {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum,
};
let key = rel_block_to_key(rel, blk.blkno);
let key_is_local = self.shard.is_key_local(&key);
tracing::debug!(
"ingest: shard decision {} (checkpoint={}) for key {}",
if !key_is_local { "drop" } else { "keep" },
self.checkpoint_modified,
key
);
if !key_is_local {
if self.shard.is_zero() {
// Shard 0 tracks relation sizes. Although we will not store this block, we will observe
// its blkno in case it implicitly extends a relation.
self.observe_decoded_block(modification, blk, ctx).await?;
}
continue;
}
self.ingest_decoded_block(modification, lsn, decoded, blk, ctx)
.await?;
}
@@ -367,13 +399,37 @@ impl<'a> WalIngest<'a> {
self.checkpoint_modified = false;
}
if modification.is_empty() {
tracing::debug!("ingest: filtered out record @ LSN {lsn}");
WAL_INGEST.records_filtered.inc();
return Ok(());
}
// Now that this record has been fully handled, including updating the
// checkpoint data, let the repository know that it is up-to-date to this LSN
WAL_INGEST.records_committed.inc();
modification.commit(ctx).await?;
Ok(())
}
/// Do not store this block, but observe it for the purposes of updating our relation size state.
async fn observe_decoded_block(
&mut self,
modification: &mut DatadirModification<'_>,
blk: &DecodedBkpBlock,
ctx: &RequestContext,
) -> Result<(), PageReconstructError> {
let rel = RelTag {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum,
};
self.handle_rel_extend(modification, rel, blk.blkno, ctx)
.await
}
async fn ingest_decoded_block(
&mut self,
modification: &mut DatadirModification<'_>,
@@ -1465,8 +1521,15 @@ impl<'a> WalIngest<'a> {
//info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
modification.put_rel_extend(rel, new_nblocks, ctx).await?;
let mut key = rel_block_to_key(rel, blknum);
// fill the gap with zeros
for gap_blknum in old_nblocks..blknum {
key.field6 = gap_blknum;
if self.shard.get_shard_number(&key) != self.shard.number {
continue;
}
modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone())?;
}
}

View File

@@ -34,7 +34,6 @@ use std::process::{Child, ChildStdin, ChildStdout, Command};
use std::sync::{Arc, Mutex, MutexGuard, RwLock};
use std::time::Duration;
use std::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::{bin_ser::BeSer, id::TenantId, lsn::Lsn, nonblock::set_nonblock};
@@ -124,7 +123,9 @@ impl PostgresRedoManager {
/// The WAL redo is handled by a separate thread, so this just sends a request
/// to the thread and waits for response.
///
/// CANCEL SAFETY: NOT CANCEL SAFE.
/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub async fn request_redo(
&self,
key: Key,
@@ -157,7 +158,6 @@ impl PostgresRedoManager {
self.conf.wal_redo_timeout,
pg_version,
)
.await
};
img = Some(result?);
@@ -178,7 +178,6 @@ impl PostgresRedoManager {
self.conf.wal_redo_timeout,
pg_version,
)
.await
}
}
}
@@ -216,7 +215,7 @@ impl PostgresRedoManager {
/// Process one request for WAL redo using wal-redo postgres
///
#[allow(clippy::too_many_arguments)]
async fn apply_batch_postgres(
fn apply_batch_postgres(
&self,
key: Key,
lsn: Lsn,
@@ -332,12 +331,7 @@ impl PostgresRedoManager {
// than we can SIGKILL & `wait` for them to exit. By doing it the way we do here,
// we limit this risk of run-away to at most $num_runtimes * $num_executor_threads.
// This probably needs revisiting at some later point.
let mut wait_done = proc.stderr_logger_task_done.clone();
drop(proc);
wait_done
.wait_for(|v| *v)
.await
.expect("we use scopeguard to ensure we always send `true` to the channel before dropping the sender");
} else if n_attempts != 0 {
info!(n_attempts, "retried walredo succeeded");
}
@@ -649,8 +643,6 @@ struct WalRedoProcess {
child: Option<NoLeakChild>,
stdout: Mutex<ProcessOutput>,
stdin: Mutex<ProcessInput>,
stderr_logger_cancel: CancellationToken,
stderr_logger_task_done: tokio::sync::watch::Receiver<bool>,
/// Counter to separate same sized walredo inputs failing at the same millisecond.
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize,
@@ -699,6 +691,8 @@ impl WalRedoProcess {
let stdin = child.stdin.take().unwrap();
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
let stderr = tokio::process::ChildStderr::from_std(stderr)
.context("convert to tokio::ChildStderr")?;
macro_rules! set_nonblock_or_log_err {
($file:ident) => {{
let res = set_nonblock($file.as_raw_fd());
@@ -710,69 +704,45 @@ impl WalRedoProcess {
}
set_nonblock_or_log_err!(stdin)?;
set_nonblock_or_log_err!(stdout)?;
set_nonblock_or_log_err!(stderr)?;
let mut stderr = tokio::io::unix::AsyncFd::new(stderr).context("AsyncFd::with_interest")?;
// all fallible operations post-spawn are complete, so get rid of the guard
let child = scopeguard::ScopeGuard::into_inner(child);
let stderr_logger_cancel = CancellationToken::new();
let (stderr_logger_task_done_tx, stderr_logger_task_done_rx) =
tokio::sync::watch::channel(false);
tokio::spawn({
let stderr_logger_cancel = stderr_logger_cancel.clone();
tokio::spawn(
async move {
scopeguard::defer! {
debug!("wal-redo-postgres stderr_logger_task finished");
let _ = stderr_logger_task_done_tx.send(true);
crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_finished.inc();
}
debug!("wal-redo-postgres stderr_logger_task started");
loop {
// NB: we purposefully don't do a select! for the cancellation here.
// The cancellation would likely cause us to miss stderr messages.
// We can rely on this to return from .await because when we SIGKILL
// the child, the writing end of the stderr pipe gets closed.
match stderr.readable_mut().await {
Ok(mut guard) => {
let mut errbuf = [0; 16384];
let res = guard.try_io(|fd| {
use std::io::Read;
fd.get_mut().read(&mut errbuf)
});
match res {
Ok(Ok(0)) => {
// it closed the stderr pipe
break;
}
Ok(Ok(n)) => {
// The message might not be split correctly into lines here. But this is
// good enough, the important thing is to get the message to the log.
let output = String::from_utf8_lossy(&errbuf[0..n]).to_string();
error!(output, "received output");
},
Ok(Err(e)) => {
error!(error = ?e, "read() error, waiting for cancellation");
stderr_logger_cancel.cancelled().await;
error!(error = ?e, "read() error, cancellation complete");
break;
}
Err(e) => {
let _e: tokio::io::unix::TryIoError = e;
// the read() returned WouldBlock, that's expected
}
}
crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_started.inc();
use tokio::io::AsyncBufReadExt;
let mut stderr_lines = tokio::io::BufReader::new(stderr);
let mut buf = Vec::new();
let res = loop {
buf.clear();
// TODO we don't trust the process to cap its stderr length.
// Currently it can do unbounded Vec allocation.
match stderr_lines.read_until(b'\n', &mut buf).await {
Ok(0) => break Ok(()), // eof
Ok(num_bytes) => {
let output = String::from_utf8_lossy(&buf[..num_bytes]);
error!(%output, "received output");
}
Err(e) => {
error!(error = ?e, "read() error, waiting for cancellation");
stderr_logger_cancel.cancelled().await;
error!(error = ?e, "read() error, cancellation complete");
break;
break Err(e);
}
}
};
match res {
Ok(()) => (),
Err(e) => {
error!(error=?e, "failed to read from walredo stderr");
}
}
}.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_id, %pg_version))
});
);
Ok(Self {
conf,
@@ -787,8 +757,6 @@ impl WalRedoProcess {
pending_responses: VecDeque::new(),
n_processed_responses: 0,
}),
stderr_logger_cancel,
stderr_logger_task_done: stderr_logger_task_done_rx,
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize::default(),
})
@@ -1029,7 +997,6 @@ impl Drop for WalRedoProcess {
.take()
.expect("we only do this once")
.kill_and_wait(WalRedoKillCause::WalRedoProcessDrop);
self.stderr_logger_cancel.cancel();
// no way to wait for stderr_logger_task from Drop because that is async only
}
}

View File

@@ -18,6 +18,7 @@
#include "fmgr.h"
#include "access/xlog.h"
#include "access/xlogutils.h"
#include "common/hashfn.h"
#include "storage/buf_internals.h"
#include "storage/lwlock.h"
#include "storage/ipc.h"
@@ -36,22 +37,12 @@
#include "neon.h"
#include "walproposer.h"
#include "neon_utils.h"
#include "control_plane_connector.h"
#define PageStoreTrace DEBUG5
#define RECONNECT_INTERVAL_USEC 1000000
bool connected = false;
PGconn *pageserver_conn = NULL;
/*
* WaitEventSet containing:
* - WL_SOCKET_READABLE on pageserver_conn,
* - WL_LATCH_SET on MyLatch, and
* - WL_EXIT_ON_PM_DEATH.
*/
WaitEventSet *pageserver_conn_wes = NULL;
/* GUCs */
char *neon_timeline;
char *neon_tenant;
@@ -64,87 +55,175 @@ int flush_every_n_requests = 8;
int n_reconnect_attempts = 0;
int max_reconnect_attempts = 60;
#define MAX_PAGESERVER_CONNSTRING_SIZE 256
typedef struct
{
LWLockId lock;
pg_atomic_uint64 update_counter;
char pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE];
} PagestoreShmemState;
#if PG_VERSION_NUM >= 150000
static shmem_request_hook_type prev_shmem_request_hook = NULL;
static void walproposer_shmem_request(void);
#endif
static shmem_startup_hook_type prev_shmem_startup_hook;
static PagestoreShmemState *pagestore_shared;
static uint64 pagestore_local_counter = 0;
static char local_pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE];
int stripe_size;
bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;
static bool pageserver_flush(void);
static void pageserver_disconnect(void);
static bool pageserver_flush(shardno_t shard_no);
static void pageserver_disconnect(shardno_t shard_no);
static void AssignPageserverConnstring(const char *newval, void *extra);
static bool
PagestoreShmemIsValid()
{
return pagestore_shared && UsedShmemSegAddr;
}
static shmem_startup_hook_type prev_shmem_startup_hook;
#if PG_VERSION_NUM>=150000
static shmem_request_hook_type prev_shmem_request_hook;
#endif
static bool
CheckPageserverConnstring(char **newval, void **extra, GucSource source)
typedef struct
{
return strlen(*newval) < MAX_PAGESERVER_CONNSTRING_SIZE;
size_t n_shards;
pg_atomic_uint64 begin_update_counter;
pg_atomic_uint64 end_update_counter;
char shard_connstr[MAX_SHARDS][MAX_PS_CONNSTR_LEN];
} ShardMap;
static ShardMap* shard_map;
static uint64 shard_map_update_counter;
typedef struct
{
/*
* Connection for each shard
*/
PGconn *conn;
/*
* WaitEventSet containing:
* - WL_SOCKET_READABLE on pageserver_conn,
* - WL_LATCH_SET on MyLatch, and
* - WL_EXIT_ON_PM_DEATH.
*/
WaitEventSet *wes;
} PageServer;
static PageServer page_servers[MAX_SHARDS];
static shardno_t max_attached_shard_no;
static void
psm_shmem_startup(void)
{
bool found;
if (prev_shmem_startup_hook)
{
prev_shmem_startup_hook();
}
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
shard_map = (ShardMap*)ShmemInitStruct("shard_map", sizeof(ShardMap), &found);
if (!found)
{
shard_map->n_shards = 0;
pg_atomic_init_u64(&shard_map->begin_update_counter, 0);
pg_atomic_init_u64(&shard_map->end_update_counter, 0);
AssignPageserverConnstring(page_server_connstring, NULL);
}
LWLockRelease(AddinShmemInitLock);
}
static void
AssignPageserverConnstring(const char *newval, void *extra)
psm_shmem_request(void)
{
if(!PagestoreShmemIsValid())
return;
LWLockAcquire(pagestore_shared->lock, LW_EXCLUSIVE);
strlcpy(pagestore_shared->pageserver_connstring, newval, MAX_PAGESERVER_CONNSTRING_SIZE);
pg_atomic_fetch_add_u64(&pagestore_shared->update_counter, 1);
LWLockRelease(pagestore_shared->lock);
}
#if PG_VERSION_NUM>=150000
if (prev_shmem_request_hook)
prev_shmem_request_hook();
#endif
static bool
CheckConnstringUpdated()
{
if(!PagestoreShmemIsValid())
return false;
return pagestore_local_counter < pg_atomic_read_u64(&pagestore_shared->update_counter);
RequestAddinShmemSpace(sizeof(ShardMap));
}
static void
ReloadConnstring()
psm_init(void)
{
if(!PagestoreShmemIsValid())
return;
LWLockAcquire(pagestore_shared->lock, LW_SHARED);
strlcpy(local_pageserver_connstring, pagestore_shared->pageserver_connstring, sizeof(local_pageserver_connstring));
pagestore_local_counter = pg_atomic_read_u64(&pagestore_shared->update_counter);
LWLockRelease(pagestore_shared->lock);
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = psm_shmem_startup;
#if PG_VERSION_NUM>=150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = psm_shmem_request;
#else
psm_shmem_request();
#endif
}
/*
* Reload page map if needed and return number of shards and connection string for the specified shard
*/
static shardno_t
load_shard_map(shardno_t shard_no, char* connstr)
{
shardno_t n_shards;
uint64 begin_update_counter;
uint64 end_update_counter;
/*
* There is race condition here between backendc and postmaster which can update shard map.
* We recheck update couner after copying connection string to check that configuration was not changed.
*/
do
{
begin_update_counter = pg_atomic_read_u64(&shard_map->begin_update_counter);
end_update_counter = pg_atomic_read_u64(&shard_map->end_update_counter);
n_shards = shard_map->n_shards;
if (shard_no >= n_shards)
elog(ERROR, "Shard %d is greater or equal than number of shards %d", shard_no, n_shards);
if (connstr)
strncpy(connstr, shard_map->shard_connstr[shard_no], MAX_PS_CONNSTR_LEN);
}
while (begin_update_counter != end_update_counter
|| begin_update_counter != pg_atomic_read_u64(&shard_map->begin_update_counter)
|| end_update_counter != pg_atomic_read_u64(&shard_map->end_update_counter));
if (shard_map_update_counter != end_update_counter)
{
/* Reset all connections if connection strings are changed */
for (shardno_t i = 0; i < max_attached_shard_no; i++)
{
if (page_servers[i].conn)
pageserver_disconnect(i);
}
max_attached_shard_no = 0;
shard_map_update_counter = end_update_counter;
}
return n_shards;
}
#define MB (1024*1024)
shardno_t
get_shard_number(BufferTag* tag)
{
shardno_t n_shards = load_shard_map(0, NULL);
uint32 hash;
#if PG_MAJORVERSION_NUM < 16
hash = murmurhash32(tag->rnode.relNode);
hash = hash_combine(hash, murmurhash32(tag->blockNum/(MB/BLCKSZ)/stripe_size));
#else
hash = murmurhash32(tag->relNumber);
hash = hash_combine(hash, murmurhash32(tag->blockNum/(MB/BLCKSZ)/stripe_size));
#endif
return hash % n_shards;
}
static bool
pageserver_connect(int elevel)
pageserver_connect(shardno_t shard_no, int elevel)
{
char *query;
int ret;
const char *keywords[3];
const char *values[3];
int n;
PGconn* conn;
WaitEventSet *wes;
char connstr[MAX_PS_CONNSTR_LEN];
Assert(!connected);
Assert(page_servers[shard_no].conn == NULL);
if(CheckConnstringUpdated())
{
ReloadConnstring();
}
(void)load_shard_map(shard_no, connstr); /* refresh page map if needed */
/*
* Connect using the connection string we got from the
@@ -165,19 +244,18 @@ pageserver_connect(int elevel)
n++;
}
keywords[n] = "dbname";
values[n] = local_pageserver_connstring;
values[n] = connstr;
n++;
keywords[n] = NULL;
values[n] = NULL;
n++;
pageserver_conn = PQconnectdbParams(keywords, values, 1);
conn = PQconnectdbParams(keywords, values, 1);
if (PQstatus(pageserver_conn) == CONNECTION_BAD)
if (PQstatus(conn) == CONNECTION_BAD)
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
char *msg = pchomp(PQerrorMessage(conn));
PQfinish(pageserver_conn);
pageserver_conn = NULL;
PQfinish(conn);
ereport(elevel,
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
@@ -185,30 +263,28 @@ pageserver_connect(int elevel)
errdetail_internal("%s", msg)));
return false;
}
query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
ret = PQsendQuery(pageserver_conn, query);
ret = PQsendQuery(conn, query);
if (ret != 1)
{
PQfinish(pageserver_conn);
pageserver_conn = NULL;
PQfinish(conn);
neon_log(elevel, "could not send pagestream command to pageserver");
return false;
}
pageserver_conn_wes = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(pageserver_conn_wes, WL_LATCH_SET, PGINVALID_SOCKET,
wes = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(pageserver_conn_wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
AddWaitEventToSet(wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
AddWaitEventToSet(pageserver_conn_wes, WL_SOCKET_READABLE, PQsocket(pageserver_conn), NULL, NULL);
AddWaitEventToSet(wes, WL_SOCKET_READABLE, PQsocket(conn), NULL, NULL);
while (PQisBusy(pageserver_conn))
while (PQisBusy(conn))
{
WaitEvent event;
/* Sleep until there's something to do */
(void) WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
(void) WaitEventSetWait(wes, -1L, &event, 1, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
@@ -216,14 +292,12 @@ pageserver_connect(int elevel)
/* Data available in socket? */
if (event.events & WL_SOCKET_READABLE)
{
if (!PQconsumeInput(pageserver_conn))
if (!PQconsumeInput(conn))
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
char *msg = pchomp(PQerrorMessage(conn));
PQfinish(pageserver_conn);
pageserver_conn = NULL;
FreeWaitEventSet(pageserver_conn_wes);
pageserver_conn_wes = NULL;
PQfinish(conn);
FreeWaitEventSet(wes);
neon_log(elevel, "could not complete handshake with pageserver: %s",
msg);
@@ -232,9 +306,11 @@ pageserver_connect(int elevel)
}
}
neon_log(LOG, "libpagestore: connected to '%s'", page_server_connstring);
neon_log(LOG, "libpagestore: connected to '%s'", connstr);
page_servers[shard_no].conn = conn;
page_servers[shard_no].wes = wes;
max_attached_shard_no = Max(shard_no+1, max_attached_shard_no);
connected = true;
return true;
}
@@ -242,10 +318,10 @@ pageserver_connect(int elevel)
* A wrapper around PQgetCopyData that checks for interrupts while sleeping.
*/
static int
call_PQgetCopyData(char **buffer)
call_PQgetCopyData(shardno_t shard_no, char **buffer)
{
int ret;
PGconn* pageserver_conn = page_servers[shard_no].conn;
retry:
ret = PQgetCopyData(pageserver_conn, buffer, 1 /* async */ );
@@ -254,7 +330,7 @@ retry:
WaitEvent event;
/* Sleep until there's something to do */
(void) WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
(void) WaitEventSetWait(page_servers[shard_no].wes, -1L, &event, 1, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
@@ -279,7 +355,7 @@ retry:
static void
pageserver_disconnect(void)
pageserver_disconnect(shardno_t shard_no)
{
/*
* If anything goes wrong while we were sending a request, it's not clear
@@ -288,38 +364,32 @@ pageserver_disconnect(void)
* time later after we have already sent a new unrelated request. Close
* the connection to avoid getting confused.
*/
if (connected)
if (page_servers[shard_no].conn)
{
neon_log(LOG, "dropping connection to page server due to error");
PQfinish(pageserver_conn);
pageserver_conn = NULL;
connected = false;
PQfinish(page_servers[shard_no].conn);
page_servers[shard_no].conn = NULL;
prefetch_on_ps_disconnect();
}
if (pageserver_conn_wes != NULL)
if (page_servers[shard_no].wes != NULL)
{
FreeWaitEventSet(pageserver_conn_wes);
pageserver_conn_wes = NULL;
FreeWaitEventSet(page_servers[shard_no].wes);
page_servers[shard_no].wes = NULL;
}
}
static bool
pageserver_send(NeonRequest * request)
pageserver_send(shardno_t shard_no, NeonRequest * request)
{
StringInfoData req_buff;
if(CheckConnstringUpdated())
{
pageserver_disconnect();
ReloadConnstring();
}
PGconn* pageserver_conn = page_servers[shard_no].conn;
/* If the connection was lost for some reason, reconnect */
if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD)
if (pageserver_conn && PQstatus(pageserver_conn) == CONNECTION_BAD)
{
neon_log(LOG, "pageserver_send disconnect bad connection");
pageserver_disconnect();
pageserver_disconnect(shard_no);
}
req_buff = nm_pack_request(request);
@@ -331,9 +401,9 @@ pageserver_send(NeonRequest * request)
* See https://github.com/neondatabase/neon/issues/1138
* So try to reestablish connection in case of failure.
*/
if (!connected)
if (!page_servers[shard_no].conn)
{
while (!pageserver_connect(n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
while (!pageserver_connect(shard_no, n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
{
HandleMainLoopInterrupts();
n_reconnect_attempts += 1;
@@ -342,7 +412,9 @@ pageserver_send(NeonRequest * request)
n_reconnect_attempts = 0;
}
/*
pageserver_conn = page_servers[shard_no].conn;
/*
* Send request.
*
* In principle, this could block if the output buffer is full, and we
@@ -353,7 +425,7 @@ pageserver_send(NeonRequest * request)
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0)
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect();
pageserver_disconnect(shard_no);
neon_log(LOG, "pageserver_send disconnect because failed to send page request (try to reconnect): %s", msg);
pfree(msg);
pfree(req_buff.data);
@@ -373,12 +445,12 @@ pageserver_send(NeonRequest * request)
}
static NeonResponse *
pageserver_receive(void)
pageserver_receive(shardno_t shard_no)
{
StringInfoData resp_buff;
NeonResponse *resp;
if (!connected)
PGconn* pageserver_conn = page_servers[shard_no].conn;
if (!pageserver_conn)
return NULL;
PG_TRY();
@@ -386,7 +458,7 @@ pageserver_receive(void)
/* read response */
int rc;
rc = call_PQgetCopyData(&resp_buff.data);
rc = call_PQgetCopyData(shard_no, &resp_buff.data);
if (rc >= 0)
{
resp_buff.len = rc;
@@ -405,25 +477,25 @@ pageserver_receive(void)
else if (rc == -1)
{
neon_log(LOG, "pageserver_receive disconnect because call_PQgetCopyData returns -1: %s", pchomp(PQerrorMessage(pageserver_conn)));
pageserver_disconnect();
pageserver_disconnect(shard_no);
resp = NULL;
}
else if (rc == -2)
{
char* msg = pchomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect();
pageserver_disconnect(shard_no);
neon_log(ERROR, "pageserver_receive disconnect because could not read COPY data: %s", msg);
}
else
{
pageserver_disconnect();
pageserver_disconnect(shard_no);
neon_log(ERROR, "pageserver_receive disconnect because unexpected PQgetCopyData return value: %d", rc);
}
}
PG_CATCH();
{
neon_log(LOG, "pageserver_receive disconnect due to caught exception");
pageserver_disconnect();
pageserver_disconnect(shard_no);
PG_RE_THROW();
}
PG_END_TRY();
@@ -433,9 +505,10 @@ pageserver_receive(void)
static bool
pageserver_flush(void)
pageserver_flush(shardno_t shard_no)
{
if (!connected)
PGconn* pageserver_conn = page_servers[shard_no].conn;
if (!pageserver_conn)
{
neon_log(WARNING, "Tried to flush while disconnected");
}
@@ -444,7 +517,7 @@ pageserver_flush(void)
if (PQflush(pageserver_conn))
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect();
pageserver_disconnect(shard_no);
neon_log(LOG, "pageserver_flush disconnect because failed to flush page requests: %s", msg);
pfree(msg);
return false;
@@ -468,62 +541,61 @@ check_neon_id(char **newval, void **extra, GucSource source)
return **newval == '\0' || HexDecodeString(id, *newval, 16);
}
static Size
PagestoreShmemSize(void)
{
return sizeof(PagestoreShmemState);
}
static bool
PagestoreShmemInit(void)
{
bool found;
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
pagestore_shared = ShmemInitStruct("libpagestore shared state",
PagestoreShmemSize(),
&found);
if(!found)
{
pagestore_shared->lock = &(GetNamedLWLockTranche("neon_libpagestore")->lock);
pg_atomic_init_u64(&pagestore_shared->update_counter, 0);
AssignPageserverConnstring(page_server_connstring, NULL);
}
LWLockRelease(AddinShmemInitLock);
return found;
}
static void
pagestore_shmem_startup_hook(void)
AssignPageserverConnstring(const char *newval, void *extra)
{
if(prev_shmem_startup_hook)
prev_shmem_startup_hook();
/*
* Load shard map only at Postmaster.
* If old page server is not available, then backends can be blocked in attempts to reconnect to it and do not reload config in this loop
*/
if (shard_map != NULL && (MyProcPid == PostmasterPid || shard_map->n_shards == 0))
{
char const* shard_connstr = newval;
char const* sep;
size_t connstr_len;
int i = 0;
bool shard_map_changed = false;
do
{
sep = strchr(shard_connstr, ',');
connstr_len = sep != NULL ? sep - shard_connstr : strlen(shard_connstr);
if (connstr_len == 0)
break; /* trailing comma */
if (i >= MAX_SHARDS)
{
elog(LOG, "Too many shards");
return;
}
if (connstr_len >= MAX_PS_CONNSTR_LEN)
{
elog(LOG, "Connection string too long");
return;
}
if (i >= shard_map->n_shards ||
strcmp(shard_map->shard_connstr[i], shard_connstr) != 0)
{
if (!shard_map_changed)
{
pg_atomic_add_fetch_u64(&shard_map->begin_update_counter, 1);
shard_map_changed = true;
}
memcpy(shard_map->shard_connstr[i], shard_connstr, connstr_len+1);
}
shard_connstr = sep + 1;
i += 1;
} while (sep != NULL);
PagestoreShmemInit();
}
static void
pagestore_shmem_request(void)
{
#if PG_VERSION_NUM >= 150000
if(prev_shmem_request_hook)
prev_shmem_request_hook();
#endif
RequestAddinShmemSpace(PagestoreShmemSize());
RequestNamedLWLockTranche("neon_libpagestore", 1);
}
static void
pagestore_prepare_shmem(void)
{
#if PG_VERSION_NUM >= 150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = pagestore_shmem_request;
#else
pagestore_shmem_request();
#endif
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = pagestore_shmem_startup_hook;
if (i == 0)
{
elog(LOG, "No shards were specified");
return;
}
if (shard_map_changed)
{
shard_map->n_shards = i;
pg_atomic_add_fetch_u64(&shard_map->end_update_counter, 1);
}
}
}
/*
@@ -532,8 +604,6 @@ pagestore_prepare_shmem(void)
void
pg_init_libpagestore(void)
{
pagestore_prepare_shmem();
DefineCustomStringVariable("neon.pageserver_connstring",
"connection string to the page server",
NULL,
@@ -541,7 +611,7 @@ pg_init_libpagestore(void)
"",
PGC_SIGHUP,
0, /* no flags required */
CheckPageserverConnstring, AssignPageserverConnstring, NULL);
NULL, AssignPageserverConnstring, NULL);
DefineCustomStringVariable("neon.timeline_id",
"Neon timeline_id the server is running on",
@@ -561,6 +631,15 @@ pg_init_libpagestore(void)
0, /* no flags required */
check_neon_id, NULL, NULL);
DefineCustomIntVariable("neon.stripe_size",
"sharding sripe size",
NULL,
&stripe_size,
256, 1, INT_MAX,
PGC_SIGHUP,
GUC_UNIT_MB,
NULL, NULL, NULL);
DefineCustomIntVariable("neon.max_cluster_size",
"cluster size limit",
NULL,
@@ -623,4 +702,5 @@ pg_init_libpagestore(void)
}
lfc_init();
psm_init();
}

View File

@@ -20,12 +20,16 @@
#include RELFILEINFO_HDR
#include "storage/block.h"
#include "storage/smgr.h"
#include "storage/buf_internals.h"
#include "lib/stringinfo.h"
#include "libpq/pqformat.h"
#include "utils/memutils.h"
#include "pg_config.h"
#define MAX_SHARDS 128
#define MAX_PS_CONNSTR_LEN 128
typedef enum
{
/* pagestore_client -> pagestore */
@@ -144,11 +148,13 @@ extern char *nm_to_string(NeonMessage * msg);
* API
*/
typedef unsigned shardno_t;
typedef struct
{
bool (*send) (NeonRequest * request);
NeonResponse *(*receive) (void);
bool (*flush) (void);
bool (*send) (shardno_t shard_no, NeonRequest * request);
NeonResponse *(*receive) (shardno_t shard_no);
bool (*flush) (shardno_t shard_no);
} page_server_api;
extern void prefetch_on_ps_disconnect(void);
@@ -165,6 +171,8 @@ extern char *neon_tenant;
extern bool wal_redo;
extern int32 max_cluster_size;
extern shardno_t get_shard_number(BufferTag* tag);
extern const f_smgr *smgr_neon(BackendId backend, NRelFileInfo rinfo);
extern void smgr_init_neon(void);
extern void readahead_buffer_resize(int newsize, void *extra);

View File

@@ -164,6 +164,7 @@ typedef struct PrefetchRequest {
XLogRecPtr actual_request_lsn;
NeonResponse *response; /* may be null */
PrefetchStatus status;
shardno_t shard_no;
uint64 my_ring_index;
} PrefetchRequest;
@@ -225,6 +226,8 @@ typedef struct PrefetchState {
/* the buffers */
prfh_hash *prf_hash;
int max_shard_no;
uint8 shard_bitmap[(MAX_SHARDS + 7)/8];
PrefetchRequest prf_buffer[]; /* prefetch buffers */
} PrefetchState;
@@ -313,6 +316,7 @@ compact_prefetch_buffers(void)
Assert(target_slot->status == PRFS_UNUSED);
target_slot->buftag = source_slot->buftag;
target_slot->shard_no = source_slot->shard_no;
target_slot->status = source_slot->status;
target_slot->response = source_slot->response;
target_slot->effective_request_lsn = source_slot->effective_request_lsn;
@@ -477,6 +481,23 @@ prefetch_cleanup_trailing_unused(void)
}
}
static bool
prefetch_flush_requests(void)
{
for (shardno_t shard_no = 0; shard_no < MyPState->max_shard_no; shard_no++)
{
if (MyPState->shard_bitmap[shard_no >> 3] & (1 << (shard_no & 7)))
{
if (!page_server->flush(shard_no))
return false;
MyPState->shard_bitmap[shard_no >> 3] &= ~(1 << (shard_no & 7));
}
}
MyPState->max_shard_no = 0;
return true;
}
/*
* Wait for slot of ring_index to have received its response.
* The caller is responsible for making sure the request buffer is flushed.
@@ -492,7 +513,7 @@ prefetch_wait_for(uint64 ring_index)
if (MyPState->ring_flush <= ring_index &&
MyPState->ring_unused > MyPState->ring_flush)
{
if (!page_server->flush())
if (!prefetch_flush_requests())
return false;
MyPState->ring_flush = MyPState->ring_unused;
}
@@ -530,7 +551,7 @@ prefetch_read(PrefetchRequest *slot)
Assert(slot->my_ring_index == MyPState->ring_receive);
old = MemoryContextSwitchTo(MyPState->errctx);
response = (NeonResponse *) page_server->receive();
response = (NeonResponse *) page_server->receive(slot->shard_no);
MemoryContextSwitchTo(old);
if (response)
{
@@ -682,12 +703,14 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
Assert(slot->response == NULL);
Assert(slot->my_ring_index == MyPState->ring_unused);
while (!page_server->send((NeonRequest *) &request));
while (!page_server->send(slot->shard_no, (NeonRequest *) &request));
/* update prefetch state */
MyPState->n_requests_inflight += 1;
MyPState->n_unused -= 1;
MyPState->ring_unused += 1;
MyPState->shard_bitmap[slot->shard_no >> 3] |= 1 << (slot->shard_no & 7);
MyPState->max_shard_no = Max(slot->shard_no+1, MyPState->max_shard_no);
/* update slot state */
slot->status = PRFS_REQUESTED;
@@ -847,6 +870,7 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
* function reads the buffer tag from the slot.
*/
slot->buftag = tag;
slot->shard_no = get_shard_number(&tag);
slot->my_ring_index = ring_index;
prefetch_do_request(slot, force_latest, force_lsn);
@@ -857,7 +881,7 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
if (flush_every_n_requests > 0 &&
MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests)
{
if (!page_server->flush())
if (!prefetch_flush_requests())
{
/* Prefetch set is reset in case of error, so we should try to register our request once again */
goto Retry;
@@ -872,11 +896,42 @@ static NeonResponse *
page_server_request(void const *req)
{
NeonResponse* resp;
BufferTag tag = {0};
shardno_t shard_no;
switch (((NeonRequest *) req)->tag)
{
case T_NeonExistsRequest:
CopyNRelFileInfoToBufTag(tag, ((NeonExistsRequest *) req)->rinfo);
break;
case T_NeonNblocksRequest:
CopyNRelFileInfoToBufTag(tag, ((NeonNblocksRequest *) req)->rinfo);
break;
case T_NeonDbSizeRequest:
NInfoGetDbOid(BufTagGetNRelFileInfo(tag)) = ((NeonDbSizeRequest *) req)->dbNode;
break;
case T_NeonGetPageRequest:
CopyNRelFileInfoToBufTag(tag, ((NeonGetPageRequest *) req)->rinfo);
tag.blockNum = ((NeonGetPageRequest *) req)->blkno;
break;
default:
elog(ERROR, "Unexpected request tag: %d", ((NeonRequest *) req)->tag);
}
shard_no = get_shard_number(&tag);
/*
* TODO: temporary workarround - we stream all WAL only to shard 0, so metadata and forks other than main
* should be requested from shard 0. We still need to call get_shard_no() to check if shard map is up-to-date
*/
if (((NeonRequest *) req)->tag != T_NeonGetPageRequest || ((NeonGetPageRequest *) req)->forknum != MAIN_FORKNUM)
{
shard_no = 0;
}
do {
while (!page_server->send((NeonRequest *) req) || !page_server->flush());
MyPState->ring_flush = MyPState->ring_unused;
while (!page_server->send(shard_no, (NeonRequest *) req) || !page_server->flush(shard_no));
consume_prefetch_responses();
resp = page_server->receive();
resp = page_server->receive(shard_no);
} while (resp == NULL);
return resp;

View File

@@ -103,7 +103,7 @@ struct ProxyCliArgs {
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
require_client_ip: bool,
/// Disable dynamic rate limiter and store the metrics to ensure its production behaviour.
#[clap(long, default_value_t = true, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
disable_dynamic_rate_limiter: bool,
/// Rate limit algorithm. Makes sense only if `disable_rate_limiter` is `false`.
#[clap(value_enum, long, default_value_t = proxy::rate_limiter::RateLimitAlgorithm::Aimd)]

View File

@@ -134,9 +134,9 @@ pub static ALLOWED_IPS_BY_CACHE_OUTCOME: Lazy<IntCounterVec> = Lazy::new(|| {
pub static RATE_LIMITER_ACQUIRE_LATENCY: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"semaphore_control_plane_token_acquire_seconds",
"proxy_control_plane_token_acquire_seconds",
"Time it took for proxy to establish a connection to the compute endpoint",
// largest bucket = 3^16 * 0.00005ms = 2.15s
// largest bucket = 3^16 * 0.05ms = 2.15s
exponential_buckets(0.00005, 3.0, 16).unwrap(),
)
.unwrap()

View File

@@ -914,9 +914,14 @@ where
Ok(())
}
/// Persist control file to disk, called only after timeline creation (bootstrap).
pub async fn persist(&mut self) -> Result<()> {
self.persist_control_file(self.state.clone()).await
/// Persist in-memory state of control file to disk.
//
// TODO: passing inmem_remote_consistent_lsn everywhere is ugly, better
// separate state completely and give Arc to all those who need it.
pub async fn persist_inmem(&mut self, inmem_remote_consistent_lsn: Lsn) -> Result<()> {
let mut state = self.state.clone();
state.remote_consistent_lsn = inmem_remote_consistent_lsn;
self.persist_control_file(state).await
}
/// Persist in-memory state to the disk, taking other data from state.
@@ -930,7 +935,7 @@ where
/// Persist control file if there is something to save and enough time
/// passed after the last save.
pub async fn maybe_persist_control_file(
pub async fn maybe_persist_inmem_control_file(
&mut self,
inmem_remote_consistent_lsn: Lsn,
) -> Result<()> {
@@ -943,9 +948,7 @@ where
|| self.inmem.peer_horizon_lsn > self.state.peer_horizon_lsn
|| inmem_remote_consistent_lsn > self.state.remote_consistent_lsn;
if need_persist {
let mut state = self.state.clone();
state.remote_consistent_lsn = inmem_remote_consistent_lsn;
self.persist_control_file(state).await?;
self.persist_inmem(inmem_remote_consistent_lsn).await?;
trace!("saved control file: {CF_SAVE_INTERVAL:?} passed");
}
Ok(())
@@ -1064,8 +1067,6 @@ where
if sync_control_file {
let mut state = self.state.clone();
// Note: we could make remote_consistent_lsn update in cf common by
// storing Arc to walsenders in Safekeeper.
state.remote_consistent_lsn = new_remote_consistent_lsn;
self.persist_control_file(state).await?;
}

View File

@@ -182,8 +182,9 @@ impl SharedState {
}
/// Mark timeline active/inactive and return whether s3 offloading requires
/// start/stop action.
fn update_status(
/// start/stop action. If timeline is deactivated, control file is persisted
/// as maintenance task does that only for active timelines.
async fn update_status(
&mut self,
num_computes: usize,
remote_consistent_lsn: Lsn,
@@ -191,7 +192,15 @@ impl SharedState {
) -> bool {
let is_active = self.is_active(num_computes, remote_consistent_lsn);
if self.active != is_active {
info!("timeline {} active={} now", ttid, is_active);
info!(
"timeline {} active={} now, remote_consistent_lsn={}, commit_lsn={}",
ttid, is_active, remote_consistent_lsn, self.sk.inmem.commit_lsn
);
if !is_active {
if let Err(e) = self.sk.persist_inmem(remote_consistent_lsn).await {
warn!("control file save in update_status failed: {:?}", e);
}
}
}
self.active = is_active;
self.is_wal_backup_action_pending(num_computes)
@@ -438,7 +447,7 @@ impl Timeline {
fs::create_dir_all(&self.timeline_dir).await?;
// Write timeline to disk and start background tasks.
if let Err(e) = shared_state.sk.persist().await {
if let Err(e) = shared_state.sk.persist_inmem(Lsn::INVALID).await {
// Bootstrap failed, cancel timeline and remove timeline directory.
self.cancel(shared_state);
@@ -511,12 +520,14 @@ impl Timeline {
self.mutex.lock().await
}
fn update_status(&self, shared_state: &mut SharedState) -> bool {
shared_state.update_status(
self.walreceivers.get_num(),
self.get_walsenders().get_remote_consistent_lsn(),
self.ttid,
)
async fn update_status(&self, shared_state: &mut SharedState) -> bool {
shared_state
.update_status(
self.walreceivers.get_num(),
self.get_walsenders().get_remote_consistent_lsn(),
self.ttid,
)
.await
}
/// Update timeline status and kick wal backup launcher to stop/start offloading if needed.
@@ -526,7 +537,7 @@ impl Timeline {
}
let is_wal_backup_action_pending: bool = {
let mut shared_state = self.write_shared_state().await;
self.update_status(&mut shared_state)
self.update_status(&mut shared_state).await
};
if is_wal_backup_action_pending {
// Can fail only if channel to a static thread got closed, which is not normal at all.
@@ -683,7 +694,7 @@ impl Timeline {
shared_state.sk.record_safekeeper_info(&sk_info).await?;
let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now());
shared_state.peers_info.upsert(&peer_info);
is_wal_backup_action_pending = self.update_status(&mut shared_state);
is_wal_backup_action_pending = self.update_status(&mut shared_state).await;
commit_lsn = shared_state.sk.inmem.commit_lsn;
}
self.commit_lsn_watch_tx.send(commit_lsn)?;
@@ -828,7 +839,7 @@ impl Timeline {
self.write_shared_state()
.await
.sk
.maybe_persist_control_file(remote_consistent_lsn)
.maybe_persist_inmem_control_file(remote_consistent_lsn)
.await
}

View File

@@ -456,6 +456,7 @@ class NeonEnvBuilder:
self.initial_tenant = initial_tenant or TenantId.generate()
self.initial_timeline = initial_timeline or TimelineId.generate()
self.enable_generations = False
self.initial_shard_count = 1
self.scrub_on_exit = False
self.test_output_dir = test_output_dir
@@ -497,7 +498,10 @@ class NeonEnvBuilder:
f"Services started, creating initial tenant {env.initial_tenant} and its initial timeline"
)
initial_tenant, initial_timeline = env.neon_cli.create_tenant(
tenant_id=env.initial_tenant, conf=initial_tenant_conf, timeline_id=env.initial_timeline
tenant_id=env.initial_tenant,
conf=initial_tenant_conf,
timeline_id=env.initial_timeline,
shard_count=self.initial_shard_count,
)
assert env.initial_tenant == initial_tenant
assert env.initial_timeline == initial_timeline
@@ -1144,6 +1148,7 @@ class NeonCli(AbstractNeonCli):
timeline_id: Optional[TimelineId] = None,
conf: Optional[Dict[str, str]] = None,
set_default: bool = False,
shard_count: int = 1,
) -> Tuple[TenantId, TimelineId]:
"""
Creates a new tenant, returns its id and its initial timeline's id.
@@ -1160,6 +1165,8 @@ class NeonCli(AbstractNeonCli):
str(timeline_id),
"--pg-version",
self.env.pg_version,
"--shard-count",
str(shard_count),
]
if conf is not None:
args.extend(
@@ -1382,7 +1389,7 @@ class NeonCli(AbstractNeonCli):
tenant_id: Optional[TenantId] = None,
hot_standby: bool = False,
lsn: Optional[Lsn] = None,
pageserver_id: Optional[int] = None,
pageserver_ids: Optional[list[int]] = None,
) -> "subprocess.CompletedProcess[str]":
args = [
"endpoint",
@@ -1404,8 +1411,10 @@ class NeonCli(AbstractNeonCli):
args.append(endpoint_id)
if hot_standby:
args.extend(["--hot-standby", "true"])
if pageserver_id is not None:
args.extend(["--pageserver-id", str(pageserver_id)])
if pageserver_ids is not None:
args.extend(["--pageserver-id", ",".join(str(i) for i in pageserver_ids)])
log.info(f"endpoint_create: {args}")
res = self.raw_cli(args)
res.check_returncode()
@@ -2451,7 +2460,7 @@ class Endpoint(PgProtocol):
hot_standby: bool = False,
lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None,
pageserver_id: Optional[int] = None,
pageserver_ids: Optional[list[int]] = None,
) -> "Endpoint":
"""
Create a new Postgres endpoint.
@@ -2473,7 +2482,7 @@ class Endpoint(PgProtocol):
hot_standby=hot_standby,
pg_port=self.pg_port,
http_port=self.http_port,
pageserver_id=pageserver_id,
pageserver_ids=pageserver_ids,
)
path = Path("endpoints") / self.endpoint_id / "pgdata"
self.pgdata_dir = os.path.join(self.env.repo_dir, path)
@@ -2609,7 +2618,7 @@ class Endpoint(PgProtocol):
lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None,
remote_ext_config: Optional[str] = None,
pageserver_id: Optional[int] = None,
pageserver_ids: Optional[list[int]] = None,
) -> "Endpoint":
"""
Create an endpoint, apply config, and start Postgres.
@@ -2624,7 +2633,7 @@ class Endpoint(PgProtocol):
config_lines=config_lines,
hot_standby=hot_standby,
lsn=lsn,
pageserver_id=pageserver_id,
pageserver_ids=pageserver_ids,
).start(remote_ext_config=remote_ext_config)
log.info(f"Postgres startup took {time.time() - started_at} seconds")
@@ -2660,7 +2669,7 @@ class EndpointFactory:
hot_standby: bool = False,
config_lines: Optional[List[str]] = None,
remote_ext_config: Optional[str] = None,
pageserver_id: Optional[int] = None,
pageserver_ids: Optional[list[int]] = None,
) -> Endpoint:
ep = Endpoint(
self.env,
@@ -2678,7 +2687,7 @@ class EndpointFactory:
config_lines=config_lines,
lsn=lsn,
remote_ext_config=remote_ext_config,
pageserver_id=pageserver_id,
pageserver_ids=pageserver_ids,
)
def create(
@@ -3162,7 +3171,15 @@ def check_restored_datadir_content(
cur.execute("CHECKPOINT")
# wait for pageserver to catch up
wait_for_last_flush_lsn(env, endpoint, endpoint.tenant_id, timeline_id)
if len(env.pageservers) > 1:
# FIXME: wait_for_last_flush_lsn needs teaching about sharding: it tries to query
# LSNs for shard-naive TenantId
return
for pageserver in env.pageservers:
wait_for_last_flush_lsn(
env, endpoint, endpoint.tenant_id, timeline_id, pageserver_id=pageserver.id
)
# stop postgres to ensure that files won't change
endpoint.stop()

View File

@@ -48,6 +48,6 @@ def test_pg_clients(test_output_dir: Path, remote_pg: RemotePostgres, client: st
subprocess_capture(test_output_dir, build_cmd, check=True)
run_cmd = [docker_bin, "run", "--rm", "--env-file", env_file, image_tag]
basepath, _, _ = subprocess_capture(test_output_dir, run_cmd, check=True)
_, output, _ = subprocess_capture(test_output_dir, run_cmd, check=True, capture_stdout=True)
assert Path(f"{basepath}.stdout").read_text().strip() == "1"
assert str(output).strip() == "1"

View File

@@ -49,7 +49,7 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder):
"compaction_period": "0s", # we want to control when compaction runs
"checkpoint_timeout": "24h", # something we won't reach
"checkpoint_distance": f"{50 * (1024**2)}", # something we won't reach, we checkpoint manually
"image_creation_threshold": f"{image_creation_threshold}",
"image_creation_threshold": "100", # we want to control when image is created
"compaction_threshold": f"{l0_l1_threshold}",
"compaction_target_size": f"{128 * (1024**3)}", # make it so that we only have 1 partition => image coverage for delta layers => enables gc of delta layers
}
@@ -124,6 +124,10 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder):
), "sanity check for what above loop is supposed to do"
# create the image layer from the future
ps_http.patch_tenant_config_client_side(
tenant_id, {"image_creation_threshold": image_creation_threshold}, None
)
assert ps_http.tenant_config(tenant_id).effective_config["image_creation_threshold"] == 1
ps_http.timeline_compact(tenant_id, timeline_id, force_repartition=True)
assert (
len(

View File

@@ -384,7 +384,7 @@ def test_download_remote_layers_api(
env.pageserver.allowed_errors.extend(
[
".*download failed: downloading evicted layer file failed.*",
f".*initial size calculation.*{tenant_id}.*{timeline_id}.*Failed to calculate logical size",
f".*initial_size_calculation.*{tenant_id}.*{timeline_id}.*initial size calculation failed: downloading evicted layer file failed",
]
)

View File

@@ -106,7 +106,6 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder, generations: bool)
# Initial tenant load should reflect the delay we injected
("initial_tenant_load", lambda t, p: t >= (tenant_load_delay_ms / 1000.0) and t >= p),
# Subsequent steps should occur in expected order
("initial_logical_sizes", lambda t, p: t > 0 and t >= p),
("background_jobs_can_start", lambda t, p: t > 0 and t >= p),
("complete", lambda t, p: t > 0 and t >= p),
]

View File

@@ -3,24 +3,38 @@
#
from pathlib import Path
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
import pytest
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
check_restored_datadir_content,
)
# Run the main PostgreSQL regression tests, in src/test/regress.
#
@pytest.mark.parametrize("shard_count", [2])
def test_pg_regress(
neon_simple_env: NeonEnv,
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
pg_bin,
capsys,
base_dir: Path,
pg_distrib_dir: Path,
shard_count: int,
):
env = neon_simple_env
neon_env_builder.enable_generations = True
neon_env_builder.initial_shard_count = shard_count
neon_env_builder.num_pageservers = shard_count
env = neon_env_builder.init_start()
for pageserver in env.pageservers:
# FIXME: attachment_service is not yet sharding aware, so generation validation is broken.
pageserver.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
endpoint = env.endpoints.create_start("main", pageserver_ids=[p.id for p in env.pageservers])
env.neon_cli.create_branch("test_pg_regress", "empty")
# Connect to postgres and create a database called "regression".
endpoint = env.endpoints.create_start("test_pg_regress")
endpoint.safe_psql("CREATE DATABASE regression")
# Create some local directories for pg_regress to run in.

View File

@@ -0,0 +1,23 @@
from fixtures.neon_fixtures import NeonEnvBuilder
import pytest
@pytest.mark.parametrize("shard_count", [2])
@pytest.mark.timeout(1000)
def test_sharding(neon_env_builder: NeonEnvBuilder, shard_count: int):
neon_env_builder.enable_generations = True
neon_env_builder.initial_shard_count = shard_count
neon_env_builder.num_pageservers = shard_count
env = neon_env_builder.init_start()
for pageserver in env.pageservers:
# FIXME: attachment_service is not yet sharding aware, so generation validation is broken.
pageserver.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
pageserver.allowed_errors.append(".*Dropping stale deletions for tenant.*")
endpoint = env.endpoints.create_start("main", pageserver_ids=[p.id for p in env.pageservers])
with endpoint.cursor() as cur:
cur.execute("SET statement_timeout=0") # disable statement timeout
cur.execute("create table t(t bigint, payload text default repeat('?',200))")
cur.execute("insert into t values(generate_series(1,10000000))")
cur.execute("select count(*) from t")
assert cur.fetchone()[0] == 10000000

View File

@@ -146,6 +146,72 @@ def wait_for_pageserver_catchup(endpoint_main: Endpoint, polling_interval=1, tim
time.sleep(polling_interval)
def test_timeline_size_quota_on_startup(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
new_timeline_id = env.neon_cli.create_branch("test_timeline_size_quota_on_startup")
wait_for_timeline_size_init(client, tenant=env.initial_tenant, timeline=new_timeline_id)
endpoint_main = env.endpoints.create(
"test_timeline_size_quota_on_startup",
# Set small limit for the test
config_lines=["neon.max_cluster_size=30MB"],
)
endpoint_main.start()
log.info("postgres is running on 'test_timeline_size_quota_on_startup' branch")
with closing(endpoint_main.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE TABLE foo (t text)")
# Insert many rows. This query must fail because of space limit
try:
for _i in range(5000):
cur.execute(
"""
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 100) g
"""
)
# If we get here, the timeline size limit failed
log.error("Query unexpectedly succeeded")
raise AssertionError()
except psycopg2.errors.DiskFull as err:
log.info(f"Query expectedly failed with: {err}")
# Restart endpoint that reached the limit to ensure that it doesn't fail on startup
# i.e. the size limit is not enforced during startup.
endpoint_main.stop()
# don't skip pg_catalog updates - it runs CREATE EXTENSION neon
# which is needed for neon.pg_cluster_size() to work
endpoint_main.respec(skip_pg_catalog_updates=False)
endpoint_main.start()
# ensure that the limit is enforced after startup
with closing(endpoint_main.connect()) as conn:
with conn.cursor() as cur:
# This query must fail because of space limit
try:
cur.execute(
"""
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 100000) g
"""
)
# If we get here, the timeline size limit failed
log.error("Query unexpectedly succeeded")
raise AssertionError()
except psycopg2.errors.DiskFull as err:
log.info(f"Query expectedly failed with: {err}")
def test_timeline_size_quota(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
client = env.pageserver.http_client()

View File

@@ -30,6 +30,7 @@ from fixtures.neon_fixtures import (
Safekeeper,
SafekeeperHttpClient,
SafekeeperPort,
last_flush_lsn_upload,
)
from fixtures.pageserver.utils import (
timeline_delete_wait_completed,
@@ -286,29 +287,43 @@ def test_broker(neon_env_builder: NeonEnvBuilder):
# wait until remote_consistent_lsn gets advanced on all safekeepers
clients = [sk.http_client() for sk in env.safekeepers]
stat_before = [cli.timeline_status(tenant_id, timeline_id) for cli in clients]
log.info(f"statuses is {stat_before}")
log.info(f"statuses before insert: {stat_before}")
endpoint.safe_psql("INSERT INTO t SELECT generate_series(1,100), 'payload'")
# force checkpoint in pageserver to advance remote_consistent_lsn
wait_lsn_force_checkpoint(tenant_id, timeline_id, endpoint, env.pageserver)
# wait for remote_consistent_lsn to reach flush_lsn, forcing it with checkpoint
new_rcl = last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
log.info(f"new_rcl: {new_rcl}")
endpoint.stop()
# and wait till remote_consistent_lsn propagates to all safekeepers
#
# TODO: this executes long as timeline on safekeeper is immediately
# deactivated once rcl reaches pageserver one, and thus we generally wait
# till pageserver reconnects to all safekeepers one by one here. Timeline
# status on safekeeper should take into account peers state as well.
started_at = time.time()
while True:
stat_after = [cli.timeline_status(tenant_id, timeline_id) for cli in clients]
if all(
s_after.remote_consistent_lsn > s_before.remote_consistent_lsn
for s_after, s_before in zip(stat_after, stat_before)
):
if all([s_after.remote_consistent_lsn >= new_rcl for s_after in stat_after]):
break
elapsed = time.time() - started_at
if elapsed > 20:
if elapsed > 30:
raise RuntimeError(
f"timed out waiting {elapsed:.0f}s for remote_consistent_lsn propagation: status before {stat_before}, status current {stat_after}"
)
time.sleep(1)
# Ensure that safekeepers don't lose remote_consistent_lsn on restart.
# Control file is persisted each 5s. TODO: do that on shutdown and remove sleep.
time.sleep(6)
for sk in env.safekeepers:
sk.stop()
sk.start()
stat_after_restart = [cli.timeline_status(tenant_id, timeline_id) for cli in clients]
log.info(f"statuses after {stat_after_restart}")
assert all([s.remote_consistent_lsn >= new_rcl for s in stat_after_restart])
# Test that old WAL consumed by peers and pageserver is removed from safekeepers.
@pytest.mark.parametrize("auth_enabled", [False, True])