在 Android 上使用 RxNetty

Netty是由JBOSS提供的一个Java开源框架,是一个支持TCP/UDP/HTTP等网络协议的通信框架,和Mina类似,广泛应用于RPC框架。RxNetty则是支持RxJava的Netty开源框架,现在我们来看一下在Android上如何使用RxNetty。

添加RxNetty

在 Android Studio 中添加 RxNetty 的依赖:
在 Android 上使用 RxNetty_第1张图片
把RxNetty的tcp包加入到依赖,直接这样编译会有两个问题,第一个问题是jar重复:

com.android.build.api.transform.TransformException: com.android.builder.packaging.DuplicateFileException: Duplicate files copied in APK THIRD-PARTYFile1: C:\Users\XXX.gradle\caches\modules-2\files-2.1\org.openjdk.jmh\jmh-core\1.11.2\f4f8cd9874f5cdbc272b715a381c57e65f67ddf2\jmh-core-1.11.2.jarFile2: C:\Users\XXX.gradle\caches\modules-2\files-2.1\org.openjdk.jmh\jmh-generator-annprocess\1.11.2\72d854bf76ba5e59596d4c887a6de48e7003bee2\jmh-generator-annprocess-1.11.2.jar

解决办法:

dependencies {  ...  compile('io.reactivex:rxnetty-tcp:0.5.2-RC1') {    exclude group: 'org.openjdk.jmh'  }  ...}

另一个问题是引用的netty包中META-INF/下的部分文件重复。
解决办法:

  packagingOptions {    ...    exclude 'META-INF/INDEX.LIST'    exclude 'META-INF/BenchmarkList'    exclude 'META-INF/io.netty.versions.properties'    exclude 'META-INF/CompilerHints'    ...  }

到这里RxNetty就成功添加到项目模块中了。接下来看看到底如何使用RxNetty。

如何使用

拿TCP协议举例,用过Netty的都清楚创建连接的步骤:

        workerGroup = new NioEventLoopGroup();        Bootstrap boot = new Bootstrap();        boot.group(workerGroup)            .channel(NioSocketChannel.class)            .option(ChannelOption.TCP_NODELAY, true)            .handler(new ChannelInitializer() {              @Override public void initChannel(SocketChannel ch) throws Exception {                ChannelPipeline p = ch.pipeline();                p.addLast("decoder", new MessageDecoder());                p.addLast("encoder", new MessageEncoder());                p.addLast("handler", new MessageHandler());              }            });        ChannelFuture f =            boot.connect("localhost", 8888).syncUninterruptibly();        channel = f.channel();

自定义的协议需要我们自己实现编码解码Handler,还有最后处理数据的MessageHandler

@Sharablepublic class MessageHandler extends SimpleChannelInboundHandler<Message> {    @Override    public void messageReceived(ChannelHandlerContext ctx, Message msg)            throws Exception {            //处理消息    }}

在RxNetty中可以不实现MessageHandler,因为通过注册的观察者可以得到最终解码后的协议对象。
下面是RxNetty创建连接的方法:

  Connection mConnection;  public Observable connect(final String url, final int port) {    return Observable.create(new Observable.OnSubscribe() {      @Override public void call(final Subscriber<? super Boolean> subscriber) {        TcpClient.newClient(url, port).addChannelHandlerLast("decoder",            new Func0() {              @Override public ChannelHandler call() {                return new StringDecoder();              }            }).addChannelHandlerLast("encoder", new Func0() {          @Override public ChannelHandler call() {            return new StringEncoder();          }        }).createConnectionRequest().subscribe(new Observer>() {          @Override public void onCompleted() {            subscriber.onCompleted();          }          @Override public void onError(Throwable e) {            subscriber.onError(e);          }          @Override public void onNext(Connection connection) {            mConnection = connection;            subscriber.onNext(true);          }        });      }    });  }

上面的TCP客户端创建了一个字符串解码器、一个字符串编码器,然后创建链接,在链接创建成功后把链接对象connection保存到mConnection方便后面发送数据,同时通知订阅者socket连接成功。

在Android中不能在UI线程创建网络链接,就连InetSocketAddress类都不能在UI线程中创建,TcpClient.newClient(url, port)...createConnectionRequest()本身是一个Observable,但是由于方法newClient(url, port)中创建了InetSocketAddress类,Android严苛模式会报异常,所以上面创建链接的TcpClient方法在外层又包裹了一个Observable,让它运行在IO线程等其它非UI线程才可以正常创建socket链接。

用来接收数据、发送数据的方法同样返回一个Observable,代码如下:

  public Observable receive() {    if (mConnection != null) {      return mConnection.getInput();    }    return null;  }  public Observable send(String s) {    return mConnection.writeString(Observable.just(s));  }

测试上面方法的客户端代码:

  public void rxNettyClientTest() {    connect("localhost", 60000).subscribe(new Observer() {      @Override public void onCompleted() {      }      @Override public void onError(Throwable e) {        //reconnect        Observable.timer(1, TimeUnit.SECONDS).subscribe(new Action1() {          @Override public void call(Long aLong) {            if (mConnection != null) mConnection.closeNow();            rxNettyClientTest();          }        });        System.out.println("reconnect");      }      @Override public void onNext(Boolean aBoolean) {        //send data        send("hello world!").subscribe(new Action1() {          @Override public void call(Void aVoid) {            System.out.println("send success!");          }        });        //receive data        receive().subscribe(new Observer() {          @Override public void onCompleted() {          }          @Override public void onError(Throwable e) {            //reconnect            Observable.timer(1, TimeUnit.SECONDS).subscribe(new Action1() {              @Override public void call(Long aLong) {                if (mConnection != null) mConnection.closeNow();                rxNettyClientTest();              }            });            System.out.println("reconnect");          }          @Override public void onNext(String s) {            System.out.println("receive:" + s);          }        });      }    });  }

上面的代码包涵了读、写数据和重连等主要功能。
然后是创建服务端的代码:

  public void rxNettyServerTest() {    TcpServer server;    server = TcpServer.newServer(60000).addChannelHandlerLast("string-decoder",        new Func0() {          @Override public ChannelHandler call() {            return new StringDecoder();          }        }).addChannelHandlerLast("string-encoder", new Func0() {      @Override public ChannelHandler call() {        return new StringEncoder();      }    }).start(new ConnectionHandler() {      @Override public Observable handle(Connection newConnection) {        return newConnection.writeStringAndFlushOnEach(            newConnection.getInput().map(new Func1() {              @Override public String call(String s) {                System.out.println("receive:" + s);                return "echo=> " + s;              }            }));      }    });    server.awaitShutdown();  }

服务端代码比较简单,直接echo客户端发来的数据。
关于线程,在Android中处理网络需要subscribeOn(Schedulers.io()),如果需要在UI线程展示则observeOn(AndroidSchedulers.mainThread())

最后,在Android上使用RxNetty大多数是因为没有合适的socket客户端框架,RxNetty也支持Http协议,Android上的Http协议的可选框架比较多,所以就不在这里介绍了,想要了解的可以到这里RxNetty。

更多相关文章

  1. 如何在Android中用好多线程
  2. Instrumentation 框架简介
  3. android 动画框架
  4. android 多媒体框架
  5. Android 学习系列 - Java 多线程
  6. Android 判断当前线程是否是UI主线程
  7. Android 线程
  8. Android ORM框架 GreenDao 的使用详解

随机推荐

  1. 流程控制之for循环
  2. 必看!java后端,亮剑诛仙(最全知识点)
  3. 解析Java横死之谜,气定神闲看花开花落
  4. 给我1万字,也讲不清Java内存排查。1万不行
  5. 集合类型内置方法
  6. Python与用户交互
  7. 想把Java代码写的更漂亮么?了解一下Try吧
  8. 将java进程转移到“解剖台”之前,法医都干
  9. 字典类型
  10. 字典类型内置方法