实践案例 | 高性能亿级数据集成方案,又快又准确!
小编推荐
大企内部业务系统繁多,由于业务需求,常需要进行异构系统间的数据集成。当集成数据量庞大,达亿级时,采取常规的集成方案往往数据同步效率较低。那么,如何才能高效完成数据同步呢?
本期亿级数据集成性能优化方案告诉你答案,基于金蝶云·苍穹集成服务云,综合应用水平分表、微服务等能力,让数据集成准确、可靠的同时保障高性能!
案例撰稿人:戴松。
1 业务背景
某集团A旗下有3000多家组织需要进行快报、季报业务,其报表业务数据支撑依托EAS、SAP、Oracle等多种来源系统。
客户要求统一在星瀚系统开展合并报表业务,统一合并规则,实现不同期次各级单体报表和各级合并报表线上化集中、规范管理。对此,需要把各个系统的科目余额、现金流量、账龄、固定资产、租赁负债、辅助账等搭建合并报表的业务数据集成到星瀚系统。
在项目中,客户的科目余额数据量达亿级,每一期数据量有将近1500万+。最初,采取常规的集成步骤【数据集成方案】→【启动方案】→【服务流程】同步数据,但发现数据同步效率较低,采集单个组织、期间数据时,当中间表数据量有百万级时,整个数据集成流程要么执行3小时以上,也可能无法执行完,要么直接卡死造成系统整体性能下降,无法满足客户高性能指标要求。
客户希望在确保数据集成准确、可靠的同时保障高性能:单家组织数据同步5分钟内完成,全组织数据同步4小时内。
2 解决方案
首先,需明确的是,数据集成的整体流程中,中间表和多维中间表数据,在重复同步时,是通过先删再同步的方式。因为项目上,数据经过清洗、导入、转换、导出等步骤最终展示在报表中,并不会作为其他业务单据关联使用。
2.1 方案整体思路
针对客户的高性能指标要求,下面,以业务数据中的科目余额表、多维中间表为例,具体介绍方案:
数据表层面优化
a. 创建有效索引,遵循索引规则;
b. 中间表进行水平分表。
集成云层面优化
a. 集成云采取KDB连接方式;
b. 整体上使用服务流程脚本,不使用数据集成方案、启动方案;
c. 脚本批量保存数据;
d. 脚本处理关联字段;
e. 值转换规则,缓存转换结果。
SQL层面优化
a. 复杂SQL拆解查询,降低笛卡尔积;
b. 源表数据量同步中间表优化,合并同类项。
代码层面优化
a. 苍穹二开微服务处理复杂业务逻辑;
b. 逆向思维,以目标为导向,生成目标需要的多维中间表数据。
综上,优化的整体思路可概括为:走索引、优化数据量、降低笛卡尔积、减少数据库访问次数。
2.2 方案实现步骤
方案的整体流程如下图所示:
图-方案整体流程
其中,集成云中同步数据主要是在服务流程中通过脚本节点处理,基本上不使用启动方案,因为启动方案关联的数据集成方案在同步数据时,是通过候选键查询表中数据是否存在,来判断是新增还是修改。当表中数据量较大时,会造成数据同步效率低下,所以方案中都是通过服务流程脚本节点实现的逻辑。
因此,准备阶段只需要创建数据源就行,不需要创建集成对象、数据集成方案、启动方案。
准备阶段
首先,在【集成管理】→【连接管理】→【连接配置】中,新增KDB、EAS系统、Oracle、SAP-HANA的连接类型,如下图:
图-新增连接类型
在【集成管理】→【连接管理】→【数据源管理】中,新增连接类型对应的数据源,如下图:
图-新增数据源
具体性能优化从以下层面展开:
数据表层面
1、创建有效索引
根据业务剖析,可得出业务数据是按照组织+期间从多样化的异构系统中同步数据的规律,因此,创建有效组合索引(组织+期间+来源系统),遵循如下索引规则:
a. 字段值重复率越小越靠前定义,例如:值重复率:组织<期间<来源系统,所以组合索引定义是顺序是(`forgunitno`,`fperiodno`,`fssno`);
b. 一张表上不要冗余创建过多的索引(建议不要超过5个);
c. 组合索引字段不超过5个;
d. 索引字段尽可能创建在字段长度较小的列上。
图-创建有效索引
2、水平分表
随着业务增长,科目余额表和多维中间表的数据量将会越来越大,必然成为系统性能和业务效率的瓶颈。对此,苍穹提供了水平分表功能,通过表单的水平切分存储,减少单个物理表的数据量,提高增删改查性能,有效满足在线大数据量存储性能的要求。路径:【系统服务云】→【配置工具】→【水平分表】→【分片配置】。
配置流程主要设置分片属性,分片策略默认映射策略即可满足开发使用,分片策略参数可全部默认,分表配置完成后,原表数据迁移至新表,原表名t_isv_xxx在数据库中将不存在,生成的新表t_isv_xxx$n(n=1,2,3....n)和映射表t_isv_xxx$map,t_isv_xxx$map是分片属性与n的映射关系表;t_isv_xxx$n表是否存在取决于分片属性对应的数据是否存在而动态创建。
图-水平分表配置
集成云层面
1、KDB连接方式
KDB的连接方式详情见:准备阶段。
在方案中使用KDB连接的数据源,有以下几点优势:
a. 原理上是通过SQL对数据直接进行数据库级别操作,所以性能上会比元数据更快;
b. 支持水平分表操作,即当单据做了水平分表配置后,原表名不存在时,通过KDB连接数据源操作表时,依然可以通过操作原表名实现,苍穹框架将提供底层支持。
2、使用服务流程,不使用启动方案
对于科目余额表全链路采集流程,数据同步不通过【启动方案】&【数据集成方案】实现,原因有两方面:
一方面是因为【启动方案】&【数据集成方案】目标对象与源对象的映射关系需要创建候选键,候选键相当于是表数据的唯一索引,在数据同步至表中时,会先根据候选键字段查询表中是否存在数据,然后执行update或insert语句;
另一方面是当【数据集成方案】中使用脚本转换处理关联字段时,查询SQL,将极大提高数据库的访问次数,增加数据库压力;当业务数据量有一定体量的时候,数据同步会很慢。
最终采取【集成管理】→【服务流程】方案,如下图所示,没有调用数据集成节点,实现数据全链路同步。
图-数据同步服务流程
3、脚本批量保存数据
分批处理保存数据,按组织分批处理调用函数:
图-分批处理保存数据
函数dealData的逻辑,如下述流程图:
图-函数dealData处理逻辑
4、脚本处理关联字段
场景描述:数据同步时,为了将外部系统关联表id字段转为编码和名称,通常是将表join关联查询;当join的关联表数据体量比较小时,这样固然可以实现需求,但当join的关联表数据量很大时,会造成查询效率非常慢。
解决方案:结合优化点SQL层面·复杂SQL拆解查询,将复杂join关联查询拆解为多个查询SQL,构建数据结构以关联id为项的List(数组)和为key的Map,使用流程变量接收。
List对象作为业务数据查询SQL的关联条件(where 关联id字段 in 条件);
Map对象作为查询结果遍历时,关联id转编码和名称。
图-脚本处理关联字段
常规map使用:通过map.propname使用,详见下图。
图-常规map使用
升级map使用:通过map[propname]使用,方案中均采取该方式,详见下图。
图-升级map使用
5、值转换规则
场景描述:从外部系统同步的业务数据中包括组织编码或产权码信息,需要转换成星瀚的关联id字段。
解决方案:可通过值转换规则,加缓存转换结果,减少数据库访问次数。
图-值转换规则配置
SQL层面
1、复杂SQL拆解查询
场景描述:根据业务需求同步EAS科目余额表数据,关联了会计科目表的id,业务上需要查询到科目编码和科目名称信息,但经分析发现会计科目表有1500万+体量的数据量;join查询时会导致笛卡尔积很大,查询速率非常慢。
解决方案:通过拆解复杂SQL,预置查询表数据,转换成需要的数据结构对象,使用流程变量接收,在后续节点中使用,减少复杂SQL的关联表语句,降低笛卡尔积,如下图:
图-复杂SQL拆解查询
2、源表数据量同步优化
在满足业务需求的前提下,优化同步源数据量,通过SQL按照业务进行字段的 group by语句合并同类项,sum金额字段。以某组织某期间为例,优化前后效果如下:
未使用group by:从外部系统同步科目余额表至中间表数据量有9万+,从中间表同步至多维中间表(经列转行、父级科目汇总、根据审计类型分别复制一套数据、导入、转换、导出...)数据量翻了20倍以上,达到200万+;
使用group by:同步至中间表数据量5000+,同步至多维中间表数据量40万+。
图-源表数据量同步优化
代码层面
1、苍穹二开微服务
牵扯复杂业务逻辑,很难通过脚本实现时,可调用二开微服务,转向java代码处理。在【集成管理】→【集成元数据】→【API登记】→【苍穹微服务登记】中,注册微服务:
图-注册微服务
创建接口微服务类:
package kd.cmhk.bcm.service; import java.util.List; public interface SyncDataToBcmService { void toBcmTableData(String syncSys, String syncPeriod, List<String> syncOrgArr); void toBcmTableData2V(String syncSys, String syncPeriod, List<String> syncOrgArr,Long schemeId); }
实现接口微服务类:
public class SyncDataToBcmServiceImpl implements SyncDataToBcmService { private static final Log LOGGER = LogFactory.getLog(SyncDataToBcmServiceImpl.class); @Override public void toBcmTableData(String syncSys, String syncPeriod, List<String> syncOrgArr) { //判断入参非空 ...省略 //同步科目余额数据...省略 //同步现金流量数据...省略 //同步账龄数据...省略 } @Override public void toBcmTableData2V(String syncSys, String syncPeriod, List<String> syncOrgArr, Long schemeId) { //判断入参非空...省略 //同步科目余额数据...省略 //同步现金流量数据...省略 //同步账龄数据...省略 } }
创建微服务工厂类:
public class ServiceFactory { private static Map<String, String> serviceMap = new HashMap<>(); static { serviceMap.put("AccountMappingService", "kd.cmhk.bcm.service.impl.AccountMappingServiceImpl"); serviceMap.put("SyncDataToBcmService", "kd.cmhk.bcm.service.impl.SyncDataToBcmServiceImpl"); } public static Object getService(String serviceName) { String className = serviceMap.get(serviceName); if (className == null) { throw new RuntimeException(String.format("%s对应的服务实现未找到", serviceName)); } return TypesContainer.getOrRegisterSingletonInstance(className); } }
逆向思维
场景描述:从中间表同步数据至多维中间表(经列转行、父级科目汇总、根据审计类型分别复制一套数据、导入、转换、导出...),根据组织、科目、期间、维度、变动类型、审计类型等信息唯一确定一条数据,生成了大量的数据,其中包括很多合并报表不使用的数据。
图-中间表列表
解决方案:采用逆向思维,以目标为导向,查询合并报表配置的科目与变动类型的成员映射关系,来控制生成多维中间表的数据情况。经验证,同步至多维中间表数据量由40万+缩减到6万+。
代码逻辑:查询科目与变动类型的成员映射关系,并构建全局变量accountCTSet,以科目为key,变动类型为集合的对象。
private Map<String, Set<String>> accountCTSet; private Map<String, Set<String>> resultAccCTSet; public SyncDataToBcmServiceImpl() { accountCTSet = new HashMap<>(); resultAccCTSet = new HashMap<>(); } private void getAccountCTSet(Long schemeId) { accountCTSet = new HashMap<>(); resultAccCTSet = new HashMap<>(); //拼接sql查询科目与金额类型的映射关系 String selectSql = " select distinct acct.fnumber account,changetype.fnumber changetype from ( select distinct m.fid,src.fsrcmembnumber fnumber from t_bcm_isgroupmap m join t_bcm_isscheme s on s.fid = m.fschemeid join t_bcm_isgroupsrcmapentry src on m.fid = src.fid where s.fid = ? and src.fdimensionid in (select fid from t_bcm_isbaseentlist l where l.FSCHEMEID = ? and l.fnumber='faccount' ) and m.fdimmapid in (select distinct map.fid from t_bcm_isdimmap map join t_bcm_isdimmapsrcentry srcmap on map.fid = srcmap.fid join t_bcm_isscheme s on s.fid = map.fschemeid join t_bcm_isbaseentlist l on l.fid = srcmap.fdimensionid where s.fid = ? and l.fnumber in('fchangetype','faccount') group by map.fid having count(1) >=2)) as acct join ( select distinct m.fid,src.fsrcmembnumber fnumber from t_bcm_isgroupmap m join t_bcm_isscheme s on s.fid = m.fschemeid join t_bcm_isgroupsrcmapentry src on m.fid = src.fid where s.fid = ? and src.fdimensionid in (select fid from t_bcm_isbaseentlist l where l.FSCHEMEID = ? and l.fnumber='fchangetype' ) m.fdimmapid in (select distinct map.fid from t_bcm_isdimmap map join t_bcm_isdimmapsrcentry srcmap on map.fid = srcmap.fid join t_bcm_isscheme s on s.fid = map.fschemeid join t_bcm_isbaseentlist l on l.fid = srcmap.fdimensionid where s.fid = ? and l.fnumber in('fchangetype','faccount') group by map.fid having count(1) >=2) ) as changetype on acct.fid = changetype.fid order by acct.fnumber "; DataSet dataSet = DB.queryDataSet("algoKey_Scheme", DBRoute.of("bcm"), selectSql, new Object[]{schemeId, schemeId, schemeId, schemeId, schemeId, schemeId}); while (dataSet.hasNext()) { Row row = dataSet.next(); String account = row.getString("account"); String changetype = row.getString("changetype"); if (StringUtils.isEmpty(account)) { continue; } if ("*".equals(account)) { continue; } if (account.lastIndexOf("%") != -1) { continue; } if (account.matches("[a-zA-Z]+")) { continue; } int lastIndexOf = account.lastIndexOf("_"); if (lastIndexOf != -1) { account = account.substring(lastIndexOf + 1); } else { lastIndexOf = account.lastIndexOf("%"); if (lastIndexOf != -1) { account = account.substring(0, lastIndexOf); } } Set<String> changetypes = accountCTSet.get(account); if (changetypes == null) { changetypes = new HashSet<>(); changetypes.add(changetype); accountCTSet.put(account, changetypes); } else { changetypes.add(changetype); accountCTSet.put(account, changetypes); } } dataSet.close(); }
调用逻辑:查询中间表数据,遍历结果集,根据科目编码查询变动类型集合,科目编码(例如:三级科目1001.02.03)一级一级截取向上寻找,将匹配的科目和变动类型结果存进变量resultAccCTSet中,以供后续科目优先从变量中匹配,提高匹配性能。
@Override public void toBcmTableData2V(String syncSys, String syncPeriod, List<String> syncOrgArr, Long schemeId) { //判断入参非空resultAccCTSet judgeParamNonNull(syncSys, syncPeriod, syncOrgArr); //同步科目余额数据 syncAsistBalance2V(syncSys, syncPeriod, syncOrgArr, schemeId); ... } private void syncAsistBalance2V(String syncSys, String syncPeriod, List<String> syncOrgArr, Long schemeId) { ...构建查询中间表:科目余额表数据的过滤条件 ...省略 for (String orgNo : syncOrgArr) { //科目余额表查询过滤条件 QFilter filter = new QFilter("periodno", QCP.equals, syncPeriod); filter.and(new QFilter("orgunitno", QCP.equals, orgNo)); filter.and(new QFilter("ssno", QCP.equals, syncSys)); DataSet dataSet = ORM.create().queryDataSet("algoKey_Balance", "8b4t_min_assistbalance", "id,ssno,orgunitno,periodno,accountno,currencyno,orginalcno,beginbalancefor," + "debitfor,creditfor,yearpnlfor,endbalancefor,beginbalancelocal,debitlocal,creditlocal,yearpnllocal,endbalancelocal,yeardebitfor,yeardebitlocal,yearcreditfor,yearcreditlocal," + "assistgrpno1,assistgrpno2,assistgrpno3,assistgrpno4,assistgrpno5,assistgrpno6,assistgrpno7,assistgrpname7,assistgrpno8,assistgrpno9,assistgrpnoa,assistgrpnob,assistgrpnoc,assistgrpnod," + "assistgrpnoe,assistgrpnof,assistgrpnog,assistgrpnoh,assistgrpnoi,assistgrpnoj,assistgrpnok,billtype,accoccurfor,accoccurlocal,originaccountno", new QFilter[]{filter}, "id asc"); if (dataSet.isEmpty()) { continue; } DataSet copy = dataSet.copy(); try { Map<String, BigDecimal> results = new HashMap<>(); StringBuilder item = new StringBuilder(100); while (copy.hasNext()) { ...省略 Set<String> amtTypeSet; if ("3".equals(billType)) { amtTypeSet = new HashSet<>(); amtTypeSet.add("end_oc"); amtTypeSet.add("end_bk"); } else { amtTypeSet = resultAccCTSet.get(accountno); if (amtTypeSet == null || amtTypeSet.isEmpty()) { getAccountCT(accountno, accountno); amtTypeSet = resultAccCTSet.get(accountno); } } ...省略 } // 解析map,并保存数据 resolveMapSave(results); } catch (KDException e) { e.printStackTrace(); } finally { dataSet.close(); } } } private void getAccountCT(String origAccount, String accountno) { int indexOf = accountno.lastIndexOf("."); if (indexOf != -1) { getAccountCT(origAccount, accountno.substring(0, accountno.lastIndexOf("."))); } //根据科目查询对应的 Set<String> accountCT = accountCTSet.get(accountno.replace(".", "")); if (accountCT != null && !accountCT.isEmpty()) { Set<String> resultAccCT = resultAccCTSet.get(origAccount); if (resultAccCT == null || resultAccCT.isEmpty()) { resultAccCT = new HashSet<>(); } resultAccCT.addAll(accountCT); resultAccCTSet.put(origAccount, resultAccCT); } }
2.3 方案实现效果
根据上述步骤,数据集成的速率、系统的稳定性大大提升,性能优化效果远超客户期望:
图-方案实现效果
3 方案的可复用价值
行业的普适程度
很多行业由于业务体系越来越庞大,使用的业务系统也越来越多,在业务系统的数据量成一定体量时,客户不仅关注如何有效管理和整合这些业务系统,也关注数据集成的高性能指标。因此,该方案同样适用于有相应高性能数据集成需求的行业。
对客户的价值
系统性能稳定,报表数据快速响应,业务顺利进行;
统一平台入口,业务流程标准化;
技术标准和规范,对业务扩展友好。
4 注意事项
1、同步数据时,使用脚本对数据进行数据库级别的处理,性能更好、同步速率更快;
2、使用KDB连接类型的数据源,支持分片分表的数据表操作,不支持group by嵌数据库函数;
*(解决思路):可先在子查询中优先处理查询函数的结果,在外层进行group by操作。
3、脚本中查询的数据量不宜过大,过大可能会造成OOM,应合理循环分批查询;
4、集成云脚本中的function函数体,不能直接使用流程变量,需要作为函数的入参才可使用。
相关资料
实践案例 | 异构系统间千万级水平分表数据高效同步:
https://vip.kingdee.com/link/s/ld3T0
微服务开发与注册指导文档:
https://vip.kingdee.com/link/s/ld3Ti
#往期推荐#
更多精彩内容,“码”上了解!↓
实践案例 | 高性能亿级数据集成方案,又快又准确!
本文2024-09-23 00:50:21发表“云苍穹知识”栏目。
本文链接:https://wenku.my7c.com/article/kingdee-cangqiong-142064.html