当前位置:首页 > 资讯 > 正文

离线数据同步变迁

离线数据同步变迁

随着业务的发展,系统进行了微服务的差分,导致数据越来越分散,很难进行一个完整的生命周期的数据查询,对于某些业务的需求支持变得越来越难,越来越复杂,也越来越难以进行职责划分。对着业务的发展,数据量越来越大之后,为了良好的业务支持,进行了分库分表,分库分表规则五花八门,一旦脱离了业务逻辑,很难确定某一条数据在哪个库哪个表。

基于这样的问题和情况,为了满足业务需求,很自然的就想到了使用大数据服务,将业务数据归集到一起,建立完整的数据仓库,便于数据的查询。

通过Azkaban调用Spark应用,将数据从RDS同步到Hive,运营平台和报表系统采用Presto加速访问Hive的数据。

数据同步采用Spark任务来进行,将任务打包之后,上传到Azkaban调度平台,使用Azkaban进行定时调度,完成T+1级别的数据同步工作。

数据同步代码示例:


删除Partition的代码示例:


从RDS同步数据到HIVE的代码示例:


Spark Application代码示例



  1. 自定义UDF函数

在使用的过程中,需要将表中的IP地址,解析为所在地的名称,这需要调用第三方的一个服务接口来完成,为了完成这个任务,定义了一个自定义UDF函数,进行解析。

a. 自定义UDF函数


b. 使用自定义UDF函数


  1. 数据库的配置安全性问题

刚开始数据库配置同步配置文件直接写死,但是后续发现这样存在一些安全性的问题,后来采用将数据库相关的配置组合为一个JSON字符串,将其加密之后保存到MongoDB中,在使用时进行查询解密。


  1. Spark任务脚本示例

  1. Job任务脚本实例


  1. Davinci报表 一个开源的报表平台

自从上次开始使用基于Hadoop的大数据体现方案之后,业务平稳发展,但是随着时间的推移,新的问题开始出现,主要出现的问题为两个:

  1. 数据的变更越来越频繁,基于之前SparkSQL任务的方式,只要需要对表结构进行变更,就需要重新修改Scala代码,然后重新进行任务的打包,这对于一些不熟悉代码的人来说,不太友好,而且成本也很高。
  2. 虽然使用了Presto对HIVE的数据查询进行了加速,但是所在数据量越来越大,分析要求越来越复杂,即席查询越来越多,由于集群本身资源有限,查询能力出现了显著瓶颈。

数据同步过程中,遇到了另外一个问题,即业务存在大量的分库分表的,这些分库分表的逻辑五花八门,60张左右的逻辑板,经过分库分表之后达到了惊人的5000多张,为每张表配置任务很显然不太正常,这就需要能够在进行数据同步的时候动态生成需要的表列表,把表列表配置到DataX的配置文件中去。

经过技术的调用,Apache DolphinScheduler的Python任务类型很适合做这个事情,由于公司本身使用了Apache DolphinScheduler3.0的版本,其Python任务还不支持返回数据到下游节点,但是社区最新版本已经支持该能力,因为按照已实现版本对其进行改造。

改造之后,Python节点能够将数据传递给他的下游节点,因此使用Python脚本查询获取需要进行同步的表列表,将其传递给DataX节点,完成动态表的数据同步




  1. DATAX只支持python2.x

下载支持python3.x的相关文件,替换DataX中的相同文件,即可支持python3.x使用

  1. StarRocks 高性能的MPP数据库
  2. DataX 离线数据同步
  3. Apache DolphinScheduler 任务调度工具

自从采用Apache DolphinScheduler + StarRocks数据方案以来,一切都很平稳发展;但是随着时间的推移,总会出现新的问题。

随着数据量的增多,使用方需求的增长,已经一些其他因素的影响,对目前的数据同步架构带来了一些不小的挑战,这些问题导致任务的维护和更新越来越麻烦,需要耗费大量的时间来进行,急需一种新的方式来处理。

  1. 由于等保的要求,线上RDS数据库不再支持通过公网访问,又因为StarRocks也在内网,这就导致了之前的数据同步链路彻底断裂,需要新的方案。
  2. 由于数据结构的频繁变更、服务器资源导致的任务调度异常等等原因,需要重跑数据的需求越来越多,这就导致需要不断的修改任务的调度参数(如日期),目前已经上线了10个业务的调度任务,也就是重新同步一次,就需要依次修改调度这10个任务,这期间还需要专人进行状态的跟踪,即使修改调度,压力很大。

