AlgoX开发指南

1 AlgoX介绍
AlgoX是金蝶云.苍穹的分布式计算框架,用于解决海量数据计算场景。基于MapReduce原理,分布式部署,通过多台计算节点并行计算,实现弹性计算能力,使得性能可以伸缩扩展。
互联网行业流行的分布式计算框架有4种:
1) Apache Hadoop MapReduce框架,这是Hadoop生态原生的MapReduce框架,在Hadoop技术刚刚开始流行时,曾经被大量使用,由于编写困难和性能不佳,逐渐产生了更优秀的框架, 网站:https://hadoop.apache.org
2) Apache Storm, Apache Storm是一个分布式实时大数据处理系统。简化了流数据的可靠处理,像 Hadoop 一样实现实时批处理。Storm 很简单,可用于任意编程语言。Apache Storm 采用 Clojure 开发。网站:https://storm.apache.org
3) Apache Spark ,是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点。网站:https://spark.apache.org。
4) Apache Flink,是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行。网站:https://flink.apache.org。
以上描述来自网上或者百度百科,从应用上说,都可以提供MaReduce分布式计算能力,它们之间的区别本文不做详述,有兴趣的读者可以自行学习。
金蝶云苍穹AlgoX研发的目的就是采用MapReduce思想和技术,利用分布式的能力,使得性能可扩展,提升大数据查询计算和分析的能力,用以解决ERP中复杂计算、报表分析的场景。
传统的ERP产品,计算分析能力一般都依赖关系数据库,一方面数据库容易形成瓶颈,性能无法扩展,其次大量使用数据库做计算分析,会影响其他功能的日常操作。
2 AlgoX架构

AlgoX是apache flink的基础上,进行封装和优化,提供了:
1) AlgoX API,开发者基于AlgoX API就能开发Flink程序,无需使用Flink原生的API进行开发,可以说,AlgoX API是Flink在苍穹上的应用API。如提供了DB、ORM、Redis等与苍穹交互的API。
2) 云原生容器部署能力,与苍穹微服务集群部署进行融合,Flink Master(JobManager)和Worker(TaskManager)作为苍穹微服务的应用,方便AlgoX集群的部署,而不需要学习Flink原生的部署方法。
3) 监控的集成,Flink的web monitor集成到苍穹monitor和产品界面中。
3 AlgoX集群部署
AlgoX计算集群,有两个角色,Master和Worker,客户端程序通过AlgoX JobSession将任务提交给AlgoX Master,AlgoX Master将任务进行分解并分发到Worker节点中去运行。所以:
1) Job Client(JobSession),代码和运行是在调用方
2) Master,集群协调者,负责资源管理、任务分发管理和监控
3) Worker,集群真正的任务运行者,MapReduce算子都是在worker节点中跑的。

AlgoX支持两种集群部署模式,本地(伪集群)模式和集群模式。

本地部署模式,即伪集群,JobClient、Master、Worker三种角色都在同一个应用节点容器中运行,即在调用方应用节点运行,这种模式下,AlgoX集群对开发者或用户是透明隐藏的。如财务总账节点如果运行algox任务,那么全部代码都运行在总账节点下。
苍穹默认情况下,AlgoX就是本地部署模式运行的。

