常见问题.二开案例.实现操作乐观锁

栏目:云星空知识作者:金蝶来源:金蝶云社区发布:2024-09-16浏览:1

常见问题.二开案例.实现操作乐观锁

【场景】针对同一个单据,同一个时间点正在执行两个操作,容易导致的数据异常问题。 场景1:工作流任务处理界面连点,导致审核和保存并发,保存后执行成功,导致的反写快照覆盖问题 场景2:第三方系统,往星空同步时没有等上一个请求完成,传了保存和审核,由于网络乱序的原因导致审核先到达服务端执行 场景3:api审核接口没有网控,api 和pc用户同时处理一个单据 场景4:执行计划和 pc用户同时处理一个单据 【数据案例】 ![image.webp](/download/0100400bff3304594ccb85480b37bb0e9652.webp) 【本地模拟操作在事务提交前并发】 ```csharp using Kingdee.BOS.Core.DynamicForm.PlugIn; using Kingdee.BOS.Core.DynamicForm.PlugIn.Args; namespace DynamicFormPlugIn.Operation { public static class ConcurrentHandler { /* * 模拟并发的开关,可以不关注 */ public static System.Threading.AutoResetEvent ConcurrentBlock = new System.Threading.AutoResetEvent(false); } public class TestOpPlugIn_ConcurrentWaitOne : AbstractOperationServicePlugIn { /// <summary> /// 在事务内结束事件 /// </summary> /// <param name="e"></param> public override void EndOperationTransaction(EndOperationTransactionArgs e) { //等待其他请求放行,这里是删除操作,目的是让反审核操作完成校验 ConcurrentHandler.ConcurrentBlock.WaitOne(); } } public class TestOpPlugIn_ConcurrentSet : AbstractOperationServicePlugIn { /// <summary> /// 在事务内开始事件,已经执行完单据的校验 /// </summary> /// <param name="e"></param> public override void BeginOperationTransaction(BeginOperationTransactionArgs e) { //放行并发操作,让其提交事务,这里是反审核操作 ConcurrentHandler.ConcurrentBlock.Set(); } } } ``` 【方案】 通过事务内更新做乐观锁控制; [EF/EFCore 中RowVersion与ConcurrencyToken的比较](https://www.cnblogs.com/qianxingmu/p/13376164.html) ![image.webp](/download/0100da124b8eb6ad49f69411be73095f3f0c.webp) ```csharp using Kingdee.BOS; using Kingdee.BOS.App.Data; using Kingdee.BOS.Core.DynamicForm.PlugIn; using Kingdee.BOS.Core.DynamicForm.PlugIn.Args; using Kingdee.BOS.Core.Metadata.FormElement; using Kingdee.BOS.Util; using System; using System.Collections.Generic; using System.Linq; namespace DynamicFormPlugIn.Operation { [Kingdee.BOS.Util.HotUpdate] [System.ComponentModel.Description("CAS操作校验——避免并发操作导致的数据异常问题")] public class CasValidate_OpServicePlugIn : AbstractOperationServicePlugIn { /* * 单据操作CAS校验,控制同一个时间节点仅有一个请求更新该单据 * 说明:所有互斥操作都应该一起挂设,如保存、提交、审核、反审核、删除 * * question 0: * 为什么不用网控控制? * answer 0: * 很多的并发场景根本不会开启网控,如单据详情界面操作不校验网控、api、界面连点、二开插件或者执行计划等 * * question 1: * 使用该方案能完全杜绝数据异常问题? * answer 1: * 不能,因为版本号不是在传入的数据包中,而是独立的字段,仅能控制并发; * 假定两个用户同时获取单据, 用户1先修改A字段保存, 用户2在用户1保存后用旧的数据包保存还是会有问题(还需要有脏标记) */ private CasValidate casValidate; private Guid guid = Guid.NewGuid(); public override void OnPrepareOperationServiceOption(OnPrepareOperationServiceEventArgs e) { var pkType = this.BusinessInfo.GetForm().PkFieldType; casValidate = new CasValidate(Context, this.BusinessInfo.GetForm().Id, pkType); } /// <summary> /// 事务前事件,读取数据关联的版本 /// </summary> /// <param name="e"></param> public override void BeforeExecuteOperationTransaction(BeforeExecuteOperationTransaction e) { var pkIds = e.SelectedRows.Select(x => x.DataEntity[0]).ToList(); casValidate.PrepareVersion(pkIds); } public override void BeginOperationTransaction(BeginOperationTransactionArgs e) { var pkIds = e.DataEntitys.Select(x => x[0]).ToList(); WriteOpLog(this.Context, this.BusinessInfo.GetForm().Id, pkIds, this.FormOperation.OperationName + ".BeginOperationTransaction." + guid.ToString().SubStr(0, 4)); } /// <summary> /// /// </summary> /// <param name="e"></param> public override void EndOperationTransaction(EndOperationTransactionArgs e) { var pkIds = e.DataEntitys.Select(x => x[0]).ToList(); WriteOpLog(this.Context, this.BusinessInfo.GetForm().Id, pkIds, this.FormOperation.OperationName + ".EndOperationTransaction." + guid.ToString().SubStr(0, 4)); casValidate.ValidateVersion(pkIds); } /// <summary> /// /// </summary> /// <param name="ctx"></param> /// <param name="formId"></param> /// <param name="pkIds"></param> public static void WriteOpLog(Context ctx, string formId, List<object> pkIds, string desc) { DateTime dateTime = DateTime.Now; Kingdee.BOS.Core.Log.LogObject logObj = new Kingdee.BOS.Core.Log.LogObject() { SubSystemId = "CasValidate", EncodeId = formId, Description = string.Format("desc:{0}, billIds:{1}", desc, string.Join(",", pkIds)) }; Kingdee.BOS.ServiceHelper.LogServiceHelper.WriteLog(ctx, logObj); } } public class CasValidate { private readonly Context Ctx; private readonly string FormId; private readonly Dictionary<object, int> VersionMap; private readonly EnumPkFieldType PkFieldType; private readonly string CasTableName; public CasValidate(Context ctx, string formId, EnumPkFieldType pkType) { Ctx = ctx; FormId = formId; PkFieldType = pkType; VersionMap = new Dictionary<object, int>(); CasTableName = string.Format("CasValidate_{0}", formId); Init(); } /// <summary> /// 检查当前业务对象是否已创建CAS校验表,没有的话补充创建一个 /// </summary> private void Init() { bool isExists = DBUtils.IsExistTable(Ctx, CasTableName); if (isExists) return; string pkSql = string.Empty; switch (PkFieldType) { case EnumPkFieldType.STRING: pkSql = "VARCHAR(36) DEFAULT ' '"; break; case EnumPkFieldType.LONG: pkSql = "BIGINT DEFAULT 0"; break; case EnumPkFieldType.INT: pkSql = "INT DEFAULT 0"; break; } string createTableSql = string.Format("CREATE TABLE {0} (FID {1} NOT NULL,FVERSION INT DEFAULT 0 NOT NULL);", CasTableName, pkSql); string createPkSql = string.Format("EXEC P_ALTERPK 'PK_Cas_{0}', '{1}', 'FID', '1'", FormId, CasTableName); List<string> sqlList = new List<string>() { createTableSql, createPkSql }; DBUtils.ExecuteBatch(Ctx, sqlList, 1); } /// <summary> /// 获取正在操作的单据的CAS版本号 /// </summary> /// <param name="pkIds"></param> public void PrepareVersion(List<object> pkIds) { if (pkIds == null || pkIds.Count == 0) return; pkIds = pkIds.Distinct().ToList(); //<0>查询已有的 PrepareVersionInner(pkIds); //<1>为没有的记录创建默认版本 CreateNoExistsDefaultVersion(pkIds); } private KDDbType GetSingleDbType() { KDDbType kddbType; switch (PkFieldType) { case EnumPkFieldType.STRING: kddbType = KDDbType.AnsiString; break; case EnumPkFieldType.LONG: kddbType = KDDbType.Int64; break; case EnumPkFieldType.INT: kddbType = KDDbType.Int32; break; } return KDDbType.Int32; } /// <summary> /// 获取版本号 /// </summary> /// <param name="pkIds"></param> private void PrepareVersionInner(List<object> pkIds) { string filter = string.Empty; List<SqlParam> sqlParams = new List<SqlParam>(); if (pkIds.Count == 1) { filter = " FID = @FID"; sqlParams.Add(new SqlParam("@FID", GetSingleDbType(), pkIds[0])); } else if (pkIds.Count <= 50) { string inBillIds = string.Empty; switch (PkFieldType) { case EnumPkFieldType.STRING: inBillIds = string.Join(",", pkIds.Select(x => string.Format("'{0}'", x))); break; case EnumPkFieldType.LONG: case EnumPkFieldType.INT: inBillIds = string.Join(",", pkIds); break; } filter = string.Format(" FID IN ({0})", inBillIds); } else { KDDbType kddbType = KDDbType.udt_inttable; switch (PkFieldType) { case EnumPkFieldType.STRING: kddbType = KDDbType.udt_varchartable; break; case EnumPkFieldType.LONG: case EnumPkFieldType.INT: kddbType = KDDbType.udt_inttable; break; } string cardSql = StringUtils.GetSqlWithCardinality(pkIds.Count, "@FID", kddbType == KDDbType.udt_inttable ? 1 : 2, false); filter = string.Format(" EXISTS ( {0} WHERE b.FID = {1} ) ", cardSql, "FID"); sqlParams.Add(new SqlParam("@FID", kddbType, pkIds)); } string sql = string.Format("SELECT FID, FVERSION FROM {0} WHERE {1}", CasTableName, filter); using (var dr = DBUtils.ExecuteReader(Ctx, sql, sqlParams)) { while (dr.Read()) { object pk = dr["FID"]; int version = Convert.ToInt32(dr["FVERSION"]); VersionMap[pk] = version; } } } /// <summary> /// 不存在版本号的记录创建默认版本号 /// </summary> /// <param name="pkIds"></param> private void CreateNoExistsDefaultVersion(List<object> pkIds) { if (pkIds.Count == VersionMap.Count) return; List<SqlObject> sqlObjects = new List<SqlObject>(); string strSql = string.Format(@"/*dialect*/IF NOT EXISTS (SELECT 1 FROM {0} WHERE FID = @FID) INSERT INTO {0}(FID,FVERSION) VALUES(@FID, 0);", CasTableName); foreach (var pkId in pkIds) { if (VersionMap.ContainsKey(pkId)) continue; SqlObject sqlObj = new SqlObject(strSql, new SqlParam("@FID", GetSingleDbType(), pkId)); sqlObjects.Add(sqlObj); VersionMap[pkId] = 0; } DBUtils.ExecuteBatch(Ctx, sqlObjects); } /// <summary> /// 更新CAS版本号(最终会有行锁,越晚提交越好,但是要在事务内) /// 后续可以改造成支持多行,目前按照单个更新的逻辑最简单 /// </summary> /// <param name="pkIds"></param> public void ValidateVersion(List<object> pkIds) { if (pkIds == null || pkIds.Count == 0) return; pkIds = pkIds.Distinct().ToList(); string strSql = string.Format(@"UPDATE {0} SET FVERSION = FVERSION+1 WHERE FID = @FID AND FVERSION = @OLDVERSION;", CasTableName); //确保被取消操作事务后,CAS校验还是同时成功 using (KDTransactionScope scope = new KDTransactionScope(System.Transactions.TransactionScopeOption.Required)) { foreach (var billVersion in VersionMap) { List<SqlParam> paramList = new List<SqlParam>(); int oldVersion = billVersion.Value; paramList.Add(new SqlParam("@FID", GetSingleDbType(), billVersion.Key)); paramList.Add(new SqlParam("@OLDVERSION", KDDbType.Int32, oldVersion)); int cnt = DBUtils.Execute(Ctx, strSql, paramList); if (cnt != 1) { throw new Exception(string.Format("CAS校验异常,请重新对该单据执行操作, formId:{0}, pkId:{1}", FormId, billVersion)); } } scope.Complete(); } } } } ``` 【效果】 ![20240208 1711.webp](/download/01008ba41feadd0947d480b5eb415013fada.webp)

常见问题.二开案例.实现操作乐观锁

【场景】针对同一个单据,同一个时间点正在执行两个操作,容易导致的数据异常问题。场景1:工作流任务处理界面连点,导致审核和保存并发,...
点击下载文档
确认删除?
回到顶部
客服QQ
  • 客服QQ点击这里给我发消息