mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
Actually connect
This commit is contained in:
@@ -5,10 +5,13 @@ use std::{
|
||||
path::PathBuf,
|
||||
str::FromStr,
|
||||
};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::{io::AsyncWriteExt, net::TcpStream};
|
||||
|
||||
use clap::{App, Arg};
|
||||
use utils::zid::{ZTenantId, ZTimelineId};
|
||||
use utils::{
|
||||
pq_proto::FeMessage,
|
||||
zid::{ZTenantId, ZTimelineId},
|
||||
};
|
||||
|
||||
// TODO put this in library, dedup with stuff in control_plane
|
||||
/// Client for the pageserver's pagestream API
|
||||
@@ -16,18 +19,18 @@ struct PagestreamApi {
|
||||
stream: TcpStream,
|
||||
}
|
||||
|
||||
/// Good enough implementation for these tests
|
||||
impl PagestreamApi {
|
||||
async fn connect(
|
||||
connstr: &str,
|
||||
tenant: &ZTenantId,
|
||||
timeline: &ZTimelineId,
|
||||
) -> anyhow::Result<PagestreamApi> {
|
||||
let mut stream = TcpStream::connect("localhost:15000").await?;
|
||||
|
||||
// Connect to pageserver
|
||||
println!("connecting to: {}", connstr);
|
||||
// Parse connstr
|
||||
let config = tokio_postgres::Config::from_str(connstr).expect("bad connstr");
|
||||
let tcp_addr = format!("localhost:{}", config.get_ports()[0]);
|
||||
|
||||
// Connect
|
||||
let mut stream = TcpStream::connect(tcp_addr).await?;
|
||||
let (client, conn) = config
|
||||
.connect_raw(&mut stream, tokio_postgres::NoTls)
|
||||
.await?;
|
||||
@@ -39,13 +42,15 @@ impl PagestreamApi {
|
||||
_ = client.query(init_query.as_str(), &[]) => (),
|
||||
};
|
||||
|
||||
println!("connected");
|
||||
|
||||
Ok(PagestreamApi { stream })
|
||||
}
|
||||
|
||||
async fn make_request(&mut self) -> anyhow::Result<()> {
|
||||
// TODO implement
|
||||
async fn make_request(&mut self, msg: PagestreamFeMessage) -> anyhow::Result<()> {
|
||||
let msg_bytes = msg.serialize();
|
||||
self.stream.write_all(&msg_bytes).await?;
|
||||
|
||||
let _response = FeMessage::read_fut(&mut self.stream).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -56,7 +61,7 @@ async fn replay_trace<R: std::io::Read>(
|
||||
) -> anyhow::Result<()> {
|
||||
while let Ok(msg) = PagestreamFeMessage::parse(reader) {
|
||||
println!("Parsed message {:?}", msg);
|
||||
pagestream.make_request().await?;
|
||||
// pagestream.make_request(msg).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -97,10 +102,14 @@ async fn main() -> anyhow::Result<()> {
|
||||
let path = entry.path();
|
||||
let _conn_id = ZTimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||
|
||||
// TODO The pageserver deletes existing traces?
|
||||
// LOL yes because I use tenant ID as trace id
|
||||
let pagestream = PagestreamApi::connect(connstr, &tenant_id, &timeline_id).await?;
|
||||
|
||||
let file = File::open(path)?;
|
||||
let file = File::open(path.clone())?;
|
||||
let mut reader = BufReader::new(file);
|
||||
// let len = file.metadata().unwrap().len();
|
||||
// println!("replaying {:?} trace {} bytes", path, len);
|
||||
replay_trace(&mut reader, pagestream).await?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -499,7 +499,9 @@ impl PageServerHandler {
|
||||
// Make request tracer if needed
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
|
||||
let mut tracer = if repo.get_trace_read_requests() {
|
||||
let path = repo.conf.trace_path(&tenant_id, &timeline_id, &timeline_id);
|
||||
let path = repo
|
||||
.conf
|
||||
.trace_path(&tenant_id, &timeline_id, &ZTimelineId::generate());
|
||||
Some(Tracer::new(path))
|
||||
} else {
|
||||
None
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
from contextlib import closing
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, ReplayBin
|
||||
|
||||
|
||||
@@ -10,15 +12,30 @@ def test_trace_replay(neon_env_builder: NeonEnvBuilder, replay_bin: ReplayBin):
|
||||
"trace_read_requests": "true",
|
||||
}
|
||||
)
|
||||
timeline = env.neon_cli.create_timeline("test_trace_replay", tenant_id=tenant)
|
||||
env.neon_cli.create_timeline("test_trace_replay", tenant_id=tenant)
|
||||
pg = env.postgres.create_start("test_trace_replay", "main", tenant)
|
||||
|
||||
pg.safe_psql("select 1;")
|
||||
pg.safe_psql("select 1;")
|
||||
pg.safe_psql("select 1;")
|
||||
pg.safe_psql("select 1;")
|
||||
|
||||
trace_path = env.repo_dir / "traces" / str(tenant) / str(timeline) / str(timeline)
|
||||
assert trace_path.exists()
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("create table t (i integer);")
|
||||
cur.execute(f"insert into t values (generate_series(1,{10000}));")
|
||||
cur.execute("select count(*) from t;")
|
||||
|
||||
# Stop pg so we drop the connection and flush the traces
|
||||
pg.stop()
|
||||
|
||||
# TODO turn off tracing now?
|
||||
|
||||
# trace_path = env.repo_dir / "traces" / str(tenant) / str(timeline) / str(timeline)
|
||||
# assert trace_path.exists()
|
||||
|
||||
print("replaying")
|
||||
ps_connstr = env.pageserver.connstr().replace("'", "\\'")
|
||||
ps_connstr = env.pageserver.connstr()
|
||||
# ps_connstr = "host=localhost port=15004 dbname=postgres user=neon_admin"
|
||||
output = replay_bin.replay_all(ps_connstr)
|
||||
print(output)
|
||||
|
||||
Reference in New Issue
Block a user