集群部署模式,即真集群,JobClient、Master、Worker三种角色运行在不同的容器应用,JobClient运行在调用方应用节点上,这个无需部署。Master和Worker分别独立部署,构成了一个计算集群,如AlgoX Master建议的appName为algox-master,AlgoX Worker建议的appName为algox-worker。master节点启用1个或2个容器即可,worker节点根据情况部署。
3.1 线程槽与并行度
一个AlgoX集群能运行多少个AlgoX任务,是无限的还是受限的,这要从Flink的资源分配机制说起。AlgoX(Flink)把一个Worker节点定义成一个资源池,称为槽池(SlotPool),其中一个Slot就是一个可运行的线程,对应Java中的线程。每个Worker节点有多少个槽是固定的,通过配置,启动时就确定了,运行期不可调整。
假设一个Worker节点有N个线程槽,集群总共有M个Worker节点,那么整个集群就有M*N个线程槽。
每个Job任务运行时如何分配资源的,需要多少个槽?这个可以简单地看成任务并行度(Parallelism),一个任务的并行度是全局设置的。
假设系统设置任务并行度为4,那么可以认为整个集群同时能够跑M*N/4个任务。当然,这并不是精确的,内部更复杂,有一些例外情况,但在资源评估是可以简单这样计算。
3.2 集群配置参数
客户端配置(本地和集群模式通用) | ||||||||||||
配置项 | 作用 | 本地模式 | 集群模式 | |||||||||
algox.client.invokemode | 客户端调用模式 配置在应用端,支持不同应用可以用不同模式,如fi用本地模式,cal用集群模式。一般设为全局开关。 未来algox将会支持多集群部署,不同场景配置到不同algox集群 | local (默认) | remote | |||||||||
algox.client.job.parallelism | 任务并行度 | 默认值4 有些版本是8 | 默认值4 有些版本是8 | |||||||||
本地模式配置 应用端配置: algox.client.invokemode=local 本地模式可以不用配,默认就是 以下配置项也是在应用端配置: | ||||||||||||
配置项 | 作用 | 单位 | 默认值 | |||||||||
algox.master.memory | 分配个AlgoX Master使用的最大堆内存 一般不需要修改 | M | 100 | |||||||||
algox.worker.memory | 分配个AlgoX Worker使用的最大堆内存 job任务真正运行在worker中,这个内存越大,发生磁盘IO的机率就越小,提升性能 | M | 2048 | |||||||||
algox.worker.threads | worker的线程槽数量 | 64 | ||||||||||
algox.io.tmp.dirs | 临时文件目录,worker计算时用该临时文件做交换或者外排序 可以设置多个:/a,/b, 设置多个可以提高IO并行能力,提升性能 | /tmp | ||||||||||
algox.master.jobstore.cacheCount | master缓存的job最大个数,超过就删除旧文件,监控使用 | 2000 | ||||||||||
algox.master.jobstore.cacheSize | master缓存的job最大磁盘大小,超过就删除旧文件,监控使用 | M | 200 | |||||||||
algox.master.jobstore.expireTime | master缓存的job文件过期时间,超过就删除,监控使用 | 小时 | 24 | |||||||||
algox.web.refresh.interval | web监控自动刷新频率 | 毫秒 | 10000 | |||||||||
集群模式配置 应用端配置: algox.client.invokemode=remote 公共是指Master和Worker节点都要配置 | ||||||||||||
配置项 | 作用 | 单位 | 默认值 | |||||||||
公共 | algox.cluster.name | algox集群名,可选,默认为苍穹集群名 | ||||||||||
algox.cluster.zookeeper | 协调AlgoX集群,可选,默认为configUrl | |||||||||||
algox.cluster.zkRootPath | zookeeper根路径,可选 | / | ||||||||||
algox.network.heartbeat.interval | master和worker之间心跳间隔 | 毫秒 | 10000 | |||||||||
algox.network.heartbeat.timeout | master和worker之间心跳超时,超过超时时间后,master认为worker已死,可能导致任务失败 | 毫秒 | 300000 | |||||||||
algox.network.rpc.connection.timeout | worker节点之间rpc通讯socket连接超时 | 秒 | 60 | |||||||||
algox.network.rpc.read.timeout | worker节点之间rpc通讯socket读取超时 | 秒 | 180 | |||||||||
algox.network.request.timeout | worer下游节点等待上游节点传输数据超时 | 毫秒 | 180000 | |||||||||
Master | algox.master.enable | 必须,值为true | ||||||||||
algox.master.memory | 分配个AlgoX Master使用的最大堆内存 一般不需要修改 | M | 100 | |||||||||
algox.master.jobstore.cacheCount | master缓存的job最大个数,超过就删除旧文件,监控使用 | 2000 | ||||||||||
algox.master.jobstore.cacheSize | master缓存的job最大磁盘大小,超过就删除旧文件,监控使用 | M | 200 | |||||||||
algox.master.jobstore.expireTime | master缓存的job文件过期时间,超过就删除,监控使用 | 小时 | 24 | |||||||||
algox.web.refresh.interval | web监控自动刷新频率 | 毫秒 | 10000 | |||||||||
Worker | algox.worker.enable | 必须,值为true | ||||||||||
algox.worker.memory | 分配个AlgoX Worker使用的最大堆内存 job任务真正运行在worker中,这个内存越大,发生磁盘IO的机率就越小,提升性能 | M | 4096 | |||||||||
algox.worker.threads | worker的线程槽数量 | 32 | ||||||||||
algox.io.tmp.dirs | 临时文件目录,worker计算时用该临时文件做交换或者外排序 可以设置多个:/a,/b, 设置多个可以提高IO并行能力,提升性能 | /tmp | ||||||||||
3.3 内存和CPU资源
无论是用本地模式还是集群模式,都是需要消耗内存和CPU的。如果是本地模式,跟业务应用部署在一起,内存和CPU是和应用代码共用的,需要考虑互相影响。
本地模式,默认AlgoX内存最大使用algox.worker.memory(默认2G),应用节点必须预留出2G堆内存给AlgoX。
集群部署,Master节点内存的使用不多,依赖于Job对象的大小,以及并发的个数,一般Master节点设为6G堆内存即可。Worker节点是真正运行Job的节点,Java堆内存建议10G及以上,AlgoX内存(algox.worker.memory)配为6G。
AlgoX内存(algox.worker.memory)只是给AlgoX内部使用,包括内部排序,Map,Join,Shuffle等算法。但是开发人员自己写的Function(如ReduceFuction,MapFunction等),内部创建的java对象,并不受algox管控,由jvm分配。从这个角度看,开发人员要保证自己的Function不要OOM,AlgoX框架不能保证业务代码本身的可靠性,AlgoX框架内存管理机制可以保证algox本身的算法不会OOM。所以业务代码尽量使用algox原生算法。
AlgoX内部算法对CPU要求较高,因此CPU建议2个以上,最好4个。
本地模式适合于小数据量低并发的客户,但数据规模和并发变大后,本地模式可能撑不住,导致性能变慢,槽资源不足报错,或者干脆cpu100%卡死。本地模式下,可以通过增加相应的应用节点,来应对性能压力。
但应用规模、数据规模和并发变大后,最好是部署单独的AlgoX集群,达到资源专用,并通过扩容Worker节点应对性能压力。
3.4 IO存储资源
对于大数据量计算来说,再大的内存都可能不够用,这时候就需要采用磁盘作为交换,如操作系统的swap。AlgoX也是如此。
AlgoX对磁盘的使用主要是Worker节点,默认是/tmp目录,可以通过algox.io.tmp.dirs进行配置,这个配置项可以配多个目录。
由于磁盘IO是必不可少的,从性能考虑,IO性能越快越好,建议是SSD,algox.io.tmp.dirs配多个目录有利于并行读写。 从可靠性考虑,需要保证磁盘目录足够大,否则可能会报错。
在容器里部署,就要考虑容器的存储空间是否足够,建议在50G以上,如果不够,需要增加容器存储空间或者挂载磁盘。
挂载磁盘有两种方式,一是直接挂载为/tmp, 第二种是挂载成单独的目录,并且通过algox.io.tmp.dirs进行配置。
IO存储资源部分,对本地部署和集群部署模式是一样的。
注意:如果是挂载网盘,容器意外死亡时,可能会产生垃圾文件,随着时间推移,垃圾越来越多,需要自己写运维脚本进行清理。
3.5 FAQ
问: 集群模式下,Master节点需要部署几个节点?
答: Master建议1-2个即可,Master多个节点只是作为互备,同时只会有一个真正作为集群协调者。由于Master本身不干活,比较稳定。
问: 集群模式下Zookeeper是否要独立部署?
答:默认不需要,当并发规模特别大的时候建议独立部署,如同时有几十上百个任务在运行。
问:如何判断集群部署是否成功?
答:部署完,进入monitor,在注册中心找到master节点,点“algox”按钮,如果正常显示apache flink监控界面,worker节点个数大于0,并有成功运行的Remote Demo 任务。

