mirror of
https://github.com/neondatabase/neon.git
synced 2026-07-05 21:20:37 +00:00
more WIP to try to get parallel mode
This commit is contained in:
@@ -5,6 +5,7 @@
|
||||
//! The old legacy algorithm is implemented directly in `timeline.rs`.
|
||||
|
||||
use std::collections::{BinaryHeap, HashSet};
|
||||
use std::future::Future;
|
||||
use std::ops::{Deref, Range};
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -26,6 +27,7 @@ use serde::Serialize;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, info, info_span, trace, warn, Instrument};
|
||||
use utils::id::TimelineId;
|
||||
use utils::sync::gate::GateGuard;
|
||||
|
||||
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
|
||||
use crate::page_cache;
|
||||
@@ -870,6 +872,49 @@ impl Timeline {
|
||||
KeyLsn,
|
||||
KeyLsnValue,
|
||||
}
|
||||
type IterItem = anyhow::Result<Option<(Key, Lsn, Value)>>;
|
||||
enum MaybeSpawnedMergeIterator<'a> {
|
||||
NotSpawned(MergeIterator<'a>),
|
||||
Spawned {
|
||||
rx: tokio::sync::mpsc::Receiver<IterItem>,
|
||||
},
|
||||
}
|
||||
impl MaybeSpawnedMergeIterator<'a> {
|
||||
pub fn not_spawned(deltas: &[ResidentDeltaLayer], ctx: &RequestContext) -> Self {
|
||||
Self::NotSpawned(MergeIterator::create(&deltas, &[], ctx))
|
||||
}
|
||||
pub fn spawned(
|
||||
timeline_gate_guard: GateGuard,
|
||||
timeline_cancel: &CancellationToken,
|
||||
deltas: Vec<ResidentDeltaLayer>,
|
||||
ctx: &RequestContext,
|
||||
) -> Self {
|
||||
let ctx = ctx.attached_child();
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(1);
|
||||
// this task gets cancelled when the MaybeSpawnedMergeIterator is dropped, courtesy of the `tx.closed()` check
|
||||
tokio::spawn(async move {
|
||||
let iter = MergeIterator::create(&deltas, &[], &ctx);
|
||||
loop {
|
||||
let item = tokio::select! {
|
||||
res = iter.next() => res,
|
||||
_ = tx.closed() => break,
|
||||
};
|
||||
let res = tx.send(item).await;
|
||||
if res.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
drop(timeline_gate_guard);
|
||||
});
|
||||
Self::Spawned { rx }
|
||||
}
|
||||
pub async fn next(&mut self) -> IterItem {
|
||||
match self {
|
||||
Self::NotSpawned(iter) => iter.next().await,
|
||||
Self::Spawned { rx, .. } => rx.recv().await.expect("the sender doesn't drop before the receiver and calling next() after receiving Ok(None) is an error"),
|
||||
}
|
||||
}
|
||||
}
|
||||
type VecIter<'a> = std::slice::Iter<'a, DeltaEntry<'a>>; // TODO: distinguished lifetimes
|
||||
impl AllValuesIter<'_> {
|
||||
async fn next_all_keys_iter(
|
||||
@@ -1007,7 +1052,7 @@ impl Timeline {
|
||||
let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
|
||||
deltas.push(l);
|
||||
}
|
||||
MergeIterator::create(&deltas, &[], ctx)
|
||||
MergeIterator::create(deltas, &[], ctx)
|
||||
};
|
||||
match validate {
|
||||
None => AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter },
|
||||
|
||||
Reference in New Issue
Block a user