文档中心 > 开发文档

消息服务使用介绍

更新时间:2018/07/24 访问次数:7840

消息服务是开放平台为提高应用API调用效率而推出的一种主动推送服务( From速卖通),推送内容包括(速卖通交易、商品、退款等信息),基于该推送服务,应用获取速卖通数据不需再不停轮询API,仅需在接收到速卖通推送的消息时调用API获取即可,大大提高API调用效率和降低API使用费用。

From速卖通:即速卖通向外推送速卖通的交易、商品、退款等官方消息。

那么如何使用消息服务呢? 请看以下是消息服务 From速卖通的详细使用说明。

 

From通消息服使用

应用订阅消

进入ISV控制台,在“应用管理->消息服务->订阅消息”页面,选择需要的消息进行订阅,点击相应消息后面的“订阅”即可

  

订阅消息成功,可以在“我的订阅”中查看已经成功订阅的消息。如果需要取消消息的订阅,直接点击 “取消订阅”。点击消息名称可以查看每个消息返回的详细字段信息。

注意:如果该消息没有权限,则说明应用未开通相关API调用权限,通过点击“申请权限”,进入申请相应的权限包。

 

给用户开通消

调用taobao.tmc.user.permit接口给用户(即速卖通商家)开通,可以选择只给用户开通部分消息类型,也可全部开通。具体可看该API 入参说明。

备注:

  • 给用户开通消息前提是用户已经给应用授权,如未授权,请参考获取用户授权说明。
  • 取消用户的消息服务调用taobao.tmc.user.cancel接口。
  • (注:这个API暂时还不支持速卖通业务)可以通过接口taobao.tmc.user.get获取用户已开通消息,入参必须输入is_valid,topics,modified来判断用户授权消息是否成功
  • 消息服务API文档:点击这里查看

 

码实现接收消息

正式环境服务地址:ws://mc.api.taobao.com/

接收消息,实现方式有两种: 通过SDK接收消息 、 通过API接收消息 ,推荐采用SDK接收消息。

SDK接收消息

目前支持JAVA与.NET语言,其它语言建议采用API接收消息。通过SDK接收消息只需要关注业务的处理,不需要操心消息重发、确认、长连接的重连等操作,SDK会自动处理好一切。

JAVA接口使用说明

public interface MessageHandler {

 

         /**

          * 消息通道客户端收到消息后,会回调该方法处理具体的业务,处理结果可以通过以下两种方式来表述:

          * <ul>

          * <li>抛出异常或设置status.fail()表明消息处理失败,需要消息通道服务端重发

          * <li>不抛出异常,也没有设置status信息,则表明消息处理成功,消息通道服务端不会再投递此消息

          *

          * @param message 消息内容

          * @param status 处理结果,如果调用status.fail(),消息通道将会择机重发消息;否则,消息通道认为消息处理成功

          * @throws Exception 消息处理失败,消息通道将会择机重发消息

          */

         public void onMessage(Message message, MessageStatus status) throws Exception;

 

}

JAVA使用代码示例

TmcClient client = new TmcClient("app_key", "app_secret", "default"); // 关于default参考消息分组说明

client.setMessageHandler(new MessageHandler() {

         public void onMessage(Message message, MessageStatus status) {

                  try {

                           System.out.println(message.getContent());

                           System.out.println(message.getTopic());

                  } catch (Exception e) {

                           e.printStackTrace();

                           status.fail(); // 消息处理失败回滚,服务端需要重发

          // 重试注意:不是所有的异常都需要系统重试。

          // 对于字段不全、主键冲突问题,导致写DB异常,不可重试,否则消息会一直重发

          // 对于,由于网络问题,权限问题导致的失败,可重试。

          // 重试时间 5分钟不等,不要滥用,否则会引起雪崩

                  }

         }

});

client.connect("ws://mc.api.taobao.com"); // 消息环境地址:

注: 采用Java main方法在Eclipse里面运行上面的代码测试时,请在client.connect()后面加上Thread.sleep让main线程等待一段 时间结束,以便观察消息的实时接收情况,否则main线程结束后,TMC长连接也会跟着断开。如果是在web服务器上运行上面的代码,则不用在 client.connect()后面加任何Thread.sleep代码,也不需要在外面包一层while(true)循环,因为web服务器上的主线 程只要服务器不关闭都是不会结束的,TMC的长连接会一直保持。

C#使用示例代码

TmcClient client = new TmcClient("appkey", "appsecret", "default"); // 关于default参考消息分组说明

client.OnMessage += (s, e) =>

{

    try

    {

        Console.WriteLine(e.Message.Topic);

        Console.WriteLine(e.Message.Content);

        // 默认不抛出异常则认为消息处理成功

    }

    catch (Exception exp)

    {

        Console.WriteLine(exp.StackTrace);

        e.Fail(); // 消息处理失败回滚,服务端需要重发

        // 重试注意:不是所有的异常都需要系统重试。

        //对于字段不全、主键冲突问题,导致写DB异常,不可重试,否则消息会一直重发

        // 对于,由于网络问题,权限问题导致的失败,可重试。

        // 重试时间 5分钟不等,不要滥用,否则会引起雪崩

    }

};

client.Connect("ws://mc.api.taobao.com/"); // 消息环境地址:

注: 采用C# Main方法在VS控制台工程里面运行上面的代码测试时,请在client.Connect后面加上Console.Read()或 Thread.Sleep让main线程暂时不结束,以便观察消息的实时接收情况,否则Main线程结束后,TMC长连接也会跟着断开。如果是在IIS服 务器或C#应用程序里面运行上面的代码,则不用在client.Connect后面加任何等待的代码,也不需要在外面包一层while(true)循环,只要保持IIS服务器或C#应用程序不关闭,TMC的长连接会一直保持。

API接收消息

提供API接收消息的目的是那种对多线程和长连接处理不方便的语言使用的,比如PHP、Python,这些语言官方暂时没有提供SDK,可以通过下面两个API配合使用也能达到接收和确认消息的目的。 推荐尽量用SDK方式,如果必须使用API,建议调用taobao.tmc.messages.consume接口时尽量不要并发或并发量不要太大,API使用存在实时性不是很高的情况,如果实时性要求高建议还是用SDK。

基本步骤:

  • 首先消费消息:API名称:taobao.tmc.messages.consume消息消费后,指针自动后移,下次调用自动获取到未消费过的消息,但是消费确认后的消息无法再次获取。
  • 然后确认消息:API名称:taobao.tmc.messages.confirm获取消息后,如果不确认,消息服务会选择时机重发,重发次数由消息服务控制,如果消息3天内都没有被确认将会被删除。

JAVA示例代码

TaobaoClient client = new DefaultTaobaoClient("http://gw.api.taobao.com/router/rest", "app_key", "app_secret", "json");

do {

         long quantity = 100L;

         TmcMessagesConsumeResponse rsp = null;

         do {

                  TmcMessagesConsumeRequest req = new TmcMessagesConsumeRequest();

                  req.setQuantity(quantity);

                  req.setGroupName("default");

                  rsp = client.execute(req);

                  if (rsp.isSuccess() && rsp.getMessages() != null) {

                           for (TmcMessage msg : rsp.getMessages()) {

                                     // handle message

                                     System.out.println(msg.getContent());

                                     System.out.println(msg.getTopic());

                                     // confirm message

                                     TmcMessagesConfirmRequest cReq = new TmcMessagesConfirmRequest();

                                     cReq.setGroupName("default");

                                     cReq.setsMessageIds(String.valueOf(msg.getId()));

                                     TmcMessagesConfirmResponse cRsp = client.execute(cReq);

                                     System.out.println(cRsp.getBody());

                           }

                  }

                  System.out.println(rsp.getBody());

         } while (rsp != null && rsp.isSuccess() && rsp.getMessages() != null && rsp.getMessages().size() == quantity);

         Thread.sleep(1000L);

} while (true);

C#示例代码

ITopClient client = new DefaultTopClient("http://gw.api.taobao.com/router/rest", "app_key", "app_secret", "json");

do

{

    long quantity = 100L;

    TmcMessagesConsumeResponse rsp = null;

    do

    {

        TmcMessagesConsumeRequest req = new TmcMessagesConsumeRequest();

        req.GroupName = "default";

        req.Quantity = quantity;

        rsp = client.Execute(req);

        if (!rsp.IsError && rsp.Messages != null)

        {

            foreach (TmcMessage msg in Messages)

            {

                // handle message 

                Console.WriteLine(msg.Topic);

                Console.WriteLine(msg.Content);

                // confirm message 

                TmcMessagesConfirmRequest cReq = new TmcMessagesConfirmRequest();

                cReq.GroupName = "default";

                cReq.SMessageIds = msg.Id.ToString();

                TmcMessagesConfirmResponse cRsp = client.Execute(cReq);

                Console.WriteLine(cRsp.Body);

            }

        }

        Console.WriteLine(rsp.Body);

    } while (rsp != null && !rsp.IsError && rsp.Messages != null && rsp.Messages.Count == quantity);

    Thread.Sleep(new TimeSpan(0, 0, 1));

} while (true);

注:通过API拉取消息的平均RT在网络好的情况下能达到10毫秒左右,每次消费得到的消息为空时,务必暂停至少1秒才去执行下一次循环拉取消息,否则会产生很多无谓的请求,白白浪费服务端的资源,以及应用自身的API流量包。

消息分使用介

(注:按用户进行消息分组暂时还不支持速卖通业务,AE用户目前全部使用default分组。可以根据topic做分组路由,见taobao.tmc.topic.group.add

用户数量很大需要多台机器组成一个集群来接收消息,或者对商家进行隔离独立接收消息的时候。不管是SDK还是API都可以通过多连接接收消息。

消息服务支持多连接有两种方式:Ⅰ、创建多个用户分组,每个用户分组建立一个连接。Ⅱ、同一个分组建立多个连接

  • 创建分组:调用接口taobao.tmc.group.add创建自定义分组。注:消息服务会为应用创建一个default分组,没有分配到指定分组【group】的用户都属于default分组,通过SDK或API消费消息时,如果不指定分组,就表示采用默认分组连接。
  • 删除分组:调用接口taobao.tmc.group.delete删除指定的分组或分组下的用户。注:每个应用最多创建50个分组,每个分组用户数不限。

 

消息服见问题

什么是分组,是否需要分组

(注:按用户进行消息分组暂时还不支持速卖通业务,AE用户目前全部使用default分组。可以根据topic做分组路由,见taobao.tmc.topic.group.add
消息分组是用户消息隔离的一种方式,组内用户的消息只会发送到相同组名的连接上。同一个组支持多个连接,同一组的消息,随机发送到组内的某一个连接。如果要用户的类型对消息区别对待,比如优先保证付费用户,然后再保证免费用户,就可以通过消息分组来接收不同用户的消息。每个应用最多创建50个分组,每个分组用户数不限。

什么是多连接接收消息,如何建立多个连接
多连接收消息是指同一分组内ISV服务器与TOP的消息服务器建立多个连接来收消息。多链接是对同一个分组而言,消息在下发时随机选择从分组内的多个连接中选择一个连接下发消息。多链接有的随机下发消息的功能,可以用同一分组多连接来实现集群,负载功能。

建立多链接只需要用相同的代码重新启动一个TmcClient实例。可在同一个ISV服务器上,也可在不同的ISV服务器上,建立同一个分组的多链接。

什么情况下使用多连接
消息服务的服务端有消息堆积的功能,它看的是你客户端的处理能力,只要能处理他就给你发,处理不了就堆积在服务端,一般情况下不需要建立多连接,单个连接就能把机器的网卡跑满。新消息服务的多连接,更多的是应用在用户分组,或者做集群部署的场景下。

消息重发逻辑是怎么样的
对于断开连接(如应用挂了)情况,服务端会堆积消息,等应用重新连接进来后,再把堆积的消息顺序推送给客户端。一条消息从诞生开始,如果应用一直不接收,服 务端最长保留时间为3天,超过3天会自动清除。对于连接正常,但消息处理失败的情况,服务端会最快隔10分钟进行第一次重发,如果应用一直处理失败,服务 端会一直定时重发,直到消息被清理为止。

PHPjson_decode整形溢出问题
PHP 5.3版本以下,json_decode依赖于操作系统的位数来解释数字,在32位系统上最大只支持2^32的数字解释,在64位的系统上最大支持 2^64的数字的解释。由于消息服务的消息ID超过32位系统的最大值,如果没有升级到PHP 5.3版本以上,就会由于确认了错误的消息ID,导致消息重复投递。解决方案是:1. 升级PHP到5.3以上;2. 把应用部署到64位的系统上;3. 把JSON消息里面的数字通过正则等手段替换为字符串。

消息的断开和心跳测试
客户端要直接断开消息:TmcClient.close(); 心跳测试是否正常连接:TmcClient.isOnline();

消息服务会有延时吗
为用户开通消息服务taobao.tmc.user.permit后需要10秒才能生效。使用中消息基本没有延迟,都会在1秒内收到。如果有消息堆积或者程序处理不及时,就会有延时。延时时间与程序处理能力有关。为用户取消消息服务taobao.tmc.user.cancel后1秒内生效,取消后,堆积的消息会继续发送,新的消息不会发送。

商品消息message.getContent()中的nick为空正常吗?我怎么判断该消息属于哪个店铺?
商品消息中,是有nick为空的情况的。可以用个外层获取到message.getUserNick()或message.getUserId()。

消息服务,用户到期了,消息还会不会收到?
消息服务推送的判断有两个条件:1、是用户授权是否在有效期内;2、是用户有没有开通消息服务(toabao.tmc.user.permit)。只有二者同时满足才会推送。相反如果用户授权到期就不会推送。另外用户授权到期一个月以内,用户的开通关系还会保存,一个月以后会清除。如果在一个月以内用户重新授权,就不需要重新为用户开通消息服务。

获取消息后,如果不确认,消息服务会选择时机重发,重发次数由消息服务控制,目前会重发多少次?
消息服务每十分钟查一次未处理的消息,然后择机发送,如果消息3天内都没有被确认将会被删除

消息没有收到,如何确认是不是消息服务漏掉了?
通过日常反馈,未出现消息服务漏消息的情况,一般是ISV程序处理未收到消息或者程序处理能力导致消息阻塞。排查消息可以从以下方面确认:
* 首先确认授权(SessionKey)是否有效;
* 调用taobao.tmc.user.get确认当前用户以及开通的消息,返回参数传入topics;调用TmcClient.isOnline()测试心跳是否正常连接。若以上排查不出结果可以提交问题到支持中心附上:AppKey、用户nick、消息状态、消息大概时间、订单的tid、商品的num_iid。

客户端配置参数注意事项
.NET SDK:ReconnectIntervalSeconds重连时间,标识TmcClient断开时重连的时间间隔。此值必须>10s,如果此值太小会出现链接不上的情况,原因是服务端如果检测到500ms内重连,会断开新的链接。

time outtimelocaltime消息相关字段说明

  • time 是消息产生时间
  • outtime 是消息的本次推送时间
  • localtime 是本机的时间
  • outtime - time 表示服务端的处理或重发延迟时长
  • localtime - outtime 表示本机的时间与TOP时间差,或网络延迟,或收到消息后处理的延迟

消息服务报错isp.system-error: unknown errors,isv.tmc-switch-off: appkey,the app do not enable messaging-channel feature
应用未订阅(开通)消息服务,就使用TmcClient来接收消息。

FAQ

关于此文档暂时还没有FAQ
返回
顶部