Android与Java AIO实现简单Echo服务器与客户端
16lz
2022-06-10
写完了NIO就来了AIO(NIO 2.0)吧~
Android需要sdk version 26及以上才支持了AIO。
AIO的优势是异步,全部的收发消息都是通过回调的方式来实现。
Server端,纯Java代码:
public class EchoServer { public static final int MAX_SIZE = 256; // max size 256 public static Charset mCharSet = Charset.forName("UTF-8"); //encode and decode charset public static void main(String args[]) { AsyncServerThread serverHandler = new AsyncServerThread(8999); new Thread(serverHandler).start(); }}
public class AsyncServerThread implements Runnable { private AsynchronousServerSocketChannel mServerChannel; private static CountDownLatch serverStatus = new CountDownLatch(1); public AsyncServerThread(int port) { try { mServerChannel = AsynchronousServerSocketChannel.open(); mServerChannel.bind(new InetSocketAddress(port)); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { mServerChannel.accept(this, new AcceptHandler()); try { // 在闭锁到达结束状态之前,这扇门一直是关闭着的,不允许任何线程通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。且当门打开了,就永远保持打开状态。 serverStatus.await(); } catch (InterruptedException e) { e.printStackTrace(); } } public AsynchronousServerSocketChannel getServerChannel() { return mServerChannel; }}
AcceptHandler负责处理客户端连接
public class AcceptHandler implements CompletionHandler { @Override public void completed(AsynchronousSocketChannel asynchronousSocketChannel, AsyncServerThread asyncServerThread) { AsynchronousServerSocketChannel serverChannel = asyncServerThread.getServerChannel(); serverChannel.accept(asyncServerThread, this); try { System.out.println("connected from: " + asynchronousSocketChannel.getRemoteAddress()); } catch (IOException e) { e.printStackTrace(); } //创建新的Buffer ByteBuffer buffer = ByteBuffer.allocate(EchoServer.MAX_SIZE); //异步读 第三个参数为接收消息回调的业务Handler asynchronousSocketChannel.read(buffer, buffer, new ReadHandler(asynchronousSocketChannel)); } @Override public void failed(Throwable throwable, AsyncServerThread asyncServerThread) { }}
ReadHandler负责处理收到的消息。
public class ReadHandler implements CompletionHandler { private AsynchronousSocketChannel mChannel; public ReadHandler(AsynchronousSocketChannel channel) { this.mChannel = channel; } @Override public void completed(Integer integer, ByteBuffer byteBuffer) { //flip操作 byteBuffer.flip(); //根据 byte[] message = new byte[byteBuffer.remaining()]; byteBuffer.get(message); String data = new String(message, EchoServer.mCharSet); System.out.println("received: " + data); String response = "response:" + data; //向客户端发送消息 doWrite(response); } @Override public void failed(Throwable throwable, ByteBuffer byteBuffer) { } //发送消息 private void doWrite(String result) { System.out.println("send back: " + result); byte[] bytes = result.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); //异步写数据 参数与前面的read一样 mChannel.write(writeBuffer, writeBuffer, new CompletionHandler() { @Override public void completed(Integer result, ByteBuffer buffer) { //如果没有发送完,就继续发送直到完成 if (buffer.hasRemaining()) { mChannel.write(buffer, buffer, this); } else { //创建新的Buffer ByteBuffer readBuffer = ByteBuffer.allocate(1024); //异步读 第三个参数为接收消息回调的业务Handler mChannel.read(readBuffer, readBuffer, new ReadHandler(mChannel)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { mChannel.close(); } catch (IOException e) { e.printStackTrace(); } } }); }}
到这里服务端就完成了。代码会比Java NIO看起来更好理解一些。都是基于回调的方式来处理。
Client,Android来实现:
public class MainActivity extends AppCompatActivity { private static final String TAG = MainActivity.class.getSimpleName(); public static final int MAX_SIZE = 256; // max size 256 public static Charset mCharSet = Charset.forName("UTF-8"); //encode and decode charset private EditText mIpEt; private EditText mPortEt; private Button mConnBtn; private TextView mScreenTv; private EditText mInputEt; private Button mSendBtn; private SocketThread mSocketThread; private static Handler mMainHandler; public static final int MSG_CONNECT = 0x001; public static final int MSG_RECEIVE = 0x002; public static final int MSG_SEND_ERROR = 0x003; public static final String DATA_RECEIVE = "data_receive"; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); mIpEt = findViewById(R.id.main_ip_et); mPortEt = findViewById(R.id.main_port_et); mConnBtn = findViewById(R.id.main_connect_btn); mScreenTv = findViewById(R.id.main_screen_tv); mInputEt = findViewById(R.id.main_input_et); mSendBtn = findViewById(R.id.main_send_btn); // defalut value. Change it to your own server ip mIpEt.setText("172.16.62.65"); mPortEt.setText("8999"); mConnBtn.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v) { String ip = mIpEt.getText().toString(); String port = mPortEt.getText().toString(); if (TextUtils.isEmpty(ip) || TextUtils.isEmpty(port)) { Toast.makeText(MainActivity.this, "ip or port is null", Toast.LENGTH_SHORT).show(); } else { connectToServer(ip, Integer.valueOf(port)); } } }); mSendBtn.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v) { String data = mInputEt.getText().toString(); if (!TextUtils.isEmpty(data)) { mSocketThread.sendMsgToServer(data); } } }); // TODO handler may cause memory leaks mMainHandler = new Handler() { @Override public void handleMessage(Message msg) { switch (msg.what) { case MSG_CONNECT: Toast.makeText(MainActivity.this, "Connect to Server Success", Toast.LENGTH_SHORT).show(); mConnBtn.setText("Connected"); mConnBtn.setEnabled(false); break; case MSG_RECEIVE: Bundle data = msg.getData(); String dataStr = data.getString(DATA_RECEIVE); Log.i(TAG, "received data:" + dataStr); CharSequence originData = mScreenTv.getText(); String result = originData + "\n" + dataStr; mScreenTv.setText(result); break; case MSG_SEND_ERROR: Toast.makeText(MainActivity.this, "Send Error, Connection may be Closed", Toast.LENGTH_SHORT).show(); break; } } }; } private void connectToServer(String ip, int port) { mSocketThread = new SocketThread(ip, port); mSocketThread.start(); } private static class SocketThread extends Thread { private String mIp; private int mPort; private AsynchronousSocketChannel mClientChannel; private CountDownLatch mLatch; public SocketThread(String ip, int port) { this.mIp = ip; this.mPort = port; try { mClientChannel = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { mLatch = new CountDownLatch(1); AsyncClientHandler asyncClientHandler = new AsyncClientHandler(mClientChannel, mMainHandler); mClientChannel.connect(new InetSocketAddress(mIp, mPort), asyncClientHandler, asyncClientHandler); try { mLatch.await(); } catch (InterruptedException e1) { e1.printStackTrace(); } try { mClientChannel.close(); } catch (IOException e) { e.printStackTrace(); } } public void sendMsgToServer(final String data) { new Thread(new Runnable() { @Override public void run() { byte[] bytes = data.getBytes(mCharSet); ByteBuffer dataBuffer = ByteBuffer.allocate(bytes.length); dataBuffer.put(bytes); dataBuffer.flip(); Log.i(TAG, "send data" + data); //输出到通道 mClientChannel.write(dataBuffer, dataBuffer, new CompletionHandler() { @Override public void completed(Integer result, ByteBuffer buffer) { //如果没有发送完,就继续发送直到完成 if (buffer.hasRemaining()) { mClientChannel.write(buffer, buffer, this); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { mMainHandler.sendEmptyMessage(MSG_SEND_ERROR); mClientChannel.close(); } catch (IOException e) { e.printStackTrace(); } } }); } }).start(); } }}
布局文件:
<?xml version="1.0" encoding="utf-8"?>
客户端的连接处理:
public class AsyncClientHandler implements CompletionHandler { private static final String TAG = AsyncClientHandler.class.getSimpleName(); private AsynchronousSocketChannel mClientChannel; private Handler mMainHandler; public AsyncClientHandler(AsynchronousSocketChannel clientChannel, Handler mainHandler) { mClientChannel = clientChannel; mMainHandler = mainHandler; } @Override public void completed(Void result, AsyncClientHandler attachment) { Log.i(TAG, "connect to server success!"); mMainHandler.sendEmptyMessage(MainActivity.MSG_CONNECT); ByteBuffer buffer = ByteBuffer.allocate(MainActivity.MAX_SIZE); mClientChannel.read(buffer, buffer, new ReadHandler(mClientChannel, mMainHandler)); } @Override public void failed(Throwable exc, AsyncClientHandler attachment) { Log.i(TAG, "connect to server failed!"); exc.printStackTrace(); try { mClientChannel.close(); } catch (IOException e) { e.printStackTrace(); } }}
收消息处理:
public class ReadHandler implements CompletionHandler { private static final String TAG = "ReadHandler"; private AsynchronousSocketChannel mClientChannel; private Handler mMainHandler; public ReadHandler(AsynchronousSocketChannel clientChannel, Handler mainHandler) { mMainHandler = mainHandler; mClientChannel = clientChannel; } @Override public void completed(Integer integer, ByteBuffer byteBuffer) { //flip操作 byteBuffer.flip(); //根据 byte[] data = new byte[byteBuffer.remaining()]; byteBuffer.get(data); String dataStr = new String(data, MainActivity.mCharSet); Log.i(TAG, "received: " + dataStr); Message message = mMainHandler.obtainMessage(); message.what = MainActivity.MSG_RECEIVE; Bundle bundle = new Bundle(); bundle.putString(MainActivity.DATA_RECEIVE, dataStr); message.setData(bundle); mMainHandler.sendMessage(message); ByteBuffer buffer = ByteBuffer.allocate(MainActivity.MAX_SIZE); mClientChannel.read(buffer, buffer, new ReadHandler(mClientChannel, mMainHandler)); } @Override public void failed(Throwable throwable, ByteBuffer byteBuffer) { }}
大功告成。
试验一下~
Server端打印的日志
更多相关文章
- Only the original thread that created a view hierarchy can t
- Android事件分发机制
- Android(安卓)Lint gradle错误处理
- Android——使用 Broastcast 实现进程间通讯
- Android(安卓)按键处理(驱动层到上层)架构
- Android消息处理机制
- Android(安卓)BroadcastReceiver介绍
- A20红外遥控器与Android功能的适配
- Android的Btimap处理大图片解决方法