# flink 实战项目 1
# 项目概述
本项目旨在通过 Flink 实现将实时数据从其他数据库实时同步至 StarRocks 数据库,以实现数据的实时更新和分析,加快查询报表的查询速度和效率。
# 项目架构
数据源:SqlServer
数据同步工具:Apache Flink (1.14.0)
数据同步工具的可视化操作工具:DINKY (0.7.2)
数据目标:StarRocks 数据库(3.1.2)
单机部署的方式:
- 确定服务器支持 avx2 - cat /proc/cpuinfo | grep avx2 - 有打印内容说明支持 
- 上传 StarRocks 安装包安装 - 详情可以看 StarRocks 数据库的使用章节 
- 官网下载 flink(这里的版本的是 1.16.1) 上传服务器并解压, 下面是解压后的结构 - [root flink-1.14.0]# ll - 总用量 508- drwxrwxrwx. 2 1000 1000 4096 11月 7 2023 bin - drwxrwxrwx. 2 1000 1000 4096 9月 22 2021 conf - drwxrwxrwx. 7 1000 1000 4096 9月 22 2021 examples - drwxrwxrwx. 2 1000 1000 4096 11月 21 10:45 lib - -rwxrwxrwx. 1 1000 1000 11357 10月 29 2019 LICENSE - drwxrwxrwx. 2 1000 1000 4096 9月 22 2021 licenses - drwxrwxrwx. 2 1000 1000 16384 5月 8 16:06 log - -rwxrwxrwx. 1 1000 1000 458497 9月 22 2021 NOTICE - drwxrwxrwx. 3 1000 1000 4096 9月 22 2021 opt - drwxrwxrwx. 10 1000 1000 4096 9月 22 2021 plugins - -rwxrwxrwx. 1 1000 1000 1309 1月 30 2021 README.txt - [root flink-1.14.0]# pwd - /home/flink-1.14.0 
- 下载 dinky 上传服务器并解压,下面是解压后的结构 - [root@sanzi-bigdata-server dlink-release-0.7.2]# ll - 总用量 52- -rwxrwxrwx. 1 root root 2973 11月 24 12:04 auto.sh - drwxrwxrwx. 3 root root 4096 4月 1 17:55 config - drwxrwxrwx. 2 root root 4096 11月 24 12:04 jar - drwxrwxrwx. 2 root root 20480 11月 24 12:07 lib - drwxrwxrwx. 7 root root 4096 11月 24 12:05 plugins - drwxrwxrwx. 3 root root 4096 11月 24 12:05 sql - drwxrwxrwx. 3 root root 4096 11月 24 12:05 tmp - [root@sanzi-bigdata-server dlink-release-0.7.2]# pwd - /home/dlink-release-0.7.2 
- 安装 mysql(由于 dinky 需要依赖 mysql)可以安装 linux 或者 windows 都可以,我这边是直接压缩包安装到 windows 服务器 
- 修改 flink 配置,需要将 dlink 的依赖添加到 flink lib 下,还有 starrocks 和 sqlserver 提供的 cdc 包:flink-connector-starrocks-1.2.6_flink-1.15.jar、flink-sql-connector-sqlserver-cdc-2.3.0.jar。官网 - cp -r flink-connector-starrocks-1.2.6_flink-1.15.jar /home/flink-1.14.0/lib - cp -r flink-sql-connector-sqlserver-cdc-2.3.0.jar /home/flink-1.14.0/lib - cp -r /home/dlink-release-0.7.2/plugins/flink1.14 /home/flink-1.14.0/lib - #结构:- [root@sanzi-bigdata-server lib]# pwd - /home/flink-1.14.0/lib - [root@sanzi-bigdata-server lib]# ll - 总用量 237364- -rwxrwxrwx. 1 root root 21672 11月 2 2023 dlink-catalog-mysql-1.14-0.7.2.jar - -rwxrwxrwx. 1 root root 165868 11月 2 2023 dlink-client-1.14-0.7.2.jar - -rwxrwxrwx. 1 root root 16245 11月 2 2023 dlink-client-base-0.7.2.jar - -rwxrwxrwx. 1 root root 14853272 4月 28 2023 flink-connector-starrocks-1.2.6_flink-1.14_2.12.jar - -rwxrwxrwx. 1 1000 1000 85588 9月 22 2021 flink-csv-1.14.0.jar - -rwxrwxrwx. 1 1000 1000 136045730 9月 22 2021 flink-dist_2.12-1.14.0.jar - -rwxrwxrwx. 1 1000 1000 153148 9月 22 2021 flink-json-1.14.0.jar - -rwxrwxrwx. 1 root root 2424144 4月 28 2023 flink-shaded-guava-18.0-13.0.jar - -rwxrwxrwx. 1 1000 1000 7709731 9月 1 2021 flink-shaded-zookeeper-3.4.14.jar - -rwxrwxrwx. 1 root root 16556476 5月 4 2023 flink-sql-connector-sqlserver-cdc-2.3.0.jar - -rwxrwxrwx. 1 1000 1000 39620756 9月 22 2021 flink-table_2.12-1.14.0.jar - -rwxrwxrwx. 1 1000 1000 206756 9月 1 2021 log4j-1.2-api-2.14.1.jar - -rwxrwxrwx. 1 1000 1000 300365 9月 1 2021 log4j-api-2.14.1.jar - -rwxrwxrwx. 1 1000 1000 1745700 9月 1 2021 log4j-core-2.14.1.jar - -rwxrwxrwx. 1 1000 1000 23625 9月 1 2021 log4j-slf4j-impl-2.14.1.jar - #在将 flink lib 目录下的除 log*.jr 全部拷贝回来- cp -r /home/flink-1.14.0/lib/flink*.jar /home/dlink-release-0.7.2/plugins/flink1.14/ - #结构:- [root@sanzi-bigdata-server flink1.14]# pwd - /home/dlink-release-0.7.2/plugins/flink1.14 - [root@sanzi-bigdata-server flink1.14]# ll - 总用量 235052- -rwxrwxrwx. 1 root root 21673 11月 24 12:04 dlink-catalog-mysql-1.14-0.7.2.jar - -rwxrwxrwx. 1 root root 165868 11月 24 12:04 dlink-client-1.14-0.7.2.jar - -rwxrwxrwx. 1 root root 51828 11月 24 12:04 dlink-connector-jdbc-1.14-0.7.2.jar - -rwxrwxrwx. 1 root root 14853115 11月 24 12:04 flink-connector-starrocks-1.2.6_flink-1.15.jar - -rwxrwxrwx. 1 root root 85588 11月 24 12:04 flink-csv-1.14.0.jar - -rwxrwxrwx. 1 root root 136045730 11月 24 12:05 flink-dist_2.12-1.14.0.jar - -rwxrwxrwx. 1 root root 153148 11月 24 12:04 flink-json-1.14.0.jar - -rwxrwxrwx. 1 root root 2424144 11月 24 12:04 flink-shaded-guava-18.0-13.0.jar - -rwxrwxrwx. 1 root root 7709731 11月 24 12:04 flink-shaded-zookeeper-3.4.14.jar - -rwxrwxrwx. 1 root root 16556476 11月 24 12:05 flink-sql-connector-sqlserver-cdc-2.3.0.jar - -rwxrwxrwx. 1 root root 39620756 11月 24 12:05 flink-table_2.12-1.14.0.jar 
- 修改 fink.yaml 配置文件 - ################################################################################- # Licensed to the Apache Software Foundation (ASF) under one- # or more contributor license agreements. See the NOTICE file- # distributed with this work for additional information- # regarding copyright ownership. The ASF licenses this file- # to you under the Apache License, Version 2.0 (the- # "License"); you may not use this file except in compliance- # with the License. You may obtain a copy of the License at- #- # http://www.apache.org/licenses/LICENSE-2.0- #- # Unless required by applicable law or agreed to in writing, software- # distributed under the License is distributed on an "AS IS" BASIS,- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.- # See the License for the specific language governing permissions and- # limitations under the License.- ################################################################################- #==============================================================================- # Common- #==============================================================================- # The external address of the host on which the JobManager runs and can be- # reached by the TaskManagers and any clients which want to connect. This setting- # is only used in Standalone mode and may be overwritten on the JobManager side- # by specifying the --host <hostname> parameter of the bin/jobmanager.sh executable.- # In high availability mode, if you use the bin/start-cluster.sh script and setup- # the conf/masters file, this will be taken care of automatically. Yarn- # automatically configure the host name based on the hostname of the node where the- # JobManager runs.- jobmanager.rpc.address: localhost - # The RPC port where the JobManager is reachable.- jobmanager.rpc.port: 6123 - # The total process memory size for the JobManager.- #- # Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead.- jobmanager.memory.process.size: 14g - # The total process memory size for the TaskManager.- #- # Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.- taskmanager.memory.process.size: 13g - # To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'.- # It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory.- #- # taskmanager.memory.flink.size: 1280m- #JVM 元空间- taskmanager.memory.jvm-metaspace.size: 512m - # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.- taskmanager.numberOfTaskSlots: 150 - # The parallelism used for programs that did not specify and other parallelism.- parallelism.default: 1 - taskmanager.memory.managed.fraction: 0.05 - taskmanager.memory.network.fraction : 0.05 - task.cancellation.timeout: 0 - # The default file system scheme and authority.- #- # By default file paths without scheme are interpreted relative to the local- # root file system 'file:///'. Use this to override the default and interpret- # relative paths relative to a different file system,- # for example 'hdfs://mynamenode:12345'- #- # fs.default-scheme- #==============================================================================- # High Availability- #==============================================================================- # The high-availability mode. Possible options are 'NONE' or 'zookeeper'.- #- # high-availability: zookeeper- # The path where metadata for master recovery is persisted. While ZooKeeper stores- # the small ground truth for checkpoint and leader election, this location stores- # the larger objects, like persisted dataflow graphs.- #- # Must be a durable file system that is accessible from all nodes- # (like HDFS, S3, Ceph, nfs, ...)- #- # high-availability.storageDir: hdfs:///flink/ha/- # The list of ZooKeeper quorum peers that coordinate the high-availability- # setup. This must be a list of the form:- # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)- #- # high-availability.zookeeper.quorum: localhost:2181- # ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes- # It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE)- # The default value is "open" and it can be changed to "creator" if ZK security is enabled- #- # high-availability.zookeeper.client.acl: open- #==============================================================================- # Fault tolerance and checkpointing- #==============================================================================- # The backend that will be used to store operator state checkpoints if- # checkpointing is enabled. Checkpointing is enabled when execution.checkpointing.interval > 0.- #- # Execution checkpointing related parameters. Please refer to CheckpointConfig and ExecutionCheckpointingOptions for more details.- #- # execution.checkpointing.interval: 3min- # execution.checkpointing.externalized-checkpoint-retention: [DELETE_ON_CANCELLATION, RETAIN_ON_CANCELLATION]- # execution.checkpointing.max-concurrent-checkpoints: 1- # execution.checkpointing.min-pause: 0- # execution.checkpointing.mode: [EXACTLY_ONCE, AT_LEAST_ONCE]- # execution.checkpointing.timeout: 10min- # execution.checkpointing.tolerable-failed-checkpoints: 0- # execution.checkpointing.unaligned: false- #- # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the- # <class-name-of-factory>.- #- state.backend: filesystem - #检查点模式(精确一次与至少一次)- execution.checkpointing.mode: EXACTLY_ONCE - # “DELETE_ON_CANCELLATION”:仅当拥有作业失败时,才会保留检查点状态。如果作业被取消,则会将其删除。- # “RETAIN_ON_CANCELLATION”:当所属作业取消或失败时,将保留检查点状态。- # “NO_EXTERNALIZED_CHECKPOINTS”:禁用外部化检查点。- execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION - #检查点在被丢弃之前可能需要的最长时间。- #如果一次 Checkpoint 超过一定时间仍未完成,直接将其终止,以免其占用太多资源- execution.checkpointing.timeout: 2min - #允许的检查点连续失败数。如果设置为 0,则意味着我们不容忍任何检查点失败。checkpoint 失败即任务失败- execution.checkpointing.tolerable-failed-checkpoints: 100 - #获取定期计划检查点的时间间隔- #此设置定义基本间隔。设置 execution.checkpointing.max-concurrent-checkpoints 和 execution.checkpointing.min-pause 可延迟检查点触发- execution.checkpointing.interval: 120min - #可能同时进行的最大检查点尝试次数。如果此值为 n,则在 n 次检查点尝试当前正在进行时,将不会触发任何检查点。要触发下一个检查点,需要完成一次检查点尝试或使其过期。- execution.checkpointing.max-concurrent-checkpoints: 1 - #检查点尝试之间的最小停顿- execution.checkpointing.min-paus: 3min - #告诉我们是否应该对状态快照数据使用压缩- #execution.checkpointing.snapshot-compression: true- #要保留的已完成检查点的最大数量。- state.checkpoints.num-retained: 20 - #state.backend: filesystem- # Directory for checkpoints filesystem, when using any of the default bundled- # state backends.- #- # state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints- state.checkpoints.dir: file:///home/flink_points/flink-checkpoints - # Default target directory for savepoints, optional.- #- # state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints- state.savepoints.dir: file:///home/flink_points/flink-savepoints - # Flag to enable/disable incremental checkpoints for backends that- # support incremental checkpoints (like the RocksDB state backend).- #- # state.backend.incremental: false- #开启增量检查点- state.backend.incremental: true - #增大 Block 缓存- #整个 RocksDB 共享一个 block cache,读数据时内存 cache 大小,该参数越大读数据时的缓存命中率越高,默认大小为 8MB,建议设置到 64~256MB。- state.backend.rocksdb.block.cache-size:64mb - state.backend.latency-track.keyed-state-enabled: true - # Flink Task 本地状态恢复- state.backend.local-recovery: true - #write buffer 和 level 阈值大小- state.backend.rocksdb.writebuffer.size: 128m - state.backend.rocksdb.compaction.level.max-size-level-base: 320m - #write buffer 数量- state.backend.rocksdb.writebuffer.count: 5 - #- state.backend.rocksdb.thread.num: 4 - # The failover strategy, i.e., how the job computation recovers from task failures.- # Only restart tasks that may have been affected by the task failure, which typically includes- # downstream tasks and potentially upstream tasks if their produced data is no longer available for consumption.- jobmanager.execution.failover-strategy: region - #发送方和接收方的心跳请求和接收超时- heartbeat.timeout: 500000 - #开启火焰图功能- rest.flamegraph.enabled: true - #允许在所有的 TaskManager 上均匀地分布任务- #cluster.evenly-spread-out-slots: true- #==============================================================================- # Rest & web frontend- #==============================================================================- # The port to which the REST client connects to. If rest.bind-port has- # not been specified, then the server will bind to this port as well.- #- #rest.port: 8081- # The address to which the REST client will connect to- #- #rest.address: 0.0.0.0- # Port range for the REST and web server to bind to.- #- #rest.bind-port: 8080-8090- # The address that the REST & web server binds to- #- #rest.bind-address: 0.0.0.0- # Flag to specify whether job submission is enabled from the web-based- # runtime monitor. Uncomment to disable.- web.submit.enable: false - # Flag to specify whether job cancellation is enabled from the web-based- # runtime monitor. Uncomment to disable.- web.cancel.enable: false - #==============================================================================- # Advanced- #==============================================================================- # Override the directories for temporary files. If not specified, the- # system-specific Java temporary directory (java.io.tmpdir property) is taken.- #- # For framework setups on Yarn, Flink will automatically pick up the- # containers' temp directories without any need for configuration.- #- # Add a delimited list for multiple directories, using the system directory- # delimiter (colon ':' on unix) or a comma, e.g.:- # /data1/tmp:/data2/tmp:/data3/tmp- #- # Note: Each directory entry is read from and written to by a different I/O- # thread. You can include the same directory multiple times in order to create- # multiple I/O threads against that directory. This is for example relevant for- # high-throughput RAIDs.- #- # io.tmp.dirs: /tmp- # The classloading resolve order. Possible values are 'child-first' (Flink's default)- # and 'parent-first' (Java's default).- #- # Child first classloading allows users to use different dependency/library- # versions in their application than those in the classpath. Switching back- # to 'parent-first' may help with debugging dependency issues.- #- # classloader.resolve-order: child-first- # The amount of memory going to the network stack. These numbers usually need- # no tuning. Adjusting them may be necessary in case of an "Insufficient number- # of network buffers" error. The default min is 64MB, the default max is 1GB.- #- # taskmanager.memory.network.fraction: 0.1- # taskmanager.memory.network.min: 64mb- # taskmanager.memory.network.max: 1gb- #==============================================================================- # Flink Cluster Security Configuration- #==============================================================================- # Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors -- # may be enabled in four steps:- # 1. configure the local krb5.conf file- # 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)- # 3. make the credentials available to various JAAS login contexts- # 4. configure the connector to use JAAS/SASL- # The below configure how Kerberos credentials are provided. A keytab will be used instead of- # a ticket cache if the keytab path and principal are set.- # security.kerberos.login.use-ticket-cache: true- # security.kerberos.login.keytab: /path/to/kerberos/keytab- # security.kerberos.login.principal: flink-user- # The configuration below defines which JAAS login contexts- # security.kerberos.login.contexts: Client,KafkaClient- #==============================================================================- # ZK Security Configuration- #==============================================================================- # Below configurations are applicable if ZK ensemble is configured for security- # Override below configuration to provide custom ZK service name if configured- # zookeeper.sasl.service-name: zookeeper- # The configuration below must match one of the values set in "security.kerberos.login.contexts"- # zookeeper.sasl.login-context-name: Client- #==============================================================================- # HistoryServer- #==============================================================================- # The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)- # Directory to upload completed jobs to. Add this directory to the list of- # monitored directories of the HistoryServer as well (see below).- #jobmanager.archive.fs.dir: hdfs:///completed-jobs/- # The address under which the web-based HistoryServer listens.- #historyserver.web.address: 0.0.0.0- # The port under which the web-based HistoryServer listens.- #historyserver.web.port: 8082- # Comma separated list of directories to monitor for completed jobs.- #historyserver.archive.fs.dir: hdfs:///completed-jobs/- # Interval in milliseconds for refreshing the monitored directories.- #historyserver.archive.fs.refresh-interval: 10000
- 启动 flink - ./start-cluster.sh ![]() 
- 修改 dlink 配置 - spring: - datasource: - url: jdbc:mysql://${MYSQL_ADDR:127.0.0.1:3306}/${MYSQL_DATABASE:dinky}?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true - username: ${MYSQL_USERNAME:dinky} - password: ${MYSQL_PASSWORD:123456} - driver-class-name: com.mysql.cj.jdbc.Driver - application: - name: dlink - mvc: - pathmatch: - matching-strategy: ant_path_matcher - format: - date: yyyy-MM-dd HH:mm:ss - #json 格式化全局配置- jackson: - time-zone: GMT+8 - date-format: yyyy-MM-dd HH:mm:ss - main: - allow-circular-references: true - # 默认使用内存缓存元数据信息,- # dlink 支持 redis 缓存,如有需要请把 simple 改为 redis,并打开下面的 redis 连接配置- # 子配置项可以按需要打开或自定义配置- cache: - type: simple - ## 如果 type 配置为 redis,则该项可按需配置- # redis:- ## 是否缓存空值,保存默认即可- # cache-null-values: false- ## 缓存过期时间,24 小时- # time-to-live: 86400- # flyway:- # enabled: false- # clean-disabled: true- ## baseline-on-migrate: true- # table: dlink_schema_history- # Redis 配置- #sa-token 如需依赖 redis,请打开 redis 配置和 pom.xml、dlink-admin/pom.xml 中依赖- # redis:- # host: localhost- # port: 6379- # password:- # database: 10- # jedis:- # pool:- # # 连接池最大连接数(使用负值表示没有限制)- # max-active: 50- # # 连接池最大阻塞等待时间(使用负值表示没有限制)- # max-wait: 3000- # # 连接池中的最大空闲连接数- # max-idle: 20- # # 连接池中的最小空闲连接数- # min-idle: 5- # # 连接超时时间(毫秒)- # timeout: 5000- servlet: - multipart: - max-file-size: 524288000 - max-request-size: 524288000 - enabled: true - server: - port: 8888 - mybatis-plus: - mapper-locations: classpath*:/mapper/*Mapper.xml - #实体扫描,多个 package 用逗号或者分号分隔- typeAliasesPackage: com.dlink.model,com.dlink.detection.model,com.dlink.detection.vo - global-config: - db-config: - id-type: auto - configuration: - ##### mybatis-plus 打印完整 sql (只适用于开发环境)- log-impl: org.apache.ibatis.logging.stdout.StdOutImpl - #log-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl- # Sa-Token 配置- sa-token: - # token 名称 (同时也是 cookie 名称)- token-name: satoken - # token 有效期,单位 s 默认 10 小时,-1 代表永不过期- timeout: 36000 - # token 临时有效期 (指定时间内无操作就视为 token 过期) 单位:秒- activity-timeout: -1 - # 是否允许同一账号并发登录 (为 true 时允许一起登录,为 false 时新登录挤掉旧登录)- is-concurrent: false - # 在多人登录同一账号时,是否共用一个 token (为 true 时所有登录共用一个 token, 为 false 时每次登录新建一个 token)- is-share: true - # token 风格- token-style: uuid - # 是否输出操作日志- is-log: false - knife4j: - enable: true - dinky: - dolphinscheduler: - enabled: false - # dolphinscheduler 地址- url: http://127.0.0.1:5173/dolphinscheduler - # dolphinscheduler 生成的 token- token: ad54eb8f57fadea95f52763517978b26 - # dolphinscheduler 中指定的项目名不区分大小写- project-name: Dinky - # Dolphinscheduler DinkyTask Address- address: http://127.0.0.1:8888 - # python udf 需要用到的 python 执行环境- python: - path: python 
- 启动 dlink,并指定 flink 的版本 - sh auto.sh start 1.14 
- 访问页面确定是否启动 ![]() - 由于很多任务连接的数据库是一样的,所以我们可以在注册中心配置通用的数据库连接,在启动任务开启全局变量替换 
- 需要在注册中心注册 flink 实例 ![]() 
- 如果需要将 sqlserver 的数据实时对接到 starrocks,那么 sqlserver 就需要记录变更情况,sqlserver 记录变更情况需要为对应的数据库和表开启 CDC - sqlserver cdc开启 - 开启cdc日志 注意日志保留时间,默认为3天 需要根据flink保存点和检查点的保留时间来做调整- 开启方式: - -- 查看库表是否启动 CDC- -- 查看数据库是否启用 cdc- SELECT name,is_cdc_enabled FROM sys.databases WHERE is_cdc_enabled = 1; - -- 查看当前数据库表是否启用 cdc- SELECT name,is_tracked_by_cdc FROM sys.tables WHERE is_tracked_by_cdc = 1; - -- 数据库启用和禁用 CDC- -- 对当前数据库启用 CDC- USE MyDB- GO - EXECUTE sys.sp_cdc_enable_db; - GO - -- 对当前数据库禁用 CDC- USE MyDB- GO - EXEC sys.sp_cdc_disable_db - GO - -- 数据库表启用和禁用 CDC- -- 启用- USE MyDB- GO - EXEC sys.sp_cdc_enable_table - @source_schema = N'dbo', - @source_name = N'MyTable', - @role_name = NULL - GO - -- 禁用- USE MyDB- GO - EXEC sys.sp_cdc_disable_table - @source_schema = N'dbo', - @source_name = N'MyTable', - @capture_instance = N'dbo_MyTable' - GO - -- 批量启动表- begin- declare @temp varchar(100) - -- 申明游标为表名- declare tb_cursor cursor - for (select name from sys.tables WHERE is_tracked_by_cdc=0 and schema_id=1 and name in('bankManage','dist')) - -- 打开游标- open tb_cursor- -- 开始循环游标变量- fetch next from tb_cursor into @temp - -- 返回 fetch 语句执行的最后游标的状态- while @@fetch_status=0 - begin- EXEC sys.sp_cdc_enable_table - @source_schema = 'dbo', - @source_name = @temp, - @role_name = NULL - -- 转到下一个游标- fetch next from tb_cursor into @temp - end- -- 关闭游标- close tb_cursor- -- 释放游标- deallocate tb_cursor- end ; - -- 查看表 CDC 功能是否启用- SELECT name , - is_tracked_by_cdc ,- CASE WHEN is_tracked_by_cdc = 0 THEN 'CDC功能禁用' - ELSE 'CDC功能启用' - END 描述- FROM sys.tables; - -- 可能出现权限错误问题- -- 开启授权- ALTER AUTHORIZATION ON DATABASE::[MyDB] TO [sa] - 需开启 SQL Server Agent 服务- -- 开启 cdc 的表如果有 text 类型的字段保存出错可能是保存的内容太大- -- 通过以下命令调整 CDC 处理的最大数据字节- exec sp_configure 'show advanced options', 1 ; - reconfigure; - -- -1 表示不限制- exec sp_configure 'max text repl size', -1; - reconfigure; 
- 使用 StarRocks 提供的 - smt工具可以很方便的生成对应的 flinkSql 和 starrocks 的建表语句,建好 starrocks 的表后直接新建需要对接数据表的任务- CREATE TABLE `table1_src` ( - `id` INT NOT NULL, - `sno` INT NULL, - `title` STRING NULL, - `distNo` STRING NULL, - `distName` STRING NULL, - `landNo` STRING NULL, - `landName` STRING NULL, - `description` STRING NULL, - `address` STRING NULL, - `area` DECIMAL(19, 4) NULL, - PRIMARY KEY(`id`) - NOT ENFORCED- ) with ( - 'connector' = 'sqlserver-cdc', - 'username' = '${ss_username}', - 'password' = '${ss_password}', - -- 这里的配置是防止在读取 sqlserver 的时候不要锁表 可以去 debezium 官网查看其他配置参数说明- 'debezium.snapshot.lock.timeout.ms' = '-1', - 'debezium.snapshot.isolation.mode' = 'read_committed', - 'database-name' = 'database1', - 'table-name' = 'table1', - 'schema-name' = 'dbo', - 'hostname' = '${ss_hostname}', - 'port' = '${ss_port}' - ); - CREATE TABLE `table1_sink` ( - `id` INT NOT NULL, - `sno` INT NULL, - `title` STRING NULL, - `distNo` STRING NULL, - `distName` STRING NULL, - `landNo` STRING NULL, - `landName` STRING NULL, - `description` STRING NULL, - `address` STRING NULL, - `area` DECIMAL(19, 4) NULL, - PRIMARY KEY(`id`) - NOT ENFORCED- ) with ( - 'username' = '${ss_sr_username}', - 'password' = '${ss_sr_password}', - 'sink.properties.strip_outer_array' = 'true', - 'load-url' = '${ss_sr_load-url}', - 'sink.properties.format' = 'json', - 'connector' = 'starrocks', - 'database-name' = 'table1', - 'sink.properties.ignore_json_size' = 'true', - 'sink.max-retries' = '10', - 'jdbc-url' = '${ss_sr_jdbc-url}', - 'sink.buffer-flush.interval-ms' = '1000', - 'table-name' = 'database1' - ); - INSERT INTO `table1_sink` SELECT * FROM `table1_src`; 
- 可以在 starrocks 中对应的表的数据有变化,在 sqlserver 新增修改删除数据,对应的 starrocks 也会变化 
- 新建一个 springboot 项目 map-data(具体内容可以在博客找到),只提供通用查询的功能 因为 starrocks 也是 mysql,所以用的 mysql 驱动 - 表结构: - select * from select_config - select_name select_sql - assets_zy select * from databases1.table1; - 接口: - getData (String selectName, String queryMap) 只需要传对应的查询名称,和查询参数就可以对应的通过 starrocks 查询数据 
# 参考资料
flink-cdc



