作者: Wolf Geek 转载请说明出处

上一回,主要介绍了有关WifiDisplay设备连接和建立数据流的流程,这一回将接着向底层前进。由于涉及的内容较多,这里仅仅理清一个大概的头绪,细节的部分将不再展开,如果有什么错误的地方我会及时更正。

当Source端通过RemoteDisplay.cpp的构造函数注册了Wifidisplay处理线程,并且ANetworkSession初始化了通信所用的数据管道并且开始监听数据流变化后,Source端将通过函数mSource->start(iface)开始建立RTSP连接并且向Sink端传递数据流。接下来,将具体分析其流程。mSource->start(iface)的具体实现在以下文件,

frameworks/av/media/libstagefright/wifi-display/source/WifiDisplaySource.cpp

status_t WifiDisplaySource::start(const char *iface) {    CHECK_EQ(mState, INITIALIZED);    sp<AMessage> msg = new AMessage(kWhatStart, id());    msg->setString("iface", iface);    sp<AMessage> response;    status_t err = msg->postAndAwaitResponse(&response);    if (err != OK) {        return err;    }    if (!response->findInt32("err", &err)) {        err = OK;    }    return err;}

该函数首先通过CHECK_EQ来判断当前Source端状态是否为 INITIALIZED,如果是将通过 AMessage创建 标识为kWhatStart的消息,用于在onMessageReceived处理分支中进行匹配,msg->setString(“iface”,iface)用于在传递消息过程中携带网络地址端口信息, msg->postAndAwaitResponse用于返回相应结果。这种方式在Android的流媒体类中相当常见,是一种异步消息处理框架。与该框架相关的类主要有ALooper、AHandler、ALooperRoster等,具体请见这里。

接下来,我们来看看当Source端接收到kWhatStart的消息后做何种处理,

