Commit f9e7ec11 authored by shengjh's avatar shengjh 🇨🇳
Browse files

Fix destAddress param in omnetpp.ini. But something wrong in the signals while running.

parent 4dc9319c
......@@ -4,6 +4,7 @@
*_sm.h
*_sm.cc
dctransport
dctransport_dbg
.qtenvrc
......
[General]
simtime-scale=-15
#simtime-scale=-15
sim-time-limit = 100000ms
warmup-period = 0ms
record-eventlog = false
**.statistic-recording = false
**.scalar-recording = false
**.vector-recording = false
debug-on-errors = true
network=dcn
######################## Homa Transport Parameters ###############################
#rttBytes is computed as sending one full packet in one way and a grant in the other way.
......@@ -46,7 +46,9 @@ dcn.hostSwTurnAroundTime = 0.5us
dcn.hostNicSxThinkTime = 0.5us
dcn.switchFixDelay = 0.25us
dcn.isFabricCutThrough = false
**.ipv4.arp.typename = "GlobalArp"
*.host[*].numTrafficGeneratorApp = 1
**.host[*].app[0].destAddresses = moduleListByPath("**.host[*]")
**.realLoadFactor = 0.3
**.host[*].**.workloadType = "FACEBOOK_KEY_VALUE"
**.host[*].app[*].msgSizeRanges = "1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 18 20 22 25 28 31 35 39 44 49 55 61 68 76 85 95 107 119 133 149 167 186 208 233 260 291 325 364 406 454 508 568 635 710 794 887 992 1109 1240 1386 1550 1733 1937 2166 2421 2707 3027 3384 3783 4229 4729 5286 5910 6608 7388 8259 9234 10324 11542 12904 16129 25198 49208 96093 293173 572511 1000000"
......
......@@ -23,9 +23,9 @@ MsgSizeDistributions::MsgSizeDistributions(const char* distFileName,
int callerHostId)
: msgSizeProbDistVector()
, msgSizeDestInterarrivalQueue()
, SEED(0)
, sizeDistSelector(sizeDistSelector)
, interArrivalDist(interArrivalDist)
, SEED(0)
, avgMsgSize(0.0)
, avgInterArrivalTime(0.0)
, maxDataBytesPerPkt(maxDataBytesPerPkt)
......@@ -120,8 +120,7 @@ MsgSizeDistributions::getSizeAndInterarrival(int &msgSize, int &destHostId,
getFacebookSizeInterarrival(msgSize, nextInterarrivalTime);
return;
case DistributionChoice::SIZE_IN_FILE:
getInfileSizeInterarrivalDest(msgSize, destHostId,
nextInterarrivalTime);
getInfileSizeInterarrivalDest(msgSize, destHostId, nextInterarrivalTime);
return;
default:
msgSize = -1;
......
......@@ -53,6 +53,7 @@ WorkloadSynthesizer::WorkloadSynthesizer()
WorkloadSynthesizer::~WorkloadSynthesizer()
{
delete msgSizeGenerator;
cancelAndDelete(selfMsg);
}
void WorkloadSynthesizer::registerTemplatedStats(const char* msgSizeRanges)
......@@ -161,7 +162,6 @@ void WorkloadSynthesizer::initialize(int stage)
" \"HostBase\" for parent module type.", parentHost->getName());
}
parentHostIdx = parentHost->getIndex();
// Initialize the msgSizeGenerator
const char* workLoadType = par("workloadType").stringValue();
MsgSizeDistributions::DistributionChoice distSelector;
......@@ -298,9 +298,9 @@ void WorkloadSynthesizer::handleCrashOperation(LifecycleOperation *operation) {
void WorkloadSynthesizer::parseAndProcessXMLConfig()
{
// determine if this app is also a sender or only a receiver
const char* isSenderParam =
xmlConfig->getElementByPath("isSender")->getNodeValue();
const char* isSenderParam = xmlConfig->getElementByPath("isSender")->getNodeValue();
if (strcmp(isSenderParam, "false") == 0) {
isSender = false;
return;
} else if (strcmp(isSenderParam, "true") == 0) {
......@@ -315,27 +315,27 @@ void WorkloadSynthesizer::parseAndProcessXMLConfig()
// dest host be chosen randomly among all the possible dest hosts. Positive
// dest ids for hosts this app will send to and negative for hosts this app
// doesn't send to.
std::string destIdsStr =
std::string(xmlConfig->getElementByPath("destIds")->getNodeValue());
std::string destIdsStr = std::string(xmlConfig->getElementByPath("destIds")->getNodeValue());
std::stringstream destIdsStream(destIdsStr);
int id;
std::unordered_set<int> destHostIds;
while (destIdsStream >> id) {
if (id == -1) {
//Normally run into here.
destHostIds.clear();
destHostIds.insert(id);
break;
}
destHostIds.insert(id);
}
const char *destAddrs = par("destAddresses");
cStringTokenizer tokenizer(destAddrs);
const char *token;
while((token = tokenizer.nextToken()) != nullptr) {
cModule* mod = getSimulation()->getModuleByPath(token);
inet::L3Address result;
inet::L3AddressResolver().tryResolve(token, result);
L3Address result;
L3AddressResolver().tryResolve(token, result);
if (result.isUnspecified()) {
EV_ERROR << "cannot resolve destination address: "
<< token << endl;
......@@ -346,6 +346,7 @@ void WorkloadSynthesizer::parseAndProcessXMLConfig()
if (destHostIds.count(-1)) {
// Don't include this host (ie loopback) to the set of possible
// destination hosts
if (mod->getIndex() == parentHostIdx) {
continue;
}
......@@ -353,8 +354,10 @@ void WorkloadSynthesizer::parseAndProcessXMLConfig()
continue;
}
}
destAddresses.push_back(result);
}
}
......@@ -369,6 +372,7 @@ WorkloadSynthesizer::handleMessageWhenUp(cMessage *msg)
processStart();
break;
case SEND:
cout<<"Run into send"<<endl;
processSend();
break;
case STOP:
......@@ -468,7 +472,6 @@ WorkloadSynthesizer::processStart()
scheduleAt(stopTime, selfMsg);
return;
}
selfMsg->setKind(SEND);
setupNextSend();
}
......@@ -497,6 +500,7 @@ WorkloadSynthesizer::setupNextSend()
return;
}
ASSERT(selfMsg->getKind() == SelfMsgKinds::SEND);
//cout<<nextSendTime<<endl;
scheduleAt(nextSendTime, selfMsg);
}
......
......@@ -49,7 +49,7 @@ HomaMsgSizeFilter::receiveSignal(cResultFilter *prev, simtime_t_cref t,
outMsg = &( transport->sxController.getOutboundMsgMap()->at(homaChunk->getMsgId()) );
ASSERT((transport->getLocalAddr() == homaChunk->getDestAddr()
&& pktType == PktType::GRANT)
|| (transport->getLocalAddr() == homaPkt->getSrcAddr()
|| (transport->getLocalAddr() == homaChunk->getSrcAddr()
&& pktType == PktType::SCHED_DATA));
msgSize = outMsg->getMsgSize();
}
......
......@@ -377,11 +377,11 @@ HomaTransport::SendController::SendController(HomaTransport* transport)
, sentPktDuration(SIMTIME_ZERO)
, outboundMsgMap()
, rxAddrMsgMap()
, unschedByteAllocator(NULL)
, prioResolver(NULL)
, unschedByteAllocator(nullptr)
, prioResolver(nullptr)
, outbndMsgSet()
, transport(transport)
, homaConfig(NULL)
, homaConfig(nullptr)
, activePeriodStart(SIMTIME_ZERO)
, sentBytesPerActivePeriod(0)
, sumGrantsInGap(0)
......@@ -621,7 +621,7 @@ HomaTransport::SendController::sendOrQueue(cMessage* msg)
// When this function is called highPrioMsg to send a grant packet.
sxPkt = dynamic_cast<Packet*>(msg);
if (sxPkt) {
ASSERT(sxPkt->getPktType() == PktType::GRANT);
ASSERT(sxPkt->peekAtFront<HomaPkt>()->getPktType() == PktType::GRANT);
if (transport->sendTimer->isScheduled()) {
// NIC tx link is busy sending another packet
// EV << "Grant timer is scheduled! Grant to " <<
......@@ -1339,7 +1339,7 @@ HomaTransport::ReceiveScheduler::processReceivedPkt(Packet* rxPkt)
// check and update states for oversubscription time and bytes.
if (schedSenders->numSenders > schedSenders->numToGrant) {
// Already in an oversubcription period
ASSERT(inOversubPeriod && oversubPeriodStop==MAXTIME && oversubPeriodStart < timeNow - pktDuration);
ASSERT(inOversubPeriod && oversubPeriodStop==SIMTIME_MAX && oversubPeriodStart < timeNow - pktDuration);
rcvdBytesPerOversubPeriod += pktLenOnWire;
} else if (oversubPeriodStop != SIMTIME_MAX) {
// an oversubscription period has recently been marked ended and we need
......@@ -1425,7 +1425,7 @@ HomaTransport::ReceiveScheduler::processReceivedPkt(Packet* rxPkt)
} else if (!inOversubPeriod) {
// mark the start of a oversubcription period
ASSERT(rcvdBytesPerOversubPeriod == 0 &&
oversubPeriodStart == SIMTIME_MAX && oversubPeriodStop == MAXTIME);
oversubPeriodStart == SIMTIME_MAX && oversubPeriodStop == SIMTIME_MAX);
inOversubPeriod = true;
oversubPeriodStart = timeNow - pktDuration;
oversubPeriodStop = SIMTIME_MAX;
......@@ -1749,8 +1749,8 @@ HomaTransport::ReceiveScheduler::SenderState::handleInboundPkt(Packet* rxPkt)
totalOutstandingBytesSignal, rxScheduler->getInflightBytes());
}
ASSERT(rxPkt->getMsgId() == inboundMesg->msgIdAtSender &&
rxPkt->getSrcAddr().toIPv4() == senderAddr);
ASSERT(homaChunk->getMsgId() == inboundMesg->msgIdAtSender &&
homaChunk->getSrcAddr().toIpv4() == senderAddr);
// Append the unsched data to the inboundMesg and update variables
// tracking inflight bytes and total bytes
......
......@@ -791,7 +791,7 @@ public:
virtual void initialize();
virtual void handleMessage(cMessage *msg);
virtual void finish();
const inet::L3Address& getLocalAddr() {return localAddr;}
const inet::L3Address& getLocalAddr() const {return localAddr;}
void handleRecvdPkt(Packet* ptk);
void processStart();
void testAndEmitStabilitySignal();
......
......@@ -123,7 +123,7 @@ void
PriorityResolver::setPrioCutOffs()
{
prioCutOffs.clear();
const WorkloadEstimator::CdfVector* vecToUse = NULL;
const WorkloadEstimator::CdfVector* vecToUse = nullptr;
if (prioResMode == PrioResolutionMode::EXPLICIT) {
prioCutOffs = homaConfig->explicitUnschedPrioCutoff;
if (prioCutOffs.back() != UINT32_MAX) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment