AlgoX开发指南
1 AlgoX介绍
AlgoX是金蝶云.苍穹的分布式计算框架,用于解决海量数据计算场景。基于MapReduce原理,分布式部署,通过多台计算节点并行计算,实现弹性计算能力,使得性能可以伸缩扩展。
互联网行业流行的分布式计算框架有4种:
1) Apache Hadoop MapReduce框架,这是Hadoop生态原生的MapReduce框架,在Hadoop技术刚刚开始流行时,曾经被大量使用,由于编写困难和性能不佳,逐渐产生了更优秀的框架, 网站:https://hadoop.apache.org
2) Apache Storm, Apache Storm是一个分布式实时大数据处理系统。简化了流数据的可靠处理,像 Hadoop 一样实现实时批处理。Storm 很简单,可用于任意编程语言。Apache Storm 采用 Clojure 开发。网站:https://storm.apache.org
3) Apache Spark ,是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点。网站:https://spark.apache.org。
4) Apache Flink,是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行。网站:https://flink.apache.org。
以上描述来自网上或者百度百科,从应用上说,都可以提供MaReduce分布式计算能力,它们之间的区别本文不做详述,有兴趣的读者可以自行学习。
金蝶云苍穹AlgoX研发的目的就是采用MapReduce思想和技术,利用分布式的能力,使得性能可扩展,提升大数据查询计算和分析的能力,用以解决ERP中复杂计算、报表分析的场景。
传统的ERP产品,计算分析能力一般都依赖关系数据库,一方面数据库容易形成瓶颈,性能无法扩展,其次大量使用数据库做计算分析,会影响其他功能的日常操作。
2 AlgoX架构
AlgoX是apache flink的基础上,进行封装和优化,提供了:
1) AlgoX API,开发者基于AlgoX API就能开发Flink程序,无需使用Flink原生的API进行开发,可以说,AlgoX API是Flink在苍穹上的应用API。如提供了DB、ORM、Redis等与苍穹交互的API。
2) 云原生容器部署能力,与苍穹微服务集群部署进行融合,Flink Master(JobManager)和Worker(TaskManager)作为苍穹微服务的应用,方便AlgoX集群的部署,而不需要学习Flink原生的部署方法。
3) 监控的集成,Flink的web monitor集成到苍穹monitor和产品界面中。
3 AlgoX集群部署
AlgoX计算集群,有两个角色,Master和Worker,客户端程序通过AlgoX JobSession将任务提交给AlgoX Master,AlgoX Master将任务进行分解并分发到Worker节点中去运行。所以:
1) Job Client(JobSession),代码和运行是在调用方
2) Master,集群协调者,负责资源管理、任务分发管理和监控
3) Worker,集群真正的任务运行者,MapReduce算子都是在worker节点中跑的。
AlgoX支持两种集群部署模式,本地(伪集群)模式和集群模式。
本地部署模式,即伪集群,JobClient、Master、Worker三种角色都在同一个应用节点容器中运行,即在调用方应用节点运行,这种模式下,AlgoX集群对开发者或用户是透明隐藏的。如财务总账节点如果运行algox任务,那么全部代码都运行在总账节点下。
苍穹默认情况下,AlgoX就是本地部署模式运行的。
集群部署模式,即真集群,JobClient、Master、Worker三种角色运行在不同的容器应用,JobClient运行在调用方应用节点上,这个无需部署。Master和Worker分别独立部署,构成了一个计算集群,如AlgoX Master建议的appName为algox-master,AlgoX Worker建议的appName为algox-worker。master节点启用1个或2个容器即可,worker节点根据情况部署。
3.1 线程槽与并行度
一个AlgoX集群能运行多少个AlgoX任务,是无限的还是受限的,这要从Flink的资源分配机制说起。AlgoX(Flink)把一个Worker节点定义成一个资源池,称为槽池(SlotPool),其中一个Slot就是一个可运行的线程,对应Java中的线程。每个Worker节点有多少个槽是固定的,通过配置,启动时就确定了,运行期不可调整。
假设一个Worker节点有N个线程槽,集群总共有M个Worker节点,那么整个集群就有M*N个线程槽。
每个Job任务运行时如何分配资源的,需要多少个槽?这个可以简单地看成任务并行度(Parallelism),一个任务的并行度是全局设置的。
假设系统设置任务并行度为4,那么可以认为整个集群同时能够跑M*N/4个任务。当然,这并不是精确的,内部更复杂,有一些例外情况,但在资源评估是可以简单这样计算。
3.2 集群配置参数
客户端配置(本地和集群模式通用) | ||||||||||||
配置项 | 作用 | 本地模式 | 集群模式 | |||||||||
algox.client.invokemode | 客户端调用模式 配置在应用端,支持不同应用可以用不同模式,如fi用本地模式,cal用集群模式。一般设为全局开关。 未来algox将会支持多集群部署,不同场景配置到不同algox集群 | local (默认) | remote | |||||||||
algox.client.job.parallelism | 任务并行度 | 默认值4 有些版本是8 | 默认值4 有些版本是8 | |||||||||
本地模式配置 应用端配置: algox.client.invokemode=local 本地模式可以不用配,默认就是 以下配置项也是在应用端配置: | ||||||||||||
配置项 | 作用 | 单位 | 默认值 | |||||||||
algox.master.memory | 分配个AlgoX Master使用的最大堆内存 一般不需要修改 | M | 100 | |||||||||
algox.worker.memory | 分配个AlgoX Worker使用的最大堆内存 job任务真正运行在worker中,这个内存越大,发生磁盘IO的机率就越小,提升性能 | M | 2048 | |||||||||
algox.worker.threads | worker的线程槽数量 | 64 | ||||||||||
algox.io.tmp.dirs | 临时文件目录,worker计算时用该临时文件做交换或者外排序 可以设置多个:/a,/b, 设置多个可以提高IO并行能力,提升性能 | /tmp | ||||||||||
algox.master.jobstore.cacheCount | master缓存的job最大个数,超过就删除旧文件,监控使用 | 2000 | ||||||||||
algox.master.jobstore.cacheSize | master缓存的job最大磁盘大小,超过就删除旧文件,监控使用 | M | 200 | |||||||||
algox.master.jobstore.expireTime | master缓存的job文件过期时间,超过就删除,监控使用 | 小时 | 24 | |||||||||
algox.web.refresh.interval | web监控自动刷新频率 | 毫秒 | 10000 | |||||||||
集群模式配置 应用端配置: algox.client.invokemode=remote 公共是指Master和Worker节点都要配置 | ||||||||||||
配置项 | 作用 | 单位 | 默认值 | |||||||||
公共 | algox.cluster.name | algox集群名,可选,默认为苍穹集群名 | ||||||||||
algox.cluster.zookeeper | 协调AlgoX集群,可选,默认为configUrl | |||||||||||
algox.cluster.zkRootPath | zookeeper根路径,可选 | / | ||||||||||
algox.network.heartbeat.interval | master和worker之间心跳间隔 | 毫秒 | 10000 | |||||||||
algox.network.heartbeat.timeout | master和worker之间心跳超时,超过超时时间后,master认为worker已死,可能导致任务失败 | 毫秒 | 300000 | |||||||||
algox.network.rpc.connection.timeout | worker节点之间rpc通讯socket连接超时 | 秒 | 60 | |||||||||
algox.network.rpc.read.timeout | worker节点之间rpc通讯socket读取超时 | 秒 | 180 | |||||||||
algox.network.request.timeout | worer下游节点等待上游节点传输数据超时 | 毫秒 | 180000 | |||||||||
Master | algox.master.enable | 必须,值为true | ||||||||||
algox.master.memory | 分配个AlgoX Master使用的最大堆内存 一般不需要修改 | M | 100 | |||||||||
algox.master.jobstore.cacheCount | master缓存的job最大个数,超过就删除旧文件,监控使用 | 2000 | ||||||||||
algox.master.jobstore.cacheSize | master缓存的job最大磁盘大小,超过就删除旧文件,监控使用 | M | 200 | |||||||||
algox.master.jobstore.expireTime | master缓存的job文件过期时间,超过就删除,监控使用 | 小时 | 24 | |||||||||
algox.web.refresh.interval | web监控自动刷新频率 | 毫秒 | 10000 | |||||||||
Worker | algox.worker.enable | 必须,值为true | ||||||||||
algox.worker.memory | 分配个AlgoX Worker使用的最大堆内存 job任务真正运行在worker中,这个内存越大,发生磁盘IO的机率就越小,提升性能 | M | 4096 | |||||||||
algox.worker.threads | worker的线程槽数量 | 32 | ||||||||||
algox.io.tmp.dirs | 临时文件目录,worker计算时用该临时文件做交换或者外排序 可以设置多个:/a,/b, 设置多个可以提高IO并行能力,提升性能 | /tmp | ||||||||||
3.3 内存和CPU资源
无论是用本地模式还是集群模式,都是需要消耗内存和CPU的。如果是本地模式,跟业务应用部署在一起,内存和CPU是和应用代码共用的,需要考虑互相影响。
本地模式,默认AlgoX内存最大使用algox.worker.memory(默认2G),应用节点必须预留出2G堆内存给AlgoX。
集群部署,Master节点内存的使用不多,依赖于Job对象的大小,以及并发的个数,一般Master节点设为6G堆内存即可。Worker节点是真正运行Job的节点,Java堆内存建议10G及以上,AlgoX内存(algox.worker.memory)配为6G。
AlgoX内存(algox.worker.memory)只是给AlgoX内部使用,包括内部排序,Map,Join,Shuffle等算法。但是开发人员自己写的Function(如ReduceFuction,MapFunction等),内部创建的java对象,并不受algox管控,由jvm分配。从这个角度看,开发人员要保证自己的Function不要OOM,AlgoX框架不能保证业务代码本身的可靠性,AlgoX框架内存管理机制可以保证algox本身的算法不会OOM。所以业务代码尽量使用algox原生算法。
AlgoX内部算法对CPU要求较高,因此CPU建议2个以上,最好4个。
本地模式适合于小数据量低并发的客户,但数据规模和并发变大后,本地模式可能撑不住,导致性能变慢,槽资源不足报错,或者干脆cpu100%卡死。本地模式下,可以通过增加相应的应用节点,来应对性能压力。
但应用规模、数据规模和并发变大后,最好是部署单独的AlgoX集群,达到资源专用,并通过扩容Worker节点应对性能压力。
3.4 IO存储资源
对于大数据量计算来说,再大的内存都可能不够用,这时候就需要采用磁盘作为交换,如操作系统的swap。AlgoX也是如此。
AlgoX对磁盘的使用主要是Worker节点,默认是/tmp目录,可以通过algox.io.tmp.dirs进行配置,这个配置项可以配多个目录。
由于磁盘IO是必不可少的,从性能考虑,IO性能越快越好,建议是SSD,algox.io.tmp.dirs配多个目录有利于并行读写。 从可靠性考虑,需要保证磁盘目录足够大,否则可能会报错。
在容器里部署,就要考虑容器的存储空间是否足够,建议在50G以上,如果不够,需要增加容器存储空间或者挂载磁盘。
挂载磁盘有两种方式,一是直接挂载为/tmp, 第二种是挂载成单独的目录,并且通过algox.io.tmp.dirs进行配置。
IO存储资源部分,对本地部署和集群部署模式是一样的。
注意:如果是挂载网盘,容器意外死亡时,可能会产生垃圾文件,随着时间推移,垃圾越来越多,需要自己写运维脚本进行清理。
3.5 FAQ
问: 集群模式下,Master节点需要部署几个节点?
答: Master建议1-2个即可,Master多个节点只是作为互备,同时只会有一个真正作为集群协调者。由于Master本身不干活,比较稳定。
问: 集群模式下Zookeeper是否要独立部署?
答:默认不需要,当并发规模特别大的时候建议独立部署,如同时有几十上百个任务在运行。
问:如何判断集群部署是否成功?
答:部署完,进入monitor,在注册中心找到master节点,点“algox”按钮,如果正常显示apache flink监控界面,worker节点个数大于0,并有成功运行的Remote Demo 任务。
4 AlgoX开发指南
4.1 原理介绍
AlgoX是一种分布式并行数据迭代计算框架,一种MapReduce框架,遵循Input -> Transform -> Output的编程和运行模式,由Input,Transform,Output三类节点构成了一棵有向图。
编程期生成的Job,是一棵逻辑有向图,运行期每一个节点都可能是并行执行的,在多个Worker线程槽中运行的。
4.2 AlgoX VS Algo
AlgoX和Algo是什么关系,有了Algo为啥还要AlgoX?
两者既有共性也有差异,解决的问题场景不同。
总的来说,Algo是一个内存数据库计算引擎,AlgoX是一个分布式数据库计算引擎,Algo的底层原理是关系数据库算法(SQL引擎),AlgoX的底层原理是MapReduce。虽然,Algo和AlgoX表现出来的能力有些类似,都能做SQL计算,常见的SQL算子如过滤,Join,GroupBySum,排序等,双方都支持,甚至MapReduce的接口两者也都有,但AlgoX是用底层MapReduce能力来支撑SQL能力,重点是分布式算法,而Algo是单节点内存算法。
Algo是嵌入式单节点,主要是单线程运行的。AlgoX是可部署的多节点,多线程运行的,如果是本地伪集群部署,则退化成单节点多线程。
Algo | AlgoX | |
部署和运行模式 | 不可独立部署,单节点 | 多节点,可独立部署 |
编程相似性 | 1. 都具备SQL92所有算法 2. 都有Map和Reduce能力 3. 都支持Input,Input编程接口一样 4. RowMeta,Field,DataType是同一套 | |
编程差异 | 1. 中间结果集DataSet可以遍历 2. 没有Output | 1. 中间结果集DataSetX不可遍历,只能输出到下个DataSetX,最后到Output 2. 必须有Output |
性能 | 大数据量下,AlgoX优于Algo,小数据量下,不一定 |
在AlgoX任务中可以使用Algo的能力,比如:
1) 把Algo的DataSet作为AlgoX的Input使用
2) 在AlgoX的function中,如reduce中,如果需要处理大量数据,可以借助Algo将Iterator<RowX>转化为algo DataSet,进一步处理,提高性能和可靠性,因为Algo算法性能更佳,并能够控制内存,防止OOM。
4.3 快速入门(示例代码)
创建并提交一个AlgoX任务包括以下步骤:
1) 创建JobSession
2) createInput,jobSession.fromInput生成DataSetX
3) 对DataSetX进行系列转换,调用DataSetX接口,实现业务逻辑
4) 创建Output
5) commit任务,等待返回
// 1. 创建JobSession
JobSession session = AlgoX.createSession(“jobName”, “jobTitle”);
//2. 创建Input
Field fieldId = new Field("id", DataType.IntegerType);
Field fieldKey = new Field("key", DataType.StringType);
Field fieldName = new Field("name", DataType.StringType);
Field fieldType = new Field("type", DataType.IntegerType);
RowMeta rowMeta = new RowMeta(fieldId, fieldKey, fieldName, fieldType);
String dbKey = "basedata";
String sql = "select fid,fkey,fname,ftype from t_bas_operation";
DbInput input = new DbInput("algox.test", dbKey, sql, null, rowMeta);
//数据转换
DataSetX dataset = session.fromInput(input);
//3. 调用filter
dataset = dataset.filter("type=1");
//4. 创建Output
sql = "insert into t_test11 values(?,?,?,?)";
DbOutput output = new DbOutput(dbKey, sql, rowMeta);
dataset.output(output);
//6. 提交Job
try{
session.commit(60, TimeUnit.SECONDS);
}catch(Exception e){
throw e;
}
4.4 API说明
4.4.1 创建任务JobSession
JobSession session = AlgoX.createSession(jobName, jobTitle);
jobTitle用来监控显示, jobName用来监控,打印日志,并且可以用来独立配置jobName的行为,比如为某个jobName独立部署一个AlgoX集群,所以,jobName不能是随机字符串。
4.4.2 JobSession
JobSession就是一个Job任务,由session开始创建源头节点DataSetX,即数据源(DataSource),最后commit任务。
//创建Input DataSetX
DataSetX fromInput(Input input)
//创建Input DataSetX,多个Input生成并行取数,提高取数性能
DataSetX fromInput(Input… inputs)
//提交当前job,同步等待timeout
//CommitTimeoutException:提交任务超时
//RunningTimeoutException:AlgoX集群执行超时
void commit(int timeout, TimeUnit timeUnit) throws CommitTimeoutException, RunningTimeoutException
//返回DataSetOutput创建的Algo DataSet,非AlgoX DataSetX
DataSet readDataSet(String id)
注意,fromInput最好是传入数组,这样才能充分发挥并行能力,但这些Input必须是同样的表头,因为fromInput创建出来的DataSetX是同一个,逻辑上等同于union了多个Input。但是Input数组也不能太大,因为任务并行度是有限的,建议Input数组大小为16或32。如果只有一个Input,那么该DataSetX运行时是无法并行取数的,虽然AlgoX任务是可并行的,那是后续transform节点可能会并行,Input DataSetX节点是否并行,依赖于Input的个数。
4.4.3 Input
Input用来创建数据源,是Job的源头,有以下几种:
接口/对象类 | 构造函数 | 作用 |
kd.bos.algo.Input | 接口 | |
DbInput | DbInput(String algoKey, String routeKey, String sql, Object[] params, RowMeta rowMeta) | DB sql查询 |
OrmInput | OrmInput(String algoKey, String entityName, String selectFields, QFilter[] filters) | Orm查询 |
DataSetInput | DataSetInput(DataSet dataSet) | 将Algo DataSet作为Input |
CollectionInput | CollectionInput(RowMeta rowMeta, Collection<Object[]> collection) | 集合对象 |
CustomizedInput | 抽象类,需自己实现取数逻辑 | 自定义Input |
以上接口都比较直观。DbInput类似于Db.queryDataSet(), OrmInput类似于Orm.queryDataSet(),相信大家都比较熟悉不做介绍。
CustomizedInput提供了自定义Input的能力,接口如下:
public interface CustomizedInput extends Input{
//数据获取迭代器
public abstract Iterator<Object[]> createIterator();
//数据迭代完调用,做资源清理
public abstract void close();
}
4.4.4 DataSetX API
Job任务创建的有向图中,除了末端Output,所有节点都是DataSetX。
源头是通过Input创建,这时创建的DataSetX称为Input DataSetX,或者DataSource DataSetX。
DataSetX经过不断迭代,最后output生成Output DataSetX,Output DataSetX不可继续迭代。
因此,DataSetX接口包括了AlgoX主体的功能。
4.4.4.1 DataSetX API列表
特别说明: 如果支持表达式的地方,表达式语法与Algo的表达式一样。
方法 | 说明 |
RowMeta getRowMeta() | 得到DataSetX的RowMeta |
void output(Output output) | 输出到Output |
基础转换API | |
DataSetX select(String... fields) | 选择字段,类似: select a,b,c,支持别名,如fid as id,或fid id 目前不支持表达式,如果有表达式需求,请使用map函数,性能更好 关系数据库对应算法: project,投影 |
DataSetX filter(String expr) DataSetX filter(String expr, Map<String, Object> params) | 过滤,类似sql中where,expr支持写表达式 expr中可以写变量,变量值通过params传递,比如”a>b”, params是map,key包含a和b 与Algo的filter功能和用法类似 |
DataSetX filter(FilterFunction func) | 过滤,通过FilterFunction自己实现过滤逻辑 |
DataSetX orderBy(String... fields) | 过滤,field不支持表达式 |
DataSetX top(int n) | top n |
DataSetX distinct(String... fields) | distinct,fields可以为空 |
DataSetX addFields(Field[] fields, Object[] values) | 增加常量字段,fields为字段,values是值 |
DataSetX removeFields(String... fields) | 删除字段 |
DataSetX union(DataSetX dataSet) | union操作 |
MapReduce转换API MapReduce思想上指先分组GroupBy,再合并(Reduce)。 也可以直接在DataSetX上reduce,这时候整个DataSetX是一个分组。 map和flatMap函数没有Reduce方法,也属于退化的MapReduce。 以下是DataSetX自身的方法 | |
DataSetX map(MapFunction func) | map映射,一对一映射,一行RowX生成新的一行RowX |
DataSetX flatMap(FlatMapFunction func) | flatMap映射,一对多映射,一行RowX生成新的N行RowX,N可以为0 |
DataSetX reduceGroup(GroupReduceFunction func) | 对整个DataSetX做reduce操作,开发需要实现GroupReduceFunction |
DataSetX combineGroup(GroupCombineFunction func) | 对整个DataSetX做combine操作 combine类似于reduce,后续介绍 |
DataSetX combineReduceGroup(GroupCombineReduceFunction func) | 对整个DataSetX做combine和reduce操作 combine类似于reduce,后续介绍 |
DataSetX sum(String field) DataSetX sum(String field, String alias) DataSetX max(String field) DataSetX max(String field, String alias) DataSetX min(String field) DataSetX min(String field, String alias) DataSetX count(String field) DataSetX count(String field, String alias) | 对整个DataSetX进行汇总,生成的DataSetX只有一行记录 类似sql的select sum(field) from t, 没有group by 注意:没有avg聚合函数,分布式下,avg一般都不支持 |
MapReduce转换API:GroupBy及Grouper上的API | |
DataSetX方法: Grouper groupBy(String... fields) | 根据fields groupBy分组,生成Grouper对象 field不支持表达式 Grouper只是分组 |
Grouper方法: DataSetX reduceGroup(GroupReduceFunction func) | 对分组做reduce操作,开发需要实现GroupReduceFunction |
Grouper方法: DataSetX combineGroup(GroupCombineFunction func) | 对分组做combine操作 combine类似于reduce,后续介绍 |
Grouper方法: DataSetX combineReduceGroup(GroupCombineReduceFunction func) | 对分组做combine和reduce操作 combine类似于reduce,后续介绍 |
Grouper方法: DataSetX sum(String field) DataSetX sum(String field, String alias) DataSetX max(String field) DataSetX max(String field, String alias)< 上一篇:苍穹跨库查询超能力,超级查询来了下一篇:第三方用户映射表 本文2024-09-23 00:33:48发表“云苍穹知识”栏目。 您需要登录后才可以发表评论, 登录登录 或者 注册 最新文档 热门文章 阅读排行确认删除? |