实践案例 | 高性能亿级数据集成方案,又快又准确!

栏目:云苍穹知识作者:金蝶来源:金蝶云社区发布:2024-09-23浏览:1

实践案例 | 高性能亿级数据集成方案,又快又准确!

小编推荐


大企内部业务系统繁多,由于业务需求,常需要进行异构系统间的数据集成。当集成数据量庞大,达亿级时,采取常规的集成方案往往数据同步效率较低。那么,如何才能高效完成数据同步呢?


本期亿级数据集成性能优化方案告诉你答案,基于金蝶云·苍穹集成服务云,综合应用水平分表、微服务等能力,让数据集成准确、可靠的同时保障高性能


案例撰稿人:戴松。



1 业务背景


某集团A旗下有3000多家组织需要进行快报、季报业务,其报表业务数据支撑依托EAS、SAP、Oracle等多种来源系统。


客户要求统一在星瀚系统开展合并报表业务,统一合并规则,实现不同期次各级单体报表和各级合并报表线上化集中、规范管理。对此,需要把各个系统的科目余额、现金流量、账龄、固定资产、租赁负债、辅助账等搭建合并报表的业务数据集成到星瀚系统。


在项目中,客户的科目余额数据量达亿级,每一期数据量有将近1500万+。最初,采取常规的集成步骤【数据集成方案】→【启动方案】→【服务流程】同步数据,但发现数据同步效率较低,采集单个组织、期间数据时,当中间表数据量有百万级时,整个数据集成流程要么执行3小时以上,也可能无法执行完,要么直接卡死造成系统整体性能下降,无法满足客户高性能指标要求。


客户希望在确保数据集成准确、可靠的同时保障高性能:单家组织数据同步5分钟内完成,全组织数据同步4小时内


2 解决方案


首先,需明确的是,数据集成的整体流程中,中间表和多维中间表数据,在重复同步时,是通过先删再同步的方式。因为项目上,数据经过清洗、导入、转换、导出等步骤最终展示在报表中,并不会作为其他业务单据关联使用。


2.1 方案整体思路


针对客户的高性能指标要求,下面,以业务数据中的科目余额表、多维中间表为例,具体介绍方案:


  • 数据表层面优化


a. 创建有效索引,遵循索引规则;

b. 中间表进行水平分表


  • 集成云层面优化


a. 集成云采取KDB连接方式;

b. 整体上使用服务流程脚本,不使用数据集成方案、启动方案;

c. 脚本批量保存数据;

d. 脚本处理关联字段;

e. 值转换规则,缓存转换结果。


  • SQL层面优化


a. 复杂SQL拆解查询,降低笛卡尔积;

b. 源表数据量同步中间表优化,合并同类项。


  • 代码层面优化


a. 苍穹二开微服务处理复杂业务逻辑;

b. 逆向思维,以目标为导向,生成目标需要的多维中间表数据。

综上,优化的整体思路可概括为:走索引、优化数据量、降低笛卡尔积、减少数据库访问次数。


2.2 方案实现步骤


方案的整体流程如下图所示:


上传图片

图-方案整体流程


其中,集成云中同步数据主要是在服务流程中通过脚本节点处理,基本上不使用启动方案,因为启动方案关联的数据集成方案在同步数据时,是通过候选键查询表中数据是否存在,来判断是新增还是修改。当表中数据量较大时,会造成数据同步效率低下,所以方案中都是通过服务流程脚本节点实现的逻辑。


因此,准备阶段只需要创建数据源就行,不需要创建集成对象、数据集成方案、启动方案。


  • 准备阶段


首先,在【集成管理】→【连接管理】→【连接配置】中,新增KDB、EAS系统、Oracle、SAP-HANA的连接类型,如下图:


上传图片

图-新增连接类型


在【集成管理】→【连接管理】→【数据源管理】中,新增连接类型对应的数据源,如下图:


上传图片

图-新增数据源


具体性能优化从以下层面展开:


  • 数据表层面


1、创建有效索引


根据业务剖析,可得出业务数据是按照组织+期间从多样化的异构系统中同步数据的规律,因此,创建有效组合索引(组织+期间+来源系统),遵循如下索引规则:


a. 字段值重复率越小越靠前定义,例如:值重复率:组织<期间<来源系统,所以组合索引定义是顺序是(`forgunitno`,`fperiodno`,`fssno`);

