Files
neon/test_runner/logical_repl/test_clickhouse.py
a-masterov da596a5162 Update the versions for ClickHouse and Debezium (#12741)
## Problem
The test for logical replication used the year-old versions of
ClickHouse and Debezium so that we may miss problems related to
up-to-date versions.
## Summary of changes
The ClickHouse version has been updated to 24.8.
The Debezium version has been updated to the latest stable one,
3.1.3Final.
Some problems with locally running the Debezium test have been fixed.

---------

Co-authored-by: Alexey Masterov <alexey.masterov@databricks.com>
Co-authored-by: Alexander Bayandin <alexander@neon.tech>
2025-07-28 13:26:33 +00:00

91 lines
2.8 KiB
Python

"""
Test the logical replication in Neon with ClickHouse as a consumer
"""
from __future__ import annotations
import hashlib
import os
import time
from typing import TYPE_CHECKING
import clickhouse_connect
import psycopg2
import pytest
from fixtures.log_helper import log
from fixtures.utils import wait_until
if TYPE_CHECKING:
from fixtures.neon_fixtures import RemotePostgres
def query_clickhouse(
client,
query: str,
digest: str,
) -> None:
"""
Run the query on the client
return answer if successful, raise an exception otherwise
"""
log.debug("Query: %s", query)
res = client.query(query)
log.debug(res.result_rows)
m = hashlib.sha1()
m.update(repr(tuple(res.result_rows)).encode())
hash_res = m.hexdigest()
log.debug("Hash: %s", hash_res)
if hash_res == digest:
return
raise ValueError("Hash mismatch")
@pytest.mark.remote_cluster
def test_clickhouse(remote_pg: RemotePostgres):
"""
Test the logical replication having ClickHouse as a client
"""
clickhouse_host = "clickhouse" if ("CI" in os.environ) else "127.0.0.1"
conn_options = remote_pg.conn_options()
conn = psycopg2.connect(remote_pg.connstr())
cur = conn.cursor()
cur.execute("DROP TABLE IF EXISTS table1")
cur.execute("CREATE TABLE table1 (id integer primary key, column1 varchar(10));")
cur.execute("INSERT INTO table1 (id, column1) VALUES (1, 'abc'), (2, 'def');")
conn.commit()
if "CLICKHOUSE_PASSWORD" not in os.environ:
raise RuntimeError("CLICKHOUSE_PASSWORD is not set")
client = clickhouse_connect.get_client(
host=clickhouse_host, password=os.environ["CLICKHOUSE_PASSWORD"]
)
client.command("SET allow_experimental_database_materialized_postgresql=1")
client.command("DROP DATABASE IF EXISTS db1_postgres")
client.command(
"CREATE DATABASE db1_postgres ENGINE = "
f"MaterializedPostgreSQL('{conn_options['host']}', "
f"'{conn_options['dbname']}', "
f"'{conn_options['user']}', '{conn_options['password']}') "
"SETTINGS materialized_postgresql_tables_list = 'table1';"
)
wait_until(
lambda: query_clickhouse(
client,
"select * from db1_postgres.table1 order by 1",
"ee600d8f7cd05bd0b169fa81f44300a9dd10085a",
),
timeout=60,
)
cur.execute("INSERT INTO table1 (id, column1) VALUES (3, 'ghi'), (4, 'jkl');")
conn.commit()
wait_until(
lambda: query_clickhouse(
client,
"select * from db1_postgres.table1 order by 1",
"9eba2daaf7e4d7d27ac849525f68b562ab53947d",
),
timeout=60,
)
log.debug("Sleeping before final checking if Neon is still alive")
time.sleep(3)
cur.execute("SELECT 1")