Hugh's Blog

clickhouse-copier 使用

 

clickhouse-copier 是官方的数据迁移工具,主要用在多个集群之间的数据迁移,当然单个集群也适用。

本地使用 Docker 搭建测试集群:clickhouse-cluster-example

集群信息

┌─cluster────┬─shard_num─┬─replica_num─┬─host_name───┐
│ ck_cluster │         1 │           1 │ clickhouse1 │
│ ck_cluster │         1 │           2 │ clickhouse2 │
│ ck_cluster │         2 │           1 │ clickhouse3 │
│ ck_cluster │         2 │           2 │ clickhouse4 │
└────────────┴───────────┴─────────────┴─────────────┘

测试数据

在其中一个节点上创建表并插入数据:

CREATE TABLE test ON cluster ck_cluster
(
    `EventDate` DateTime, 
    `CounterId` UInt32
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/test', '{replica}')
PARTITION BY toYYYYMM(EventDate)
ORDER BY (CounterId, EventDate);

CREATE TABLE test_all ON cluster ck_cluster
(
    `EventDate` DateTime, 
    `CounterId` UInt32
)
ENGINE = Distributed('ck_cluster', 'default', 'test', CounterId);

INSERT INTO test VALUES ('2020-01-01 01:01:01', 1); ...

当前数据:

# select hostName() as hostname, * from test_all order by hostname, EventDate

┌─hostname────┬───────────EventDate─┬─CounterId─┐
│ clickhouse1 │ 2020-01-01 01:01:01 │         1 │
│ clickhouse1 │ 2020-01-01 11:11:11 │         1 │
│ clickhouse1 │ 2020-01-01 12:12:12 │         1 │
│ clickhouse1 │ 2020-02-02 02:02:02 │         2 │
└─────────────┴─────────────────────┴───────────┘
┌─hostname────┬───────────EventDate─┬─CounterId─┐
│ clickhouse3 │ 2020-03-03 03:03:03 │         3 │
│ clickhouse3 │ 2020-04-04 04:04:04 │         4 │
└─────────────┴─────────────────────┴───────────┘

zookeeper 配置文件

<yandex>
    <logger>
        <level>trace</level>
        <size>100M</size>
        <count>3</count>
    </logger>

    <zookeeper>
        <node index="1">
            <host>zoo1</host>
            <port>2181</port>
        </node>
        <node index="2">
            <host>zoo2</host>
            <port>2181</port>
        </node>
        <node index="3">
            <host>zoo3</host>
            <port>2181</port>
        </node>
    </zookeeper>
</yandex>

任务描述文件

<yandex>
    <remote_servers>
        <src_cluster>
            <shard>
                <internal_replication>true</internal_replication>
                <replica>
                    <host>clickhouse1</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>clickhouse2</host>
                    <port>9000</port>
                </replica>
            </shard>
            <shard>
                <internal_replication>true</internal_replication>
                <replica>
                    <host>clickhouse3</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>clickhouse4</host>
                    <port>9000</port>
                </replica>
            </shard>
        </src_cluster>
        <src_shard>
            <shard>
                <internal_replication>true</internal_replication>
                <replica>
                    <host>clickhouse1</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>clickhouse2</host>
                    <port>9000</port>
                </replica>
            </shard>
        </src_shard>
        <dst_shard>
            <shard>
                <internal_replication>true</internal_replication>
                <replica>
                    <host>clickhouse3</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>clickhouse4</host>
                    <port>9000</port>
                </replica>
            </shard>
        </dst_shard>
    </remote_servers>
    <!-- How many simultaneously active workers are possible. If you run more workers superfluous workers will sleep. -->
    <max_workers>4</max_workers>
    <!-- Setting used to fetch (pull) data from source cluster tables -->
    <settings_pull>
        <readonly>1</readonly>
    </settings_pull>
    <!-- Setting used to insert (push) data to destination cluster tables -->
    <settings_push>
        <readonly>0</readonly>
    </settings_push>
    <!-- Common setting for fetch (pull) and insert (push) operations. Also, copier process context uses it.
         They are overlaid by <settings_pull/> and <settings_push/> respectively. -->
    <settings>
        <connect_timeout>3</connect_timeout>
        <!-- Sync insert is set forcibly, leave it here just in case. -->
        <insert_distributed_sync>1</insert_distributed_sync>
    </settings>
    <!-- Copying tasks description.
         You could specify several table task in the same task description (in the same ZooKeeper node), they will be performed
         sequentially.
    -->
    <tables>
        <!-- A table task, copies one table. -->
        <!-- 集群间复制 -->
        <table_test_copy>
            <!-- Source cluster name (from <remote_servers/> section) and tables in it that should be copied -->
            <cluster_pull>src_cluster</cluster_pull>
            <database_pull>default</database_pull>
            <table_pull>test</table_pull>

            <!-- Destination cluster name and tables in which the data should be inserted -->
            <cluster_push>src_cluster</cluster_push>
            <database_push>default</database_push>
            <table_push>test_copy</table_push>
            <!-- Engine of destination tables.
                 If destination tables have not be created, workers create them using columns definition from source tables and engine
                 definition from here.
                 NOTE: If the first worker starts insert data and detects that destination partition is not empty then the partition will
                 be dropped and refilled, take it into account if you already have some data in destination tables. You could directly
                 specify partitions that should be copied in <enabled_partitions/>, they should be in quoted format like partition column of
                 system.parts table.
            -->
            <!-- 目标集群没有表的情况下,会根据下面的配置来创建表 `create table if not exists` -->
            <engine>
            ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/test_copy', '{replica}')
            PARTITION BY toYYYYMM(EventDate)
            ORDER BY (CounterId, EventDate)
            </engine>
            <!-- Sharding key used to insert data to destination cluster -->
            <sharding_key>rand()</sharding_key>
            <!-- Optional expression that filter data while pull them from source servers -->
            <!-- This section specifies partitions that should be copied, other partition will be ignored.
                 Partition names should have the same format as
                 partition column of system.parts table (i.e. a quoted text).
                 Since partition key of source and destination cluster could be different,
                 these partition names specify destination partitions.
                 NOTE: In spite of this section is optional (if it is not specified, all partitions will be copied),
                 it is strictly recommended to specify them explicitly.
                 If you already have some ready paritions on destination cluster they
                 will be removed at the start of the copying since they will be interpeted
                 as unfinished data from the previous copying!!!
            -->
            <!-- <enabled_partitions>
                <partition>'2019-10-13'</partition>
            </enabled_partitions> -->
        </table_test_copy>

        <!-- 分片间复制 -->
        <table_test_shard>
            <cluster_pull>src_shard</cluster_pull>
            <database_pull>default</database_pull>
            <table_pull>test</table_pull>

            <cluster_push>dst_shard</cluster_push>
            <database_push>default</database_push>
            <table_push>test_shard</table_push>

            <engine>
            ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/test_shard', '{replica}')
            PARTITION BY toYYYYMM(EventDate)
            ORDER BY (CounterId, EventDate)
            </engine>

            <sharding_key>rand()</sharding_key>
        </table_test_shard>
    </tables>
</yandex>

创建 zookeeper 任务

# docker exec -it clickhouse-cluster-example_zoo1_1 bash

./bin/zkCli.sh create /clickhouse/copytasks ""
./bin/zkCli.sh create /clickhouse/copytasks/test ""
./bin/zkCli.sh create /clickhouse/copytasks/test/description "`cat /task.xml`"

# 查看任务信息
./bin/zkCli.sh get /clickhouse/copytasks/test/description

# 更新任务信息
./bin/zkCli.sh set /clickhouse/copytasks/test/description "`cat /task.xml`"

执行任务

# docker exec -it clickhouse-cluster-example_clickhouse1_1 bash

clickhouse-copier --daemon --config /zookeeper.xml --task-path /clickhouse/copytasks/test --base-dir /tmp/copylogs

任务结果

# select hostName() as hostname, * from test_copy_all order by hostname, EventDate

┌─hostname────┬───────────EventDate─┬─CounterId─┐
│ clickhouse1 │ 2020-01-01 12:12:12 │         1 │
│ clickhouse1 │ 2020-02-02 02:02:02 │         2 │
└─────────────┴─────────────────────┴───────────┘
┌─hostname────┬───────────EventDate─┬─CounterId─┐
│ clickhouse4 │ 2020-01-01 01:01:01 │         1 │
│ clickhouse4 │ 2020-01-01 11:11:11 │         1 │
│ clickhouse4 │ 2020-03-03 03:03:03 │         3 │
│ clickhouse4 │ 2020-04-04 04:04:04 │         4 │
└─────────────┴─────────────────────┴───────────┘

# select hostName() as hostname, * from test_shard order by hostname, EventDate

┌─hostname────┬───────────EventDate─┬─CounterId─┐
│ clickhouse3 │ 2020-01-01 01:01:01 │         1 │
│ clickhouse3 │ 2020-01-01 11:11:11 │         1 │
│ clickhouse3 │ 2020-01-01 12:12:12 │         1 │
│ clickhouse3 │ 2020-02-02 02:02:02 │         2 │
└─────────────┴─────────────────────┴───────────┘

注意事项

官方的文档并没有详细的说明,下面是在使用过程中碰到的一些问题:

  • 原始表建表语句需要指定 partition by,在测试的时候由于没有指定,导致任务失败;

  • 对于物化视图复制,table_pull 表名需要添加 .inner. 前缀,不然任务会因找不到分区信息而失败,另外目标集群的物化视图需要提前创建好,不然由任务生成的表为普通表;

  • 复制过程使用的是 insert into 方式来批量写入数据,所以会触发物化视图的数据更新(如果数据量大的话,会很慢);

  • 在集群间复制(多分片),数据会打乱;

  • 复制任务会在 zookeeper 中记录状态,但是表中已经处理过的 partition 下次任务不会再处理(即使有新数据)。


参考

clickhouse-copier

使用clickhouse-copier在ClickHouse表间快速同步数据