Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "TCPSessionApp.h"
00016 #include "IPAddressResolver.h"
00017
00018
00019 Define_Module(TCPSessionApp);
00020
00021
00022 void TCPSessionApp::parseScript(const char *script)
00023 {
00024 const char *s = script;
00025 while (*s)
00026 {
00027 Command cmd;
00028
00029
00030 while (isspace(*s)) s++;
00031 if (!*s || *s==';') break;
00032 const char *s0 = s;
00033 cmd.tSend = strtod(s,&const_cast<char *&>(s));
00034 if (s==s0)
00035 throw cRuntimeError("syntax error in script: simulation time expected");
00036
00037
00038 while (isspace(*s)) s++;
00039 if (!isdigit(*s))
00040 throw cRuntimeError("syntax error in script: number of bytes expected");
00041 cmd.numBytes = atoi(s);
00042 while (isdigit(*s)) s++;
00043
00044
00045 commands.push_back(cmd);
00046
00047
00048 while (isspace(*s)) s++;
00049 if (!*s) break;
00050 if (*s!=';')
00051 throw cRuntimeError("syntax error in script: separator ';' missing");
00052 s++;
00053 while (isspace(*s)) s++;
00054 }
00055 }
00056
00057 void TCPSessionApp::count(cMessage *msg)
00058 {
00059 if(msg->isPacket())
00060 {
00061 if (msg->getKind()==TCP_I_DATA || msg->getKind()==TCP_I_URGENT_DATA)
00062 {
00063 packetsRcvd++;
00064 bytesRcvd+=PK(msg)->getByteLength();
00065 }
00066 else
00067 {
00068 EV << "TCPSessionApp received unknown message (kind:" << msg->getKind() << ", name:" << msg->getName() << ")\n";
00069 }
00070 }
00071 else
00072 {
00073 indicationsRcvd++;
00074 }
00075 }
00076
00077 void TCPSessionApp::waitUntil(simtime_t t)
00078 {
00079 if (simTime()>=t)
00080 return;
00081
00082 cMessage *timeoutMsg = new cMessage("timeout");
00083 scheduleAt(t, timeoutMsg);
00084 cMessage *msg=NULL;
00085 while ((msg=receive())!=timeoutMsg)
00086 {
00087 count(msg);
00088 socket.processMessage(msg);
00089 }
00090 delete timeoutMsg;
00091 }
00092
00093 void TCPSessionApp::activity()
00094 {
00095 packetsRcvd = indicationsRcvd = 0;
00096 bytesRcvd = bytesSent = 0;
00097 WATCH(packetsRcvd);
00098 WATCH(bytesRcvd);
00099 WATCH(indicationsRcvd);
00100
00101
00102 const char *address = par("address");
00103 int port = par("port");
00104 const char *connectAddress = par("connectAddress");
00105 int connectPort = par("connectPort");
00106
00107 bool active = par("active");
00108 simtime_t tOpen = par("tOpen");
00109 simtime_t tSend = par("tSend");
00110 long sendBytes = par("sendBytes");
00111 simtime_t tClose = par("tClose");
00112
00113 const char *script = par("sendScript");
00114 parseScript(script);
00115 if (sendBytes>0 && commands.size()>0)
00116 throw cRuntimeError("cannot use both sendScript and tSend+sendBytes");
00117
00118 socket.setOutputGate(gate("tcpOut"));
00119
00120
00121 waitUntil(tOpen);
00122
00123 socket.bind(*address ? IPvXAddress(address) : IPvXAddress(), port);
00124
00125 EV << "issuing OPEN command\n";
00126 if (ev.isGUI()) getDisplayString().setTagArg("t",0, active?"connecting":"listening");
00127
00128 if (active)
00129 socket.connect(IPAddressResolver().resolve(connectAddress), connectPort);
00130 else
00131 socket.listenOnce();
00132
00133
00134 while (socket.getState()!=TCPSocket::CONNECTED)
00135 {
00136 socket.processMessage(receive());
00137 if (socket.getState()==TCPSocket::SOCKERROR)
00138 return;
00139 }
00140
00141 EV << "connection established, starting sending\n";
00142 if (ev.isGUI()) getDisplayString().setTagArg("t",0,"connected");
00143
00144
00145 if (sendBytes>0)
00146 {
00147 waitUntil(tSend);
00148 EV << "sending " << sendBytes << " bytes\n";
00149 cPacket *msg = new cPacket("data1");
00150 msg->setByteLength(sendBytes);
00151 bytesSent += sendBytes;
00152 socket.send(msg);
00153 }
00154 for (CommandVector::iterator i=commands.begin(); i!=commands.end(); ++i)
00155 {
00156 waitUntil(i->tSend);
00157 EV << "sending " << i->numBytes << " bytes\n";
00158 cPacket *msg = new cPacket("data1");
00159 msg->setByteLength(i->numBytes);
00160 bytesSent += i->numBytes;
00161 socket.send(msg);
00162 }
00163
00164
00165 if (tClose>=0)
00166 {
00167 waitUntil(tClose);
00168 EV << "issuing CLOSE command\n";
00169 if (ev.isGUI()) getDisplayString().setTagArg("t",0,"closing");
00170 socket.close();
00171 }
00172
00173
00174 for (;;)
00175 {
00176 cMessage *msg = receive();
00177 count(msg);
00178 socket.processMessage(msg);
00179 }
00180 }
00181
00182 void TCPSessionApp::finish()
00183 {
00184 EV << getFullPath() << ": received " << bytesRcvd << " bytes in " << packetsRcvd << " packets\n";
00185 recordScalar("bytesRcvd", bytesRcvd);
00186 recordScalar("bytesSent", bytesSent);
00187 }