b. 一张表上不要冗余创建过多的索引(建议不要超过5个);

c. 组合索引字段不超过5个;

d. 索引字段尽可能创建在字段长度较小的列上。


上传图片

图-创建有效索引


2、水平分表


随着业务增长,科目余额表和多维中间表的数据量将会越来越大,必然成为系统性能和业务效率的瓶颈。对此,苍穹提供了水平分表功能,通过表单的水平切分存储,减少单个物理表的数据量,提高增删改查性能,有效满足在线大数据量存储性能的要求。路径:【系统服务云】→【配置工具】→【水平分表】→【分片配置】。


配置流程主要设置分片属性,分片策略默认映射策略即可满足开发使用,分片策略参数可全部默认,分表配置完成后,原表数据迁移至新表,原表名t_isv_xxx在数据库中将不存在,生成的新表t_isv_xxx$n(n=1,2,3....n)和映射表t_isv_xxx$map,t_isv_xxx$map是分片属性与n的映射关系表;t_isv_xxx$n表是否存在取决于分片属性对应的数据是否存在而动态创建。


上传图片

上传图片

图-水平分表配置


  • 集成云层面


1、KDB连接方式


KDB的连接方式详情见:准备阶段。

在方案中使用KDB连接的数据源,有以下几点优势:

a. 原理上是通过SQL对数据直接进行数据库级别操作,所以性能上会比元数据更快;

b. 支持水平分表操作,即当单据做了水平分表配置后,原表名不存在时,通过KDB连接数据源操作表时,依然可以通过操作原表名实现,苍穹框架将提供底层支持


2、使用服务流程,不使用启动方案


对于科目余额表全链路采集流程,数据同步不通过【启动方案】&【数据集成方案】实现,原因有两方面:

一方面是因为【启动方案】&【数据集成方案】目标对象与源对象的映射关系需要创建候选键,候选键相当于是表数据的唯一索引,在数据同步至表中时,会先根据候选键字段查询表中是否存在数据,然后执行update或insert语句;

另一方面是当【数据集成方案】中使用脚本转换处理关联字段时,查询SQL,将极大提高数据库的访问次数,增加数据库压力;当业务数据量有一定体量的时候,数据同步会很慢。

最终采取【集成管理】→【服务流程】方案,如下图所示,没有调用数据集成节点,实现数据全链路同步。


上传图片

图-数据同步服务流程


3、脚本批量保存数据


分批处理保存数据,按组织分批处理调用函数:


上传图片

图-分批处理保存数据


函数dealData的逻辑,如下述流程图:


上传图片

图-函数dealData处理逻辑


4、脚本处理关联字段


场景描述:数据同步时,为了将外部系统关联表id字段转为编码和名称,通常是将表join关联查询;当join的关联表数据体量比较小时,这样固然可以实现需求,但当join的关联表数据量很大时,会造成查询效率非常慢。

解决方案:结合优化点SQL层面·复杂SQL拆解查询,将复杂join关联查询拆解为多个查询SQL,构建数据结构以关联id为项的List(数组)和为key的Map,使用流程变量接收。

List对象作为业务数据查询SQL的关联条件(where 关联id字段  in  条件);

Map对象作为查询结果遍历时,关联id转编码和名称。


上传图片

图-脚本处理关联字段


常规map使用:通过map.propname使用,详见下图。


上传图片

图-常规map使用


升级map使用:通过map[propname]使用,方案中均采取该方式,详见下图。


上传图片

图-升级map使用


5、值转换规则


场景描述:从外部系统同步的业务数据中包括组织编码或产权码信息,需要转换成星瀚的关联id字段。

解决方案:可通过值转换规则,加缓存转换结果,减少数据库访问次数。


上传图片

图-值转换规则配置


  • SQL层面


1、复杂SQL拆解查询


场景描述:根据业务需求同步EAS科目余额表数据,关联了会计科目表的id,业务上需要查询到科目编码科目名称信息,但经分析发现会计科目表有1500万+体量的数据量;join查询时会导致笛卡尔积很大,查询速率非常慢。

解决方案:通过拆解复杂SQL,预置查询表数据,转换成需要的数据结构对象,使用流程变量接收,在后续节点中使用,减少复杂SQL的关联表语句,降低笛卡尔积,如下图:


上传图片

图-复杂SQL拆解查询


