Commit 274c0a83 authored by shengjh's avatar shengjh 🇨🇳
Browse files

Rewrite the making packet process from inet2.99 to inet4.

parent f0bc5a6b
......@@ -13,9 +13,10 @@
// along with this program. If not, see http://www.gnu.org/licenses/.
//
import inet.networklayer.common.L3Address;
import inet.common.packet.chunk.Chunk;
packet AppMessage {
class AppMessage extends inet::FieldsChunk
{
inet::L3Address destAddr;
inet::L3Address srcAddr;
simtime_t msgCreationTime;
......
......@@ -22,6 +22,7 @@
#include "inet/networklayer/common/NetworkInterface.h"
#include "inet/networklayer/common/InterfaceTable.h"
#include "inet/networklayer/ipv4/Ipv4InterfaceData.h"
#include "inet/networklayer/common/FragmentationTag_m.h"
namespace inet {
Define_Module(WorkloadSynthesizer);
......@@ -155,8 +156,7 @@ void WorkloadSynthesizer::initialize(int stage)
cModule* parentHost = this->getParentModule();
if (strcmp(parentHost->getName(), "host") != 0) {
throw cRuntimeError("'%s': Not a valid parent module type. Expected"
" \"HostBase\" for parent module type.",
parentHost->getName());
" \"HostBase\" for parent module type.", parentHost->getName());
}
parentHostIdx = parentHost->getIndex();
......@@ -245,13 +245,14 @@ void WorkloadSynthesizer::initialize(int stage)
MsgSizeDistributions::InterArrivalDist::INTERARRIVAL_IN_FILE;
} else if (strcmp(par("interArrivalDist").stringValue(),
"facebook_key_value") == 0){
interArrivalDist =
MsgSizeDistributions::InterArrivalDist::FACEBOOK_PARETO;
interArrivalDist = MsgSizeDistributions::InterArrivalDist::FACEBOOK_PARETO;
} else {
throw cRuntimeError("'%s': Not a valid Interarrival Distribution",
par("interArrivalDist").stringValue());
}
double avgRate = par("loadFactor").doubleValue() * nicLinkSpeed;
msgSizeGenerator = new MsgSizeDistributions(distFileName.c_str(),
maxDataBytesPerPkt, interArrivalDist, distSelector, avgRate,
......@@ -353,13 +354,9 @@ void WorkloadSynthesizer::parseAndProcessXMLConfig()
}
}
void WorkloadSynthesizer::handleMessageWhenUp(cMessage *msg) {
//do nothing;
}
void
WorkloadSynthesizer::handleSelfMessage(cMessage *msg)
WorkloadSynthesizer::handleMessageWhenUp(cMessage *msg)
{
if (msg->isSelfMessage()) {
ASSERT(msg == selfMsg);
......@@ -378,7 +375,7 @@ WorkloadSynthesizer::handleSelfMessage(cMessage *msg)
(int)selfMsg->getKind());
}
} else {
processRcvdMsg(check_and_cast<AppMessage*>(msg));
processRcvdMsg(check_and_cast<Packet*>(msg));
}
}
......@@ -412,14 +409,18 @@ WorkloadSynthesizer::sendMsg()
}
char msgName[100];
sprintf(msgName, "WorkloadSynthesizerMsg-%d", numSent);
AppMessage *appMessage = new AppMessage(msgName);
appMessage->setByteLength(sendMsgSize);
appMessage->setDestAddr(destAddrs);
appMessage->setSrcAddr(srcAddress);
appMessage->setMsgCreationTime(appMessage->getCreationTime());
appMessage->setTransportSchedDelay(appMessage->getCreationTime());
emit(sentMsgSignal, appMessage);
send(appMessage, "transportOut");
Packet *packet = new Packet();
packet->addTag<FragmentationReq>()->setDontFragment(true);
const auto &payload = makeShared<AppMessage>();
// appMessage->setByteLength(sendMsgSize);
payload->setChunkLength(B(sendMsgSize));
payload->setDestAddr(destAddrs);
payload->setSrcAddr(srcAddress);
payload->setMsgCreationTime(simTime());
payload->setTransportSchedDelay(simTime());
packet->insertAtBack(payload);
emit(sentMsgSignal, packet);
send(packet, "transportOut");
numSent++;
}
......@@ -479,8 +480,7 @@ void
WorkloadSynthesizer::setupNextSend()
{
double nextSendInterval;
msgSizeGenerator->getSizeAndInterarrival(sendMsgSize, nextDestHostId,
nextSendInterval);
msgSizeGenerator->getSizeAndInterarrival(sendMsgSize, nextDestHostId, nextSendInterval);
simtime_t nextSendTime = nextSendInterval + simTime();
if (sendMsgSize < 0 || nextSendTime > stopTime) {
selfMsg->setKind(STOP);
......@@ -492,13 +492,16 @@ WorkloadSynthesizer::setupNextSend()
}
void
WorkloadSynthesizer::processRcvdMsg(cPacket* msg)
WorkloadSynthesizer::processRcvdMsg(Packet* pk)
{
AppMessage* rcvdMsg = check_and_cast<AppMessage*>(msg);
emit(rcvdMsgSignal, rcvdMsg);
//AppMessage* rcvdMsg = check_and_cast<AppMessage*>(msg);
ASSERT(pk);
emit(rcvdMsgSignal, pk);
auto rcvdMsg = pk->peekAtFront<AppMessage>();
simtime_t completionTime = simTime() - rcvdMsg->getMsgCreationTime();
emit(msgE2EDelaySignal, completionTime);
uint64_t msgByteLen = (uint64_t)(rcvdMsg->getByteLength());
uint64_t msgByteLen = (uint64_t)(B(rcvdMsg->getChunkLength()).get());
EV_INFO << "Received a message of length " << msgByteLen
<< "Bytes" << endl;
......@@ -548,7 +551,7 @@ WorkloadSynthesizer::processRcvdMsg(cPacket* msg)
mesgStats.transportSchedDelay = rcvdMsg->getTransportSchedDelay();
emit(mesgStatsSignal, &mesgStats);
delete rcvdMsg;
delete pk;
numReceived++;
}
......@@ -556,7 +559,7 @@ WorkloadSynthesizer::processRcvdMsg(cPacket* msg)
* correct ip assignments based on the config.xml file.
*/
double
WorkloadSynthesizer::idealMsgEndToEndDelay(AppMessage* rcvdMsg)
WorkloadSynthesizer::idealMsgEndToEndDelay(Ptr<const AppMessage> rcvdMsg)
{
int totalBytesTranmitted = 0;
inet::L3Address srcAddr = rcvdMsg->getSrcAddr();
......@@ -573,9 +576,8 @@ WorkloadSynthesizer::idealMsgEndToEndDelay(AppMessage* rcvdMsg)
// rcvdMsg. These bytes include all headers and ethernet overhead bytes per
// frame.
int lastPartialFrameLen = 0;
int numFullEthFrame = rcvdMsg->getByteLength() / maxDataBytesPerEthFrame;
uint32_t lastPartialFrameData =
rcvdMsg->getByteLength() % maxDataBytesPerEthFrame;
int numFullEthFrame = B(rcvdMsg->getChunkLength()).get() / maxDataBytesPerEthFrame;
uint32_t lastPartialFrameData = B(rcvdMsg->getChunkLength()).get() % maxDataBytesPerEthFrame;
totalBytesTranmitted = numFullEthFrame *
(MAX_ETHERNET_PAYLOAD_BYTES + ETHERNET_HDR_SIZE +
......
......@@ -23,8 +23,10 @@
#include "dctrans/common/Minimal.h"
#include "dctrans/common/DCTRANSDefs.h"
#include "inet/common/INETDefs.h"
#include "inet/networklayer/common/L3Address.h"
#include "inet/applications/base/ApplicationBase.h"
#include "inet/common/packet/Packet.h"
#include "MsgSizeDistributions.h"
#include "AppMessage_m.h"
......@@ -111,7 +113,7 @@ class WorkloadSynthesizer : public ApplicationBase
virtual void initialize(int stage) override;
virtual int numInitStages() const override { return NUM_INIT_STAGES; }
virtual void handleMessageWhenUp(cMessage *msg) override;
virtual void handleSelfMessage(cMessage *msg);
//virtual void handleSelfMessage(cMessage *msg);
// Lifecycle methods
virtual void handleStartOperation(LifecycleOperation *operation) override;
......@@ -125,12 +127,12 @@ class WorkloadSynthesizer : public ApplicationBase
void processStart();
void processSend();
void processStop();
void processRcvdMsg(cPacket* msg);
void processRcvdMsg(Packet* msg);
void sendMsg();
void setupNextSend();
void parseAndProcessXMLConfig();
void registerTemplatedStats(const char* msgSizeRanges);
double idealMsgEndToEndDelay(AppMessage* rcvdMsg);
double idealMsgEndToEndDelay(Ptr<const AppMessage> rcvdMsg);
};
class MesgStats : public cObject, noncopyable
......
......@@ -15,7 +15,7 @@ public:
HomaPktHelper();
static uint32_t getHomaPktHeaderSize(PktType pktType);
static uint32_t getBytesOnWire(uint32_t numDataBytes, PktType homaPktType);
virtual ~HomaPktHelper();
~HomaPktHelper();
};
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment