如何解决本地大批量数据的更新,和后台的同步,讲解socket的IPC和socket的通讯
Android的功夫,在Android之外。
这句话我很认同,Android Framework只不过是对底层系统的封装,要想深入理解它,必须熟悉JNI、读得懂C++、理解Java虚拟机、Linux系统甚至汇编、指令集等等。但是并不意味这你作为一个开发就一定都得会这些,我相信等你学完上述那些知识,可能已经换一个操作平台了,当然如果你真的掌握了这些,下一代操作系统是什么已经不重要了。今天我们要讲的知识点虽然是为了解决当下要解决的问题,但是追根溯源其实是java,甚至是网络协议,C++,等层面的知识,但知识都是触类旁通的。
说这个问题首先我先说下这个业务的使用场景。随着互联网的发展进入了下半场,有以前的app大而且多的局面满满的走向精而细的划分,每一个app的如何基于大数据统计用户行为是衡量一款产品的优劣标准之一,因为这些数据驱动老板、产品、市场、运营的业务决策,深度了解你的用户行为,评估营销效果,优化产品体验,提升运营效率,在探索不同业务的关键行为中,洞察指标背后掩藏的故事。对产品的定位和改进是非常重要的因素,接下来我就围绕我们开发无埋点统计中遇到的问题跟大家交流产品的统计数据遇到的解决方案:
- 本博客不介绍如何实现无埋点统计,这里我讲的是数据的处理,至于无埋点如何插桩,这个点不是今天我们要讨论的,以后有机会可以跟大家详细的分享安卓AOP的相关知识点
- 如何实现后台与前端的界面统一,可以再web端动态的控制选择埋点,这个在以前的博文稍微有介绍:就是频繁的截屏
- 基于第二个问题,频繁的截屏必然产生大量的数据,数据如何进行通讯
今天的文章就是围绕第三个问题进行的拓展和延伸
本地截屏的大量数据如何进行传输:
思路:
- 因为产品在开发完版本以后让老板、产品、市场、运营可以再后端web端动态控制,他们不懂程序,只知道图形化的操作,那么我们需要把这个app界面的图形化操作传到服务器,让其进行选择,可以通过本地的adb命令Socket的IPC进程通信完成但是这里的局限性是手机和服务器是不同的机器,如何通信?
- 可能有人说socket可以支持TCP/IP通信,是的,没问题,如果仅仅是这个问题那么也没有必要写博客了,今天主要是要根据这个我们能不能做成socket如何通过IPC完成数据的上传工作
围绕这个问题之前我们先回顾下一些知识点:
首先说下线程间通信的几种方式:
1:使用管道流Pipes
“管道”是java.io包的一部分。它是Java的特性,而不是Android特有的。一条“管道”为两个线程建立一个单向的通道。生产者负责写数据,消费者负责读取数据。
public class SecAct extends Activity { private static final String TAG = "PipeExampleActivity"; private EditText editText; PipedReader r; PipedWriter w; private Thread workerThread; @Override protected void onCreate(@Nullable Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_sec); r = new PipedReader(); w = new PipedWriter(); try { w.connect(r); } catch (IOException e) { e.printStackTrace(); } editText = (EditText) findViewById(R.id.edit_text); editText.addTextChangedListener(new TextWatcher() { @Override public void beforeTextChanged(CharSequence charSequence, int start, int count, int after) { } @Override public void onTextChanged(CharSequence charSequence, int start, int before, int count) { try { if (count > before) { w.write(charSequence.subSequence(before, count).toString()); } } catch (IOException e) { e.printStackTrace(); } } @Override public void afterTextChanged(Editable editable) { } }); workerThread = new Thread(new TextHandlerTask(r)); workerThread.start(); } @Override protected void onDestroy() { super.onDestroy(); workerThread.interrupt(); try { r.close(); w.close(); } catch (IOException e) { } } private static class TextHandlerTask implements Runnable { private final PipedReader reader; public TextHandlerTask(PipedReader reader) { this.reader = reader; } @Override public void run() { while (true) { try { int i; while ((i = reader.read()) != -1) { char c = (char) i; Log.d(TAG, "char = " + c); } } catch (IOException e) { e.printStackTrace(); } } } }}
2:共享内存
说到这个我们就得聊到java的内存模型了。
java内存模型是什么呢?它规范了java虚拟机如何与计算机内存的协同工作
- 堆为JVM内所有的线程共享,存在内存中所有的对象和数组数据
- 栈为每个线程所有,栈中存放了当前方法的调用信息以及基本数据类型和引用数据类型的数据
java中的堆,堆在虚拟机启动的时候创建。堆占用的内存是垃圾回收器回收,不用我们手动回收。
JVM没有规定死使用哪种回收机制,不同的虚拟机可以实现不同的回收算法。
堆中包含了java程序创建的所有的对象, 不论是哪个线程。
一个对象的成员变量随着这个对象自身存放在堆上。不管这个成员变量是基本类型还是引用类型。
java中的栈
栈在线程创建的时候创建,它和C语言的栈类似,在一个方法中,你创建的局部变量和部分结果都会保存在栈中,并在方法调用和返回中起作用。当前的栈只对当前的线程可见,即便两个线程执行同样的代码,这两个线程仍然会在自己的线程栈中创建一个本地副本。
因此每一个线程拥有每个本地变量的独有版本。
栈中保存方法调用栈、基本类型的数据、以及对象的引用。
计算机中的内存、寄存器、缓存
一个现代计算机通常由两个或者多个 CPU,每个 CPU 都包含一系列的寄存器,CPU 在寄存器上执行操作的速度远大于在主存上执行的速度。
每个 CPU 可能还有一个 CPU 缓存层。CPU 访问缓存层的速度快于访问主存的速度,但通常比访问内部寄存器的速度还要慢一点。
通常情况下,当一个 CPU 需要读取主存时,它会将主存的部分读到 CPU 缓存中。它甚至可能将缓存中的部分内容读到它的内部寄存器中,然后在寄存器中执行操作。
当 CPU 需要将结果写回到主存中去时,它会将内部寄存器的值刷新到缓存中,然后在某个时间点将值刷新回主存。
多线程可能出现的问题
通过上述介绍,我们可以知道,如果多个线程共享一个对象,每个线程在自己的栈中会有对象的副本。
如果线程 A 对对象中的某个变量进行修改后还没来得及写回主存,线程 B 也对该变量进行了修改,那最后刷新回主内存后的值一定和期望的值不一致。
就好比我和你同时开发同一模块代码,我下笔如有神不一会儿搞定了注册登录并且提交,你没有从服务器拉代码就蒙头狂写,最后一 pull 代码,就会发现自己写的好多都跟服务器上的冲突了!
竞态条件与临界区
当多个线程操作同一资源时,如果对资源的访问顺序敏感,就称存在竞态条件。导致竞态条件发生的代码区称作临界区。
在临界区中使用适当的同步就可以避免竞态条件,比如 synchronized, 显式锁和原子操作类等。
内存可见性
我写的代码你无法立即看到,这就是所谓的“内存可见性”问题。
为了让线程 A 对变量做的修改线程 B 立即可以看到,我们可以使用 volatile 修饰变量或者对修改操作使用同步。
当线程访问某一个对象时候值的时候:
首先通过对象的引用找到对应在堆内存的变量的值;
然后把堆内存变量的具体值 load 到线程工作内存中,建立一个变量副本;
之后线程就不再和对象在堆内存变量值有任何关系,而是直接修改副本变量的值,在修改完之后也不会立即同步修改共享堆内存中该变量的值;
直到某一个时刻(线程退出之前),自动把线程变量副本的值回写到对象在堆中变量。这样在堆中的对象的值就产生变化了。
多个线程共享同一份内存,就是说,一个变量可以同时被多个线程所访问。这里要特别注意同步和原子操作的问题。
synchronized(this) { while(isConditionFullfilled == false) { wait(); } notify();}
因为如果不加同步或者原子性可能会出现,这个线程修改了,另外一个线程没法读到。
3:使用Hander和Message
上面讲的是线程间通信的问题,接下来再来说说进程间通信的问题。而且今天主要给大家讲解android有哪些可以跨进程通信的机制,android系统归根到底还是一个Linux系统,Linux系统有着非常成熟完善的跨进程通信的机制,比如:管道,System V,Socket等
linux下的进程通信手段基本上是从Unix平台上的进程通信手段继承而来的
下面分别讲解这几个,以及为什么有了这些还要共有跨进程机制还有哪些特有的机制.
- 管道:
- 共享内存,System V(https://blog.csdn.net/qq_35535992/article/details/52926543)
- Socket
Socket是目前用于最广泛的进程间通信的机制,他与其他Linux通信机制不同的地方在于除了可以用于单机内的进程间通信以外,还可以用于不同机器的进程间通信,但是Socket本身不支持同时等待或者超时处理,所以他不能直接用来多进程之间的相互实时通信。在我们开发的项目中使用的Socket的进程通信方法是,建立一个进程专门用于通讯服务器(Server)来中转各个进程间的通信,它首先启动一个用来监视连接要求的listening Socket,并把它的描述(Descriptor)号加入到一个事先定义好的fd_set的集合中,这个fd_set的集合用来存放listening Socket和后来生成的通信Socket的描述号。Server运用system call select来实时检查是否有数据到达这个集合中的任何一个socket,如果有数据到达listening Socket,则这一定是客户端发起的连接请求,于是生成一个新的通信Socket与该客户端连接,将生成的Socket描述号加入到fd_set的集合中,将客户端的ID号和与之对应的Socket的描述号记录在ID登记表中。如果有数据到达某个通信Socket,则这一定是某个客户端发起的通信请求,读出数据并取出收信客户端ID号,在ID登记表中找到与之对应的Socket描述号,将数据通过对应Socket传送到收信客户端。
其他各个进程作为作为客户端,(client)。客户端的动作是首先建立通信Socket连接服务器端,然后通过通信Socket进行送信和收信。
首先给出Server端的程序,在这里假设有两个客户端要进行实时通信,ClientA向ClientB发送字符1,ClientB向ClientA发送字符2。
#include #include #include #include #include #include #include #include int main(){ int rcd ; struct sockaddr_un server_sockaddr ; int backlog ; ushort ci ; int watch_fd_list[3] ; fd_set catch_fd_set ; fd_set watchset ; int new_cli_fd ; int maxfd; int socklen ,server_len; struct sockaddr_un cli_sockaddr ; struct { char module_id ; /* Module ID */ int cli_sock_fd ; /* Socket ID */ } cli_info_t[2] ; for (ci=0;ci<=1;ci++) cli_info_t[ci].cli_sock_fd=-1; for (ci=0;ci<=2;ci++) watch_fd_list[ci]=-1; int server_sockfd,client_sockfd; server_sockfd = socket( AF_UNIX, SOCK_STREAM, 0 ) ; server_sockaddr.sun_family = AF_UNIX ; strcpy( server_sockaddr.sun_path, "server_socket" ) ; server_len=sizeof(server_sockaddr); rcd = bind( server_sockfd, ( struct sockaddr * )&server_sockaddr, server_len ) ; backlog = 5 ; rcd = listen( server_sockfd, backlog ) ; printf("SERVER::Server is waitting on socket=%d \n",server_sockfd); watch_fd_list[0]=server_sockfd; FD_ZERO( &watchset ) ; FD_SET( server_sockfd, &watchset ) ; maxfd=watch_fd_list[0];
在上面的程序中,Server生成listening Socket(server_sockfd),初始化Socket监视集合(watchset),并将listening Socket放入Socket监视集合中。
while (1){char ch;int fd;int nread; catch_fd_set=watchset;rcd = select( maxfd+1, &catch_fd_set, NULL, NULL, (struct timeval *)0 ) ;
在上面的程序中,Server运用系统调用函数 select来实时检查是否有数据到达Socket监视集合中的任何一个socket。
if ( rcd < 0 ) { printf("SERVER::Server 5 \n"); exit(1);}if ( FD_ISSET( server_sockfd, &catch_fd_set ) ) { socklen = sizeof( cli_sockaddr ) ; new_cli_fd = accept( server_sockfd, ( struct sockaddr * ) &( cli_sockaddr ), &socklen ) ; printf(" SERVER::open communication with Client %s on socket %d\n", cli_sockaddr.sun_path,new_cli_fd); for (ci=1;ci<=2;ci++){ if(watch_fd_list[ci] != -1) continue; else{ watch_fd_list[ci] = new_cli_fd; break; } } FD_SET(new_cli_fd , &watchset ) ; if ( maxfd < new_cli_fd ) { maxfd = new_cli_fd ; } for ( ci=0;ci<=1;ci++){ if(cli_info_t[ci].cli_sock_fd == -1) { cli_info_t[ci].module_id=cli_sockaddr.sun_path[0]; cli_info_t[ci].cli_sock_fd=new_cli_fd; break; } } continue; }
在上面的程序中,Server运用系统调用函数FD_ISSET来检查是否有客户端的连接请求到达Listening Socket, 如果返回值大于0,Server生成一个新的通信Socket (new_cli_fd)与客户端连接。将新生成的通信Socket放入Socket监视集合中(FD_SET)。将客户端的信息(ID号和Socket描述号)保存在注册表cli_info_t中
for ( ci = 1; ci<=2 ; ci++ ) { int dst_fd = -1 ; char dst_module_id; char src_module_id; int i; if (watch_fd_list[ ci ]==-1) continue; if ( !FD_ISSET( watch_fd_list[ ci ], &catch_fd_set ) ) { continue ; } ioctl(watch_fd_list[ ci ],FIONREAD,&nread); if (nread==0){ continue; } read( watch_fd_list[ ci ], &dst_module_id, 1 ) ; for (i=0;i<=1;i++){ if(cli_info_t[i].module_id == dst_module_id) dst_fd= cli_info_t[i].cli_sock_fd; if(cli_info_t[i].cli_sock_fd==watch_fd_list[ ci ]) src_module_id= cli_info_t[i].module_id; } read( watch_fd_list[ ci ], &ch, 1 ) ; printf("SERVER::char=%c to Client %c on socket%d\n",ch, dst_module_id,dst_fd); write(dst_fd,&src_module_id, 1 ) ; write(dst_fd,&ch, 1 ) ; } } }
在上面的程序中,如果有数据到达某个通信Socket,Server则读出数据并取出收信客户端ID号。在ID登记表中找到收信客户端对应的Socket描述号。并将数据通过对应Socket传送到收信客户端
给出客户端 ClientA的程序
ClientB的程序只需将 char dst_module_id='B'; 改为char dst_module_id='A'; char ch='1'; 改为char char ch='2';既可。
#include #include #include #include #include #include #include int main(){ int client_sockfd; int len; struct sockaddr_un server_sockaddr,cli_sockaddr; int result; char dst_module_id='B'; char ch='1'; char src_module_id; client_sockfd= socket(AF_UNIX,SOCK_STREAM,0); cli_sockaddr.sun_family = AF_UNIX ; strcpy( cli_sockaddr.sun_path, "A" ) ; bind(client_sockfd,(struct sockaddr * )&cli_sockaddr, sizeof( cli_sockaddr ) ) ; server_sockaddr.sun_family=AF_UNIX; strcpy( server_sockaddr.sun_path, "server_socket" ) ; len=sizeof(server_sockaddr); result = connect(client_sockfd,( struct sockaddr * )&server_sockaddr,len); if (result <0){ printf("ClientA::error on connecting \n"); exit(1); } printf("ClientA::succeed in connecting with server\n"); sleep(10); write(client_sockfd,&dst_module_id,1); write(client_sockfd,&ch,1); read (client_sockfd,&src_module_id,1); read (client_sockfd,&ch,1); printf("ClientA::char from Client %c =%c\n", src_module_id,ch); close (client_sockfd); }
下面是样本程序的执行结果
[root@zhou test]# ./server &[3] 4301[root@zhou test]# SERVER::Server is waitting on socket=3./clientA & ./clientB &[4] 4302[5] 4303ClientA::succeed in connecting with server SERVER::open communication with Client A on socket 4[root@zhou test]# SERVER::open communication with Client B on socket 5ClientB::succeed in connecting with serverSERVER::char=1 to Client B on socket5ClientB::char from Client A =1SERVER::char=2 to Client A on socket4ClientA::char from Client B =2
为什么使用
AF_UNIX Socket服务器呢?
- 它更容易,因为端口不能被其他任何东西使用
- 减少开销
- 侦听端口往往会使USB绑定在某些设备上无法使用
- 更好的隔离; 在设备上运行的常规应用程序无法连接到抽象Socket,也无法通过网络连接
- 使所有端口免费供其他程序使用
- 某些设备(例如三星)不允许您在/ data / local / tmp中创建常规Socket文件,因此无法使用Socket文件
为什么不用AF_INET而使用AF_UNIX?
TCP/IP四层模型的通信原理
发送方和依赖方依赖IP:port来标识,将本地的socket绑定到对应的IP端口上,发送数据时,指定IP端口,经过Internet,可以通过此IP端口最终找到接收方,接收数据时,可以从对方的数据包中找到对方的ip,
发送方通过系统调用send()将原始数据发送到操作系统内核缓冲区中。内核缓冲区从上到下依次经过TCP层、IP层、链路层的编码,分别添加对应的头部信息,经过网卡将一个数据包发送到网络中。经过网络路由到接收方的网卡。网卡通过系统中断将数据包通知到接收方的操作系统,再沿着发送方编码的反方向进行解码,即依次经过链路层、IP层、TCP层去除头部、检查校验等,最终将原始数据上报到接收方进程。
AF_UNIX域socket的通信过程。
典型的本地ipc,类似于管道,依赖路径名标识发送方和接收方,即发送数据时,指定接收方绑定的路径名,操作系统根据该路径名直接找到对应的接收方,并将原始数据直接拷贝到接受方的内核缓冲区,并上报给接受方的进程进行处理,同样接收方可以从收到的数据包获取发送方的路径名,并通过此路径向其发送数据。
相同点:
操作系统提供的接口socket(),bind(),connect(),accept(),send(),recv(),以及用来对其进行多路复用事件检测的select(),poll(),epoll()都是完全相同的。收发数据的过程中,上层应用感知不到底层的差别。
不同点:
1:建立的socket传递的地址域不同,以及bind()的地址结构稍有区别
socket传递不同的地址域AF_INET和AF_UNIX
bind的地址结构分别为sockaddr_in(指定IP端口)和sockaddr_un(指定的路径名)
2:AF_INET需要经过多个协议的编解码,消耗系统的cpu,并且数据传输需要经过网卡,收到网卡宽带的限制。
AF_UNIX数据到达内核缓冲区,由内核根据指定路径名找到接收方socket对应的内核缓冲区,直接将数据拷贝过去,不经过协议层的编解码,节省cpu,并且不经过网卡,因此不受网卡宽带的限制。
3:AF_UNIX的传输速率远远的大于AF_INET
4:AF_INET不可以作为本机的跨进程通信,同样的可以用于不同机器的通信,其就是为了在不同机器间进行网络互联传递数据而生,而AF_UNIX仅仅可以用于本机内进程间的通信。
使用场景:
AF_UNIX由于其对系统cpu的较少消耗,不受限于网卡带宽,及高效的传递速率,本机通信则首选AF_UNIX域。
AF_INET则用于跨机器之间的通信。
这里其实我有个拓展就是我们安卓的设备也可以变相的完成不同机器的通信?如果去操作呢?
我们可以通过端口的转发完成本地pc机器链接到手机设备,完成数据传输,但是这个必须得通过usb接口
然后在把这个pc的端口开发出去和服务器链接可以完成这个变相的进程间通信
使用TCP端口进行forward之外,我们还可以使用unix domain socket进行forward:
$ adb forward localfilesystem:socket dev:/dev/block/mmcblk0p6
好啦,基本上完成了我们本地的数据传输。完成了这些以后产品交接完以后我们可以让老板、产品、市场、运营在局域网内开放本地端口就可以做数据的传输,当然这里至于如何开放端口等一些命令,我们会在手机端的链接的时候通过脚本自动完成,无需担心:
更多相关文章
- 一句话锁定MySQL数据占用元凶
- Android(安卓)使用ListView的A-Z字母排序功能实现联系人模块
- 【Android个人理解(一)】通过Looper与Handle的关系,理解Looper工作
- android 前后台保活 实现定位数据定时上传并展示轨迹 (上)
- Android客户端与服务器端的json数据交互(很详细)
- Android(安卓)Handler 消息机制(解惑篇)
- Android(安卓)Studio之ListView的用法(上)
- android 扫码设备获取扫码回调内容实践
- Unity3D链接Android手机端数据库