2、源表数据量同步优化


在满足业务需求的前提下,优化同步源数据量,通过SQL按照业务进行字段的 group by语句合并同类项,sum金额字段。以某组织某期间为例,优化前后效果如下:

未使用group by:从外部系统同步科目余额表至中间表数据量有9万+,从中间表同步至多维中间表(经列转行、父级科目汇总、根据审计类型分别复制一套数据、导入、转换、导出...)数据量翻了20倍以上,达到200万+

使用group by:同步至中间表数据量5000+,同步至多维中间表数据量40万+


上传图片

图-源表数据量同步优化


  • 代码层面


1、苍穹二开微服务


牵扯复杂业务逻辑,很难通过脚本实现时,可调用二开微服务,转向java代码处理。在【集成管理】→【集成元数据】→【API登记】→【苍穹微服务登记】中,注册微服务:


上传图片

上传图片

图-注册微服务


创建接口微服务类:


package kd.cmhk.bcm.service;
import java.util.List;
public interface SyncDataToBcmService {
    void toBcmTableData(String syncSys, String syncPeriod, List<String> syncOrgArr);
    void toBcmTableData2V(String syncSys, String syncPeriod, List<String> syncOrgArr,Long schemeId);
}



实现接口微服务类:


public class SyncDataToBcmServiceImpl implements SyncDataToBcmService {
    private static final Log LOGGER = LogFactory.getLog(SyncDataToBcmServiceImpl.class);
    @Override
    public void toBcmTableData(String syncSys, String syncPeriod, List<String> syncOrgArr) {
        //判断入参非空 ...省略
        //同步科目余额数据...省略
        //同步现金流量数据...省略
        //同步账龄数据...省略
    }
    @Override
    public void toBcmTableData2V(String syncSys, String syncPeriod, List<String> syncOrgArr, Long schemeId) {
        //判断入参非空...省略
        //同步科目余额数据...省略
        //同步现金流量数据...省略
        //同步账龄数据...省略
    }
}


创建微服务工厂类:


public class ServiceFactory {
    private static Map<String, String> serviceMap = new HashMap<>();
    static {
        serviceMap.put("AccountMappingService", "kd.cmhk.bcm.service.impl.AccountMappingServiceImpl");
        serviceMap.put("SyncDataToBcmService", "kd.cmhk.bcm.service.impl.SyncDataToBcmServiceImpl");
    }
    public static Object getService(String serviceName) {
        String className = serviceMap.get(serviceName);
        if (className == null) {
            throw new RuntimeException(String.format("%s对应的服务实现未找到", serviceName));
        }
        return TypesContainer.getOrRegisterSingletonInstance(className);
    }
}


  • 逆向思维


场景描述:从中间表同步数据至多维中间表(经列转行、父级科目汇总、根据审计类型分别复制一套数据、导入、转换、导出...),根据组织、科目、期间、维度、变动类型、审计类型等信息唯一确定一条数据,生成了大量的数据,其中包括很多合并报表不使用的数据。


上传图片

图-中间表列表


解决方案:采用逆向思维,以目标为导向,查询合并报表配置的科目与变动类型的成员映射关系,来控制生成多维中间表的数据情况。经验证,同步至多维中间表数据量由40万+缩减到6万+


代码逻辑:查询科目与变动类型的成员映射关系,并构建全局变量accountCTSet,以科目为key,变动类型为集合的对象。