4 AlgoX开发指南
4.1 原理介绍
AlgoX是一种分布式并行数据迭代计算框架,一种MapReduce框架,遵循Input -> Transform -> Output的编程和运行模式,由Input,Transform,Output三类节点构成了一棵有向图。
编程期生成的Job,是一棵逻辑有向图,运行期每一个节点都可能是并行执行的,在多个Worker线程槽中运行的。

4.2 AlgoX VS Algo
AlgoX和Algo是什么关系,有了Algo为啥还要AlgoX?
两者既有共性也有差异,解决的问题场景不同。
总的来说,Algo是一个内存数据库计算引擎,AlgoX是一个分布式数据库计算引擎,Algo的底层原理是关系数据库算法(SQL引擎),AlgoX的底层原理是MapReduce。虽然,Algo和AlgoX表现出来的能力有些类似,都能做SQL计算,常见的SQL算子如过滤,Join,GroupBySum,排序等,双方都支持,甚至MapReduce的接口两者也都有,但AlgoX是用底层MapReduce能力来支撑SQL能力,重点是分布式算法,而Algo是单节点内存算法。
Algo是嵌入式单节点,主要是单线程运行的。AlgoX是可部署的多节点,多线程运行的,如果是本地伪集群部署,则退化成单节点多线程。
Algo | AlgoX | |
部署和运行模式 | 不可独立部署,单节点 | 多节点,可独立部署 |
编程相似性 | 1. 都具备SQL92所有算法 2. 都有Map和Reduce能力 3. 都支持Input,Input编程接口一样 4. RowMeta,Field,DataType是同一套 | |
编程差异 | 1. 中间结果集DataSet可以遍历 2. 没有Output | 1. 中间结果集DataSetX不可遍历,只能输出到下个DataSetX,最后到Output 2. 必须有Output |
性能 | 大数据量下,AlgoX优于Algo,小数据量下,不一定 | |
在AlgoX任务中可以使用Algo的能力,比如:
1) 把Algo的DataSet作为AlgoX的Input使用
2) 在AlgoX的function中,如reduce中,如果需要处理大量数据,可以借助Algo将Iterator<RowX>转化为algo DataSet,进一步处理,提高性能和可靠性,因为Algo算法性能更佳,并能够控制内存,防止OOM。
4.3 快速入门(示例代码)
创建并提交一个AlgoX任务包括以下步骤:
1) 创建JobSession
2) createInput,jobSession.fromInput生成DataSetX
3) 对DataSetX进行系列转换,调用DataSetX接口,实现业务逻辑
4) 创建Output
5) commit任务,等待返回
// 1. 创建JobSession
JobSession session = AlgoX.createSession(“jobName”, “jobTitle”);
//2. 创建Input
Field fieldId = new Field("id", DataType.IntegerType);
Field fieldKey = new Field("key", DataType.StringType);
Field fieldName = new Field("name", DataType.StringType);
Field fieldType = new Field("type", DataType.IntegerType);
RowMeta rowMeta = new RowMeta(fieldId,
AlgoX开发指南
声明:除非特别标注,否则均为本站原创文章,转载时请以链接形式注明文章出处。如若本站内容侵犯了原著者的合法权益,可联系本站删除。



