TCPSessionApp.cc

Go to the documentation of this file.
00001 //
00002 // Copyright 2004 Andras Varga
00003 //
00004 // This library is free software, you can redistribute it and/or modify
00005 // it under  the terms of the GNU Lesser General Public License
00006 // as published by the Free Software Foundation;
00007 // either version 2 of the License, or any later version.
00008 // The library is distributed in the hope that it will be useful,
00009 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00010 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
00011 // See the GNU Lesser General Public License for more details.
00012 //
00013 
00014 
00015 #include "TCPSessionApp.h"
00016 #include "IPAddressResolver.h"
00017 
00018 
00019 Define_Module(TCPSessionApp);
00020 
00021 
00022 void TCPSessionApp::parseScript(const char *script)
00023 {
00024     const char *s = script;
00025     while (*s)
00026     {
00027         Command cmd;
00028 
00029         // parse time
00030         while (isspace(*s)) s++;
00031         if (!*s || *s==';') break;
00032         const char *s0 = s;
00033         cmd.tSend = strtod(s,&const_cast<char *&>(s));
00034         if (s==s0)
00035             throw cRuntimeError("syntax error in script: simulation time expected");
00036 
00037         // parse number of bytes
00038         while (isspace(*s)) s++;
00039         if (!isdigit(*s))
00040             throw cRuntimeError("syntax error in script: number of bytes expected");
00041         cmd.numBytes = atoi(s);
00042         while (isdigit(*s)) s++;
00043 
00044         // add command
00045         commands.push_back(cmd);
00046 
00047         // skip delimiter
00048         while (isspace(*s)) s++;
00049         if (!*s) break;
00050         if (*s!=';')
00051             throw cRuntimeError("syntax error in script: separator ';' missing");
00052         s++;
00053         while (isspace(*s)) s++;
00054     }
00055 }
00056 
00057 void TCPSessionApp::count(cMessage *msg)
00058 {
00059     if(msg->isPacket())
00060     {
00061         if (msg->getKind()==TCP_I_DATA || msg->getKind()==TCP_I_URGENT_DATA)
00062         {
00063             packetsRcvd++;
00064             bytesRcvd+=PK(msg)->getByteLength();
00065         }
00066         else
00067         {
00068             EV << "TCPSessionApp received unknown message (kind:" << msg->getKind() << ", name:" << msg->getName() << ")\n";
00069         }
00070     }
00071     else
00072     {
00073         indicationsRcvd++;
00074     }
00075 }
00076 
00077 void TCPSessionApp::waitUntil(simtime_t t)
00078 {
00079     if (simTime()>=t)
00080         return;
00081 
00082     cMessage *timeoutMsg = new cMessage("timeout");
00083     scheduleAt(t, timeoutMsg);
00084     cMessage *msg=NULL;
00085     while ((msg=receive())!=timeoutMsg)
00086     {
00087         count(msg);
00088         socket.processMessage(msg);
00089     }
00090     delete timeoutMsg;
00091 }
00092 
00093 void TCPSessionApp::activity()
00094 {
00095     packetsRcvd = indicationsRcvd = 0;
00096     bytesRcvd = bytesSent = 0;
00097     WATCH(packetsRcvd);
00098     WATCH(bytesRcvd);
00099     WATCH(indicationsRcvd);
00100 
00101     // parameters
00102     const char *address = par("address");
00103     int port = par("port");
00104     const char *connectAddress = par("connectAddress");
00105     int connectPort = par("connectPort");
00106 
00107     bool active = par("active");
00108     simtime_t tOpen = par("tOpen");
00109     simtime_t tSend = par("tSend");
00110     long sendBytes = par("sendBytes");
00111     simtime_t tClose = par("tClose");
00112 
00113     const char *script = par("sendScript");
00114     parseScript(script);
00115     if (sendBytes>0 && commands.size()>0)
00116         throw cRuntimeError("cannot use both sendScript and tSend+sendBytes");
00117 
00118     socket.setOutputGate(gate("tcpOut"));
00119 
00120     // open
00121     waitUntil(tOpen);
00122 
00123     socket.bind(*address ? IPvXAddress(address) : IPvXAddress(), port);
00124 
00125     EV << "issuing OPEN command\n";
00126     if (ev.isGUI()) getDisplayString().setTagArg("t",0, active?"connecting":"listening");
00127 
00128     if (active)
00129         socket.connect(IPAddressResolver().resolve(connectAddress), connectPort);
00130     else
00131         socket.listenOnce();
00132 
00133     // wait until connection gets established
00134     while (socket.getState()!=TCPSocket::CONNECTED)
00135     {
00136         socket.processMessage(receive());
00137         if (socket.getState()==TCPSocket::SOCKERROR)
00138             return;
00139     }
00140 
00141     EV << "connection established, starting sending\n";
00142     if (ev.isGUI()) getDisplayString().setTagArg("t",0,"connected");
00143 
00144     // send
00145     if (sendBytes>0)
00146     {
00147         waitUntil(tSend);
00148         EV << "sending " << sendBytes << " bytes\n";
00149         cPacket *msg = new cPacket("data1");
00150         msg->setByteLength(sendBytes);
00151         bytesSent += sendBytes;
00152         socket.send(msg);
00153     }
00154     for (CommandVector::iterator i=commands.begin(); i!=commands.end(); ++i)
00155     {
00156         waitUntil(i->tSend);
00157         EV << "sending " << i->numBytes << " bytes\n";
00158         cPacket *msg = new cPacket("data1");
00159         msg->setByteLength(i->numBytes);
00160         bytesSent += i->numBytes;
00161         socket.send(msg);
00162     }
00163 
00164     // close
00165     if (tClose>=0)
00166     {
00167         waitUntil(tClose);
00168         EV << "issuing CLOSE command\n";
00169         if (ev.isGUI()) getDisplayString().setTagArg("t",0,"closing");
00170         socket.close();
00171     }
00172 
00173     // wait until peer closes too and all data arrive
00174     for (;;)
00175     {
00176         cMessage *msg = receive();
00177         count(msg);
00178         socket.processMessage(msg);
00179     }
00180 }
00181 
00182 void TCPSessionApp::finish()
00183 {
00184     EV << getFullPath() << ": received " << bytesRcvd << " bytes in " << packetsRcvd << " packets\n";
00185     recordScalar("bytesRcvd", bytesRcvd);
00186     recordScalar("bytesSent", bytesSent);
00187 }