private Map<String, Set<String>> accountCTSet;
private Map<String, Set<String>> resultAccCTSet;
public SyncDataToBcmServiceImpl() {
        accountCTSet = new HashMap<>();
        resultAccCTSet = new HashMap<>();
}
 private void getAccountCTSet(Long schemeId) {
        accountCTSet = new HashMap<>();
        resultAccCTSet = new HashMap<>();
        //拼接sql查询科目与金额类型的映射关系
        String selectSql = " select distinct acct.fnumber account,changetype.fnumber changetype from ( select distinct m.fid,src.fsrcmembnumber fnumber from t_bcm_isgroupmap m join t_bcm_isscheme s on s.fid = m.fschemeid join t_bcm_isgroupsrcmapentry src on m.fid = src.fid where s.fid = ? and src.fdimensionid in (select fid from t_bcm_isbaseentlist l where  l.FSCHEMEID = ? and  l.fnumber='faccount' ) and m.fdimmapid in (select distinct map.fid from t_bcm_isdimmap map join t_bcm_isdimmapsrcentry srcmap on map.fid = srcmap.fid join t_bcm_isscheme s on s.fid = map.fschemeid join t_bcm_isbaseentlist l on l.fid = srcmap.fdimensionid where s.fid = ? and l.fnumber in('fchangetype','faccount')  group by map.fid having count(1) >=2)) as acct join ( select distinct m.fid,src.fsrcmembnumber fnumber from t_bcm_isgroupmap m join t_bcm_isscheme s on s.fid = m.fschemeid join t_bcm_isgroupsrcmapentry src on m.fid = src.fid where s.fid = ?  and src.fdimensionid in (select fid from t_bcm_isbaseentlist l where  l.FSCHEMEID = ? and  l.fnumber='fchangetype' ) m.fdimmapid in (select distinct map.fid from t_bcm_isdimmap map join t_bcm_isdimmapsrcentry srcmap on map.fid = srcmap.fid join t_bcm_isscheme s on s.fid = map.fschemeid join t_bcm_isbaseentlist l on l.fid = srcmap.fdimensionid where s.fid = ? and l.fnumber in('fchangetype','faccount')  group by map.fid having count(1) >=2) ) as changetype on acct.fid = changetype.fid order by acct.fnumber ";
        DataSet dataSet = DB.queryDataSet("algoKey_Scheme", DBRoute.of("bcm"), selectSql, new Object[]{schemeId, schemeId, schemeId, schemeId, schemeId, schemeId});
        while (dataSet.hasNext()) {
            Row row = dataSet.next();
            String account = row.getString("account");
            String changetype = row.getString("changetype");
            if (StringUtils.isEmpty(account)) {
                continue;
            }
            if ("*".equals(account)) {
                continue;
            }
            if (account.lastIndexOf("%") != -1) {
                continue;
            }
            if (account.matches("[a-zA-Z]+")) {
                continue;
            }
            int lastIndexOf = account.lastIndexOf("_");
            if (lastIndexOf != -1) {
                account = account.substring(lastIndexOf + 1);
            } else {
                lastIndexOf = account.lastIndexOf("%");
                if (lastIndexOf != -1) {
                    account = account.substring(0, lastIndexOf);
                }
            }
            Set<String> changetypes = accountCTSet.get(account);
            if (changetypes == null) {
                changetypes = new HashSet<>();
                changetypes.add(changetype);
                accountCTSet.put(account, changetypes);
            } else {
                changetypes.add(changetype);
                accountCTSet.put(account, changetypes);
            }
        }
        dataSet.close();
 }



调用逻辑:查询中间表数据,遍历结果集,根据科目编码查询变动类型集合,科目编码(例如:三级科目1001.02.03)一级一级截取向上寻找,将匹配的科目和变动类型结果存进变量resultAccCTSet中,以供后续科目优先从变量中匹配,提高匹配性能。


