DDPush开源推送框架源码分析之APPServer到DDPush
DDPush 任意门推送
DDPush是什么
DDPush (Dimension Door Push),任意门推送,是一款开源免费的单机千万级实时信息推送服务器,使用Java语言开发,具有简单、稳定、高性能、高容量等特点,适用于互联网、移动互联网、物联网、Android、智能设备、硬件设备等各种环境。
DDPush可以做什么
移动互联网信息推送
DDPush可实时推送信息到各种Android、Windows等手机和平板(即“透传”),并支持双向通信。DDPush支持自定义信息,信息的格式和内容可由开发者自行定义
IM实时消息系统核心组件
通过集成DDPush,可以开发各种IM实时消息系统,例如:聊天系统、社交App等。
物联网设备控制与交互
DDPush可作为一个实时控制中心,控制物联网中的各种硬件设备(硬件需支持网络通信),与之双向通信。
DDPush有什么优势
开源、免费
DDPush采用Apache License Version 2.0开源协议,可放心使用,只要您保留其许可证信息。
容量高,速度快,要求低
DDPush在线部分主要采用UDP协议(同时支持TCP协议),支撑1000万终端在线的服务器,最少只需要4G内存(不考虑变长自定义信息的情况下),单个主流双核CPU使用率低于75%。即:一部普通PC台式机的配置。
DDPush推送部分采取TCP协议和Java NIO非阻塞网络技术,普通PC可支持至少数千台应用服务器同时长连接推送信息到终端,每秒推送信息的速度在1万条以上
终端设备流量少,省电
采用DDPush,智能手机等终端设备在线一个月(空载的情况下),只需几百KB的上载流量,下载流量甚至可调节到为零。
DDPush提供的Android手机App示例demo,连续在线48小时耗电少于0.5 mAh(使用2G网络GPRS连接,经360省电王测试 >>>详情)
DDPush基于什么技术
DDPush基于自有的二进制网络传输协议(基于TCP和UDP),因此客户端可以支持各种类型的终端设备,包括各种智能手机、平板、智能设备、物联网硬件,和各种终端操作系统(包括: Android, Windows, Linux等)。
DDPush使用Java语言开发,因此服务端可运行在各种操作系统和服务器上。
更多详细介绍请移步DDPush官网:http://www.ddpush.net/
一、准备工作
1、TCP/UDP协议,参见 百度百科 2、Java NIO技术,参见 Java NIO系列教程 3、下文提到的APPServer为我们自己的业务服务器,DDPush指推送服务器
二、工作流程
流程一、APPServer使用TCP模式向DDPush发送消息 流程二、DDPush再通过TCP或UDP( TCP vs UDP)方式透传给终端。
涉及到两个重要的类
1、NIOPushListener 第一步:启动APPServer TCP连接public void initChannel() throws Exception{// 监听TCP连接channel = ServerSocketChannel.open();channel.socket().bind(new InetSocketAddress(port));channel.configureBlocking(false); // 非阻塞// 注册通道, 监听连接事件selector = Selector.open();channel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("NIO TCP Push Listener nio provider: "+selector.provider().getClass().getCanonicalName());}
第二步:run方法,监听通道事件@Overridepublic void run() {try{init();}catch(Exception e){e.printStackTrace();System.exit(1);}System.out.println("push listener port:"+this.port);while(!stoped && selector != null){try{handleEvent();handleTimeout(); // 处理超时的APPServer连接handleChannel(); // 处理通道事件}catch(java.nio.channels.ClosedSelectorException cse){//}catch (java.nio.channels.CancelledKeyException nx) { // }catch(Exception e){e.printStackTrace();}catch(Throwable t){t.printStackTrace();}}closeSelector();stopExecutor();}
第三步、分析handleChannel()方法第2行:selector.select()方法会一直阻塞(wakeup方法会取消阻塞,立即返回)直到通道上有订阅的事件private void handleChannel() throws Exception{if(selector.select() == 0){try{Thread.sleep(1);}catch(Exception e){}return;}Iterator
it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); // Is a new connection coming in? APPServer新连接到达 if (key.isAcceptable()) { try{ ServerSocketChannel server = (ServerSocketChannel) key.channel(); // APPServer连接对象 SocketChannel channel = server.accept(); channel.configureBlocking(false); // 非阻塞方式处理(对比ServerSocket得到Socket连接后, 需要开启线程去处理) channel.socket().setSoTimeout(sockTimout); //channel.socket().setReceiveBufferSize(1024); //channel.socket().setSendBufferSize(1024); PushTask task = new PushTask(this, channel); // 像Selector注册读通道 channel.register(selector,SelectionKey.OP_READ, task); }catch(Exception e){ e.printStackTrace(); } } if (key.isReadable() || key.isWritable()) { try{ PushTask task = (PushTask)key.attachment(); if(task == null){//this should never happen cancelKey(key); continue; } task.setKey(key); // 向终端推送消息 executor.execute(task); }catch(Exception e){ e.printStackTrace(); } } }}
第16行:表示有APPServer连接到DDPush了 第20行:channel可以理解为一个APPServer连接对象
第27行:向channel注册读事件,可以理解等待APPServer发消息,同时将task这个对象附件进去(后面会讲解PushTask作用) 第33行:表示准备APPServer发来消息,或消息已经接收完毕准备回应APPServer 第42行:可以理解为向终端推送消息,当然这里面包含读取APPServer发来的消息、响应APPServer等流程
2、PushTask
负责处理每一个连接到DDPush的APPServer,可以理解为一个APPServer对应一个PushTask 第一步、run()方法第14行:判断消息是否接受完毕@Overridepublic synchronized void run() {if(listener == null || channel == null){return;}if(key == null){return;}if(isCancel == true){return;}try{if(writePending == false){if(key.isReadable()){ // 可读状态//read pkg 读取APPServer推送的信息readReq();}else{// do nothing}}else{//has package// try send pkg and place hasPkg=false////register write ops if not enough buffer//if(key.isWritable()){writeRes(); // 将响应信息发给APPServer//}}}catch(Exception e){cancelKey(key);isCancel = true;}catch(Throwable t){cancelKey(key);isCancel = true;}key = null;}
第18行:读取APPServer消息 第28行:响应APPServer 下面重点分析readReq()和writeRes()方法
第二步、readReq()方法第10行:channel.read(buffer)这个方法负责从APPServer读取消息,类似我们从文件读取流一样,每次读取一定的数量,所以我们需要判断何时消息读取完毕 第13行:判断消息是否读取完毕,如果没有读取完毕则return,那么return到哪里呢?回到我们刚刚NIOPushListener的handleChannel()方法,如果消息未完毕,又会触发PushTask的run()方法,继续接受消息,一旦消息接受完毕,代码会走到第19行 第19行:processReq()方法的作用是将APPServer发过来的消息透传给终端(流程二),此方法代码虽然不多,但涉及到东西很多,下篇文章分析到终端连接DDPush的地方会详细讲解/** * 读取APPServer推送的信息 * @throws Exception */private void readReq() throws Exception{if(this.writePending == true){return;}if(channel.read(buffer) < 0){throw new Exception("end of stream");}if(this.calcWritePending() == false){ // 判断消息是否接收完毕return;}else{byte res = 0;try{ // 读取具体信息, 推送给终端processReq();}catch(Exception e){res = 1;}catch(Throwable t){res = -1;}// 响应信息信息(0表示成功)buffer.clear();buffer.limit(1);buffer.put(res);buffer.flip();// 注册写事件registerForWrite(key, true);}lastActive = System.currentTimeMillis();}
第三步、writeReq()方法好了,上面是关于APPServer向DDPush推送消息的源码分析,下篇文章会给大家带来DDPush与终端连接的源码分析,敬请期待!第6行:表示是否已经全部将响应消息发送给APPServer了 第7行:发送响应消息,这个和判断消息是否读取完毕类似,可能会触发多次,所以需要第6行的判断,也就是说PushTask的run()方法可能会被执行多次/** * 响应APPServer * @throws Exception */private void writeRes() throws Exception{if(buffer.hasRemaining()){channel.write(buffer);}else{buffer.clear();buffer.limit(Constant.PUSH_MSG_HEADER_LEN);this.writePending = false;registerForWrite(key, false);}lastActive = System.currentTimeMillis();}
更多相关文章
- Android(安卓)ZXing源码简化
- ios、android 系统字体说明
- [置顶] Android(安卓)WebKit消息处理
- Android(安卓)5.0(Lollipop)中的SurfaceTexture,TextureView, Sur
- Handler 消息传递机制介绍
- Android的Looper类使用的5个要点
- Android(安卓)照相机实现方式
- android (20)
- android 图片加载和缓存开源项目 Picasso