鉴于数据链路变更,导致原本数据链路断裂的问题,通过调研之后,决定采用KAFKA进行数据的中转,在内网部署KAFKA集群,同时该集群提供公网访问地址;在RDS所在的内网机器上使用DataX将RDS数据通过公网地址写入KAFKA,在内网中通过KafkaConnector消费数据写入StarRocks。

  1. DataX写KAFKA

DataX本身并没有kafkawriter实现,这就需要我们自己实现一个KafkaWriter来支持我们的需求,同时为了数据安全,希望能够对数据进行加密。

DataX的KafkaWriter实现


进行数据加密的实现:


Kafka的公网配置

Kafka的内外网配置,只需要修改kafka/config下面的server.properties文件中的如下配置即可。


  1. 自定义的配置文件

Python脚本需要能够自动生成对应的DataX调度的配置文件和shell脚本,自动调度DataX进行任务的执行。因此经过调研,采用自定义配置文件,通过读取配置文件,动态生成对应的DataX任务脚本和调度脚本,调度任务执行。

自定义的配置文件示例1:


支持分库分表的配置文件示例2


如上的配置文件,解释如下:

KEY 说明 datasource RDS数据源 datasource.host RDS数据库的host datasource.port> RDS数据库的端口 datasource.username RDS数据库的用户名 datasource.password RDS数据库的密码 datasource.properties jdbc连接的参数,连接时拼接为?key=value&key=value table 要同步的表信息 table.database RDS数据库名称 table.table RDS中表的名称,分库分表的可以为空 table.column RDS表中要同步的字段列表,支持取别名和使用函数 table.where 同步数据的过滤条件 table.searchTableSql 查询表名称的SQL语句,用于动态分库分表 kafka kafka相关的配置 kafka.topic 数据要写入的kafka topic的名称
  1. Python调度脚本

  1. 同步日期的控制

我们在之前的任务同步中,遇到的问题便是日期的修改很麻烦,因此我们需要一个更加简单的方式来进行日期的批量更新。在我们上面的调度脚本中,包含了对日期表达式的解析,我们自定义了一种时间的表达式$[yyyyMMddHHmmss+/-N_Y] 通过解析该表达式,我们可以生成需要的任意时间,该时间表达式的含义为:

  • yyyy 表示年份
  • MM 表示月份
  • dd 表示日期
  • HH 表示24进制小时
  • mm 表示分钟
  • ss 表示秒
    • 表示当前时间加上N
    • 表示当前时间减去N
  • _Y 表示加减的单位,可以是YMdHms(年、月、日、时、分、秒)

通过对该表达式的解析,我们可以生成相对于当前之前或之后的任何格式的时间字符串,将其用于同步的where条件中,既可以完成针对时间的解析。

  1. 如何更新日期

日期目前可以计算,但是我们需要能够批量修改配置文件中的WHERE条件中的时间表达式,如我们想同步8天前的数据,我们就需要将脚本中的表达式修改为$[yyyyMMdd-8_d] ,即代表当前时间减去8天,这样我们就可以同步八天前那一天的数据,但是我们可能想同步从8天气到现在的所有数据,那么我们希望我们也能批量修改where表达式中的条件,如将=改为>=。

鉴于以上的需求,我们开发了一个新的Python脚本,通过简单的配置,即可一次修改所有脚本中的where条件中的表达式,这样,我们只需要执行两个脚本,就完成了一切,再也不需要依次修改执行10个工作流了。


  1. 通过KafkaConnector同步数据到StarRocks
    1. starrocks-connector-for-kafka的实现

StarRocks官方提供了starrocks-connector-for-kafka的实现,我们只需要在其中加入我们的数据解密逻辑即可直接使用。


解密的逻辑


b. 配置KafkaConnector任务

最新文章