电脑桌面
添加蚂蚁七词文库到电脑桌面
安装后可以在桌面快捷访问

中间件产品_金蝶Apusic分布式消息队列_开发手册.pdf

中间件产品_金蝶Apusic分布式消息队列_开发手册.pdf_第1页
1/13
中间件产品_金蝶Apusic分布式消息队列_开发手册.pdf_第2页
2/13
中间件产品_金蝶Apusic分布式消息队列_开发手册.pdf_第3页
3/13
2000-2023金蝶天燕云计算股份有限公司Apusic2023非经本公司书面许可,任何单位和个人不得擅自摘抄、复制本文档内容的部分或全部,并不得以任何形式传播。和其他金蝶天燕商标均为深圳市金蝶天燕云计算股份有限公司的商标。本文档提及的其他所有商标或注册商标,由各自的所有人拥有。本文档可能含有预测信息,包括但不限于有关未来的财务、运营、产品系列、新技术等信息。由于实践中存在很多不确定因素,可能导致实际结果与预测信息有很大的差别。因此,本文档信息仅供参考,不构成任何要约或承诺。金蝶天燕可能不经通知修改上述信息,恕不另行通知。1金蝶Apusic分布式消息队列_开发手册1概述2mq-客户端使用说明2.1准备工作2.2java-客户端2.21快速入门2.22spring-boot-starter-接入2.23监听器方式消费消息2.24异步发送和接收2.25延迟消息2.26消息重试和死信队列2.3c-客户端2.4go-客户端2.5python-客户端4金蝶Apusic分布式消息队列_开发手册1本手册用于指导用户编写客户端接入ADMQ收发消息,客户端语言支持:Java、C#、Python和Go,其中Java为主要支持语言。下面针对每种语言的使用进行说明。ADMQ是基于topic的发布订阅模型,生产者客户端发送消息到某一个topic上,则所有订阅该topic的消费者组都能收到消息。生产者客户端和ADMQ通信过程主要包含以下流程:客户端连接到任意一个ADMQ计算节点服务发送请求获取topic所在的服务地址,并连接到该服务地址客户端发送消息ADMQ回复消息存储确认的通知客户端继续发送消息消费者客户端和ADMQ通信过程主要包含:客户端连接到任意一个ADMQ计算节点服务发送请求获取topic所在的服务地址,并连接到该服务地址客户端发送订阅请求ADMQ发送消息客户端接收消息并返回消费确认ADMQ继续发送消息手册包含三部分:安装部署、管控台使用以及客户端使用说5金蝶Apusic分布式消息队列_开发手册2mq-2.1ADMQ部署时默认开启了权限认证,客户端接入ADMQ发送和接收消息之前首先需要在管控台上创建用户并配置资源权限。具体请参考:和2.2java-2.21maven2.9.1org.apache.pulsarpulsar-client-admin${pulsar.version}//在管控台用户管理中可获取token内容Stringtoken="exaJe.";//集群的连接地址,可在集群信息中查看。通常为计算节点ip:6650,如果是多个节点则用逗号隔开Stringservice="192.168.1.1:6650,192.168.1.2:6650";ClientBuilderbuilder=PulsarClient.builder().connectionTimeout(5,TimeUnit.MINUTES);builder.authentication(AuthenticationFactory.token(token));builder.serviceUrl("pulsar://"+service);PulsarClientclient=builder.build();System.out.println("客户端创建成功");Consumerconsumer=client.newConsumer(Schema.STRING)//主题名称,格式为:persistent://租户名称/命名空间名称/主题名称.topic("persistent://apusic/ns01/topic01")//订阅名称.subscriptionName("sub-01")//从最早的位置开始消费.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)//订阅模式,包含共享、独占、灾备和按Key共享四种模式.subscriptionType(SubscriptionType.Exclusive).subscribe();System.out.println("消费者创建成功");Producerproducer=client.newProducer(Schema.STRING)//主题名称,格式为:persistent://租户名称/命名空间名称/主题名称.topic("persistent://apusic/ns01/topic01").create();System.out.println("生产者创建成功");6金蝶Apusic分布式消息队列_开发手册for(inti=0;i<10;i++){Stringdata="admq-test-message"+i;MessageIdid=producer.newMessage().value(data).send();System.out.println("sendmessage:"+data+",responseid:"+id);}producer.close();for(inti=0;i<10;i++){Messagemsg=consumer.receive();System.out.println("receivemessage:"+msg.getValue()+",msgid:"+msg.getMessageId());consumer.acknowledge(msg);}consumer.close();client.close();2.22spring-boot-starter-io.github.majuskopulsar-java-spring-boot-starter1.1.2#MQ服务接入地址pulsar.service-url=pulsar://localhost:6650#命名空间名称pulsar.namespace=default#租户名称pulsar.tenant=public#tokenpulsar.token-auth-value=43th4398gh340gf34gj349gh304ghryj34fh生产者配置@ConfigurationpublicclassProducerConfiguration{@BeanpublicProducerFactoryproducerFactory(){returnnewProducerFactory().addProducer("topic01",String.class);}}创建生产者金蝶Apusic分布式消息队列_开发手册7@ServiceclassMyProducer{@AutowiredprivatePulsarTemplateproducer;voidsendHelloWorld()throwsPulsarClientException{//此处的主题必须是上边已经注册过的producer.send("topic01","Helloworld!");}}@ServiceclassMyConsumer{@PulsarConsumer(topic="topic01",clazz=String.class)voidconsume(Stringmsg){System.out.println(msg);}}2.23//在管控台用户管理中可获取token内容Stringtoken="exaJe.";//集群的连接地址,可在集群信息中查看。通常为计算节点ip:6650,如果是多个节点则用逗号隔开Stringservice="192.168.1.1:6650,192.168.1.2:6650";ClientBuilderbuilder=PulsarClient.builder().connectionTimeout(5,TimeUnit.MINUTES);builder.authentication(AuthenticationFactory.token(token));builder.serviceUrl("pulsar://"+service);PulsarClientclient=builder.build();System.out.println("客户端创建成功");client.newConsumer(Schema.STRING).topic(topic).subscriptionType(SubscriptionType.Shared).subscriptionName(subName).messageListener((c,msg)->{try{System.out.println("receivemessage:"+msg.getValue()+",msgid:"+msg.getMessageId());c.acknowledge(msg);}catch(Exceptione){log.error("",e);}}).subscribe();Producerproducer=client.newProducer(Schema.STRING)//主题名称,格式为:persistent://租户名称/命名空间名称/主题名称.topic("persistent://apusic/ns01/topic01")金蝶Apusic分布式消息队列_开发手册8.create();System.out.println("生产者创建成功");for(inti=0;i<10;i++){Stringdata="admq-test-message"+i;MessageIdid=producer.newMessage().value(data).send();System.out.println("sendmessage:"+data+",responseid:"+id);}producer.close();2.24//创建客户端、生产者和消费者for(inti=0;i<10;i++){Stringdata="admq-test-message"+i;producer.newMessage().value(data).sendAsync().whenCompleteAsync((msgId,ex)->{if(ex!=null){//发送失败,需要业务处理}else{System.out.println("sendmessage:"+data+",responseid:"+msgId);}});}for(inti=0;i<10;i++){consumer.receiveAsync().whenCompleteAsync((msg,ex)->{if(ex!=null){}else{System.out.println("receivemessage:"+msg.getValue()+",msgid:"+msg.getMessageId());try{consumer.acknowledge(msg);}catch(PulsarClientExceptione){e.printStackTrace();}}});}2.25支持指定消息在哪个时刻或者延迟多长时间后被消费者消费。Producerproducer=client.newProducer(Schema.STRING)//主题名称,格式为:persistent://租户名称/命名空间名称/主题名称.topic("persistent://apusic/ns01/topic01").create();System.out.println("生产者创建成功");//10秒后消费者才能收到消息producer.newMessage().value("delaymessage").deliverAfter(10,TimeUnit.SECONDS).send();//消费者在指定时刻收到消息longtime=newSimpleDateFormat("yyyy-MM-ddHH:mm:ss").parse("2022-06-1100:00:00").getTime();producer.newMessage().value("delaymessage").deliverAt(time).send();9金蝶Apusic分布式消息队列_开发手册2.26当消息没有被消费者成功消费时,会把消息保存到重试主题中,等待一段时间后会重新发送给消费者,当重试达到一定次数后把消息保存到死信主题中。client.newConsumer(Schema.STRING).topic(topic).subscriptionName("sub01").subscriptionType(SubscriptionType.Shared)//支持重试.enableRetry(true).receiverQueueSize(200).deadLetterPolicy(DeadLetterPolicy.builder()//最大重试次数,超过后进入死信队列.maxRedeliverCount(5)//指定消费失败的消息保存的主题名称。不指定的话使用默认的.retryLetterTopic("persistent://apsuic/ns01/topic-retry")//指定死信队列的主题名称。不指定的话使用默认的.deadLetterTopic("persistent://apusic/ns01/topic-dlq").build()).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();2.3c-引入NuGet程序包:DotPulsarvartoken="eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJhZG1xIn0.ybJge7zTfy_RDdAtB3w6nIPDHPT6-kbB6sNzgPt8sKQ";varclient=PulsarClient.Builder().ServiceUrl(newUri("pulsar://172.20.140.23:6650")).Authentication(AuthenticationFactory.Token(token)).RetryInterval(TimeSpan.FromSeconds(3)).Build();varconsumer=client.NewConsumer().Topic("persistent://public/default/topic-02").SubscriptionName("sub01").Create();Console.WriteLine("startconsumer.");awaitforeach(varmessageinconsumer.Messages()){Console.WriteLine("Received:"+Encoding.UTF8.GetString(message.Data.ToArray()));//确认消息awaitconsumer.AcknowledgeCumulative(message);}varproducer=client.NewProducer().Topic("persistent://public/default/topic-02")金蝶Apusic分布式消息队列_开发手册10.Create();for(inti=0;i<10;i++){vardataStr="c#messagetest"+i;producer.Send(Encoding.UTF8.GetBytes(dataStr));Console.WriteLine("sendmessage:"+dataStr);Thread.Sleep(1000);}2.4go-github.com/apache/pulsar-client-go/pulsarservice:=flag.String("service","172.20.140.23:6650","admqserviceaddress")token:=flag.String("token","-","token")client,err:=pulsar.NewClient(pulsar.ClientOptions{URL:OperationTimeout:ConnectionTimeout:Authentication:})"pulsar://"+*service,60*time.Second,60*time.Second,pulsar.NewAuthenticationToken(*token),funcconsume(clientpulsar.Client,topic*string,subName*string){consumer,err:=client.Subscribe(pulsar.ConsumerOptions{Topic:*topic,SubscriptionName:*subName,SubscriptionInitialPosition:pulsar.SubscriptionPositionLatest,})iferr!=nil{log.Fatal(err)}deferconsumer.Close()ctx:=context.Background()fori:=0;i<10;i++{msg,err:=consumer.Receive(ctx)fmt.Println("Receivemesagge:",msg.ID(),string(msg.Payload()))iferr!=nil{log.Fatal(err)}consumer.Ack(msg)}consumer.Close()}金蝶Apusic分布式消息队列_开发手册11funcproduce(clientpulsar.Client,topic*string,message*string){producer,err:=client.CreateProducer(pulsar.ProducerOptions{Topic:*topic,})iferr!=nil{log.Fatal(err)}ctx:=context.Background()fori:=0;i<10;i++{sendData:=fmt.Sprintf("%s-%d",*message,i)ifmsgId,err:=producer.Send(ctx,&pulsar.ProducerMessage{Payload:[]byte(sendData),});err!=nil{log.Fatal(err)}else{log.Println("Sendmessage:",msgId,sendData)}}iferr!=nil{log.Fatal(err)}producer.Close()}2.5python-pip3installpulsar-client==2.9.1token="eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJhZG1xIn0.ybJge7zTfy_RDdAtB3w6nIPDHPT6-kbB6sNzgPt8sKQ";client=pulsar.Client("pulsar://172.20.140.23:6650",authentication=pulsar.AuthenticationToken(token))defconsumer():consumer=client.subscribe('persistent://public/default/topic04','sub01')whileTrue:msg=consumer.receive()try:print("receivemessage'{}'id='{}'".format(msg.data(),msg.message_id()))consumer.acknowledge(msg)except:#会重新收到消息consumer.negative_acknowledge(msg)金蝶Apusic分布式消息队列_开发手册12defproducer():producer=client.create_producer('persistent://public/default/topic04')foriinrange(10):data='Hello-%d'%iproducer.send(data.encode('utf-8'))print("sendmessage:'{}'".format(data))2000-2023金蝶天燕云计算股份有限公司

1、当您付费下载文档后,您只拥有了使用权限,并不意味着购买了版权,文档只能用于自身使用,不得用于其他商业用途(如 [转卖]进行直接盈利或[编辑后售卖]进行间接盈利)。
2、本站所有内容均由合作方或网友上传,本站不对文档的完整性、权威性及其观点立场正确性做任何保证或承诺!文档内容仅供研究参考,付费前请自行鉴别。
3、如文档内容存在违规,或者侵犯商业秘密、侵犯著作权等,请点击“违规举报”。

碎片内容

中间件产品_金蝶Apusic分布式消息队列_开发手册.pdf

确认删除?
回到顶部
客服QQ
  • 客服QQ点击这里给我发消息
QQ群
  • 答案:my7c点击这里加入QQ群
支持邮箱
微信
  • 微信