随着业务的发展,系统进行了微服务的差分,导致数据越来越分散,很难进行一个完整的生命周期的数据查询,对于某些业务的需求支持变得越来越难,越来越复杂,也越来越难以进行职责划分。对着业务的发展,数据量越来越大之后,为了良好的业务支持,进行了分库分表,分库分表规则五花八门,一旦脱离了业务逻辑,很难确定某一条数据在哪个库哪个表。
基于这样的问题和情况,为了满足业务需求,很自然的就想到了使用大数据服务,将业务数据归集到一起,建立完整的数据仓库,便于数据的查询。
通过Azkaban调用Spark应用,将数据从RDS同步到Hive,运营平台和报表系统采用Presto加速访问Hive的数据。
数据同步采用Spark任务来进行,将任务打包之后,上传到Azkaban调度平台,使用Azkaban进行定时调度,完成T+1级别的数据同步工作。
数据同步代码示例:
删除Partition的代码示例:
从RDS同步数据到HIVE的代码示例:
Spark Application代码示例
在使用的过程中,需要将表中的IP地址,解析为所在地的名称,这需要调用第三方的一个服务接口来完成,为了完成这个任务,定义了一个自定义UDF函数,进行解析。
a. 自定义UDF函数
b. 使用自定义UDF函数
刚开始数据库配置同步配置文件直接写死,但是后续发现这样存在一些安全性的问题,后来采用将数据库相关的配置组合为一个JSON字符串,将其加密之后保存到MongoDB中,在使用时进行查询解密。
自从上次开始使用基于Hadoop的大数据体现方案之后,业务平稳发展,但是随着时间的推移,新的问题开始出现,主要出现的问题为两个:
数据同步过程中,遇到了另外一个问题,即业务存在大量的分库分表的,这些分库分表的逻辑五花八门,60张左右的逻辑板,经过分库分表之后达到了惊人的5000多张,为每张表配置任务很显然不太正常,这就需要能够在进行数据同步的时候动态生成需要的表列表,把表列表配置到DataX的配置文件中去。
经过技术的调用,Apache DolphinScheduler的Python任务类型很适合做这个事情,由于公司本身使用了Apache DolphinScheduler3.0的版本,其Python任务还不支持返回数据到下游节点,但是社区最新版本已经支持该能力,因为按照已实现版本对其进行改造。
改造之后,Python节点能够将数据传递给他的下游节点,因此使用Python脚本查询获取需要进行同步的表列表,将其传递给DataX节点,完成动态表的数据同步
下载支持python3.x的相关文件,替换DataX中的相同文件,即可支持python3.x使用
自从采用Apache DolphinScheduler + StarRocks数据方案以来,一切都很平稳发展;但是随着时间的推移,总会出现新的问题。
随着数据量的增多,使用方需求的增长,已经一些其他因素的影响,对目前的数据同步架构带来了一些不小的挑战,这些问题导致任务的维护和更新越来越麻烦,需要耗费大量的时间来进行,急需一种新的方式来处理。
鉴于数据链路变更,导致原本数据链路断裂的问题,通过调研之后,决定采用KAFKA进行数据的中转,在内网部署KAFKA集群,同时该集群提供公网访问地址;在RDS所在的内网机器上使用DataX将RDS数据通过公网地址写入KAFKA,在内网中通过KafkaConnector消费数据写入StarRocks。
DataX本身并没有kafkawriter实现,这就需要我们自己实现一个KafkaWriter来支持我们的需求,同时为了数据安全,希望能够对数据进行加密。
DataX的KafkaWriter实现
进行数据加密的实现:
Kafka的公网配置
Kafka的内外网配置,只需要修改kafka/config下面的server.properties文件中的如下配置即可。
Python脚本需要能够自动生成对应的DataX调度的配置文件和shell脚本,自动调度DataX进行任务的执行。因此经过调研,采用自定义配置文件,通过读取配置文件,动态生成对应的DataX任务脚本和调度脚本,调度任务执行。
自定义的配置文件示例1:
支持分库分表的配置文件示例2
如上的配置文件,解释如下:
我们在之前的任务同步中,遇到的问题便是日期的修改很麻烦,因此我们需要一个更加简单的方式来进行日期的批量更新。在我们上面的调度脚本中,包含了对日期表达式的解析,我们自定义了一种时间的表达式$[yyyyMMddHHmmss+/-N_Y] 通过解析该表达式,我们可以生成需要的任意时间,该时间表达式的含义为:
通过对该表达式的解析,我们可以生成相对于当前之前或之后的任何格式的时间字符串,将其用于同步的where条件中,既可以完成针对时间的解析。
日期目前可以计算,但是我们需要能够批量修改配置文件中的WHERE条件中的时间表达式,如我们想同步8天前的数据,我们就需要将脚本中的表达式修改为$[yyyyMMdd-8_d] ,即代表当前时间减去8天,这样我们就可以同步八天前那一天的数据,但是我们可能想同步从8天气到现在的所有数据,那么我们希望我们也能批量修改where表达式中的条件,如将=改为>=。
鉴于以上的需求,我们开发了一个新的Python脚本,通过简单的配置,即可一次修改所有脚本中的where条件中的表达式,这样,我们只需要执行两个脚本,就完成了一切,再也不需要依次修改执行10个工作流了。
StarRocks官方提供了starrocks-connector-for-kafka的实现,我们只需要在其中加入我们的数据解密逻辑即可直接使用。
解密的逻辑
b. 配置KafkaConnector任务
本网信息来自于互联网,目的在于传递更多信息,并不代表本网赞同其观点。其原创性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容、文字的真实性、完整性、及时性本站不作任何保证或承诺,并请自行核实相关内容。本站不承担此类作品侵权行为的直接责任及连带责任。如若本网有任何内容侵犯您的权益,请及时联系我们,本站将会在24小时内处理完毕,E-mail:xinmeigg88@163.com
本文链接:http://www.bhha.com.cn/news/3634.html