mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-21 15:10:44 +00:00
Add a test for clickhouse as a logical replication consumer (#8408)
## Problem We need to test logical replication with 3rd-party tools regularly. ## Summary of changes Added a test using ClickHouse as a client Co-authored-by: Alexander Bayandin <alexander@neon.tech>
This commit is contained in:
88
test_runner/logical_repl/test_log_repl.py
Normal file
88
test_runner/logical_repl/test_log_repl.py
Normal file
@@ -0,0 +1,88 @@
|
||||
"""
|
||||
Test the logical replication in Neon with the different consumers
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import time
|
||||
|
||||
import clickhouse_connect
|
||||
import psycopg2
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import RemotePostgres
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
|
||||
def query_clickhouse(
|
||||
client,
|
||||
query: str,
|
||||
digest: str,
|
||||
) -> None:
|
||||
"""
|
||||
Run the query on the client
|
||||
return answer if successful, raise an exception otherwise
|
||||
"""
|
||||
log.debug("Query: %s", query)
|
||||
res = client.query(query)
|
||||
log.debug(res.result_rows)
|
||||
m = hashlib.sha1()
|
||||
m.update(repr(tuple(res.result_rows)).encode())
|
||||
hash_res = m.hexdigest()
|
||||
log.debug("Hash: %s", hash_res)
|
||||
if hash_res == digest:
|
||||
return
|
||||
raise ValueError("Hash mismatch")
|
||||
|
||||
|
||||
@pytest.mark.remote_cluster
|
||||
def test_clickhouse(remote_pg: RemotePostgres):
|
||||
"""
|
||||
Test the logical replication having ClickHouse as a client
|
||||
"""
|
||||
conn_options = remote_pg.conn_options()
|
||||
for _ in range(5):
|
||||
try:
|
||||
conn = psycopg2.connect(remote_pg.connstr())
|
||||
except psycopg2.OperationalError as perr:
|
||||
log.debug(perr)
|
||||
time.sleep(1)
|
||||
else:
|
||||
break
|
||||
raise TimeoutError
|
||||
cur = conn.cursor()
|
||||
cur.execute("DROP TABLE IF EXISTS table1")
|
||||
cur.execute("CREATE TABLE table1 (id integer primary key, column1 varchar(10));")
|
||||
cur.execute("INSERT INTO table1 (id, column1) VALUES (1, 'abc'), (2, 'def');")
|
||||
conn.commit()
|
||||
client = clickhouse_connect.get_client(host="clickhouse")
|
||||
client.command("SET allow_experimental_database_materialized_postgresql=1")
|
||||
client.command(
|
||||
"CREATE DATABASE db1_postgres ENGINE = "
|
||||
f"MaterializedPostgreSQL('{conn_options['host']}', "
|
||||
f"'{conn_options['dbname']}', "
|
||||
f"'{conn_options['user']}', '{conn_options['password']}') "
|
||||
"SETTINGS materialized_postgresql_tables_list = 'table1';"
|
||||
)
|
||||
wait_until(
|
||||
120,
|
||||
0.5,
|
||||
lambda: query_clickhouse(
|
||||
client,
|
||||
"select * from db1_postgres.table1 order by 1",
|
||||
"ee600d8f7cd05bd0b169fa81f44300a9dd10085a",
|
||||
),
|
||||
)
|
||||
cur.execute("INSERT INTO table1 (id, column1) VALUES (3, 'ghi'), (4, 'jkl');")
|
||||
conn.commit()
|
||||
wait_until(
|
||||
120,
|
||||
0.5,
|
||||
lambda: query_clickhouse(
|
||||
client,
|
||||
"select * from db1_postgres.table1 order by 1",
|
||||
"9eba2daaf7e4d7d27ac849525f68b562ab53947d",
|
||||
),
|
||||
)
|
||||
log.debug("Sleeping before final checking if Neon is still alive")
|
||||
time.sleep(3)
|
||||
cur.execute("SELECT 1")
|
||||
Reference in New Issue
Block a user