您好、欢迎来到现金彩票网!
当前位置:多彩彩票 > 非请求分页 >

干货:如何构建一个消息推送平台

发布时间:2019-04-25 00:27 来源:未知 编辑:admin

  B/S架构下很多业务场景下我们需要服务端主动推送消息到客户端,在html5之前一般使用长轮询(除此之外还有iframe流或者Flash Socket)的方式来实现,而长轮询的方式缺点很明显,频繁交互的情况下,大量的连接被建立和释放,并且交互频率受限于两次http的请求间隔,html5开始可以使用websocket全双工的通信协议,在tomcat和jetty都有实现。

  虽然在java1.4以后可以用nio包实现非阻塞的websocket通信,但是使用起来太过底层和复杂;netty封装了nio,使用了大量异步和事件驱动,封装了拆包粘包,实现了网络编程和业务分离,但即便如此,使用和优化netty还是需要深厚的网络编程知识,况且我们需要的仅仅是用户与用户之间的文本消息的推送,这里我们介绍一个基于netty实现的websocket服务端框架,socket.io屏蔽了使用netty的细节,针对聊天场景做了高度封装,并且提供了包含WEBSOCKET、POLLING等多种推送方式的支持,同时支持namespace(命名空间)、broadcast(广播)、room(房间/聊天室)、event(业务事件)、应用层ack机制。

  构建一个统一的文本消息推送平台,可以在此之上构建业务系统,比如:客服系统、内部聊天工具、站内信、事件系统等。基础平台要做到用户管理、状态管理、文本消息发送/推送(群/点对点)、图片/文件推送、未读消息管理、历史消息、会线

  :通用接口http服务层、提供鉴权、发送消息、历史消息、离线消息、会话等相关接口

  :连接层、提供消息推送服务、维护房间、ack消息的上行是走的restful,在这层可以做uniauth(内部的统一登录平台)权限验证、负载均衡等,也让connector更轻量级,依赖更少;connector是有状态的,保持着用户的连接信息,同时要记录用户和connector的映射关系,以用于router层在推送消息时路由到对应的节点,connector使用单独的物理机来运行,因为虚拟主机并不能稳定保证较高并发的长连接数量。

  一个消息推送平台最重要的是高可用性和最终一致性(消息可达),所以消息发送可以做的非常轻量,发送消息时会带上客户端生成的唯一消息id,同时客户端持久化,服务端只需要把消息写到mq即发送成功,然后logic消费mq再做异步批量的消息持久化,从可用性角度来说,发送消息并不依赖websocket,而是通过restful服务,更加高可用和可扩展。消息发送的可靠性,没有依赖于rabbitmq的事务或者publisher confirms,后面介绍专门的机制来保证。

  文本消息,包含图片、文件、语音、文字等子类型。IQ:操作消息,客户端用来做某些特殊动作处理。

  对于点对点消息来说,发送者和接受者都是一个用户id,对于群消息来说,接受者可能是一个SESSION(会话),会话需要提前创建,会话中包含若干的用户,使用socket.io的room功能,很方便的实现一个会话,room就是SESSION的id。

  1.client-A向im-server发送一个消息请求包,即msg:R;

  7.client-A发出消息后,超过设定的某个时间没有收到ack:N,那么我们认为消息没有推送成功(用户离线除外,用户离线,服务端会模拟一个ack:N),需要客户端重发,服务端根据客户端id幂等处理;

  在我们系统,msg:N和ack:N是connector推送,其他报文就是走http的消息发送和ack;其实ack:N可以不需要,服务器可以保存未读消息列表,接受ack:R来判断是否要重发,但是这样可靠性稍差一些,极端情况会造成消息丢失。

  首先我们要确定以什么样的逻辑排列消息的顺序,两个人聊天,首先要保证的是同一个人消息的先后顺序,其次要保证会话内消息的顺序。服务端不能以持久化时生成自增id作为顺序,因为异步消费不能保证顺序(某些mq,如kafka以用户为key可以保证消息顺序性,但也存在重试问题)。从客户端的角度来说,可以通过同步发送保证消息的顺序性,所以服务端可以在发mq之前通过redis生成会话内自增id,推送的时候可能会存在重发和乱序,就需要客户端根据服务端生成的id做幂等和重排序。

  如果不依赖于redis,可以使用服务器时间(精确到秒)+客户端生成的自增id作为排序字段,但这样就要求集群的时间同步在一秒内。

  关于未读消息的存储有两种方案:写扩散和读扩散,写扩散即未读消息在持久化时就针对每个用户保存一份,读扩散就是利用群消息的偏序特性,只保存用户在会话内ack的最后一条消息id,无论是哪种方式,未读消息都只是保存消息的id,未读消息的推送仅在上线或者连接的时候。

  1. 消息持久化后,如果接受用户不在线,那么在user_messages为每个用户新增一条记录;

  1. 未读消息user_messages表或者last_ack_msgid字段的更新很频繁,可以用redis来实现存储;2. ack的频率很高,可以让客户端批量或在一个时间段内ack,减少请求,即使消息重发,也有客户端去重;

  3. 未读消息可能存在消息量很大的情况,可以设置过期时间,过期后即表示该消息已读;大量未读消息不适合做推送,最好通过http分页拉取,甚至可以把拉取请求和ack请求合并。

  8. 业务逻辑不要占用EventLoop线程,启用业务线程池处理,否则可能阻塞I/O;

  9. Tcp层的心跳只能检测连接,不能确定应用可用,所以有了应用层心跳, socket.io默认25s,可以调整为180s,频率过高会占用大量带宽和流量,过低可能会导致连接断开。同时关闭tcp心跳保活SO_KEEPALIVE=false;

  会生成一个sid表示本次连接会话,upgrades:websocket表示客户端可以把协议升级为websocket而不再使用轮询,而客户端也会判断当前浏览器是否支持websocket来决定是否升级

  客户端发送了一个业务上的注册事件给服务端,并接受了一个成功的返回,在websocket没有连接成功前,会一直使用长轮询方式接受服务端推送

  声明:该文观点仅代表作者本人,搜狐号系信息发布平台,搜狐仅提供信息存储空间服务。

http://kamexpress.net/feiqingqiufenye/52.html
锟斤拷锟斤拷锟斤拷QQ微锟斤拷锟斤拷锟斤拷锟斤拷锟斤拷锟斤拷微锟斤拷
关于我们|联系我们|版权声明|网站地图|
Copyright © 2002-2019 现金彩票 版权所有