mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-23 06:09:59 +00:00
## Problem The test for logical replication used the year-old versions of ClickHouse and Debezium so that we may miss problems related to up-to-date versions. ## Summary of changes The ClickHouse version has been updated to 24.8. The Debezium version has been updated to the latest stable one, 3.1.3Final. Some problems with locally running the Debezium test have been fixed. --------- Co-authored-by: Alexey Masterov <alexey.masterov@databricks.com> Co-authored-by: Alexander Bayandin <alexander@neon.tech>
91 lines
2.8 KiB
Python
91 lines
2.8 KiB
Python
"""
|
|
Test the logical replication in Neon with ClickHouse as a consumer
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import os
|
|
import time
|
|
from typing import TYPE_CHECKING
|
|
|
|
import clickhouse_connect
|
|
import psycopg2
|
|
import pytest
|
|
from fixtures.log_helper import log
|
|
from fixtures.utils import wait_until
|
|
|
|
if TYPE_CHECKING:
|
|
from fixtures.neon_fixtures import RemotePostgres
|
|
|
|
|
|
def query_clickhouse(
|
|
client,
|
|
query: str,
|
|
digest: str,
|
|
) -> None:
|
|
"""
|
|
Run the query on the client
|
|
return answer if successful, raise an exception otherwise
|
|
"""
|
|
log.debug("Query: %s", query)
|
|
res = client.query(query)
|
|
log.debug(res.result_rows)
|
|
m = hashlib.sha1()
|
|
m.update(repr(tuple(res.result_rows)).encode())
|
|
hash_res = m.hexdigest()
|
|
log.debug("Hash: %s", hash_res)
|
|
if hash_res == digest:
|
|
return
|
|
raise ValueError("Hash mismatch")
|
|
|
|
|
|
@pytest.mark.remote_cluster
|
|
def test_clickhouse(remote_pg: RemotePostgres):
|
|
"""
|
|
Test the logical replication having ClickHouse as a client
|
|
"""
|
|
clickhouse_host = "clickhouse" if ("CI" in os.environ) else "127.0.0.1"
|
|
conn_options = remote_pg.conn_options()
|
|
conn = psycopg2.connect(remote_pg.connstr())
|
|
cur = conn.cursor()
|
|
cur.execute("DROP TABLE IF EXISTS table1")
|
|
cur.execute("CREATE TABLE table1 (id integer primary key, column1 varchar(10));")
|
|
cur.execute("INSERT INTO table1 (id, column1) VALUES (1, 'abc'), (2, 'def');")
|
|
conn.commit()
|
|
if "CLICKHOUSE_PASSWORD" not in os.environ:
|
|
raise RuntimeError("CLICKHOUSE_PASSWORD is not set")
|
|
client = clickhouse_connect.get_client(
|
|
host=clickhouse_host, password=os.environ["CLICKHOUSE_PASSWORD"]
|
|
)
|
|
client.command("SET allow_experimental_database_materialized_postgresql=1")
|
|
client.command("DROP DATABASE IF EXISTS db1_postgres")
|
|
client.command(
|
|
"CREATE DATABASE db1_postgres ENGINE = "
|
|
f"MaterializedPostgreSQL('{conn_options['host']}', "
|
|
f"'{conn_options['dbname']}', "
|
|
f"'{conn_options['user']}', '{conn_options['password']}') "
|
|
"SETTINGS materialized_postgresql_tables_list = 'table1';"
|
|
)
|
|
wait_until(
|
|
lambda: query_clickhouse(
|
|
client,
|
|
"select * from db1_postgres.table1 order by 1",
|
|
"ee600d8f7cd05bd0b169fa81f44300a9dd10085a",
|
|
),
|
|
timeout=60,
|
|
)
|
|
cur.execute("INSERT INTO table1 (id, column1) VALUES (3, 'ghi'), (4, 'jkl');")
|
|
conn.commit()
|
|
wait_until(
|
|
lambda: query_clickhouse(
|
|
client,
|
|
"select * from db1_postgres.table1 order by 1",
|
|
"9eba2daaf7e4d7d27ac849525f68b562ab53947d",
|
|
),
|
|
timeout=60,
|
|
)
|
|
log.debug("Sleeping before final checking if Neon is still alive")
|
|
time.sleep(3)
|
|
cur.execute("SELECT 1")
|