void WifiDisplaySource::onMessageReceived(const sp<AMessage> &msg) {    switch (msg->what()) {        case kWhatStart:        {            uint32_t replyID;            CHECK(msg->senderAwaitsResponse(&replyID));            AString iface;            CHECK(msg->findString("iface", &iface));            status_t err = OK;            ssize_t colonPos = iface.find(":");  //寻找“:”所在位置            unsigned long port;            if (colonPos >= 0) {                const char *s = iface.c_str() + colonPos + 1;                char *end;                port = strtoul(s, &end, 10);  //得到port号                if (end == s || *end != '\0' || port > 65535) {                    err = -EINVAL;                } else {                    iface.erase(colonPos, iface.size() - colonPos);                  }            } else {                port = kWifiDisplayDefaultPort;            }            if (err == OK) {                if (inet_aton(iface.c_str(), &mInterfaceAddr) != 0) {  //将IP地址转化为32位的网络序列IP地址                    sp<AMessage> notify = new AMessage(kWhatRTSPNotify, id());//建立标识为 kWhatRTSPNotify的消息作为参数传递                    err = mNetSession->createRTSPServer(                            mInterfaceAddr, port, notify, &mSessionID);                } else {                    err = -EINVAL;                }            }            if (err == OK) {                mState = AWAITING_CLIENT_CONNECTION;            }            sp<AMessage> response = new AMessage;            response->setInt32("err", err);            response->postReply(replyID);            break;        }      ...   }}

首先,可以看到当Source端接收到消息标识为 kWhatStart的消息后,消息指针msg会通过函数msg->senderAwaitsResponse(&replyID)获取对应于postAndAwaitResponse函数的响应标识,并把处理中的错误信息作为消息载体通过response->postReply(replyID)传递回start(iface)函数。然后,该处理函数将接收到的网络地址端口信息iface拆分为IP地址和端口两个部分,并且利用 createRTSPServer创建RTSP服务端,函数会返回相应的Session编号。如果RTSP服务端创建成功,则将Source端状态更改为 AWAITING_CLIENT_CONNECTION,表示等待客户端连接。

接着看创建RTSP服务端具体做了哪些动作,

frameworks/av/media/libstagefright/wifi-display/ANetworkSession.cpp

status_t ANetworkSession::createRTSPServer(        const struct in_addr &addr, unsigned port,        const sp<AMessage> notify, int32_t *sessionID) {    return createClientOrServer(            kModeCreateRTSPServer,            &addr,            port,            NULL /* remoteHost */,            0 /* remotePort */,            notify,            sessionID);}

可以看到函数createRTSPServer具体又调用了 createClientOrServer函数。在此类中,与建立管道数据流相关的函数都会调用该函数,它们分别是createRTSPClient、createRTSPServer、createUDPSession、createTCPDatagramSession等函数。

其中可以不用关注函数createTCPDatagramSession,这是因为Sink端默认选择了UDP方式进行传输。具体起关键性作用的是在WifiDisplaySink.h头文件中的static const bool sUseTCPInterleaving = false变量 ,具体而言,该变量为false就导致Sink端不会向Source端发送“Transport: RTP/AVP/TCP”这样的请求,这样在Source端就不会将Sender类中的mTransportMode变量设置为TRANSPORT_TCP或者 TRANSPORT_TCP_INTERLEAVED,所以在Sender类中最终并不会调用函数createTCPDatagramSession。

接下来,将重点看看函数createClientOrServer,其中与建立RTSP服务端无关的步骤先省略不看。

status_t ANetworkSession::createClientOrServer(        Mode mode,        const struct in_addr *localAddr,        unsigned port,        const char *remoteHost,        unsigned remotePort,        const sp<AMessage> notify,        int32_t *sessionID) {    Mutex::Autolock autoLock(mLock);    *sessionID = 0;    status_t err = OK;    int s, res;    sp<Session> session;    s = socket(            AF_INET,            (mode == kModeCreateUDPSession) ? SOCK_DGRAM : SOCK_STREAM,            0);    //建立类型为流套接字的socket  ...    if (mode == kModeCreateRTSPServer            || mode == kModeCreateTCPDatagramSessionPassive) {        const int yes = 1;        res = setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));//允许socket和一个已在使用中的地址捆绑    err = MakeSocketNonBlocking(s);  //设置socket为非阻塞方式    struct sockaddr_in addr;    memset(addr.sin_zero, 0, sizeof(addr.sin_zero));    addr.sin_family = AF_INET;    if (mode == kModeCreateRTSPClient            || mode == kModeCreateTCPDatagramSessionActive) {     ...    } else if (localAddr != NULL) {        addr.sin_addr = *localAddr;        addr.sin_port = htons(port);    } else {        ...    }    if (mode == kModeCreateRTSPClient            || mode == kModeCreateTCPDatagramSessionActive) {       ...           } else {        res = bind(s, (const struct sockaddr *)&addr, sizeof(addr));//socket与sockaddr结构体指针绑定,sockaddr对应与iface        if (res == 0) {  //绑定成功            if (mode == kModeCreateRTSPServer                    || mode == kModeCreateTCPDatagramSessionPassive) {                res = listen(s, 4);  //作为服务端开始监听rtsp连接请求            } else {                  ...                }            }        }    }     ...    Session::State state;    switch (mode) {        ...        case kModeCreateRTSPServer:            state = Session::LISTENING_RTSP;   //设置Session状态为 LISTENING_RTSP            break;       ...    }    session = new Session(            mNextSessionID++,            state,            s,            notify);    //创建一个session对象,sessionID加1    ...    mSessions.add(session->sessionID(), session); //将该对象加入vector结构中保存    interrupt();  //向管道写端写数据    *sessionID = session->sessionID();  //由指针带出当前sessionID    goto bail;...bail:    return err;}

可以看到建立RTSP服务端的步骤没什么特别的地方,无非是建立socket,绑定地址然后监听等步骤,只不过为了标识不同的请求和区分当前状态,这里用Session结构体和相应的状态来管理对应的socket。

回到RemoteDisplay.cpp的构造函数中,可以看到在Source端调用函数mSource->start(iface)之前就通过mNetSession->start()开启了ANetworkSession线程,接下来看一下mNetSession->start()究竟干了什么事。


status_t ANetworkSession::start(){   ...   int res =pipe(mPipeFd);  //建立读写管道,控制threadLoop的执行   if (res != 0) {        mPipeFd[0] = mPipeFd[1] = -1;        return -errno;               }   mThread = new NetworkThread(this); //构造ANetworkSession的内部结构线程   status_t err = mThread->run("ANetworkSession", ANDROID_PRIORITY_AUDIO);//开启该线程,将会调用NetworkThread线程中threadLoop,进一步会调用AnetworkSession::threadLoop()   ...   return OK;}

可以看到ANetworkSession采用了管道的方式来控制AnetworkSession::threadLoop()的执行,

void ANetworkSession::threadLoop() {    fd_set rs, ws;    FD_ZERO(&rs);    FD_ZERO(&ws);    FD_SET(mPipeFd[0], &rs);    int maxFd = mPipeFd[0];    {      ...        for (size_t i = 0; i < mSessions.size(); ++i) {            const sp<Session> &session = mSessions.valueAt(i);            int s = session->socket();    //遍历并获取vector结构中保存的socket            ...            if (session->wantsToRead()) {  //判断当前session状态是否需要读                FD_SET(s, &rs);                if (s > maxFd) {                    maxFd = s;                }            }            if (session->wantsToWrite()) {//判断当前session状态是否需要写                FD_SET(s, &ws);                if (s > maxFd) {                    maxFd = s;                }            }        }    }        int res = select(maxFd + 1, &rs, &ws, NULL, NULL ); //阻塞查看是否有socket可读写          ...    if (FD_ISSET(mPipeFd[0], &rs)) {        char c;        ssize_t n;        do {                    n = read(mPipeFd[0], &c, 1);  //只有当管道中有数值时才跳出循环,即类中有其他函数调用了 interrupt函数        } while (n < 0 && errno == EINTR);        ...        --res;       }    {        ...         List<sp<Session> > sessionsToAdd;        for (size_t i = mSessions.size(); res > 0 && i-- > 0;) { //res>0判断是否有socket资源可进行读或写            const sp<Session> &session = mSessions.valueAt(i);            int s = session->socket();                           if (FD_ISSET(s, &rs) || FD_ISSET(s, &ws)) {                --res;            }            if (FD_ISSET(s, &rs)) {                if (session->isRTSPServer() || session->isTCPDatagramServer()) {//如果当前状态Session状态为LISTENING_RTSP或LISTENING_TCP_DGRAMS执行下列操作                    struct sockaddr_in remoteAddr;                    socklen_t remoteAddrLen = sizeof(remoteAddr);                    int clientSocket = accept(                            s, (struct sockaddr *)&remoteAddr, &remoteAddrLen);//从处于listen状态的流套接字s的客户连接请求队列中取出排在最前的一个客户请求建立新的socket通道                    if (clientSocket >= 0) {                        status_t err = MakeSocketNonBlocking(clientSocket);                         ...                            sp<Session> clientSession =                                new Session(                                        mNextSessionID++,                                        Session::CONNECTED,                                        clientSocket,                                        session->getNotificationMessage());//把所建立RTSP连接的本地地址、客户地址以及端口等信息通过AMessage发送到Source端                            clientSession->setIsRTSPConnection(                                    session->isRTSPServer());  //mIsRTSPConnection变量设为false                  sessionsToAdd.push_back(clientSession);  //将该Session加入到 sessionsToAdd队列尾部                    }                         ...                 } else {                    status_t err = session->readMore(); //在建立UDP连接或者RTSP连接已建立的状况且该socket可读,接收相应socket传来的信息,同时通过AMessage的形式与Source或Sink端做数据交换并且通知其做相应处理                }            }            if (FD_ISSET(s, &ws)) {                status_t err = session->writeMore();  //对有写需求的Session,并且该socket是可写的情况下,向UDP或RTSP连接的另一端发送由Souce或Sink端相应请求中获得的数据          ...             }        }        while (!sessionsToAdd.empty()) {            sp<Session> session = *sessionsToAdd.begin();            sessionsToAdd.erase(sessionsToAdd.begin());            mSessions.add(session->sessionID(), session);  //按队列顺序把相关Session加入vector结构中保存        }    }}} 

该 threadLoop()函数首先完成了Sink端RTSP客户端连接请求的接收,其次还负责完成在Source端和Sink端之间的RTSP连接和UDP连接的相关socket读写通信等工作。了解了ANetworkSession线程是如何管理Source端和Sink端之间数据通信的过程后,再次回到Wifi Display的开启流程上来。在threadLoop()函数中RTSP服务端会从处于listen状态的流套接字s的客户连接请求队列中取出排在最前的一个客户请求建立新的socket通道。可以看到这个连接请求由rtsp的客户端发出,具体而言,该请求是由createClientOrServer函数中相应的connect函数完成,

frameworks/av/media/libstagefright/wifi-display/ANetworkSession.cpp

if (mode == kModeCreateRTSPClient            || mode == kModeCreateTCPDatagramSessionActive) {        struct hostent *ent= gethostbyname(remoteHost);        addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;        addr.sin_port = htons(remotePort);        in_addr_t x = ntohl(addr.sin_addr.s_addr);        ALOGI("connecting socket %d to %d.%d.%d.%d:%d",              s,              (x >> 24),              (x >> 16) & 0xff,              (x >> 8) & 0xff,              x & 0xff,              ntohs(addr.sin_port));        res = connect(s, (const struct sockaddr *)&addr, sizeof(addr));    }

到这里,关注的焦点转向到createRTSPClient也就是Rtsp客户端建立的过程,而客户端的建立是由Sink端完成的,因此开始分析Sink端程序。

客户端主程序目前只有测试程序,由于没有设备并没有进行相关测试,该程序编译好只能以命令方式运行,命令有两个选项作为选择,调用格式如下,

./wfd -c xx.xx.xx.xx:port

./wfd -u uri

第一条命令用于将Sink端连接到Source端,需要知道Source端的IP地址及Port端口,第二条命令用于测试sink端连接到rtsp uri地址的效果。这里只来看第一条命令,执行后其会调用以下关键的几个函数,

frameworks/av/media/libstagefright/wifi-display/wfd.cpp

    sp<ANetworkSession> session = new ANetworkSession;     session->start();    sp<WifiDisplaySink> sink = new WifiDisplaySink(session);    looper->registerHandler(sink);    sink->start(connectToHost.c_str(), connectToPort);     looper->start(true);

可以看到,Sink端的启动与Source端没有什么区别,依旧是开启如同RemoteDisplay.cpp中的那几个线程,下面来关注一下WifiDisplaySink线程的启动。首先是用构造函数new出一个操作对象,以下是构造函数,

WifiDisplaySink::WifiDisplaySink(        const sp<ANetworkSession> &netSession,        const sp<ISurfaceTexture> &surfaceTex)    : mState(UNDEFINED),      mNetSession(netSession),      mSurfaceTex(surfaceTex),      mSessionID(0),      mNextCSeq(1) {}

上面为WifiDisplaySink的构造函数,可以看到调用时并没有去初始化第二个参数,默认就会被置为空,这是为用户自己构造Sink端程序界面提供的参数。后面会看到在TunnelRenderer类中会直接创建Surface对象供填充Sink端播放器使用。也就是说,如果要写自己的Sink端应用,可以将应用层要作为显示Source端数据流的Surface直接用来填充该构造函数使用。之后就是利用构造出的对象去启动Sink端主线程,

 void WifiDisplaySink::start(const char *sourceHost, int32_t sourcePort) {    sp<AMessage> msg = new AMessage(kWhatStart, id());    msg->setString("sourceHost", sourceHost);    msg->setInt32("sourcePort", sourcePort);    msg->post();}            

又看到了熟悉的异步消息处理框架AMessage类,这里就直接去看onMessageReceived的相应处理,

void WifiDisplaySink::onMessageReceived(const sp<AMessage> &msg) {    switch (msg->what()) {        case kWhatStart:        {            int32_t sourcePort;                  ...                CHECK(msg->findString("sourceHost", &mRTSPHost));                CHECK(msg->findInt32("sourcePort", &sourcePort));            sp<AMessage> notify = new AMessage(kWhatRTSPNotify, id());            status_t err = mNetSession->createRTSPClient(                    mRTSPHost.c_str(), sourcePort, notify, &mSessionID);//建立RTSP客户端            CHECK_EQ(err, (status_t)OK);            mState = CONNECTING;            break;        }        ...     }   }

到此,终于看到了Sink端调用ANetworkSession类中的createRTSPClient函数去建立Rtsp客户端。具体就是调用上面提到的connect函数将流套接字s连接至由命令指定的source端网络地址,并且将当前Session状态置为Session::CONNECTING。需要注意的是,只要调用了createClientOrServer的函数都会根据当前不同状态创建session并且加入到mSessions这一Vector结构中以供threadLoop和sendRequest等函数使用。当建立完RTSP服务端和客户端后,与RTSP服务端对应的session是需要读的,相应socket被加入到读文件描述符集合rs中;与RTSP客户端对应的session是需要写的,相应socket被加入到写文件描述符集合ws中。

Rtsp服务端在监听到客户端连接请求后,在threadLoop中将调用accept函数接收该请求并创建新的clientSocket。如果clientSocket可以被置为非阻塞状态,则通过其创建clientSession,具体就是调用Session的构造函数sp<Session> clientSession =new Session(mNextSessionID++,Session::CONNECTED,clientSocket,session->getNotificationMessage()),

ANetworkSession::Session::Session(        int32_t sessionID,        State state,        int s,        const sp<AMessage> notify)    : mSessionID(sessionID),      mState(state),      mIsRTSPConnection(false),      mSocket(s),      mNotify(notify),      mSawReceiveFailure(false),      mSawSendFailure(false) {  if (mState == CONNECTED) {           struct sockaddr_in localAddr;        socklen_t localAddrLen = sizeof(localAddr);        int res = getsockname(                mSocket, (struct sockaddr *)&localAddr, &localAddrLen);//根据clientSocket获得本地网络地址        CHECK_GE(res, 0);        struct sockaddr_in remoteAddr;        socklen_t remoteAddrLen = sizeof(remoteAddr);        res = getpeername(                mSocket, (struct sockaddr *)&remoteAddr, &remoteAddrLen);//根据clientSocket获得连接到Rtsp服务端的客户端网络地址        CHECK_GE(res, 0);        in_addr_t addr = ntohl(localAddr.sin_addr.s_addr);        AString localAddrString = StringPrintf(                "%d.%d.%d.%d",                (addr >> 24),                (addr >> 16) & 0xff,                (addr >> 8) & 0xff,                addr & 0xff);        addr = ntohl(remoteAddr.sin_addr.s_addr);        AString remoteAddrString = StringPrintf(                "%d.%d.%d.%d",                (addr >> 24),                (addr >> 16) & 0xff,                (addr >> 8) & 0xff,                addr & 0xff);        sp<AMessage> msg = mNotify->dup();  //利用AMessage进行通知        msg->setInt32("sessionID", mSessionID);        msg->setInt32("reason", kWhatClientConnected);  //通知Source端相关信息        msg->setString("server-ip", localAddrString.c_str());        msg->setInt32("server-port", ntohs(localAddr.sin_port));        msg->setString("client-ip", remoteAddrString.c_str());        msg->setInt32("client-port", ntohs(remoteAddr.sin_port));        msg->post();    }  }

可以看到,在创建clientSession时,session的状态被置为 CONNECTED,随后会根据clientSocket获得本地网络地址和连接到Rtsp服务端的客户端网络地址。在构造函数的最后会将相关信息通知Source端。此外,clientSession
Source端在创建RTSP服务端后就一直处于AWAITING_CLIENT_CONNECTION的状态,并且等待接收kWhatRTSPNotify类型的消息。当clientSession创建并向Source端发送通知消息后,Source端就在查询reason类型为kWhatClientConnected的消息。
frameworks/av/media/libstagefright/wifi-display/source/WifiDisplaySource.cpp

 case ANetworkSession::kWhatClientConnected:                {                    int32_t sessionID;                    CHECK(msg->findInt32("sessionID", &sessionID));                    if (mClientSessionID > 0) {  // mClientSessionID初始为0                        ALOGW("A client tried to connect, but we already "                              "have one.");                        mNetSession->destroySession(sessionID);                          break;                    }                    CHECK_EQ(mState, AWAITING_CLIENT_CONNECTION);  //检查状态                    CHECK(msg->findString("client-ip", &mClientInfo.mRemoteIP));//接收clientSession传来的信息                    CHECK(msg->findString("server-ip", &mClientInfo.mLocalIP));                    if (mClientInfo.mRemoteIP == mClientInfo.mLocalIP) {                      //出于安全考虑不接收由本地网络地址发来的连接                        mNetSession->destroySession(sessionID);                        break;                    }                    CHECK(msg->findInt32(                                "server-port", &mClientInfo.mLocalPort));                    mClientInfo.mPlaybackSessionID = -1;                    mClientSessionID = sessionID;                    ALOGI("We now have a client (%d) connected.", sessionID);                    mState = AWAITING_CLIENT_SETUP;                    status_t err = sendM1(sessionID);                    CHECK_EQ(err, (status_t)OK);                    break;                }

在Source端,onMessageReceived函数会接收由clientSession发送的通知消息,并且将Source端状态置为AWAITING_CLIENT_SETUP。与此同时,其会将clientSession对应的sessionID作为参数标识向RTSP服务端发送OPTIONS请求。

status_t WifiDisplaySource::sendM1(int32_t sessionID) {    AString request = "OPTIONS * RTSP/1.0\r\n";    AppendCommonResponse(&request, mNextCSeq);  //添加基本消息,如时间、消息序号等等    request.append(            "Require: org.wfa.wfd1.0\r\n"            "\r\n");    status_t err =        mNetSession->sendRequest(sessionID, request.c_str(), request.size());//向ANetworkSession发送数据等待threadLoop处理    if (err != OK) {        return err;    }    registerResponseHandler(            sessionID, mNextCSeq, &WifiDisplaySource::onReceiveM1Response);//使用函数指针返回状态信息    ++mNextCSeq;  //消息序号标识递增    return OK;}


进一步,ANetworkSession类中的sendRequest函数无非是将消息保存在mOutBuffer或者mOutDatagrams供writeMore函数使用。具体而言,当sendRequest函数调用完毕时,threadLoop中的select函数发现写文件描述符集合ws有写变化,即有socket可写。因而会调用session->writeMore()函数,该session对应于RTSP客户端,所处状态仍旧为Session::CONNECTING。因此,在调用writeMore函数时,会执行以下语句,

status_t ANetworkSession::Session::writeMore() {...if (mState == CONNECTING) {        int err;        socklen_t optionLen = sizeof(err);        CHECK_EQ(getsockopt(mSocket, SOL_SOCKET, SO_ERROR, &err, &optionLen), 0);        CHECK_EQ(optionLen, (socklen_t)sizeof(err));        if (err != 0) {            notifyError(kWhatError, -err, "Connection failed");            mSawSendFailure = true;            return -err;        }        mState = CONNECTED;        notify(kWhatConnected);        return OK;    }    CHECK_EQ(mState, CONNECTED);    CHECK(!mOutBuffer.empty());    ssize_t n;    do {        n = send(mSocket, mOutBuffer.c_str(), mOutBuffer.size(), 0); //客户端向服务端发送OPTIONS请求    } while (n < 0 && errno == EINTR);    status_t err = OK;    if (n > 0) {        mOutBuffer.erase(0, n);    } else if (n < 0) {        err = -errno;    } else if (n == 0) {        err = -ECONNRESET;    }    if (err != OK) {        notifyError(true , err, "Send failed.");        mSawSendFailure = true;    }    return err;}

由于RTSP客户端所处状态为Session::CONNECTING,因此执行if中的语句。这里首先获得相关socket的错误信息,如果没有任何错误信息,则更改RTSP客户端相应Session状态为 CONNECTED,并且会通知Sink端 kWhatConnected信息将Sink端的状态置为CONNECTED。之后,threadLoop函数会将clientSession加入到mSessions集合中,并等待接收由客户端发送来的请求信息。也就是在下一次threadLoop执行时,clientSession将会调用readMore函数从mSocket中接收由客户端发送的请求信息,具体过程如下,

status_t ANetworkSession::Session::readMore() {    ...    char tmp[512];    ssize_t n;    do {        n = recv(mSocket, tmp, sizeof(tmp), 0);   //接收客户端请求信息    } while (n < 0 && errno == EINTR);    status_t err = OK;    if (n > 0) {        mInBuffer.append(tmp, n);      } else if (n < 0) {        err = -errno;    } else {        err = -ECONNRESET;    }    if (!mIsRTSPConnection) {               ...    } else {        for (;;) {            size_t length;            if (mInBuffer.size() > 0 && mInBuffer.c_str()[0] == '$') {//接收到类型为PlaybackSession::kWhatBinaryData且头部请求信息为'$'才会进入此判断体                        ...                             }            sp<ParsedMessage> msg =                ParsedMessage::Parse(                        mInBuffer.c_str(), mInBuffer.size(), err != OK, &length);//解析处理RTSP消息            if (msg == NULL) {                break;            }            sp<AMessage> notify = mNotify->dup();            notify->setInt32("sessionID", mSessionID);            notify->setInt32("reason", kWhatData);  //向Sink端发送类型为kWhatData的消息请求            notify->setObject("data", msg);            notify->post();             ...            mInBuffer.erase(0, length);            if (err != OK) {                break;            }        }    }    if (err != OK) {        notifyError(false /* send */, err, "Recv failed.");        mSawReceiveFailure = true;    }    return err;}

该函数会首先接收客户端请求信息,然后利用解析类将信息解析成Sink端能够识别处理的信息,并且向Sink端发送类型为kWhatData的消息请求。现在先来看一下Sink端在接收到消息后是如何进行处理的,

void WifiDisplaySink::onMessageReceived(const sp<AMessage> &msg) {         ...  case ANetworkSession::kWhatData:                {                   onReceiveClientData(msg);  //调用消息处理函数                                  break;                }          ...}void WifiDisplaySink::onReceiveClientData(const sp<AMessage> &msg) {     int32_t sessionID;    CHECK(msg->findInt32("sessionID", &sessionID));  //获得发送消息的sessionID    sp<RefBase> obj;    CHECK(msg->findObject("data", &obj));  //获得发送信息的object对象    sp<ParsedMessage> data =        static_cast<ParsedMessage *>(obj.get());  //根据object对象获取解析后的数据    ALOGV("session %d received '%s'",          sessionID, data->debugString().c_str());    AString method;    AString uri;    data->getRequestField(0, &method);        int32_t cseq;    if (!data->findInt32("cseq", &cseq)) {     //获取send时加入的消息序号        sendErrorResponse(sessionID, "400 Bad Request", -1 );        return ERROR_MALFORMED;    }    if (method.startsWith("RTSP/")) {  //如果消息以"RTSP/"开头        ResponseID id;           id.mSessionID = sessionID;        id.mCSeq = cseq;        ssize_t index = mResponseHandlers.indexOfKey(id);  //根据 ResponseID获取注册时的mResponseHandlers在KeyedVector<ResponseID, HandleRTSPResponseFunc>这个数据结构中的位置        if (index < 0) {            ALOGW("Received unsolicited server response, cseq %d", cseq);            return ERROR_MALFORMED;        }        HandleRTSPResponseFunc func = mResponseHandlers.valueAt(index); //根据位置获取注册的回复函数        mResponseHandlers.removeItemsAt(index);        status_t err = (this->*func)(sessionID, data);  //填充函数指针         //判断回复中是否有错误信息        CHECK_EQ(err, (status_t)OK);    } else {        AString version;        data->getRequestField(2, &version);        if (!(version == AString("RTSP/1.0"))) {  //判断RTSP协议版本是否正确            sendErrorResponse(sessionID, "505 RTSP Version not supported", cseq);            return;        }        if (method == "OPTIONS") {            onOptionsRequest(sessionID, cseq, data);        } else if (method == "GET_PARAMETER") {            onGetParameterRequest(sessionID, cseq, data);        } else if (method == "SET_PARAMETER") {            onSetParameterRequest(sessionID, cseq, data);        } else {            sendErrorResponse(sessionID, "405 Method Not Allowed", cseq);        }    }}

首先,Sink端接收函数类型为kWhatData的消息请求,会调用onReceiveClientData函数进行消息处理。该函数在获得发送信息的object对象、sessionID(此时的sessionID对应于clientSession)等信息后,根据object对象获取解析后的数据。如果解析后的消息以"RTSP/"开头,注册的回复函数中也没有错误信息,那么就填充之前利用registerResponseHandler函数注册的函数指针,否则就匹配处理方法类型调用相关处理函数。由于函数sendM1发送的是 OPTIONS请求,这样会直接去匹配处理方法类型,从而会调用onOptionsRequest函数。

void WifiDisplaySink::onOptionsRequest(        int32_t sessionID,        int32_t cseq,        const sp<ParsedMessage> &data) {    AString response = "RTSP/1.0 200 OK\r\n";    AppendCommonResponse(&response, cseq);    response.append("Public: org.wfa.wfd1.0, GET_PARAMETER, SET_PARAMETER\r\n");    response.append("\r\n");    status_t err = mNetSession->sendRequest(sessionID, response.c_str());    CHECK_EQ(err, (status_t)OK);    err = sendM2(sessionID);    CHECK_EQ(err, (status_t)OK);}

该函数会将Sink端支持的方法GET_PARAMETER, SET_PARAMETER发送至ANetworkSession。以采取上面的那种readMore,writeMore之间socket通信的方式将Sink端支持的方法通知给Source端,Source端依旧采取形如Sink端的消息接收处理方式进行处理,

void WifiDisplaySource::onMessageReceived(const sp<AMessage> &msg) {         ...  case ANetworkSession::kWhatData:                {                    status_t err = onReceiveClientData(msg);  //调用消息处理函数                    if (err != OK) {                        mClient->onDisplayError(                                IRemoteDisplayClient::kDisplayErrorUnknown);                    }                    break;                }          ...}status_t WifiDisplaySource::onReceiveClientData(const sp<AMessage> &msg) {       ...        if (method.startsWith("RTSP/")) {  //如果消息以"RTSP/"开头        ResponseID id;           id.mSessionID = sessionID;        id.mCSeq = cseq;        ssize_t index = mResponseHandlers.indexOfKey(id);          ...        HandleRTSPResponseFunc func = mResponseHandlers.valueAt(index);         mResponseHandlers.removeItemsAt(index);        status_t err = (this->*func)(sessionID, data);  //填充函数指针         if (err != OK) {            ALOGW("Response handler for session %d, cseq %d returned "                  "err %d (%s)",                  sessionID, cseq, err, strerror(-err));            return err;        }          ...        return OK;    }    status_t err;    if (method == "OPTIONS") {        err = onOptionsRequest(sessionID, cseq, data);   匹配处理方法类型    } else if (method == "SETUP") {        err = onSetupRequest(sessionID, cseq, data);    } else if (method == "PLAY") {        err = onPlayRequest(sessionID, cseq, data);    } else if (method == "PAUSE") {        err = onPauseRequest(sessionID, cseq, data);    } else if (method == "TEARDOWN") {        err = onTeardownRequest(sessionID, cseq, data);    } else if (method == "GET_PARAMETER") {        err = onGetParameterRequest(sessionID, cseq, data);    } else if (method == "SET_PARAMETER") {        err = onSetParameterRequest(sessionID, cseq, data);    } else {        sendErrorResponse(sessionID, "405 Method Not Allowed", cseq);        err = ERROR_UNSUPPORTED;    }    return err;}

首先,Source端接收函数类型为kWhatData的消息请求,会调用onReceiveClientData函数进行消息处理。该函数在获得发送信息的object对象、sessionID(此时的sessionID对应于clientSession)等信息后,根据object对象获取解析后的数据。由于此时消息以"RTSP/"开头,则会填充函数sendM1 中利用registerResponseHandler函数注册的函数指针onReceiveM1Response。如果回复信息中没有错误信息,即RTSP服务端返回的状态码为200,则表示RTSP服务端提供Sink端需要的那些RTSP方法。

与此同时,Sink端还会接下去执行,也就是调用sendM2函数,该函数如同sendM1函数,依旧是发送OPTIONS请求,只不过方向正好相反,是由Sink端发送Source端接收。具体流程与上面的流程一致,Source端在接收到OPTIONS请求请求后,会同样调用以下函数。

status_t WifiDisplaySource::onOptionsRequest(        int32_t sessionID,        int32_t cseq,        const sp<ParsedMessage> &data) {    int32_t playbackSessionID;    sp<PlaybackSession> playbackSession =        findPlaybackSession(data, &playbackSessionID);    if (playbackSession != NULL) {        playbackSession->updateLiveness();    }    AString response = "RTSP/1.0 200 OK\r\n";      AppendCommonResponse(&response, cseq);    response.append(            "Public: org.wfa.wfd1.0, SETUP, TEARDOWN, PLAY, PAUSE, "            "GET_PARAMETER, SET_PARAMETER\r\n");       response.append("\r\n");    status_t err = mNetSession->sendRequest(sessionID, response.c_str());    if (err == OK) {        err = sendM3(sessionID);    }    return err;}

该函数会将Source端支持的所有方法通知给Sink端。同样的,Sink端也会填充类似的状态信息函数onReceiveM2Response。 如果回复信息中没有错误信息,即RTSP服务端返回的状态码为200,则表示RTSP服务端提供Source端需要的那些RTSP方法。

接下来,Source端会因为发送onOptionsRequest请求成功而继续调用sendM3函数以GET_PARAMETER方法发送获得Sink端所支持媒体流的内容保护(HDCP)支持性、视频格式信息、音频编码格式信息以及rtp客户端端口等信息的请求。Sink端则会调用onGetParameterRequest处理函数回复Sink端支持的相关信息。目前代码里可以看到Sink端并不支持内容保护特性。接下来,基本流程符合RTSP在进行媒体流通信的流程,这里就不做太多介绍。当流程调用到函数sendM5时,Source端会向Sink端发送SETUP命令,Sink端调用sendSetup启动sink端RTP线程,其中会利用createUDPSession创建UDP连接来传递音视频数据流。之后Source端会调用onSetupRequest函数做相应响应,如开启与编码Source端、打包相关的PlaybackSession线程,此过程算是Source端的核心代码,涉及到的内容较多如SurfaceFlinger等,希望有机会详细展开来介绍。当Setup流程结束后,Sink端通过sendPlay函数发送PLAY请求,Source端调用onPlayRequest函数做相应响应。如果playbackSession能够正常播放,则通过调用finishPlay函数完成开始播放的最后一些任务,如向Source端发送kWhatSessionEstablished消息,调用IRemoteDisplayClient的onDisplayConnected函数向应用层提供Wifi Display连接状态回调。

  mClient->onDisplayConnected(                            mClientInfo.mPlaybackSession->getSurfaceTexture(),                            mClientInfo.mPlaybackSession->width(),                            mClientInfo.mPlaybackSession->height(),                            mUsingHDCP                                ? IRemoteDisplayClient::kDisplayFlagSecure                                : 0);

该函数执行成功后,还会将Source状态由ABOUT_TO_PLAY改变为PLAYING。

当Sink端接收到Source端的数据流后,会调用/av/media/libstagefright/wifi-display/sink/RTPSink.cpp

下的parseRTP函数向TunnelRenderer类发送kWhatQueueBuffer消息,使其通过调用以下函数

/av/media/libstagefright/wifi-display/sink/TunnelRenderer.cpp

  mStreamSource->doSomeWork();

更新Sink端测试播放器PlayerClient在内存中的相关数据。具体流程放在下一回去做分析。


更多相关文章

  1. Android 的消息机制(Handler消息传递机制)
  2. Android下使用dlopen函数动态调用.so链接库
  3. Android 异步消息处理机制的几种实现
  4. Android 异步消息处理机制(Handler 、 Looper 、MessageQueue)源码
  5. Android消息机制之ThreadLocal浅析
  6. Android消息循环机制源码深入理解
  7. Android 调试桥(adb)是多种用途的工具,该工具可以帮助你你管理设

随机推荐

  1. Android(安卓)4.4.2 exfat 移植
  2. Android界面编程——对话框控件(四)
  3. Drawable分类
  4. Activity生命周期详解
  5. 简述Android消息机制及解惑
  6. android SQLite 鎬荤粨
  7. Android基础开发之手势识别
  8. Android实现视频播放的3种实现方式
  9. Android应用项目绑定appcom_v7打包时,出现
  10. Android(安卓)秒级编译方案-Freeline安装