在 Android 上使用 RxNetty
在 Android 上使用 RxNetty
Netty是由JBOSS提供的一个Java开源框架,是一个支持TCP/UDP/HTTP等网络协议的通信框架,和Mina类似,广泛应用于RPC框架。RxNetty则是支持RxJava的Netty开源框架,现在我们来看一下在Android上如何使用RxNetty。
添加RxNetty
在 Android Studio 中添加 RxNetty 的依赖:
把RxNetty的tcp包加入到依赖,直接这样编译会有两个问题,第一个问题是jar重复:
com.android.build.api.transform.TransformException: com.android.builder.packaging.DuplicateFileException: Duplicate files copied in APK THIRD-PARTYFile1: C:\Users\XXX.gradle\caches\modules-2\files-2.1\org.openjdk.jmh\jmh-core\1.11.2\f4f8cd9874f5cdbc272b715a381c57e65f67ddf2\jmh-core-1.11.2.jarFile2: C:\Users\XXX.gradle\caches\modules-2\files-2.1\org.openjdk.jmh\jmh-generator-annprocess\1.11.2\72d854bf76ba5e59596d4c887a6de48e7003bee2\jmh-generator-annprocess-1.11.2.jar
解决办法:
dependencies { ... compile('io.reactivex:rxnetty-tcp:0.5.2-RC1') { exclude group: 'org.openjdk.jmh' } ...}
另一个问题是引用的netty包中META-INF/下的部分文件重复。
解决办法:
packagingOptions { ... exclude 'META-INF/INDEX.LIST' exclude 'META-INF/BenchmarkList' exclude 'META-INF/io.netty.versions.properties' exclude 'META-INF/CompilerHints' ... }
到这里RxNetty就成功添加到项目模块中了。接下来看看到底如何使用RxNetty。
如何使用
拿TCP协议举例,用过Netty的都清楚创建连接的步骤:
workerGroup = new NioEventLoopGroup(); Bootstrap boot = new Bootstrap(); boot.group(workerGroup) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast("decoder", new MessageDecoder()); p.addLast("encoder", new MessageEncoder()); p.addLast("handler", new MessageHandler()); } }); ChannelFuture f = boot.connect("localhost", 8888).syncUninterruptibly(); channel = f.channel();
自定义的协议需要我们自己实现编码解码Handler,还有最后处理数据的MessageHandler
:
@Sharablepublic class MessageHandler extends SimpleChannelInboundHandler<Message> { @Override public void messageReceived(ChannelHandlerContext ctx, Message msg) throws Exception { //处理消息 }}
在RxNetty中可以不实现MessageHandler
,因为通过注册的观察者可以得到最终解码后的协议对象。
下面是RxNetty创建连接的方法:
Connection mConnection; public Observable connect(final String url, final int port) { return Observable.create(new Observable.OnSubscribe() { @Override public void call(final Subscriber<? super Boolean> subscriber) { TcpClient.newClient(url, port).addChannelHandlerLast("decoder", new Func0() { @Override public ChannelHandler call() { return new StringDecoder(); } }).addChannelHandlerLast("encoder", new Func0() { @Override public ChannelHandler call() { return new StringEncoder(); } }).createConnectionRequest().subscribe(new Observer>() { @Override public void onCompleted() { subscriber.onCompleted(); } @Override public void onError(Throwable e) { subscriber.onError(e); } @Override public void onNext(Connection connection) { mConnection = connection; subscriber.onNext(true); } }); } }); }
上面的TCP客户端创建了一个字符串解码器、一个字符串编码器,然后创建链接,在链接创建成功后把链接对象connection
保存到mConnection
方便后面发送数据,同时通知订阅者socket连接成功。
在Android中不能在UI线程创建网络链接,就连
InetSocketAddress
类都不能在UI线程中创建,TcpClient.newClient(url, port)...createConnectionRequest()
本身是一个Observable
,但是由于方法newClient(url, port)
中创建了InetSocketAddress
类,Android严苛模式会报异常,所以上面创建链接的TcpClient方法在外层又包裹了一个Observable
,让它运行在IO线程等其它非UI线程才可以正常创建socket链接。
用来接收数据、发送数据的方法同样返回一个Observable,代码如下:
public Observable receive() { if (mConnection != null) { return mConnection.getInput(); } return null; } public Observable send(String s) { return mConnection.writeString(Observable.just(s)); }
测试上面方法的客户端代码:
public void rxNettyClientTest() { connect("localhost", 60000).subscribe(new Observer() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { //reconnect Observable.timer(1, TimeUnit.SECONDS).subscribe(new Action1() { @Override public void call(Long aLong) { if (mConnection != null) mConnection.closeNow(); rxNettyClientTest(); } }); System.out.println("reconnect"); } @Override public void onNext(Boolean aBoolean) { //send data send("hello world!").subscribe(new Action1() { @Override public void call(Void aVoid) { System.out.println("send success!"); } }); //receive data receive().subscribe(new Observer() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { //reconnect Observable.timer(1, TimeUnit.SECONDS).subscribe(new Action1() { @Override public void call(Long aLong) { if (mConnection != null) mConnection.closeNow(); rxNettyClientTest(); } }); System.out.println("reconnect"); } @Override public void onNext(String s) { System.out.println("receive:" + s); } }); } }); }
上面的代码包涵了读、写数据和重连等主要功能。
然后是创建服务端的代码:
public void rxNettyServerTest() { TcpServer server; server = TcpServer.newServer(60000).addChannelHandlerLast("string-decoder", new Func0() { @Override public ChannelHandler call() { return new StringDecoder(); } }).addChannelHandlerLast("string-encoder", new Func0() { @Override public ChannelHandler call() { return new StringEncoder(); } }).start(new ConnectionHandler() { @Override public Observable handle(Connection newConnection) { return newConnection.writeStringAndFlushOnEach( newConnection.getInput().map(new Func1() { @Override public String call(String s) { System.out.println("receive:" + s); return "echo=> " + s; } })); } }); server.awaitShutdown(); }
服务端代码比较简单,直接echo客户端发来的数据。
关于线程,在Android中处理网络需要subscribeOn(Schedulers.io())
,如果需要在UI线程展示则observeOn(AndroidSchedulers.mainThread())
。
最后,在Android上使用RxNetty大多数是因为没有合适的socket客户端框架,RxNetty也支持Http协议,Android上的Http协议的可选框架比较多,所以就不在这里介绍了,想要了解的可以到这里RxNetty。
更多相关文章
- 如何在Android中用好多线程
- Instrumentation 框架简介
- android 动画框架
- android 多媒体框架
- Android 学习系列 - Java 多线程
- Android 判断当前线程是否是UI主线程
- Android 线程
- Android ORM框架 GreenDao 的使用详解