电脑桌面
添加蚂蚁七词文库到电脑桌面
安装后可以在桌面快捷访问

记录HR项目中使用zato对接魔学院培训系统消息转发到钉钉

来源:金蝶云社区作者:金蝶2024-09-166

记录HR项目中使用zato对接魔学院培训系统消息转发到钉钉

客户使用了魔学院、金蝶sHR,移动端使用的是钉钉,shr这边实现了和魔学院的组织人员对接、PC端单点登陆对接,没有实现消息对接,魔学院也不支持再将消息与第三方对接,需要人力系统想办法进行对接。

需求是期望培训提醒消息能推送到钉钉。刚好最近在研究zato,相关的对接逻辑通过研究钉钉和魔学院的接口文档,大致了解,感觉不难,实际操作的时候各种奇奇怪怪的问题。真的是看起来容易,做起来难,断断续续做了有几个月的时间,从zato安装、尝试写第一个service,部署centos环境,梳理对接逻辑,一点点编写对接代码,本地调试,服务器部署等等。个人对sql比较熟悉,python也是简单学过,过程中遇到的坑基本上都是网上检索到的。

进入正题:

实现效果:

3b0ba2d0b2e29c8e5ac00851298619b.webp

zato安装不再赘述,网上很多教程,建议参考zato.io里面相关的介绍来做,会比较官方一些。

对接分为三个部分:

  1. 订阅魔学院消息。这部分主要过程为获取魔学院accesstoken,发送订阅请求,接收返回的加密数据,解密数据,检查是否为check_url事件,是,则返回加密的success,结束。

  2. 接收魔学院消息转发钉钉。这部分主要过程是被动接收魔学院发送来的消息,解密消息,根据消息内容获取用户手机号、姓名,存数据库,获取钉钉accesstoken,根据手机号获取用户钉钉id,发送消息后接收回执存库。

  3. 从钉钉消息单点登陆到魔学院。

代码主要分为两部分:魔学院部分和钉钉部分。

魔学院部分:

# -*- coding: utf-8 -*-
# zato: ide-deploy=True

# Zato
from zato.server.service import Service
from dingcrypt import DingCallbackCrypto3
from contextlib import closing
from zato.common.odb.api import SQLConnectionPool
from zato.common.util.api import get_engine_url,get_component_name
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from urllib.parse import urlparse,parse_qsl,quote,urlencode
import json
import sys
sys.path.append(r'/opt/zato/env/qs-1/server1/pickup/incoming/services')
import os
from accesstoken import accesstokencache
from zato.common import DATA_FORMAT
#print(Service)
# ##############################################################################

class GetMXYAccessToken(Service):
    """ Returns details of a user by the person's ID.
    """
    name = 'api.mxy.gettoken'

    def handle(self):
        params = {'corpid':'xxxx', 'corpsecret':'xxxx'}
        conn= self.out.rest['mxyAccessToken'].conn
        #获取请求参数
        rqparam=self.request.payload
        #先从缓存加载token数据
        json_dict=accesstokencache.get_value('mxy')
        #加载不成功或者token过期则刷新token
        if(json_dict==None or rqparam['movement']=='refresh'):
            response=conn.get(self.cid, params)
            tokenDict=response.data
            print(tokenDict)
            accesstokencache.set_value('mxy',tokenDict['results']['access_token'],tokenDict['results']['expires_in'])
            json_dict=accesstokencache.get_value('mxy')
        self.response.payload = json_dict


class HandleSubscribe(Service):
    name='api.mxy.handlesubscribe'
    def handle(self):
        conn=self.out.rest['mxySubscribe'].conn
        #self.logger.info(self.invoke('api.mxy.gettoken'))
        refresh_data={'movement':'refresh'}
        get_data={'movement':'get'}
        #内部调用获取token服务
        token=self.invoke('api.mxy.gettoken',get_data,DATA_FORMAT.JSON)
        params = {'access_token':token, 'channel':'message','url':'https://baidu.com'}
        response=conn.get(self.cid, params)
        result=response.data
        #返回错误码为401重新获取token
        if(result['errcode']==401):
            token=self.invoke('api.mxy.gettoken',refresh_data,DATA_FORMAT.JSON)
            params = {'access_token':token, 'channel':'message','url':'https://baidu.com'}
            result=conn.get(self.cid, params)

