如何通过KDTX实现分布式事务(最终一致性)
1 业务背景
分布式事务就是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。通俗的说就是某个操作中的事务跨了多个库,或者是跨了多个服务节点而产生了分布式事务,所以一般来讲就是分库分表或者微服务化导致了分布式事务的产生。而本地事务的所有操作都是在某一个库中进行。
苍穹平台提供的分布式事务解决方案,简称KDTX((Kingdee Distributed Transaction)。KDTX目前的方案是两种,自研TCC框架和最终一致。
最终一致模式适用于资源不需要回滚,只需数据最终一致的情景。如,清理数据失败,记录日志表等等,通过重试让最后可以完成,实现弱一致性。
最终一致模式和TCC模式的不同之处在于,它不能回滚,如果某个阶段出现问题,只能通过重试的方式让事务一直往下走,直到成功为止。
本案例主要是介绍下KDTX的最终一致实现方案。
2 案例实现
本次实现的需求是,在二开的单据上点击保存时,同时保存财务库的单据。
因二开的单据是存在二开库中,与财务库的单据分属不同数据库,如采用本地事务保存的话,系统会报错提示:一个事务内不能同时写入多个数据库,所以需要改造成通过分布式事务来实现。
1 构建测试数据
分别在财务云的应付应用和二开云的应用下创建两张测试单据。
2 在二开的单据的保存操作上绑定操作插件:
3 编写操作插件
package bidt.fys.test.plugin.op; import kd.bos.db.DBRoute; import kd.bos.entity.plugin.AbstractOperationServicePlugIn; import kd.bos.entity.plugin.args.BeginOperationTransactionArgs; import kd.bos.entity.plugin.args.EndOperationTransactionArgs; import kd.bos.kdtx.sdk.session.ec.ECGlobalSession; public class TestSaveFiBillOpPlg extends AbstractOperationServicePlugIn { @Override public void beginOperationTransaction(BeginOperationTransactionArgs e) { ECGlobalSession.begin("TestSave", DBRoute.of("secd")); } @Override public void endOperationTransaction(EndOperationTransactionArgs e) { ecSavebill(); } public void ecSavebill(){ ECGlobalSession.register( "fi", "apext", "TestSave", null, "ap"); ECGlobalSession.txCommit(); } }
4 编写微服务插件,serviceFactory的路径应该按照上面插件ECGlobalSession.register的注册第一个云参数和第二个应用参数来构建,遵循这个格式(kd.云参数.应用参数.servicehelper.ServiceFactory)。
package kd.fi.apext.servicehelper; import kd.bos.dataentity.TypesContainer; import kd.bos.dataentity.resource.ResManager; import kd.bos.instance.Instance; import kd.bos.logging.Log; import kd.bos.logging.LogFactory; import java.util.HashMap; import java.util.Map; public class ServiceFactory { private static Map<String, String> serviceMap = new HashMap(); private static Log log = LogFactory.getLog("ServiceFactory"); public ServiceFactory() { } public static void putService(String serviceName, String serviceImpl) { serviceMap.put(serviceName, serviceImpl); } public static Object getService(String serviceName) { String className = (String)serviceMap.get(serviceName); if (className == null) { String appName = Instance.getAppName(); throw new RuntimeException(String.format(ResManager.loadKDString("%1$s对应的服务实现在%2$s未找到", "ServiceFactory_0", "bos-core-api", new Object[0]), serviceName, appName)); } else { return TypesContainer.getOrRegisterSingletonInstance(className); } } static { serviceMap.put("TestSave", "bidt.fys.test.plugin.service.TestSave"); } }
5 编写实现方法插件
这里实现的是最终一致的方案,需要继承kd.bos.kdtx.sdk.ext.provider.BaseECService,实现doExecute方法,
也可以继承kd.bos.kdtx.sdk.api.EventualConsistencyService,实现execute方法。
package bidt.fys.test.plugin.service; import kd.bos.dataentity.entity.DynamicObject; import kd.bos.kdtx.common.invoke.DtxResponse; import kd.bos.kdtx.sdk.ext.provider.BaseECService; import kd.bos.orm.ORM; import kd.bos.servicehelper.BusinessDataServiceHelper; import kd.bos.servicehelper.operation.SaveServiceHelper; public class TestSave extends BaseECService { public void savebill(){ DynamicObject newObject = BusinessDataServiceHelper.newDynamicObject("bidt_fi_billtest"); newObject.set("billno", "test"); newObject.set("billstatus", "A"); ORM orm = ORM.create(); SaveServiceHelper.save(new DynamicObject[]{newObject}); } @Override protected DtxResponse doExecute(Object o, Object o1) throws Exception { savebill(); return null; } }
6 业务场景配置
场景编码需要和ECGlobalSession.register注册的第三个参数保持一致
7 测试验证,将以上步骤配置好就可以新增一条数据验证下
同时也可以分布式事务查询里面看到成功的日志
3 注意事项
在本案例中注册的代码中,ECGlobalSession.register( "fi", "apext", "TestSave", null, "ap");
注册的是以apext应用注册的,私有云部署的时候,需要在容器节点添加apext到appid上,如下图所示位置:
否则会因为appId没有配置路由到其他节点,导致发生找不到类的报错。
如何通过KDTX实现分布式事务(最终一致性)
本文2024-09-23 00:17:42发表“云苍穹知识”栏目。
本文链接:https://wenku.my7c.com/article/kingdee-cangqiong-138551.html