mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-23 08:00:37 +00:00
Compare commits
10 Commits
jcsp/secon
...
bench-writ
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b1de46c18d | ||
|
|
88064d8c1d | ||
|
|
4f0a8e92ad | ||
|
|
5952f350cb | ||
|
|
726c8e6730 | ||
|
|
f7067a38b7 | ||
|
|
896347f307 | ||
|
|
e5c81fef86 | ||
|
|
7ebe9ca1ac | ||
|
|
1588601503 |
2
.github/workflows/release.yml
vendored
2
.github/workflows/release.yml
vendored
@@ -2,7 +2,7 @@ name: Create Release Branch
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: '0 7 * * 2'
|
||||
- cron: '0 7 * * 5'
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
|
||||
@@ -278,8 +278,9 @@ fn main() -> Result<()> {
|
||||
if #[cfg(target_os = "linux")] {
|
||||
use std::env;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::warn;
|
||||
let vm_monitor_addr = matches.get_one::<String>("vm-monitor-addr");
|
||||
let vm_monitor_addr = matches
|
||||
.get_one::<String>("vm-monitor-addr")
|
||||
.expect("--vm-monitor-addr should always be set because it has a default arg");
|
||||
let file_cache_connstr = matches.get_one::<String>("filecache-connstr");
|
||||
let cgroup = matches.get_one::<String>("cgroup");
|
||||
let file_cache_on_disk = matches.get_flag("file-cache-on-disk");
|
||||
@@ -288,22 +289,16 @@ fn main() -> Result<()> {
|
||||
// Note: it seems like you can make a runtime in an inner scope and
|
||||
// if you start a task in it it won't be dropped. However, make it
|
||||
// in the outermost scope just to be safe.
|
||||
let rt = match (env::var_os("AUTOSCALING"), vm_monitor_addr) {
|
||||
(None, None) => None,
|
||||
(None, Some(_)) => {
|
||||
warn!("--vm-monitor-addr option set but AUTOSCALING env var not present");
|
||||
None
|
||||
}
|
||||
(Some(_), None) => {
|
||||
panic!("AUTOSCALING env var present but --vm-monitor-addr option not set")
|
||||
}
|
||||
(Some(_), Some(_)) => Some(
|
||||
let rt = if env::var_os("AUTOSCALING").is_some() {
|
||||
Some(
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.worker_threads(4)
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("failed to create tokio runtime for monitor"),
|
||||
),
|
||||
.expect("failed to create tokio runtime for monitor")
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// This token is used internally by the monitor to clean up all threads
|
||||
@@ -314,7 +309,7 @@ fn main() -> Result<()> {
|
||||
Box::leak(Box::new(vm_monitor::Args {
|
||||
cgroup: cgroup.cloned(),
|
||||
pgconnstr: file_cache_connstr.cloned(),
|
||||
addr: vm_monitor_addr.cloned().unwrap(),
|
||||
addr: vm_monitor_addr.clone(),
|
||||
file_cache_on_disk,
|
||||
})),
|
||||
token.clone(),
|
||||
|
||||
@@ -24,7 +24,7 @@ fn do_control_plane_request(
|
||||
) -> Result<ControlPlaneSpecResponse, (bool, String)> {
|
||||
let resp = reqwest::blocking::Client::new()
|
||||
.get(uri)
|
||||
.header("Authorization", jwt)
|
||||
.header("Authorization", format!("Bearer {}", jwt))
|
||||
.send()
|
||||
.map_err(|e| {
|
||||
(
|
||||
|
||||
@@ -12,6 +12,7 @@ use hyper::{Body, Request, Response};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use utils::http::endpoint::request_span;
|
||||
use utils::logging::{self, LogFormat};
|
||||
use utils::signals::{ShutdownSignals, Signal};
|
||||
|
||||
@@ -221,8 +222,25 @@ async fn handle_attach_hook(mut req: Request<Body>) -> Result<Response<Body>, Ap
|
||||
generation: 0,
|
||||
});
|
||||
|
||||
if attach_req.node_id.is_some() {
|
||||
if let Some(attaching_pageserver) = attach_req.node_id.as_ref() {
|
||||
tenant_state.generation += 1;
|
||||
tracing::info!(
|
||||
tenant_id = %attach_req.tenant_id,
|
||||
ps_id = %attaching_pageserver,
|
||||
generation = %tenant_state.generation,
|
||||
"issuing",
|
||||
);
|
||||
} else if let Some(ps_id) = tenant_state.pageserver {
|
||||
tracing::info!(
|
||||
tenant_id = %attach_req.tenant_id,
|
||||
%ps_id,
|
||||
generation = %tenant_state.generation,
|
||||
"dropping",
|
||||
);
|
||||
} else {
|
||||
tracing::info!(
|
||||
tenant_id = %attach_req.tenant_id,
|
||||
"no-op: tenant already has no pageserver");
|
||||
}
|
||||
tenant_state.pageserver = attach_req.node_id;
|
||||
let generation = tenant_state.generation;
|
||||
@@ -240,9 +258,9 @@ async fn handle_attach_hook(mut req: Request<Body>) -> Result<Response<Body>, Ap
|
||||
fn make_router(persistent_state: PersistentState) -> RouterBuilder<hyper::Body, ApiError> {
|
||||
endpoint::make_router()
|
||||
.data(Arc::new(State::new(persistent_state)))
|
||||
.post("/re-attach", handle_re_attach)
|
||||
.post("/validate", handle_validate)
|
||||
.post("/attach-hook", handle_attach_hook)
|
||||
.post("/re-attach", |r| request_span(r, handle_re_attach))
|
||||
.post("/validate", |r| request_span(r, handle_validate))
|
||||
.post("/attach-hook", |r| request_span(r, handle_attach_hook))
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
|
||||
108
docs/updating-postgres.md
Normal file
108
docs/updating-postgres.md
Normal file
@@ -0,0 +1,108 @@
|
||||
# Updating Postgres
|
||||
|
||||
## Minor Versions
|
||||
|
||||
When upgrading to a new minor version of Postgres, please follow these steps:
|
||||
|
||||
_Example: 15.4 is the new minor version to upgrade to from 15.3._
|
||||
|
||||
1. Clone the Neon Postgres repository if you have not done so already.
|
||||
|
||||
```shell
|
||||
git clone git@github.com:neondatabase/postgres.git
|
||||
```
|
||||
|
||||
1. Add the Postgres upstream remote.
|
||||
|
||||
```shell
|
||||
git remote add upstream https://git.postgresql.org/git/postgresql.git
|
||||
```
|
||||
|
||||
1. Create a new branch based on the stable branch you are updating.
|
||||
|
||||
```shell
|
||||
git checkout -b my-branch REL_15_STABLE_neon
|
||||
```
|
||||
|
||||
1. Tag the last commit on the stable branch you are updating.
|
||||
|
||||
```shell
|
||||
git tag REL_15_3_neon
|
||||
```
|
||||
|
||||
1. Push the new tag to the Neon Postgres repository.
|
||||
|
||||
```shell
|
||||
git push origin REL_15_3_neon
|
||||
```
|
||||
|
||||
1. Find the release tags you're looking for. They are of the form `REL_X_Y`.
|
||||
|
||||
1. Rebase the branch you created on the tag and resolve any conflicts.
|
||||
|
||||
```shell
|
||||
git fetch upstream REL_15_4
|
||||
git rebase REL_15_4
|
||||
```
|
||||
|
||||
1. Run the Postgres test suite to make sure our commits have not affected
|
||||
Postgres in a negative way.
|
||||
|
||||
```shell
|
||||
make check
|
||||
# OR
|
||||
meson test -C builddir
|
||||
```
|
||||
|
||||
1. Push your branch to the Neon Postgres repository.
|
||||
|
||||
```shell
|
||||
git push origin my-branch
|
||||
```
|
||||
|
||||
1. Clone the Neon repository if you have not done so already.
|
||||
|
||||
```shell
|
||||
git clone git@github.com:neondatabase/neon.git
|
||||
```
|
||||
|
||||
1. Create a new branch.
|
||||
|
||||
1. Change the `revisions.json` file to point at the HEAD of your Postgres
|
||||
branch.
|
||||
|
||||
1. Update the Git submodule.
|
||||
|
||||
```shell
|
||||
git submodule set-branch --branch my-branch vendor/postgres-v15
|
||||
git submodule update --remote vendor/postgres-v15
|
||||
```
|
||||
|
||||
1. Run the Neon test suite to make sure that Neon is still good to go on this
|
||||
minor Postgres release.
|
||||
|
||||
```shell
|
||||
./scripts/poetry -k pg15
|
||||
```
|
||||
|
||||
1. Commit your changes.
|
||||
|
||||
1. Create a pull request, and wait for CI to go green.
|
||||
|
||||
1. Force push the rebased Postgres branches into the Neon Postgres repository.
|
||||
|
||||
```shell
|
||||
git push --force origin my-branch:REL_15_STABLE_neon
|
||||
```
|
||||
|
||||
It may require disabling various branch protections.
|
||||
|
||||
1. Update your Neon PR to point at the branches.
|
||||
|
||||
```shell
|
||||
git submodule set-branch --branch REL_15_STABLE_neon vendor/postgres-v15
|
||||
git commit --amend --no-edit
|
||||
git push --force origin
|
||||
```
|
||||
|
||||
1. Merge the pull request after getting approval(s) and CI completion.
|
||||
@@ -88,6 +88,10 @@ criterion.workspace = true
|
||||
hex-literal.workspace = true
|
||||
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time", "test-util"] }
|
||||
|
||||
[[bench]]
|
||||
name = "bench_writes"
|
||||
harness = false
|
||||
|
||||
[[bench]]
|
||||
name = "bench_layer_map"
|
||||
harness = false
|
||||
|
||||
@@ -10,3 +10,7 @@ To run a specific file:
|
||||
|
||||
To run a specific function:
|
||||
`cargo bench --bench bench_layer_map -- real_map_uniform_queries`
|
||||
|
||||
To add a new benchmark:
|
||||
1. Create new file containing `criterion_main!`
|
||||
2. Add it to `Cargo.toml`
|
||||
|
||||
76
pageserver/benches/bench_writes.rs
Normal file
76
pageserver/benches/bench_writes.rs
Normal file
@@ -0,0 +1,76 @@
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use criterion::{black_box, criterion_group, criterion_main, Criterion};
|
||||
use pageserver::{tenant::storage_layer::InMemoryLayer, config::PageServerConf, context::{RequestContext, DownloadBehavior}, task_mgr::TaskKind, repository::Key, virtual_file};
|
||||
use pageserver::repository::Value;
|
||||
use utils::{id::{TimelineId, TenantId}, lsn::Lsn};
|
||||
|
||||
fn bench_writes(c: &mut Criterion) {
|
||||
// Boilerplate
|
||||
// TODO this setup can be avoided if I reuse TenantHarness but it's difficult
|
||||
// because it's only compiled for tests, and it's hacky because tbh we
|
||||
// shouldn't need this many inputs for a function that just writes bytes
|
||||
// from memory to disk. Performance-critical functions should be
|
||||
// self-contained (almost like they're separate libraries) and all the
|
||||
// monolithic pageserver machinery should live outside.
|
||||
virtual_file::init(10);
|
||||
let repo_dir = Utf8PathBuf::from(&"/home/bojan/tmp/repo_dir");
|
||||
let conf = PageServerConf::dummy_conf(repo_dir);
|
||||
let conf: &'static PageServerConf = Box::leak(Box::new(conf));
|
||||
let timeline_id = TimelineId::generate();
|
||||
let tenant_id = TenantId::generate();
|
||||
let start_lsn = Lsn(0);
|
||||
let ctx = RequestContext::new(TaskKind::LayerFlushTask, DownloadBehavior::Error);
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
fn test_img(s: &str) -> Bytes {
|
||||
let mut buf = BytesMut::new();
|
||||
buf.extend_from_slice(s.as_bytes());
|
||||
buf.resize(64, 0);
|
||||
|
||||
buf.freeze()
|
||||
}
|
||||
|
||||
// Make the InMemoryLayer that will be flushed
|
||||
let layer = rt.block_on(async {
|
||||
let l = InMemoryLayer::create(&conf, timeline_id, tenant_id, start_lsn).await.unwrap();
|
||||
|
||||
let mut lsn = Lsn(0x10);
|
||||
let mut key = Key::from_hex("012222222233333333444444445500000000").unwrap();
|
||||
let mut blknum = 0;
|
||||
for _ in 0..100 {
|
||||
key.field6 = blknum;
|
||||
let val = Value::Image(test_img(&format!("{} at {}", blknum, lsn)));
|
||||
l.put_value(key, lsn, &val, &ctx).await.unwrap();
|
||||
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
blknum += 1;
|
||||
}
|
||||
l
|
||||
});
|
||||
|
||||
rt.block_on(async {
|
||||
layer.write_to_disk_bench(&ctx).await.unwrap();
|
||||
});
|
||||
|
||||
|
||||
let mut group = c.benchmark_group("g1");
|
||||
group.bench_function("f1", |b| {
|
||||
b.iter(|| {
|
||||
// TODO
|
||||
});
|
||||
});
|
||||
group.bench_function("f2", |b| {
|
||||
b.iter(|| {
|
||||
// TODO
|
||||
});
|
||||
});
|
||||
group.finish();
|
||||
}
|
||||
|
||||
|
||||
criterion_group!(group_1, bench_writes);
|
||||
criterion_main!(group_1);
|
||||
@@ -57,7 +57,10 @@ impl ControlPlaneClient {
|
||||
|
||||
if let Some(jwt) = &conf.control_plane_api_token {
|
||||
let mut headers = hyper::HeaderMap::new();
|
||||
headers.insert("Authorization", jwt.get_contents().parse().unwrap());
|
||||
headers.insert(
|
||||
"Authorization",
|
||||
format!("Bearer {}", jwt.get_contents()).parse().unwrap(),
|
||||
);
|
||||
client = client.default_headers(headers);
|
||||
}
|
||||
|
||||
|
||||
@@ -569,7 +569,17 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/NotFoundError"
|
||||
"409":
|
||||
description: Tenant download is already in progress
|
||||
description: |
|
||||
The tenant is already known to Pageserver in some way,
|
||||
and hence this `/attach` call has been rejected.
|
||||
|
||||
Some examples of how this can happen:
|
||||
- tenant was created on this pageserver
|
||||
- tenant attachment was started by an earlier call to `/attach`.
|
||||
|
||||
Callers should poll the tenant status's `attachment_status` field,
|
||||
like for status 202. See the longer description for `POST /attach`
|
||||
for details.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
|
||||
@@ -4,6 +4,7 @@ pub mod delta_layer;
|
||||
mod filename;
|
||||
mod image_layer;
|
||||
mod inmemory_layer;
|
||||
mod inmemory_layer_raw;
|
||||
mod layer;
|
||||
mod layer_desc;
|
||||
|
||||
|
||||
@@ -367,4 +367,61 @@ impl InMemoryLayer {
|
||||
let delta_layer = delta_layer_writer.finish(Key::MAX, timeline).await?;
|
||||
Ok(delta_layer)
|
||||
}
|
||||
|
||||
/// Write this frozen in-memory layer to disk.
|
||||
///
|
||||
/// Returns a new delta layer with all the same data as this in-memory layer
|
||||
pub async fn write_to_disk_bench(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
// Grab the lock in read-mode. We hold it over the I/O, but because this
|
||||
// layer is not writeable anymore, no one should be trying to acquire the
|
||||
// write lock on it, so we shouldn't block anyone. There's one exception
|
||||
// though: another thread might have grabbed a reference to this layer
|
||||
// in `get_layer_for_write' just before the checkpointer called
|
||||
// `freeze`, and then `write_to_disk` on it. When the thread gets the
|
||||
// lock, it will see that it's not writeable anymore and retry, but it
|
||||
// would have to wait until we release it. That race condition is very
|
||||
// rare though, so we just accept the potential latency hit for now.
|
||||
let inner = self.inner.read().await;
|
||||
|
||||
let end_lsn = *self.end_lsn.get().unwrap();
|
||||
|
||||
let mut delta_layer_writer = DeltaLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_id,
|
||||
Key::MIN,
|
||||
self.start_lsn..end_lsn,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut buf = Vec::new();
|
||||
|
||||
let cursor = inner.file.block_cursor();
|
||||
|
||||
let mut keys: Vec<(&Key, &VecMap<Lsn, u64>)> = inner.index.iter().collect();
|
||||
keys.sort_by_key(|k| k.0);
|
||||
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::InMemoryLayer)
|
||||
.build();
|
||||
for (key, vec_map) in keys.iter() {
|
||||
let key = **key;
|
||||
// Write all page versions
|
||||
for (lsn, pos) in vec_map.as_slice() {
|
||||
cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
|
||||
let will_init = Value::des(&buf)?.will_init();
|
||||
delta_layer_writer
|
||||
.put_value_bytes(key, *lsn, &buf, will_init)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
// MAX is used here because we identify L0 layers by full key range
|
||||
// TODO XXX do this
|
||||
// let delta_layer = delta_layer_writer.finish(Key::MAX, timeline).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
23
pageserver/src/tenant/storage_layer/inmemory_layer_raw.rs
Normal file
23
pageserver/src/tenant/storage_layer/inmemory_layer_raw.rs
Normal file
@@ -0,0 +1,23 @@
|
||||
|
||||
|
||||
pub struct InMemoryLayerRaw {
|
||||
}
|
||||
|
||||
impl InMemoryLayerRaw {
|
||||
pub async fn new() -> Self {
|
||||
Self {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn put_value(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
val: &Value,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -125,6 +125,7 @@ impl Layer {
|
||||
let inner = Arc::new(DownloadedLayer {
|
||||
owner: owner.clone(),
|
||||
kind: tokio::sync::OnceCell::default(),
|
||||
version: 0,
|
||||
});
|
||||
resident = Some(inner.clone());
|
||||
|
||||
@@ -163,6 +164,7 @@ impl Layer {
|
||||
let inner = Arc::new(DownloadedLayer {
|
||||
owner: owner.clone(),
|
||||
kind: tokio::sync::OnceCell::default(),
|
||||
version: 0,
|
||||
});
|
||||
resident = Some(inner.clone());
|
||||
let access_stats = LayerAccessStats::empty_will_record_residence_event_later();
|
||||
@@ -328,16 +330,17 @@ impl Layer {
|
||||
/// read with [`Layer::get_value_reconstruct_data`].
|
||||
///
|
||||
/// [`LayerMap::search`]: crate::tenant::layer_map::LayerMap::search
|
||||
#[derive(Debug)]
|
||||
enum ResidentOrWantedEvicted {
|
||||
Resident(Arc<DownloadedLayer>),
|
||||
WantedEvicted(Weak<DownloadedLayer>),
|
||||
WantedEvicted(Weak<DownloadedLayer>, usize),
|
||||
}
|
||||
|
||||
impl ResidentOrWantedEvicted {
|
||||
fn get(&self) -> Option<Arc<DownloadedLayer>> {
|
||||
match self {
|
||||
ResidentOrWantedEvicted::Resident(strong) => Some(strong.clone()),
|
||||
ResidentOrWantedEvicted::WantedEvicted(weak) => match weak.upgrade() {
|
||||
ResidentOrWantedEvicted::WantedEvicted(weak, _) => match weak.upgrade() {
|
||||
Some(strong) => {
|
||||
LAYER_IMPL_METRICS.inc_raced_wanted_evicted_accesses();
|
||||
Some(strong)
|
||||
@@ -349,21 +352,16 @@ impl ResidentOrWantedEvicted {
|
||||
/// When eviction is first requested, drop down to holding a [`Weak`].
|
||||
///
|
||||
/// Returns `true` if this was the first time eviction was requested.
|
||||
fn downgrade(&mut self) -> &Weak<DownloadedLayer> {
|
||||
let _was_first = match self {
|
||||
fn downgrade(&mut self) -> bool {
|
||||
match self {
|
||||
ResidentOrWantedEvicted::Resident(strong) => {
|
||||
let weak = Arc::downgrade(strong);
|
||||
*self = ResidentOrWantedEvicted::WantedEvicted(weak);
|
||||
*self = ResidentOrWantedEvicted::WantedEvicted(weak, strong.version);
|
||||
// returning the weak is not useful, because the drop could had already ran with
|
||||
// the replacement above, and that will take care of cleaning the Option we are in
|
||||
true
|
||||
}
|
||||
ResidentOrWantedEvicted::WantedEvicted(_) => false,
|
||||
};
|
||||
|
||||
match self {
|
||||
ResidentOrWantedEvicted::WantedEvicted(ref weak) => weak,
|
||||
_ => unreachable!("just wrote wanted evicted"),
|
||||
ResidentOrWantedEvicted::WantedEvicted(..) => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -398,8 +396,10 @@ struct LayerInner {
|
||||
/// [`LayerInner::on_downloaded_layer_drop`].
|
||||
wanted_evicted: AtomicBool,
|
||||
|
||||
/// Version is to make sure we will in fact only evict a file if no new download has been
|
||||
/// started.
|
||||
/// Version is to make sure we will only evict a specific download of a file.
|
||||
///
|
||||
/// Incremented for each download, stored in `DownloadedLayer::version` or
|
||||
/// `ResidentOrWantedEvicted::WantedEvicted`.
|
||||
version: AtomicUsize,
|
||||
|
||||
/// Allow subscribing to when the layer actually gets evicted.
|
||||
@@ -515,6 +515,14 @@ impl LayerInner {
|
||||
.timeline_path(&timeline.tenant_id, &timeline.timeline_id)
|
||||
.join(desc.filename().to_string());
|
||||
|
||||
let (inner, version) = if let Some(inner) = downloaded {
|
||||
let version = inner.version;
|
||||
let resident = ResidentOrWantedEvicted::Resident(inner);
|
||||
(heavier_once_cell::OnceCell::new(resident), version)
|
||||
} else {
|
||||
(heavier_once_cell::OnceCell::default(), 0)
|
||||
};
|
||||
|
||||
LayerInner {
|
||||
conf,
|
||||
path,
|
||||
@@ -524,12 +532,8 @@ impl LayerInner {
|
||||
access_stats,
|
||||
wanted_garbage_collected: AtomicBool::new(false),
|
||||
wanted_evicted: AtomicBool::new(false),
|
||||
inner: if let Some(inner) = downloaded {
|
||||
heavier_once_cell::OnceCell::new(ResidentOrWantedEvicted::Resident(inner))
|
||||
} else {
|
||||
heavier_once_cell::OnceCell::default()
|
||||
},
|
||||
version: AtomicUsize::new(0),
|
||||
inner,
|
||||
version: AtomicUsize::new(version),
|
||||
status: tokio::sync::broadcast::channel(1).0,
|
||||
consecutive_failures: AtomicUsize::new(0),
|
||||
generation,
|
||||
@@ -604,7 +608,7 @@ impl LayerInner {
|
||||
loop {
|
||||
let download = move || async move {
|
||||
// disable any scheduled but not yet running eviction deletions for this
|
||||
self.version.fetch_add(1, Ordering::Relaxed);
|
||||
let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// no need to make the evict_and_wait wait for the actual download to complete
|
||||
drop(self.status.send(Status::Downloaded));
|
||||
@@ -655,6 +659,7 @@ impl LayerInner {
|
||||
let res = Arc::new(DownloadedLayer {
|
||||
owner: Arc::downgrade(self),
|
||||
kind: tokio::sync::OnceCell::default(),
|
||||
version: next_version,
|
||||
});
|
||||
|
||||
self.access_stats.record_residence_event(
|
||||
@@ -896,7 +901,7 @@ impl LayerInner {
|
||||
}
|
||||
|
||||
/// `DownloadedLayer` is being dropped, so it calls this method.
|
||||
fn on_downloaded_layer_drop(self: Arc<LayerInner>) {
|
||||
fn on_downloaded_layer_drop(self: Arc<LayerInner>, version: usize) {
|
||||
let gc = self.wanted_garbage_collected.load(Ordering::Acquire);
|
||||
let evict = self.wanted_evicted.load(Ordering::Acquire);
|
||||
let can_evict = self.have_remote_client;
|
||||
@@ -904,15 +909,16 @@ impl LayerInner {
|
||||
if gc {
|
||||
// do nothing now, only in LayerInner::drop
|
||||
} else if can_evict && evict {
|
||||
let version = self.version.load(Ordering::Relaxed);
|
||||
|
||||
let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_id, timeline_id = %self.desc.timeline_id, layer=%self);
|
||||
let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_id, timeline_id = %self.desc.timeline_id, layer=%self, %version);
|
||||
|
||||
// downgrade for queueing, in case there's a tear down already ongoing we should not
|
||||
// hold it alive.
|
||||
let this = Arc::downgrade(&self);
|
||||
drop(self);
|
||||
|
||||
// NOTE: this scope *must* never call `self.inner.get` because evict_and_wait might
|
||||
// drop while the `self.inner` is being locked, leading to a deadlock.
|
||||
|
||||
crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(move || {
|
||||
let _g = span.entered();
|
||||
|
||||
@@ -922,19 +928,15 @@ impl LayerInner {
|
||||
LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone);
|
||||
return;
|
||||
};
|
||||
this.evict_blocking(version);
|
||||
match this.evict_blocking(version) {
|
||||
Ok(()) => LAYER_IMPL_METRICS.inc_completed_evictions(),
|
||||
Err(reason) => LAYER_IMPL_METRICS.inc_eviction_cancelled(reason),
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn evict_blocking(&self, version: usize) {
|
||||
match self.evict_blocking0(version) {
|
||||
Ok(()) => LAYER_IMPL_METRICS.inc_completed_evictions(),
|
||||
Err(reason) => LAYER_IMPL_METRICS.inc_eviction_cancelled(reason),
|
||||
}
|
||||
}
|
||||
|
||||
fn evict_blocking0(&self, version: usize) -> Result<(), EvictionCancelled> {
|
||||
fn evict_blocking(&self, only_version: usize) -> Result<(), EvictionCancelled> {
|
||||
// deleted or detached timeline, don't do anything.
|
||||
let Some(timeline) = self.timeline.upgrade() else {
|
||||
return Err(EvictionCancelled::TimelineGone);
|
||||
@@ -945,32 +947,34 @@ impl LayerInner {
|
||||
let _permit = {
|
||||
let maybe_downloaded = self.inner.get();
|
||||
|
||||
if version != self.version.load(Ordering::Relaxed) {
|
||||
// downloadness-state has advanced, we might no longer be the latest eviction
|
||||
// work; don't do anything.
|
||||
//
|
||||
// this is possible to get to by having:
|
||||
//
|
||||
// 1. wanted_evicted.store(true)
|
||||
// 2. ResidentOrWantedEvicted::downgrade
|
||||
// 3. DownloadedLayer::drop
|
||||
// 4. LayerInner::get_or_maybe_download
|
||||
// 5. LayerInner::evict_blocking
|
||||
return Err(EvictionCancelled::VersionCheckFailed);
|
||||
}
|
||||
|
||||
// free the DownloadedLayer allocation
|
||||
match maybe_downloaded.map(|mut g| g.take_and_deinit()) {
|
||||
Some((taken, permit)) => {
|
||||
assert!(matches!(taken, ResidentOrWantedEvicted::WantedEvicted(_)));
|
||||
permit
|
||||
let (_weak, permit) = match maybe_downloaded {
|
||||
Some(mut guard) => {
|
||||
if let ResidentOrWantedEvicted::WantedEvicted(_weak, version) = &*guard {
|
||||
if *version == only_version {
|
||||
guard.take_and_deinit()
|
||||
} else {
|
||||
// this was not for us; maybe there's another eviction job
|
||||
// TODO: does it make any sense to stall here? unique versions do not
|
||||
// matter, we only want to make sure not to evict a resident, which we
|
||||
// are not doing.
|
||||
return Err(EvictionCancelled::VersionCheckFailed);
|
||||
}
|
||||
} else {
|
||||
return Err(EvictionCancelled::AlreadyReinitialized);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
unreachable!("we do the version checking for this exact reason")
|
||||
// already deinitialized, perhaps get_or_maybe_download did this and is
|
||||
// currently waiting to reinitialize it
|
||||
return Err(EvictionCancelled::LostToDownload);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
permit
|
||||
};
|
||||
|
||||
// now accesses to inner.get_or_init wait on the semaphore or the `_permit`
|
||||
|
||||
self.access_stats.record_residence_event(
|
||||
LayerResidenceStatus::Evicted,
|
||||
LayerResidenceEventReason::ResidenceChange,
|
||||
@@ -1086,6 +1090,7 @@ impl std::fmt::Display for NeedsDownload {
|
||||
pub(crate) struct DownloadedLayer {
|
||||
owner: Weak<LayerInner>,
|
||||
kind: tokio::sync::OnceCell<anyhow::Result<LayerKind>>,
|
||||
version: usize,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for DownloadedLayer {
|
||||
@@ -1093,6 +1098,7 @@ impl std::fmt::Debug for DownloadedLayer {
|
||||
f.debug_struct("DownloadedLayer")
|
||||
// owner omitted because it is always "Weak"
|
||||
.field("kind", &self.kind)
|
||||
.field("version", &self.version)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@@ -1100,7 +1106,7 @@ impl std::fmt::Debug for DownloadedLayer {
|
||||
impl Drop for DownloadedLayer {
|
||||
fn drop(&mut self) {
|
||||
if let Some(owner) = self.owner.upgrade() {
|
||||
owner.on_downloaded_layer_drop();
|
||||
owner.on_downloaded_layer_drop(self.version);
|
||||
} else {
|
||||
// no need to do anything, we are shutting down
|
||||
}
|
||||
@@ -1458,6 +1464,9 @@ enum EvictionCancelled {
|
||||
VersionCheckFailed,
|
||||
FileNotFound,
|
||||
RemoveFailed,
|
||||
AlreadyReinitialized,
|
||||
/// Not evicted because of a pending reinitialization
|
||||
LostToDownload,
|
||||
}
|
||||
|
||||
impl EvictionCancelled {
|
||||
@@ -1468,6 +1477,8 @@ impl EvictionCancelled {
|
||||
EvictionCancelled::VersionCheckFailed => "version_check_fail",
|
||||
EvictionCancelled::FileNotFound => "file_not_found",
|
||||
EvictionCancelled::RemoveFailed => "remove_failed",
|
||||
EvictionCancelled::AlreadyReinitialized => "already_reinitialized",
|
||||
EvictionCancelled::LostToDownload => "lost_to_download",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -857,7 +857,8 @@ impl WalRedoProcess {
|
||||
let in_revents = stdin_pollfds[0].revents().unwrap();
|
||||
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
|
||||
nwrite += proc.stdin.write(&writebuf[nwrite..])?;
|
||||
} else if in_revents.contains(PollFlags::POLLHUP) {
|
||||
}
|
||||
if in_revents.contains(PollFlags::POLLHUP) {
|
||||
// We still have more data to write, but the process closed the pipe.
|
||||
anyhow::bail!("WAL redo process closed its stdin unexpectedly");
|
||||
}
|
||||
@@ -907,7 +908,8 @@ impl WalRedoProcess {
|
||||
let out_revents = stdout_pollfds[0].revents().unwrap();
|
||||
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
|
||||
nresult += output.stdout.read(&mut resultbuf[nresult..])?;
|
||||
} else if out_revents.contains(PollFlags::POLLHUP) {
|
||||
}
|
||||
if out_revents.contains(PollFlags::POLLHUP) {
|
||||
anyhow::bail!("WAL redo process closed its stdout unexpectedly");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,7 +49,7 @@ impl Api {
|
||||
.endpoint
|
||||
.get("proxy_get_role_secret")
|
||||
.header("X-Request-ID", &request_id)
|
||||
.header("Authorization", &self.jwt)
|
||||
.header("Authorization", format!("Bearer {}", &self.jwt))
|
||||
.query(&[("session_id", extra.session_id)])
|
||||
.query(&[
|
||||
("application_name", extra.application_name),
|
||||
@@ -94,7 +94,7 @@ impl Api {
|
||||
.endpoint
|
||||
.get("proxy_wake_compute")
|
||||
.header("X-Request-ID", &request_id)
|
||||
.header("Authorization", &self.jwt)
|
||||
.header("Authorization", format!("Bearer {}", &self.jwt))
|
||||
.query(&[("session_id", extra.session_id)])
|
||||
.query(&[
|
||||
("application_name", extra.application_name),
|
||||
|
||||
@@ -249,7 +249,7 @@ def assert_prefix_empty(neon_env_builder: "NeonEnvBuilder", prefix: Optional[str
|
||||
# this has been seen in the wild by tests with the below contradicting logging
|
||||
# https://neon-github-public-dev.s3.amazonaws.com/reports/pr-5322/6207777020/index.html#suites/3556ed71f2d69272a7014df6dcb02317/53b5c368b5a68865
|
||||
# this seems like a mock_s3 issue
|
||||
log.warn(
|
||||
log.warning(
|
||||
f"contrading ListObjectsV2 response with KeyCount={keys} and Contents={objects}, CommonPrefixes={common_prefixes}, assuming this means KeyCount=0"
|
||||
)
|
||||
keys = 0
|
||||
@@ -257,7 +257,7 @@ def assert_prefix_empty(neon_env_builder: "NeonEnvBuilder", prefix: Optional[str
|
||||
# this has been seen in one case with mock_s3:
|
||||
# https://neon-github-public-dev.s3.amazonaws.com/reports/pr-4938/6000769714/index.html#suites/3556ed71f2d69272a7014df6dcb02317/ca01e4f4d8d9a11f
|
||||
# looking at moto impl, it might be there's a race with common prefix (sub directory) not going away with deletes
|
||||
log.warn(
|
||||
log.warning(
|
||||
f"contradicting ListObjectsV2 response with KeyCount={keys} and Contents={objects}, CommonPrefixes={common_prefixes}"
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user