class MessageRecive(Service):
    
    name='api.mxy.messagereceive'
    def mxyMessegeInsertTB(self,param):
        existTemplate='''SELECT EXISTS (
        SELECT FROM 
        pg_tables
        WHERE 
        tablename  = 't_mxy_message'
        );'''
        tableTemplate='''create table t_mxy_message(
        id int not null PRIMARY KEY generated always as identity,
	    messagetext text,
        sender text,
        userid text,
        phonelist text,
        username text,
        title text,
        messagecontent text,
        contentType text,
        linkurlh5 text,
        dduserid text,
        easuserid text,
        ddtaskid text
        );'''
        insertTemplate='insert into t_mxy_message(messagetext,sender,userid,title,messagecontent,contentType,linkurlh5,phonelist,username) values(:messagetext,:sender,:userid,:title,:messagecontent,:contentType,:linkurlh5,:phonelist,:username) '
        parameters=param
        self.logger.info(parameters)
        dburl=get_engine_url(self.out.sql.get('shrDev').config)
        self.logger.info(dburl)
        self.logger.info(self.out.sql.get('shrDev').config)
        engine=create_engine(dburl)
        Session=sessionmaker(bind=engine)
        with closing(Session()) as session:
            if(False==session.execute(existTemplate).first()[0]):
                session.execute(tableTemplate)
                session.commit()
            session.execute(insertTemplate,parameters)
            session.commit()

    def mxyUserInfoGet(self,userid):
        conn=self.out.rest['mxyGetUserInfo'].conn
        refresh_data={'movement':'refresh'}
        get_data={'movement':'get'}
        #内部调用获取token服务
        token=self.invoke('api.mxy.gettoken',get_data,DATA_FORMAT.JSON)
        params={'access_token':token,'userid':userid}
        response=conn.get(self.cid,params)
        result=response.data
        if(result['errcode']==401):
            token=self.invoke('api.mxy.gettoken',refresh_data,DATA_FORMAT.JSON)
            response=conn.get(self.cid, params)
        result={'mobile':response.data['results']['mobile'],'name':response.data['results']['name']}

        return result
        
    def handle(self):
        mxycrypt=DingCallbackCrypto3("xxx", "xxx", "xxx",1)
        self.logger.info(self.request.payload)
        self.logger.info(self.request.payload.decode('utf-8'))
        rcData=dict(parse_qsl(self.request.payload.decode('utf-8')))
        self.logger.info(rcData)
        rcParam=self.request.http.params
        self.logger.info(rcData['encrypt'])
        self.logger.info(self.request.http.params)
        self.logger.info(rcParam.signature)
        self.logger.info(rcParam.timestamp)
        self.logger.info(rcParam.nonce)
        decryptResult=mxycrypt.getDecryptMsg(rcParam.signature,rcParam.timestamp,rcParam.nonce,rcData['encrypt'])
        self.logger.info(decryptResult)
        self.logger.info(isinstance(decryptResult,str))
        checkUrlDict='{"eventType":"check_url"}'
        
        if decryptResult==checkUrlDict:
            json_dict=mxycrypt.getEncryptedMap("success")
            self.response.payload=json_dict
            return
        jsonResult=json.loads(decryptResult)
        phoneList=[]
        nameList=[]
        for userid in jsonResult['userid']:
            phoneList.append(self.mxyUserInfoGet(userid)['mobile'])
            nameList.append(self.mxyUserInfoGet(userid)['name'])
        param={'messagetext':decryptResult,'sender':jsonResult['sender'],'userid':','.join(jsonResult['userid']),'title':jsonResult['title'],'messagecontent':jsonResult['content'],'contentType':jsonResult['contentType'],'linkurlh5':jsonResult['linkUrl']['h5'],'phonelist':','.join(phoneList),'username':','.join(nameList)}
        self.mx

记录HR项目中使用zato对接魔学院培训系统消息转发到钉钉

客户使用了魔学院、金蝶sHR,移动端使用的是钉钉,shr这边实现了和魔学院的组织人员对接、PC端单点登陆对接,没有实现消息对接,魔学院也不...
点击下载文档文档为doc格式

声明:除非特别标注,否则均为本站原创文章,转载时请以链接形式注明文章出处。如若本站内容侵犯了原著者的合法权益,可联系本站删除。

已经是第一篇
确认删除?
回到顶部
客服QQ
  • 客服QQ点击这里给我发消息
QQ群
  • 答案:my7c点击这里加入QQ群
支持邮箱
微信
  • 微信