@Override
public void toBcmTableData2V(String syncSys, String syncPeriod, List<String> syncOrgArr, Long schemeId) {
 //判断入参非空resultAccCTSet
 judgeParamNonNull(syncSys, syncPeriod, syncOrgArr);
 //同步科目余额数据
 syncAsistBalance2V(syncSys, syncPeriod, syncOrgArr, schemeId);
 ...
}
private void syncAsistBalance2V(String syncSys, String syncPeriod, List<String> syncOrgArr, Long schemeId) {
 ...构建查询中间表:科目余额表数据的过滤条件 ...省略
 for (String orgNo : syncOrgArr) {
  //科目余额表查询过滤条件
  QFilter filter = new QFilter("periodno", QCP.equals, syncPeriod);
  filter.and(new QFilter("orgunitno", QCP.equals, orgNo));
  filter.and(new QFilter("ssno", QCP.equals, syncSys));
  DataSet dataSet = ORM.create().queryDataSet("algoKey_Balance", "8b4t_min_assistbalance", "id,ssno,orgunitno,periodno,accountno,currencyno,orginalcno,beginbalancefor," +
    "debitfor,creditfor,yearpnlfor,endbalancefor,beginbalancelocal,debitlocal,creditlocal,yearpnllocal,endbalancelocal,yeardebitfor,yeardebitlocal,yearcreditfor,yearcreditlocal," +
    "assistgrpno1,assistgrpno2,assistgrpno3,assistgrpno4,assistgrpno5,assistgrpno6,assistgrpno7,assistgrpname7,assistgrpno8,assistgrpno9,assistgrpnoa,assistgrpnob,assistgrpnoc,assistgrpnod," +
    "assistgrpnoe,assistgrpnof,assistgrpnog,assistgrpnoh,assistgrpnoi,assistgrpnoj,assistgrpnok,billtype,accoccurfor,accoccurlocal,originaccountno", new QFilter[]{filter}, "id asc");
  if (dataSet.isEmpty()) {
   continue;
  }
  DataSet copy = dataSet.copy();
  try {
   Map<String, BigDecimal> results = new HashMap<>();
   StringBuilder item = new StringBuilder(100);
   while (copy.hasNext()) {
    ...省略
    Set<String> amtTypeSet;
    if ("3".equals(billType)) {
     amtTypeSet = new HashSet<>();
     amtTypeSet.add("end_oc");
     amtTypeSet.add("end_bk");
    } else {
     amtTypeSet = resultAccCTSet.get(accountno);
     if (amtTypeSet == null || amtTypeSet.isEmpty()) {
      getAccountCT(accountno, accountno);
      amtTypeSet = resultAccCTSet.get(accountno);
     }
    }
    ...省略
   }
   // 解析map,并保存数据
   resolveMapSave(results);
  } catch (KDException e) {
   e.printStackTrace();
  } finally {
   dataSet.close();
  }
 }
}
private void getAccountCT(String origAccount, String accountno) {
 int indexOf = accountno.lastIndexOf(".");
 if (indexOf != -1) {
  getAccountCT(origAccount, accountno.substring(0, accountno.lastIndexOf(".")));
 }
 //根据科目查询对应的
 Set<String> accountCT = accountCTSet.get(accountno.replace(".", ""));
 if (accountCT != null && !accountCT.isEmpty()) {
  Set<String> resultAccCT = resultAccCTSet.get(origAccount);
  if (resultAccCT == null || resultAccCT.isEmpty()) {
   resultAccCT = new HashSet<>();
  }
  resultAccCT.addAll(accountCT);
  resultAccCTSet.put(origAccount, resultAccCT);
 }
}




2.3 方案实现效果


根据上述步骤,数据集成的速率、系统的稳定性大大提升,性能优化效果远超客户期望


上传图片

图-方案实现效果


3 方案的可复用价值


行业的普适程度


很多行业由于业务体系越来越庞大,使用的业务系统也越来越多,在业务系统的数据量成一定体量时,客户不仅关注如何有效管理和整合这些业务系统,也关注数据集成的高性能指标。因此,该方案同样适用于有相应高性能数据集成需求的行业。


对客户的价值


  • 系统性能稳定,报表数据快速响应,业务顺利进行;

  • 统一平台入口,业务流程标准化;

  • 技术标准和规范,对业务扩展友好。


4 注意事项


1、同步数据时,使用脚本对数据进行数据库级别的处理,性能更好、同步速率更快;

2、使用KDB连接类型的数据源,支持分片分表的数据表操作,不支持group by嵌数据库函数;

*(解决思路):可先在子查询中优先处理查询函数的结果,在外层进行group by操作。


上传图片


3、脚本中查询的数据量不宜过大,过大可能会造成OOM,应合理循环分批查询;


上传图片


4、集成云脚本中的function函数体,不能直接使用流程变量,需要作为函数的入参才可使用。


相关资料


实践案例 | 异构系统间千万级水平分表数据高效同步:

https://vip.kingdee.com/link/s/ld3T0


微服务开发与注册指导文档:

https://vip.kingdee.com/link/s/ld3Ti




#往期推荐#


实践案例 | 高效集成SAP HANA数据库,集成结果可视化

实践案例 | 多业务系统间的高效数据互通方案

实践案例 | 多异构系统间高效系统集成方案

实践案例 | 打破信息孤岛,OA系统集成的方案落地与实践


更多精彩内容,“码”上了解!↓


实践案例 | 高性能亿级数据集成方案,又快又准确!

小编推荐大企内部业务系统繁多,由于业务需求,常需要进行异构系统间的数据集成。当集成数据量庞大,达亿级时,采取常规的集成方案往往数据...
点击下载文档
确认删除?
回到顶部
客服QQ
  • 客服QQ点击这里给我发消息