Android与Java AIO实现简单Echo服务器与客户端
16lz
2021-01-23
写完了NIO就来了AIO(NIO 2.0)吧~
Android需要sdk version 26及以上才支持了AIO。
AIO的优势是异步,全部的收发消息都是通过回调的方式来实现。
Server端,纯Java代码:
public class EchoServer { public static final int MAX_SIZE = 256; // max size 256 public static Charset mCharSet = Charset.forName("UTF-8"); //encode and decode charset public static void main(String args[]) { AsyncServerThread serverHandler = new AsyncServerThread(8999); new Thread(serverHandler).start(); }}
public class AsyncServerThread implements Runnable { private AsynchronousServerSocketChannel mServerChannel; private static CountDownLatch serverStatus = new CountDownLatch(1); public AsyncServerThread(int port) { try { mServerChannel = AsynchronousServerSocketChannel.open(); mServerChannel.bind(new InetSocketAddress(port)); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { mServerChannel.accept(this, new AcceptHandler()); try { // 在闭锁到达结束状态之前,这扇门一直是关闭着的,不允许任何线程通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。且当门打开了,就永远保持打开状态。 serverStatus.await(); } catch (InterruptedException e) { e.printStackTrace(); } } public AsynchronousServerSocketChannel getServerChannel() { return mServerChannel; }}
AcceptHandler负责处理客户端连接
public class AcceptHandler implements CompletionHandler { @Override public void completed(AsynchronousSocketChannel asynchronousSocketChannel, AsyncServerThread asyncServerThread) { AsynchronousServerSocketChannel serverChannel = asyncServerThread.getServerChannel(); serverChannel.accept(asyncServerThread, this); try { System.out.println("connected from: " + asynchronousSocketChannel.getRemoteAddress()); } catch (IOException e) { e.printStackTrace(); } //创建新的Buffer ByteBuffer buffer = ByteBuffer.allocate(EchoServer.MAX_SIZE); //异步读 第三个参数为接收消息回调的业务Handler asynchronousSocketChannel.read(buffer, buffer, new ReadHandler(asynchronousSocketChannel)); } @Override public void failed(Throwable throwable, AsyncServerThread asyncServerThread) { }}
ReadHandler负责处理收到的消息。
public class ReadHandler implements CompletionHandler { private AsynchronousSocketChannel mChannel; public ReadHandler(AsynchronousSocketChannel channel) { this.mChannel = channel; } @Override public void completed(Integer integer, ByteBuffer byteBuffer) { //flip操作 byteBuffer.flip(); //根据 byte[] message = new byte[byteBuffer.remaining()]; byteBuffer.get(message); String data = new String(message, EchoServer.mCharSet); System.out.println("received: " + data); String response = "response:" + data; //向客户端发送消息 doWrite(response); } @Override public void failed(Throwable throwable, ByteBuffer byteBuffer) { } //发送消息 private void doWrite(String result) { System.out.println("send back: " + result); byte[] bytes = result.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); //异步写数据 参数与前面的read一样 mChannel.write(writeBuffer, writeBuffer, new CompletionHandler() { @Override public void completed(Integer result, ByteBuffer buffer) { //如果没有发送完,就继续发送直到完成 if (buffer.hasRemaining()) { mChannel.write(buffer, buffer, this); } else { //创建新的Buffer ByteBuffer readBuffer = ByteBuffer.allocate(1024); //异步读 第三个参数为接收消息回调的业务Handler mChannel.read(readBuffer, readBuffer, new ReadHandler(mChannel)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { mChannel.close(); } catch (IOException e) { e.printStackTrace(); } } }); }}
到这里服务端就完成了。代码会比Java NIO看起来更好理解一些。都是基于回调的方式来处理。
Client,Android来实现:
public class MainActivity extends AppCompatActivity { private static final String TAG = MainActivity.class.getSimpleName(); public static final int MAX_SIZE = 256; // max size 256 public static Charset mCharSet = Charset.forName("UTF-8"); //encode and decode charset private EditText mIpEt; private EditText mPortEt; private Button mConnBtn; private TextView mScreenTv; private EditText mInputEt; private Button mSendBtn; private SocketThread mSocketThread; private static Handler mMainHandler; public static final int MSG_CONNECT = 0x001; public static final int MSG_RECEIVE = 0x002; public static final int MSG_SEND_ERROR = 0x003; public static final String DATA_RECEIVE = "data_receive"; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); mIpEt = findViewById(R.id.main_ip_et); mPortEt = findViewById(R.id.main_port_et); mConnBtn = findViewById(R.id.main_connect_btn); mScreenTv = findViewById(R.id.main_screen_tv); mInputEt = findViewById(R.id.main_input_et); mSendBtn = findViewById(R.id.main_send_btn); // defalut value. Change it to your own server ip mIpEt.setText("172.16.62.65"); mPortEt.setText("8999"); mConnBtn.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v) { String ip = mIpEt.getText().toString(); String port = mPortEt.getText().toString(); if (TextUtils.isEmpty(ip) || TextUtils.isEmpty(port)) { Toast.makeText(MainActivity.this, "ip or port is null", Toast.LENGTH_SHORT).show(); } else { connectToServer(ip, Integer.valueOf(port)); } } }); mSendBtn.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v) { String data = mInputEt.getText().toString(); if (!TextUtils.isEmpty(data)) { mSocketThread.sendMsgToServer(data); } } }); // TODO handler may cause memory leaks mMainHandler = new Handler() { @Override public void handleMessage(Message msg) { switch (msg.what) { case MSG_CONNECT: Toast.makeText(MainActivity.this, "Connect to Server Success", Toast.LENGTH_SHORT).show(); mConnBtn.setText("Connected"); mConnBtn.setEnabled(false); break; case MSG_RECEIVE: Bundle data = msg.getData(); String dataStr = data.getString(DATA_RECEIVE); Log.i(TAG, "received data:" + dataStr); CharSequence originData = mScreenTv.getText(); String result = originData + "\n" + dataStr; mScreenTv.setText(result); break; case MSG_SEND_ERROR: Toast.makeText(MainActivity.this, "Send Error, Connection may be Closed", Toast.LENGTH_SHORT).show(); break; } } }; } private void connectToServer(String ip, int port) { mSocketThread = new SocketThread(ip, port); mSocketThread.start(); } private static class SocketThread extends Thread { private String mIp; private int mPort; private AsynchronousSocketChannel mClientChannel; private CountDownLatch mLatch; public SocketThread(String ip, int port) { this.mIp = ip; this.mPort = port; try { mClientChannel = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { mLatch = new CountDownLatch(1); AsyncClientHandler asyncClientHandler = new AsyncClientHandler(mClientChannel, mMainHandler); mClientChannel.connect(new InetSocketAddress(mIp, mPort), asyncClientHandler, asyncClientHandler); try { mLatch.await(); } catch (InterruptedException e1) { e1.printStackTrace(); } try { mClientChannel.close(); } catch (IOException e) { e.printStackTrace(); } } public void sendMsgToServer(final String data) { new Thread(new Runnable() { @Override public void run() { byte[] bytes = data.getBytes(mCharSet); ByteBuffer dataBuffer = ByteBuffer.allocate(bytes.length); dataBuffer.put(bytes); dataBuffer.flip(); Log.i(TAG, "send data" + data); //输出到通道 mClientChannel.write(dataBuffer, dataBuffer, new CompletionHandler() { @Override public void completed(Integer result, ByteBuffer buffer) { //如果没有发送完,就继续发送直到完成 if (buffer.hasRemaining()) { mClientChannel.write(buffer, buffer, this); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { mMainHandler.sendEmptyMessage(MSG_SEND_ERROR); mClientChannel.close(); } catch (IOException e) { e.printStackTrace(); } } }); } }).start(); } }}
布局文件:
<?xml version="1.0" encoding="utf-8"?>
客户端的连接处理:
public class AsyncClientHandler implements CompletionHandler { private static final String TAG = AsyncClientHandler.class.getSimpleName(); private AsynchronousSocketChannel mClientChannel; private Handler mMainHandler; public AsyncClientHandler(AsynchronousSocketChannel clientChannel, Handler mainHandler) { mClientChannel = clientChannel; mMainHandler = mainHandler; } @Override public void completed(Void result, AsyncClientHandler attachment) { Log.i(TAG, "connect to server success!"); mMainHandler.sendEmptyMessage(MainActivity.MSG_CONNECT); ByteBuffer buffer = ByteBuffer.allocate(MainActivity.MAX_SIZE); mClientChannel.read(buffer, buffer, new ReadHandler(mClientChannel, mMainHandler)); } @Override public void failed(Throwable exc, AsyncClientHandler attachment) { Log.i(TAG, "connect to server failed!"); exc.printStackTrace(); try { mClientChannel.close(); } catch (IOException e) { e.printStackTrace(); } }}
收消息处理:
public class ReadHandler implements CompletionHandler { private static final String TAG = "ReadHandler"; private AsynchronousSocketChannel mClientChannel; private Handler mMainHandler; public ReadHandler(AsynchronousSocketChannel clientChannel, Handler mainHandler) { mMainHandler = mainHandler; mClientChannel = clientChannel; } @Override public void completed(Integer integer, ByteBuffer byteBuffer) { //flip操作 byteBuffer.flip(); //根据 byte[] data = new byte[byteBuffer.remaining()]; byteBuffer.get(data); String dataStr = new String(data, MainActivity.mCharSet); Log.i(TAG, "received: " + dataStr); Message message = mMainHandler.obtainMessage(); message.what = MainActivity.MSG_RECEIVE; Bundle bundle = new Bundle(); bundle.putString(MainActivity.DATA_RECEIVE, dataStr); message.setData(bundle); mMainHandler.sendMessage(message); ByteBuffer buffer = ByteBuffer.allocate(MainActivity.MAX_SIZE); mClientChannel.read(buffer, buffer, new ReadHandler(mClientChannel, mMainHandler)); } @Override public void failed(Throwable throwable, ByteBuffer byteBuffer) { }}
大功告成。
试验一下~
Server端打印的日志
更多相关文章
- 在Android上使用Http客户端的选择(译文)
- Android仿人人客户端(v5.7.1)——网络模块时序图
- Android消息循环机制
- Android 飞行模式的设置(打开/关闭飞行模式,获取飞行状态状态)
- android 新闻系统客户端,服务器端采用JSP Servlet 实现
- Android 页面自动跳转方法(比如进入app的广告,通过Timer计时器,通过
- android 如何在状态栏上增加一个icon