Android串口丢包重发的重传协议,一种流式重传协议
前言
最近在做一款产品,主要功能是android 接收到音频数据通过串口实时发送MCU处理,要求延迟小,速度快,传输稳定性有保障。我们都知道串口是一种不可信任性传输。不能保障传输数据完整性,在产品测试过程中也发现低概率性出现数据丢包现象。所以需要设计一种支持流式可靠性传输协议。产品需要Android串口跟MCU串口通信传输音频流,每次传输大概1M左右,MCU接收到音频后需要立马处理,所以需要尽可能快接收数据处理,不能接收每一帧会回传校验结果。只有等到有错误帧才返回给Android 请求重发错误帧,如果一次性传输1M,体积过大,时间过程长,如果数据有误需要重传1M数据,延时性太大。所以拆包每次发送10K数据发送。所以设计了一种流式传输且支持重传的串口协议,支持传输数据大小是1B ~ 1G , 传输数据帧大小10K,小于10K按照实际大小发送,大于10K则拆包发送。比如100K分成10包发送。正常发送不必回传结果,只有错误才回传错误帧的index,比如发送到低5包出错,则回传给Android 第5帧错误。Android重新从5包发送数据。
协议大概流程图如图所示。
既然是数据传输必然有传输格式协议,使用简单的TLV协议即可。
传输协议简介如下:
包结构
Header:包头
Payload:有效载荷,Module_id+Msg_id+Data
Checksum:校验和,校验数据为Header + Payload
StartFlag:固定值0xFFAA5500,每一个数据包固定的开始序列
Handle:未使用,默认0
Version:0x01 协议版本,第一个版本
Length: payload区域的长度+ Checksum字段
Opcode:
Opcode 方法定义 描述
0x01 request Request需要和response配合使用,每个request都必须有一个response的应答。Request可以由主 设备(主机)发起,也可以由从设备(翻译机、副机)发起。
0x02 response 回复对应的request的response
0x03 indication 主设备推送消息给从设备
0x04 notification 从设备推送消息给主设备
下面就是代码讲解:
协议对我提供两个接口 send() 和 flush 两个接口。和一个回调接口OnSendCallback。
已一个传输文件为例
send 将文件保存到缓存中,此时并没有发送。//发送缓存大小可修改,当前定义4M
flush 将缓存文件拆包通过串口发送 ,是真正的发送文件
先放demo 的文件
package com.dfxh.wang.serialport_test;import android.Manifest;import android.content.BroadcastReceiver;import android.content.Context;import android.content.Intent;import android.content.IntentFilter;import android.content.pm.PackageManager;import android.media.MediaPlayer;import android.os.BatteryManager;import android.os.Build;import android.os.Bundle;import android.os.Environment;import android.os.Handler;import android.os.Message;import android.util.Log;import android.view.KeyEvent;import android.view.View;import android.widget.Button;import android.widget.EditText;import android.widget.Spinner;import android.widget.TextView;import android.widget.Toast;import androidx.annotation.RequiresApi;import androidx.appcompat.app.AppCompatActivity;import org.json.JSONException;import org.json.JSONObject;import java.io.BufferedReader;import java.io.File;import java.io.FileInputStream;import java.io.FileReader;import java.math.BigInteger;import java.nio.ByteBuffer;import java.security.MessageDigest;import java.util.Date;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;import static android.text.InputType.TYPE_CLASS_NUMBER;public class MainActivity extends AppCompatActivity implements SerialPortUtils.OnSendCallback,View.OnClickListener { // Used to load the 'native-lib' library on application startup. static { System.loadLibrary("native-lib"); } private final String TAG = "MainActivity"; private int looptime = 100; private Context mContext; private boolean flag = true; private String mFileName = "/sdcard/receive.bin"; private String mSendFileName = "/sdcard/send.bin"; private String mSendFileName1 = "/sdcard/test.txt"; String path ="/storage/emulated/0/testFolder/testFile"; private CustomServiceDataHelper mDtaHelper = new CustomServiceDataHelper(); public static SerialPortUtils serialPortUtils; private static Lock lock = new ReentrantLock(); private long receiveFileLength = 0; private ExecutorService singleThreadExecutor; private int send_index = 1; private int sendtime = 0; BatteryManager batteryManager; private boolean charging = false; private boolean no_enough_space_flag = false; private Handler mHandler = new Handler() { @Override public void handleMessage(Message msg) { switch (msg.what) { case 2://通知发送方错误帧id // serialPortUtils.sendErorPackIndex(); break; case 3://通知发送方错误帧id serialPortUtils.sendErorPackIndex(); break; case 7: //重新发送错误数据包 short index = (short) msg.arg1; ReSendErrorData(index); Log.d(TAG, "mHandler resend data index= " + index); break; case 8: Log.d("SendData", "mHandler send new data ====="); if ((sendtime <1000)&&(!no_enough_space_flag)){ Log.d(TAG, "send file time : " + sendtime); // startSendBinFile1(sendtime); startSendBinFile(); sendtime++; }else { Log.d(TAG, "no_enough_space_flag: " + no_enough_space_flag); } break; case 11: led_red_on(); break; case 12: led_green_on(); break; } } }; @RequiresApi(api = Build.VERSION_CODES.LOLLIPOP) @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); mContext = this; setContentView(R.layout.activity_main); Log.e(TAG,"MainActivity onCreate ===================="); TextView tv = findViewById(R.id.lv); serialPortUtils = new SerialPortUtils(mHandler,this); serialPortUtils.setSendCallback(this); singleThreadExecutor = Executors.newSingleThreadExecutor(); Button open_bt = (Button) findViewById(R.id.open_bt); open_bt.setOnClickListener(this); Button send_bt = (Button) findViewById(R.id.send_bt); send_bt.setOnClickListener(this); Button close_bt = (Button) findViewById(R.id.close); close_bt.setOnClickListener(this); IntentFilter mIntentFilter = new IntentFilter(); mIntentFilter.addAction(Intent.ACTION_BATTERY_CHANGED); mIntentFilter.addAction(Intent.ACTION_BATTERY_LOW); mIntentFilter.addAction(Intent.ACTION_BATTERY_OKAY); mIntentFilter.addAction(Intent.ACTION_POWER_CONNECTED); mIntentFilter.addAction(Intent.ACTION_POWER_DISCONNECTED); registerReceiver(mIntentReceiver, mIntentFilter); batteryManager = (BatteryManager)getSystemService(BATTERY_SERVICE); //new StatuThread().start(); if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.M) { // Android M Permission check if (this.checkSelfPermission(Manifest.permission.READ_PHONE_STATE) != PackageManager.PERMISSION_GRANTED) { requestPermissions(new String[]{Manifest.permission.READ_PHONE_STATE}, 1); } } } public void unregister() { if (mIntentReceiver != null) { this.unregisterReceiver(mIntentReceiver); } } private BroadcastReceiver mIntentReceiver = new BroadcastReceiver() { @Override public void onReceive(Context context, Intent intent) { if (intent != null) { int status = 0; String action = intent.getAction(); int level = intent.getIntExtra(BatteryManager.EXTRA_LEVEL, -1); int scale = intent.getIntExtra(BatteryManager.EXTRA_SCALE, -1); float batteryPct = level / (float)scale; Log.e(TAG, "level --> " + level); switch (action) { case Intent.ACTION_BATTERY_CHANGED://电量发生改变 Log.e(TAG, "BatteryBroadcastReceiver --> onReceive--> ACTION_BATTERY_CHANGED"); //Toast.makeText(mContext,"电量变化",Toast.LENGTH_SHORT).show(); if(charging){ if(level < 95){ status = 11; }else{ status = 12; } }else { if(0 < level && level < 10){ status = 11; }else{ status = 12; } } Toast.makeText(mContext,"充电状态 " + charging + "值: " + level,Toast.LENGTH_SHORT).show(); break; case Intent.ACTION_BATTERY_LOW://电量低 Log.e(TAG, "BatteryBroadcastReceiver --> onReceive--> ACTION_BATTERY_LOW"); status = 9; break; case Intent.ACTION_BATTERY_OKAY://电量充满 Log.e(TAG, "BatteryBroadcastReceiver --> onReceive--> ACTION_BATTERY_OKAY"); status = 12; break; case Intent.ACTION_POWER_CONNECTED://接通电源 Log.e(TAG, "BatteryBroadcastReceiver --> onReceive--> ACTION_POWER_CONNECTED"); if(level < 95){ status = 11; }else{ status = 12; } charging = true; Toast.makeText(mContext,"充电中,电量是" + level,Toast.LENGTH_SHORT).show(); break; case Intent.ACTION_POWER_DISCONNECTED://拔出电源 Log.e(TAG, "BatteryBroadcastReceiver --> onReceive--> ACTION_POWER_DISCONNECTED"); charging = false; if(0 < level && level < 10){ status = 11; }else{ status = 12; } // Toast.makeText(mContext,"未充电" + level,Toast.LENGTH_SHORT).show(); break; } Message msg = new Message(); msg.what = status; mHandler.sendMessage(msg); } } }; @Override protected void onResume() { super.onResume(); serialPortUtils.openSerialPort(); } private void readBinfile(){ File file = new File(mSendFileName); FileInputStream fileInputStream = null; try { byte[] buf = new byte[1024*512]; int length = -1; fileInputStream = new FileInputStream(file); while(((length = fileInputStream.read(buf)) != -1)&&(no_enough_space_flag!=true)){ if(serialPortUtils.send(buf,length)==false){ no_enough_space_flag = true; break; } // PrintfInfo(buf,length,"buf"); Log.d("SendData", "send_index= " + send_index); Log.d("SendData", toHexString(buf,length)); send_index++; serialPortUtils.flush(); } fileInputStream.close(); // serialPortUtils.sendFileEndFlag(); send_index =0; Log.d("SendData", "发送完毕====="); Thread.sleep(3000); }catch (Exception e){ e.printStackTrace(); } } @Override public void OnResult(int ret) { } @Override public void Onread(CommonResponseData helper) { Log.d(TAG, "Onread Opcode=====" + helper.getOpcode()); Log.d(TAG, "Onread ModuleId=====" + helper.getModuleId()); Log.d(TAG, "Onread MsgId=====" + helper.getMsgId()); if(helper.getData() != null){ Log.d(TAG, "Onread=====" + toHexString(helper.getData(),helper.getData().length)); } } class ReadFileThread implements Runnable{ int num; public ReadFileThread(int num) { super(); this.num = num; } @Override public void run() { System.out.println(Thread.currentThread().getName()+":"+num); try { readBinfile(); }catch (Exception e){ e.printStackTrace(); } Message msg = new Message(); msg.what = 8; Log.d("SendData", "ReadFileThread 发送= " + sendtime); mHandler.sendMessage(msg); } } private void startSendData(){ Runnable runnable = new Runnable() { @Override public void run() { String str = "1111111111"; byte[] bytes = str.getBytes(); // serialPortUtils.sendData(9,0,2,3); serialPortUtils.sendData(9,bytes.length,2,3,bytes); } }; singleThreadExecutor.execute(runnable); } private void startSendFile(){ send_index =0; Runnable runnable = new Runnable() { @Override public void run() { File file = new File(mSendFileName1); FileInputStream fileInputStream = null; try { byte[] buf = new byte[512]; int length = -1; String line = null; FileReader fr = new FileReader(mSendFileName1); BufferedReader bufr = new BufferedReader(fr); while((line = bufr.readLine()) != null){ Log.d(TAG, "line= " + line); byte[] tt = line.getBytes(); Log.d(TAG, toHexString(tt,tt.length)); serialPortUtils.send(tt,tt.length); // PrintfInfo(buf,length,"buf"); // Log.d("SendData", "send_index= " + send_index); //Log.d("SendData", toHexString(buf,length)); //send_index++; } serialPortUtils.flush(); bufr.close(); fr.close(); Thread.sleep(1); }catch (Exception e){ e.printStackTrace(); } } }; singleThreadExecutor.execute(runnable); } private void startSendBinFile(){ send_index =0; Runnable runnable = new Runnable() { @Override public void run() { File file = new File(mSendFileName); FileInputStream fileInputStream = null; try { byte[] buf = new byte[1024*512]; // byte[] buf = new byte[512]; int length = -1; fileInputStream = new FileInputStream(file); Log.d("SendData", "no_enough_space_flag + " + no_enough_space_flag); while(((length = fileInputStream.read(buf)) != -1)&&(no_enough_space_flag!=true)){ // if(((length = fileInputStream.read(buf)) != -1)&&(no_enough_space_flag!=true)){ if(serialPortUtils.send(buf,length)==false){ no_enough_space_flag = true; // return; break; } //PrintfInfo(buf,length,"bufggg"); Log.d("SendData", "send_index= " + send_index); // Log.d("SendData", toHexString(buf,length)); send_index++; serialPortUtils.flush(); } fileInputStream.close(); if(serialPortUtils.send_err_index == -1){ Log.d("SendData", "============================"); serialPortUtils.sendFileEndFlag(); } send_index =0; Log.d("SendData", "发送完毕====="); Thread.sleep(500); Message msg = new Message(); msg.what = 8; if((no_enough_space_flag!=true)&&(serialPortUtils.send_err_index == -1)){ Log.d("SendData", "startSendBinFile send new frame====="); // mHandler.sendMessage(msg); } }catch (Exception e){ e.printStackTrace(); } } }; singleThreadExecutor.execute(runnable); } public String toHexString(byte[] byteArray,int len) { final StringBuilder hexString = new StringBuilder(""); if (byteArray == null || len <= 0) return null; for (int i = 0; i < len; i++) { int v = byteArray[i] & 0xFF; String hv = Integer.toHexString(v); if (hv.length() < 2) { hexString.append(0); } hexString.append(hv); } return hexString.toString().toLowerCase(); } private class StatuThread extends Thread { @RequiresApi(api = Build.VERSION_CODES.LOLLIPOP) @Override public void run() { super.run(); while (flag){ int battery = batteryManager.getIntProperty(BatteryManager.BATTERY_PROPERTY_CAPACITY); Log.d(TAG, "battery ============================ " + battery); if(!charging){ Log.d(TAG, "send battery: " + battery); int status = 0; if(battery > 10 ){ status = 12; }else{ status = 11; } Message msg = new Message(); msg.what = status; mHandler.sendMessage(msg); } try { Thread.sleep(10000); }catch (Exception e){ e.printStackTrace(); } } } } private void ReSendErrorData(short index){ final short i = index; Runnable runnable = new Runnable() { @Override public void run() { try { Log.d(TAG, "ReSendErrorData index===== " + i); serialPortUtils.reSendPackect(i); serialPortUtils.sendFileEndFlag(); Thread.sleep(500); Message msg = new Message(); msg.what = 8; Log.d("PPTV", "ReSendErrorData reSend_Err_Frame_Index===== " + serialPortUtils.send_err_index); if(serialPortUtils.send_err_index == -1){ Log.d("SendData", "ReSendErrorData send new frame====="); // mHandler.sendMessage(msg); } }catch (Exception e){ e.printStackTrace(); } } }; singleThreadExecutor.execute(runnable); } public static long convertBytesToLong(byte[] b) { ByteBuffer buf = ByteBuffer.wrap(b); return buf.getLong(); } private void stopsend(){ no_enough_space_flag = true; serialPortUtils.clearstatus(); } boolean check_md5(){ File receive = new File(mFileName); if(!receive.exists()){ Log.d(TAG,"file not exist"); return true; } File send = new File("/sdcard/send.bin"); if(!getFileMD5(receive).equals(getFileMD5(send))){ Log.d("check_md5","false"); //start_play_thread(this); return true; } Log.d("check_md5","success"); return true; } public static String getFileMD5(File file) { if (!file.isFile()) { return null; } MessageDigest digest = null; FileInputStream in = null; byte buffer[] = new byte[1024]; int len; try { digest = MessageDigest.getInstance("MD5"); in = new FileInputStream(file); while ((len = in.read(buffer, 0, 1024)) != -1) { digest.update(buffer, 0, len); } in.close(); } catch (Exception e) { e.printStackTrace(); return null; } BigInteger bigInt = new BigInteger(1, digest.digest()); return bigInt.toString(16); } public void led_off(){ JSONObject send_json = new JSONObject(); try { send_json.put("Power_Led_Red","off"); send_json.put("Power_Led_Green","off"); send_json.put("Translate_Led_Green","off"); send_json.put("Bt_Led_Blue","off"); byte[] data = send_json.toString().getBytes(); serialPortUtils.sendData(1,data.length,17,1,data); // byte[] send = mDtaHelper.getSendData(1,data.length,17,1,data); } catch (JSONException e) { e.printStackTrace(); } } public void led_red_on(){ JSONObject send_json = new JSONObject(); try { send_json.put("Power_Led_Green","off"); send_json.put("Power_Led_Red","on"); byte[] data = send_json.toString().getBytes(); serialPortUtils.sendData(1,data.length,17,1,data); }catch (JSONException e) { e.printStackTrace(); } } public void led_red_off(){ JSONObject send_json = new JSONObject(); try { send_json.put("Power_Led_Red","off"); byte[] data = send_json.toString().getBytes(); serialPortUtils.sendData(1,data.length,17,1,data); }catch (JSONException e) { e.printStackTrace(); } } public void led_green_on(){ JSONObject send_json = new JSONObject(); try { send_json.put("Power_Led_Red","off"); send_json.put("Power_Led_Green","on"); byte[] data = send_json.toString().getBytes(); serialPortUtils.sendData(1,data.length,17,1,data); }catch (JSONException e) { e.printStackTrace(); } } public void led_green_off(){ JSONObject send_json = new JSONObject(); try { send_json.put("Power_Led_Green","off"); byte[] data = send_json.toString().getBytes(); serialPortUtils.sendData(1,data.length,17,1,data); }catch (JSONException e) { e.printStackTrace(); } } public void led_on(){ JSONObject send_json = new JSONObject(); try { send_json.put("Power_Led_Red","on"); send_json.put("Power_Led_Green","on"); send_json.put("Translate_Led_Green","on"); send_json.put("Bt_Led_Blue","on"); byte[] data = send_json.toString().getBytes(); // byte[] send = mDtaHelper.getSendData(1,data.length,17,1,data); serialPortUtils.sendData(1,data.length,17,1,data); } catch (JSONException e) { e.printStackTrace(); } } @Override public void onClick(View view) { switch (view.getId()) { case R.id.open_bt: no_enough_space_flag = false; // startSendBinFile(); // startSendData(); //startSendFile(); // led_on(); led_red_on(); break; case R.id.send_bt: // stopsend(); led_green_on(); break; case R.id.close: // stopsend(); // delete("/sdcard",false,".bin"); led_off(); break; default: } } public void delete(String dirPath, boolean isPrefix, String regEx) { singleThreadExecutor.execute(new DeleteRunnable(dirPath, isPrefix, regEx)); } public void PrintfInfo(byte[] data,int len,String str) { System.out.format( str + " data is ====\n"); for (int i = 0; i < len; i++) { System.out.format("%02x", data[i]); } System.out.format("\n"); System.out.format(str +" end ============\n"); } @Override protected void onDestroy() { super.onDestroy(); serialPortUtils.closeSerialPort(); }}
send 函数实现如下,主要功能就是保存数据到mOutputBuffer 缓存
public synchronized boolean send(byte [] data,int len){ streamMode = true; if(mOutputBuffer.remaining() > len){ mOutputBuffer.put(data,0,len); Log.e(TAG,"position after send msg : " + mOutputBuffer.position()); return true; }else{ Log.e(TAG," not enough space,stop"); return false; } }
而mOutputBuffer 实现是在串口初始化中
public class MainActivity extends AppCompatActivity implements SerialPortUtils.OnSendCallback,View.OnClickListener { // Used to load the 'native-lib' library on application startup. static { System.loadLibrary("native-lib"); } private final String TAG = "MainActivity"; private int looptime = 100; private Context mContext; private boolean flag = true; private String mFileName = "/sdcard/receive.bin"; private String mSendFileName = "/sdcard/send.bin"; private String mSendFileName1 = "/sdcard/test.txt"; String path ="/storage/emulated/0/testFolder/testFile"; private CustomServiceDataHelper mDtaHelper = new CustomServiceDataHelper(); public static SerialPortUtils serialPortUtils; private static Lock lock = new ReentrantLock(); private long receiveFileLength = 0; private ExecutorService singleThreadExecutor; private int send_index = 1; private int sendtime = 0; BatteryManager batteryManager; private boolean charging = false; private boolean no_enough_space_flag = false; private Handler mHandler = new Handler() { @Override public void handleMessage(Message msg) { switch (msg.what) { case 2://通知发送方错误帧id // serialPortUtils.sendErorPackIndex(); break; case 3://通知发送方错误帧id serialPortUtils.sendErorPackIndex(); break; case 7: //重新发送错误数据包 short index = (short) msg.arg1; ReSendErrorData(index); Log.d(TAG, "mHandler resend data index= " + index); break; case 8: Log.d("SendData", "mHandler send new data ====="); if ((sendtime <1000)&&(!no_enough_space_flag)){ Log.d(TAG, "send file time : " + sendtime); // startSendBinFile1(sendtime); startSendBinFile(); sendtime++; }else { Log.d(TAG, "no_enough_space_flag: " + no_enough_space_flag); } break; } } }; @RequiresApi(api = Build.VERSION_CODES.LOLLIPOP) @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); mContext = this; setContentView(R.layout.activity_main); Log.e(TAG,"MainActivity onCreate ===================="); TextView tv = findViewById(R.id.lv); serialPortUtils = new SerialPortUtils(mHandler,this); serialPortUtils.setSendCallback(this); singleThreadExecutor = Executors.newSingleThreadExecutor(); Button open_bt = (Button) findViewById(R.id.open_bt); open_bt.setOnClickListener(this); Button send_bt = (Button) findViewById(R.id.send_bt); send_bt.setOnClickListener(this); Button close_bt = (Button) findViewById(R.id.close); close_bt.setOnClickListener(this); IntentFilter mIntentFilter = new IntentFilter(); if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.M) { // Android M Permission check if (this.checkSelfPermission(Manifest.permission.READ_PHONE_STATE) != PackageManager.PERMISSION_GRANTED) { requestPermissions(new String[]{Manifest.permission.READ_PHONE_STATE}, 1); } } } @Override protected void onResume() { super.onResume(); serialPortUtils.openSerialPort(); } @Override public void OnResult(int ret) { } @Override public void Onread(CommonResponseData helper) { Log.d(TAG, "Onread Opcode=====" + helper.getOpcode()); Log.d(TAG, "Onread ModuleId=====" + helper.getModuleId()); Log.d(TAG, "Onread MsgId=====" + helper.getMsgId()); if(helper.getData() != null){ Log.d(TAG, "Onread=====" + toHexString(helper.getData(),helper.getData().length)); } } private void startSendBinFile(){ send_index =0; Runnable runnable = new Runnable() { @Override public void run() { File file = new File(mSendFileName); FileInputStream fileInputStream = null; try { byte[] buf = new byte[1024*512]; int length = -1; fileInputStream = new FileInputStream(file); while(((length = fileInputStream.read(buf)) != -1)&&(no_enough_space_flag!=true)) if(serialPortUtils.send(buf,length)==false){ no_enough_space_flag = true; break; } //PrintfInfo(buf,length,"bufggg"); Log.d("SendData", "send_index= " + send_index); // Log.d("SendData", toHexString(buf,length)); send_index++; serialPortUtils.flush(); } fileInputStream.close(); if(serialPortUtils.send_err_index == -1){ Log.d("SendData", "============================"); serialPortUtils.sendFileEndFlag(); } send_index =0; Log.d("SendData", "发送完毕====="); Thread.sleep(500); Message msg = new Message(); msg.what = 8; if((no_enough_space_flag!=true)&&(serialPortUtils.send_err_index == -1)){ Log.d("SendData", "startSendBinFile send new frame====="); // mHandler.sendMessage(msg); } }catch (Exception e){ e.printStackTrace(); } } }; singleThreadExecutor.execute(runnable); } protected void onResume() { super.onResume(); serialPortUtils.openSerialPort(); } private void ReSendErrorData(short index){ final short i = index; Runnable runnable = new Runnable() { @Override public void run() { try { Log.d(TAG, "ReSendErrorData index===== " + i); serialPortUtils.reSendPackect(i); serialPortUtils.sendFileEndFlag(); Thread.sleep(500); Message msg = new Message(); msg.what = 8; Log.d("PPTV", "ReSendErrorData reSend_Err_Frame_Index===== " + serialPortUtils.send_err_index); if(serialPortUtils.send_err_index == -1){ Log.d("SendData", "ReSendErrorData send new frame====="); // mHandler.sendMessage(msg); } }catch (Exception e){ e.printStackTrace(); } } }; singleThreadExecutor.execute(runnable); } @Override public void onClick(View view) { switch (view.getId()) { case R.id.open_bt: no_enough_space_flag = false; startSendBinFile(); break; default: } } }
串口初始化
serialPortUtils.openSerialPort();
public SerialPort openSerialPort() { try { serialPort = new SerialPort(new File(path), baudrate, 0, 1000); threadStatus = false; //线程状态 mOutputBuffer = ByteBuffer.allocateDirect(DEF_OUT_BUF_SIZE);//发送缓存 mInputBuffer = ByteBuffer.allocateDirect(DEF_INPUT_BUF_SIZE);//接收缓存 inputStream = serialPort.getInputStream();//串口输入流 outputStream = serialPort.getOutputStream();//串口输出流 mInputStream = new BufferedInputStream(inputStream,1024*1024*4);//串口输入缓存 mOutputStream = new BufferedOutputStream(outputStream,1024*1024*4);//串口输出缓存 mReadThread = new ReadThread(); mReadThread.start();//接收线程,用于接收MCU发送信息 } catch (IOException e) { Log.e(TAG, "openSerialPort: 打开串口异常:" + e.toString()); return serialPort; } Log.d(TAG, "openSerialPort: 打开串口"); return serialPort; }
这里已已发送文件测试如何处理发送数据。这里测试发送1000次。都发送成功,说协议很稳定
private void startSendBinFile(){//发送测试文件 send_index =0; Runnable runnable = new Runnable() { @Override public void run() { File file = new File(mSendFileName); FileInputStream fileInputStream = null; try { byte[] buf = new byte[1024*512];//每次读取512K int length = -1; fileInputStream = new FileInputStream(file); Log.d("SendData", "no_enough_space_flag + " + no_enough_space_flag); while(((length = fileInputStream.read(buf)) != -1)&&(no_enough_space_flag!=true)) if(serialPortUtils.send(buf,length)==false){//读取到mOutputBuffer 缓存中 no_enough_space_flag = true; break; } send_index++; serialPortUtils.flush(); //发送文件 } fileInputStream.close(); if(serialPortUtils.send_err_index == -1){ Log.d("SendData", "============================"); serialPortUtils.sendFileEndFlag(); } send_index =0; Log.d("SendData", "发送完毕====="); Thread.sleep(500); Message msg = new Message(); msg.what = 8; if((no_enough_space_flag!=true)&&(serialPortUtils.send_err_index == -1)){ Log.d("SendData", "startSendBinFile send new frame====="); } }catch (Exception e){ e.printStackTrace(); } } }; singleThreadExecutor.execute(runnable); }
申请一个单独的线程池用于发送文件,单独线程池好处是可以防止多个线程写串口,防止串口数据串码
public synchronized boolean send(byte [] data,int len){ streamMode = true; if(mOutputBuffer.remaining() > len){ mOutputBuffer.put(data,0,len); //数据放到缓存中 Log.e(TAG,"position after send msg : " + mOutputBuffer.position()); return true; }else{ Log.e(TAG," not enough space,stop"); return false; } }
public void flush(){ Log.d(TAG, "flush: data position: " + mOutputBuffer.position()); mOutputBuffer.flip(); send_buff_index = 0; OutputDataLen = mOutputBuffer.limit(); Log.d(TAG, "flip: position: " + mOutputBuffer.position()); Log.d(TAG, "flip: limit: " + mOutputBuffer.limit()); int total = mOutputBuffer.limit()/FRAME_SIZE; if(mOutputBuffer.limit()%FRAME_SIZE > 0){ total++; } Log.d(TAG, "total: " + total); while(mOutputBuffer.hasRemaining()){ if(send_err_index > -1){//这个数据值代表错误帧index,一旦有错误帧就会被改变。再接收线程中被改变 break; } int left = mOutputBuffer.remaining(); int datlen = FRAME_SIZE; if(left < FRAME_SIZE ){ datlen = left; } byte[] bytes = new byte[datlen]; mOutputBuffer.get(bytes); Log.e(TAG, "flush send_buff_index=" + send_buff_index); sendSerialData(bytes,bytes.length,send_buff_index++); try { // Thread.sleep(1000); }catch (Exception e){ e.printStackTrace(); } } }
组包数据
private synchronized void sendSerialData(byte[] bytes,int length,short index){ //byte[] send = mDtaHelper.PackectData(7,length,1,index,bytes); byte[] send = mDtaHelper.PackectData(streamOpcode,index,length,streamMouduleId,streamMsgId,bytes); sendSerialPort(send); }
public byte[] PackectData(int opcode, short index ,int length, int module_id, int msg_id, byte[] payload) { byte[] data = new byte[length + HEAD_LENGTH + CustomServiceDataHelper.PAYLOAD_MINIMUM_SIZE]; setStartFlag(0xFFAA5500); // start flag:4 setIndex((short) index); setHandle((short)0); // handle:2, now fill 0 setVersion((byte)1); // version:1 setOpcode((byte)opcode); // opcode:1 setLength(length+CustomServiceDataHelper.PAYLOAD_MINIMUM_SIZE); // length:4, length=header+payload setModuleId((byte)module_id); // module id:1 setMsgId((byte)msg_id); // message id:1 setData(payload, length); DataSerialize(data); // payload(include checksum) return data; }
按照之前通信协议进行组包。然后通过串口发送
下面就是Android串口接收MCU返回信息,如返回错误。就冲洗您发送数据
/** * 单开一线程,来读数据 */ private class ReadThread extends Thread { @Override public void run() { super.run(); mInputBuffer.clear(); int len; int sleepTime = 0; android.os.Process.setThreadPriority(android.os.Process.THREAD_PRIORITY_MORE_FAVORABLE); //判断进程是否在运行,更安全的结束进程 while (!threadStatus){ try { if ((mInputStream.available() > 20)||(( mInputStream.available() > 0)&&(sleepTime >3))) {//超过20字节读取数据 sleepTime = 0; byte[] mRecvBuffer = new byte[10240]; len = mInputStream.read(mRecvBuffer); receoveProcessZmodem(mRecvBuffer,len);//解析数据 }else if(sleepTime >200000){//超时清空接收buffer Log.d(TAG,"clear buff"); mUsedlen = 0; sleepTime =0; mSendCallback.OnResult(clear_recv_buff); }else { // Log.d(TAG,"没有数据休息一下"); sleepTime++; Thread.sleep(1); } }catch (Exception e){ e.printStackTrace(); } } } }
private void receoveProcessZmodem(byte[] buffer, int size){
int ret = dataDeserialize1(buffer, size,mReceiveDtaHelper);
}
public int dataDeserialize1(byte[] array, int readlen,CommonResponseData helper) { Log.d(TAG, "dataDeserialize1 | pre mUsedlen: " + mUsedlen); Log.d(TAG, "recv array | readlen: " + readlen ); int errcode = receiveComplete; boolean flag = true; try { System.arraycopy(array, 0, mParseDate, mUsedlen, readlen);//接收到数据拷贝到处理buff mParseDate中 } catch (Exception ex) { mParseDate = new byte[mParseDate.length]; mUsedlen = 0; flag = false; errcode = noComplete; } mUsedlen += readlen; if (mUsedlen - HEAD_LENGTH < 0) { /*数据不足*/ Log.d(TAG, "return mUsedlen1 " + mUsedlen); return noComplete; } ByteArrayInStream in = new ByteArrayInStream(mParseDate); Log.d(TAG, "mParseDate:" + toHexString(mParseDate,mUsedlen)); while (flag) { switch (gemCommFSM) { case E_FIND_HEADER: Log.d(TAG, "------goto E_FIND_HEADER===="); int index = findHeader(mParseDate, mUsedlen); if (index == -1) {//没有找到帧头,下次再找 flag = false; errcode = noComplete; gemCommFSM = State.E_FIND_HEADER; }else{ if (index > 0) { System.arraycopy(mParseDate, index, mParseDate, 0, mUsedlen - index); mUsedlen -= index; } try { mStartFlag = in.readInt(); helper.setStartFlag(mStartFlag); mIndex = in.readShort(); helper.setIndex(mIndex); mHandle = in.readShort(); helper.setHandle(mHandle); mVersion = in.readByte(); helper.setVersion(mVersion); mOpcode = in.readByte(); helper.setOpcode(mOpcode); mLength = in.readInt(); helper.setLength(mLength); } catch (IOException e) { Log.d(TAG, e.getMessage().toString()); } gemCommFSM = State.E_CHECK_LENGTH; } break; case E_CHECK_LENGTH: Log.d(TAG, "------goto E_CHECK_LENGTH===="); if (mLength > mParseDate.length) { //数据丢包了,丢弃该包 mParseDate = new byte[mParseDate.length]; Log.e(TAG, "return give up: " + mUsedlen + ", mLength = " + mLength); mUsedlen = 0; flag = false; errcode = lengthError; gemCommFSM = State.E_FIND_HEADER; }else if (mUsedlen - HEAD_LENGTH < mLength) { /*数据不足*/ Log.d(TAG, "return mUsedlen2 " + mUsedlen + ", mLength = " + mLength); flag = false; errcode = noComplete; gemCommFSM = State.E_FIND_HEADER; }else { try { mModuleId = in.readByte(); helper.setModuleId(mModuleId); mMsgId = in.readByte(); helper.setMsgId(mMsgId); if (mLength - PAYLOAD_MINIMUM_SIZE > 0) { mData = new byte[mLength - PAYLOAD_MINIMUM_SIZE]; mData = in.readBytes(mLength - PAYLOAD_MINIMUM_SIZE); helper.setData(mData);//数据拷贝到helper中,做为回调数据结构用 mDataLen = mData.length; } mChecksum = in.readByte(); helper.setChecksum(mChecksum); }catch (Exception e){ Log.d(TAG, e.getMessage().toString()); } gemCommFSM = State.E_CHECK_CRC; } break; case E_CHECK_CRC: recv_buff_index=helper.getIndex(); Log.d(TAG, "------goto E_CHECK_CRC===="); if((recv_err_index > -1)&&(recv_buff_index !=recv_err_index)){ Log.d(TAG, "------stop recv data " + recv_buff_index); Log.d(TAG, "------stop recv data is " + toHexString(helper.getData(),helper.getData().length)); System.arraycopy(mParseDate, in.mIndex, mParseDate, 0, mUsedlen - in.mIndex); mUsedlen -= in.mIndex; in.mIndex =0; gemCommFSM = State.E_FIND_HEADER; }else{ int resultCheckSum = crc8_atm(mParseDate, in.mIndex - 1); if (mChecksum != resultCheckSum) { Log.d(TAG, "data occur mutation!"); Log.d(TAG, "show crc error data: " + toHexString(mData,mData.length)); flag = false; System.arraycopy(mParseDate, in.mIndex, mParseDate, 0, mUsedlen - in.mIndex); mUsedlen -= in.mIndex; in.mIndex =0; errcode = crcError; gemCommFSM = State.E_FIND_HEADER; }else{ gemCommFSM = State.E_RCV_COMPLETE; } } break; case E_RCV_COMPLETE: System.arraycopy(mParseDate, in.mIndex, mParseDate, 0, mUsedlen - in.mIndex); mUsedlen -= in.mIndex; in.mIndex =0; Log.d(TAG, "return mUsedlen3 " + mUsedlen); if (mLength - PAYLOAD_MINIMUM_SIZE > 0) { } pareRecvSerialData(helper); gemCommFSM = State.E_FIND_HEADER; break; default: Log.d(TAG, "------goto default===="); flag = false; mUsedlen =0; errcode = UnkonwnErr; gemCommFSM = State.E_FIND_HEADER; } } return errcode; }
pareRecvSerialData处理MCU发送回来的错误原因以及错误帧id
private void pareRecvSerialData(CommonResponseData dataHelper){ Log.d(TAG,"pareRecvSerialData frame index ===" + dataHelper.getIndex()); Log.d(TAG,"pareRecvSerialData dataHelper.getOpcode()===" + dataHelper.getOpcode()); if(dataHelper.getOpcode() == 8){ //回复帧类型 int moduleid = dataHelper.getModuleId(); if(moduleid == 1){ int total = dataHelper.getMsgId(); Log.e(TAG,"pareRecvSerialData data length: " + total); }else if(moduleid == 2){ short errIndex = dataHelper.getIndex();//错误帧index if((errIndex == send_err_index)&&(sendRepeatTime ==2)){//超过两次就禁止重发 Log.e(TAG,"pareRecvSerialData send over 2 time ,stop send: " + errIndex); send_err_index = -1; send_stop=true; sendRepeatTime = 0; mOutputBuffer.clear(); }else {//通知有数据错误 sendRepeatTime++; send_err_index = dataHelper.getIndex();//此标志会停止flush函数发送数据。只有不等于-1 就停止发送 sendHandlerMsg(7,send_err_index); } } } }
mainactivity 接收到有数据错误后
case 7: //重新发送错误数据包 short index = (short) msg.arg1; ReSendErrorData(index); Log.d(TAG, "mHandler resend data index= " + index); break;
private void ReSendErrorData(short index){ final short i = index; Runnable runnable = new Runnable() { @Override public void run() { try { Log.d(TAG, "ReSendErrorData index===== " + i); serialPortUtils.reSendPackect(i); Thread.sleep(500); Message msg = new Message(); msg.what = 8; if(serialPortUtils.send_err_index == -1){ mHandler.sendMessage(msg); } }catch (Exception e){ e.printStackTrace(); } } }; singleThreadExecutor.execute(runnable); }
重发数据会从错误帧开始重新发送数据。
public synchronized void reSendPackect( short index){ Log.e(TAG,"reSendPackect start==========--------------------------- " + index); send_buff_index = index; int position = send_buff_index*FRAME_SIZE; mOutputBuffer.position(position); mOutputBuffer.limit(OutputDataLen); while (mOutputBuffer.hasRemaining()){ if(send_stop){ break; } int left = mOutputBuffer.remaining(); Log.e(TAG,"reSendPackect remaining" + left); int datlen = FRAME_SIZE; if(left < FRAME_SIZE ){ datlen = left; } byte[] bytes = new byte[datlen]; mOutputBuffer.get(bytes); sendSerialData(bytes,bytes.length,send_buff_index++); } send_stop = false; mOutputBuffer.clear(); }
完成数据重发。。。有时间把整个app 工程贡献出来,有需要的可以私信找我要
更多相关文章
- android中自定义数据类型在两个activity间的传递
- Android菜鸟的成长笔记(22)——Android进程间传递复杂数据(AIDL)
- [android盈利模式探索]我也分享一下我Android的收入数据
- win7与android设备通过蓝牙串口的连接方法
- Android串口盒子+扫码枪开发
- Android开发之Jsoup解析webView加载数据
- android常见编译错误
- Android Fragment内嵌Fragment页面不刷新数据问题