Brief description of class still missing. More...
#include <LinkBuffer.h>
Public Member Functions | |
virtual int | bytesAvailable (char *buffer, int byteCount) |
LinkBuffer (const Address &peer, int fd) | |
void | startMeasure (RouteTcpHeader::BlockType type) |
void | startTest () |
~LinkBuffer () |
Brief description of class still missing.
Full description of class still missing
Block types:
0 Negociation, which side is going to calculate test results 1 Reset ballast factor to default and start test 2 Keep ballast factor as is and start test 3 Increase ballast factor and start test 4 First test block send by peer 5 Second test block send to peer 6 Raw results send by peer 7 Link info send to peer 8 Share pseudo time to master
LinkBuffer::LinkBuffer | ( | const Address & | peer, |
int | fd | ||
) |
Description of constructor still missing
: DynamicBuffer(fd), _testTimer(this), _measureTimer(this) { _peer=peer; }
References GpCoreTools::Address::toString().
{ Log::write(1, "[%s]: close link\n", _peer.toString().data()); }
int LinkBuffer::bytesAvailable | ( | char * | buffer, |
int | byteCount | ||
) | [virtual] |
Implements GpCoreTools::DynamicBuffer.
References PeerTracker::addFromMasterRoute(), Flowmeter::addPacket(), Flowmeter::ballastFactor(), RouteTcpHeader::ForwardAddRouteFromMaster, RouteTcpHeader::ForwardRemoveRouteFromMaster, RouteTcpHeader::InfoToLeader, RouteTcpHeader::InfoToSecond, PeerTracker::instance(), WaranCore::TcpHeader::isCompatible(), WaranCore::TcpHeader::isValid(), PeerTracker::masterTime(), RouteTcpHeader::MeasureIncreaseBallast, RouteTcpHeader::MeasureKeepBallast, Flowmeter::measurement(), RouteTcpHeader::MeasureResetBallast, RouteTcpHeader::MeasureResultToLeader, PeerTracker::removeFromMasterRoute(), GpCoreTools::Average::reset(), ROUTE_TCP_VERSION, RouteTcpHeader::SendLoadToLeader, RouteTcpHeader::SendLoadToSecond, GpCoreTools::DynamicBuffer::sendNoGateway(), GpCoreTools::DynamicBuffer::sendPartialNoGateway(), Flowmeter::setBallastFactor(), PeerTracker::setLink(), PeerTracker::setMasterAddress(), Flowmeter::setReceiveByteCount(), GpCoreTools::Timer::start(), Flowmeter::startGlobal(), startMeasure(), Flowmeter::stopGlobal(), LinkInfo::swap(), RouteTcpHeader::TestLeaderNegociation, RouteTcpHeader::TestNow, RouteTcpHeader::time(), LinkInfo::toLog(), GpCoreTools::Address::toString(), and WaranCore::TcpHeader::type().
{ if(byteCount<(int)sizeof(RouteTcpHeader)) { return 0; } // Check block header const RouteTcpHeader * hb=reinterpret_cast<const RouteTcpHeader *>(buffer); std::string peerTime=hb->time(); if(!hb->isValid()) { Log::write(2, "[%s%s]: bad header\n", _peer.toString().data(), peerTime.data()); return 1; // shift by one until matching a good header } if(!hb->isCompatible(ROUTE_TCP_VERSION)) { Log::write(2, "[%s:%05hu]: bad block version\n", _peer.toString().data(), peerTime.data()); return 0; } int readBytes=sizeof(RouteTcpHeader); buffer+=sizeof(RouteTcpHeader); byteCount-=sizeof(RouteTcpHeader); switch(static_cast<RouteTcpHeader::BlockType>(hb->type())) { case RouteTcpHeader::TestLeaderNegociation: // task negociations if(rand()%2==1) { // Ok I take it Log::write(2, "[%s%s]: ok thanks!\n", _peer.toString().data(), peerTime.data()); _rateToPeer.reset(); _rateFromPeer.reset(); _latency.reset(); _flowmeter.setBallastFactor(16); startMeasure(RouteTcpHeader::MeasureResetBallast); } else { Log::write(2, "[%s%s]: after you...\n", _peer.toString().data(), peerTime.data()); RouteTcpHeader hb(RouteTcpHeader::TestLeaderNegociation); sendNoGateway((const char *)&hb, sizeof(RouteTcpHeader)); } return readBytes; case RouteTcpHeader::MeasureResetBallast: // second _flowmeter.setBallastFactor(16); _flowmeter.startGlobal(); sendTestBlock(RouteTcpHeader::SendLoadToLeader); return readBytes; case RouteTcpHeader::MeasureKeepBallast: // leader _flowmeter.startGlobal(); sendTestBlock(RouteTcpHeader::SendLoadToLeader); return readBytes; case RouteTcpHeader::MeasureIncreaseBallast: // second _flowmeter.setBallastFactor(_flowmeter.ballastFactor()*2); Log::write(2, "[%s%s]: increase ballast factor to %i\n", _peer.toString().data(), peerTime.data(), _flowmeter.ballastFactor()); _flowmeter.startGlobal(); sendTestBlock(RouteTcpHeader::SendLoadToLeader); return readBytes; case RouteTcpHeader::SendLoadToLeader: { // leader byteCount-=sizeof(int); if(byteCount<0) { return 0; } const int& blockSize=*reinterpret_cast<const int *>(buffer); _flowmeter.setReceiveByteCount(blockSize); _flowmeter.addPacket(byteCount); if(byteCount<blockSize) { return 0; } else { sendTestBlock(RouteTcpHeader::SendLoadToSecond); return readBytes+sizeof(int)+byteCount; } } break; case RouteTcpHeader::SendLoadToSecond: { // second byteCount-=sizeof(int); if(byteCount<0) { return 0; } const int& blockSize=*reinterpret_cast<const int *>(buffer); _flowmeter.setReceiveByteCount(blockSize); _flowmeter.addPacket(byteCount); if(byteCount<blockSize) { return 0; } else { _flowmeter.stopGlobal(); RouteTcpHeader hb(RouteTcpHeader::MeasureResultToLeader); sendPartialNoGateway((const char *)&hb, sizeof(RouteTcpHeader)); sendNoGateway((const char *)&_flowmeter.measurement(), sizeof(FlowMeasurement)); return readBytes+sizeof(int)+byteCount; } } break; case RouteTcpHeader::MeasureResultToLeader: { // leader if(byteCount<(int)sizeof(FlowMeasurement)) { return 0; } _flowmeter.stopGlobal(); switch(addMeasurement(*reinterpret_cast<const FlowMeasurement *>(buffer))) { case Accepted: _measureTimer.start(); break; case Commit: { _linkInfo.toLog(_peer); MasterTime t=PeerTracker::instance()->masterTime(_peer); RouteTcpHeader hb(RouteTcpHeader::InfoToSecond); sendPartialNoGateway((const char *)&hb, sizeof(RouteTcpHeader)); sendPartialNoGateway((const char *)&_linkInfo, sizeof(LinkInfo)); sendPartialNoGateway((const char *)&t, sizeof(MasterTime)); sendNoGateway((const char *)&PeerTracker::instance()->masterAddress(), sizeof(Address)); } break; case IncreaseBallast1: _flowmeter.setBallastFactor(_flowmeter.ballastFactor()*2); Log::write(2, "[%s%s]: increase ballast factor to %i\n", _peer.toString().data(), peerTime.data(), _flowmeter.ballastFactor()); startMeasure(RouteTcpHeader::MeasureKeepBallast); break; case IncreaseBallast2: startMeasure(RouteTcpHeader::MeasureIncreaseBallast); break; case IncreaseBothBallast: _flowmeter.setBallastFactor(_flowmeter.ballastFactor()*2); Log::write(2, "[%s%s]: increase ballast factor to %i\n", _peer.toString().data(), peerTime.data(), _flowmeter.ballastFactor()); startMeasure(RouteTcpHeader::MeasureIncreaseBallast); break; } return readBytes+sizeof(FlowMeasurement); } break; case RouteTcpHeader::InfoToSecond: {// Link test finished, received updated information from leader if(byteCount<(int)(sizeof(LinkInfo)+sizeof(MasterTime)+sizeof(Address))) { return 0; } _linkInfo=*reinterpret_cast<const LinkInfo *>(buffer); _linkInfo.swap(); _linkInfo.toLog(_peer); const MasterTime& peerMasterTime=*reinterpret_cast<const MasterTime *>(buffer+sizeof(LinkInfo)); const Address& masterAddress=*reinterpret_cast<const Address *>(buffer+sizeof(LinkInfo)+sizeof(MasterTime)); PeerTracker::instance()->setMasterAddress(masterAddress); PeerTracker::instance()->setLink(_peer, _linkInfo, peerMasterTime); MasterTime t=PeerTracker::instance()->masterTime(_peer); RouteTcpHeader hb(RouteTcpHeader::InfoToLeader); sendPartialNoGateway((const char *)&hb, sizeof(RouteTcpHeader)); sendPartialNoGateway((const char *)&t, sizeof(MasterTime)); sendNoGateway((const char *)&PeerTracker::instance()->masterAddress(), sizeof(Address)); return readBytes+sizeof(LinkInfo)+sizeof(MasterTime)+sizeof(Address); } break; case RouteTcpHeader::InfoToLeader: { // master if(byteCount<(int)(sizeof(MasterTime)+sizeof(Address))) { return 0; } const MasterTime& peerMasterTime=*reinterpret_cast<const MasterTime *>(buffer); const Address& masterAddress=*reinterpret_cast<const Address *>(buffer+sizeof(MasterTime)); PeerTracker::instance()->setMasterAddress(masterAddress); PeerTracker::instance()->setLink(_peer, _linkInfo, peerMasterTime); _testTimer.start(); return readBytes+sizeof(MasterTime)+sizeof(Address); } break; case RouteTcpHeader::TestNow: return readBytes; case RouteTcpHeader::ForwardAddRouteFromMaster: { // forward add 'FromMaster' route to Master if(byteCount<(int)sizeof(Address)) { return 0; } const Address& destination=*reinterpret_cast<const Address *>(buffer); Log::write(0, "[%s%s]: received add route 'FromMaster' to [%s]\n", _peer.toString().data(), peerTime.data(), destination.toString().data()); PeerTracker::instance()->addFromMasterRoute(destination, _peer); return readBytes+sizeof(Address); } break; case RouteTcpHeader::ForwardRemoveRouteFromMaster: { // forward remove up 'FromMaster' route to Master if(byteCount<(int)sizeof(Address)) { return 0; } const Address& destination=*reinterpret_cast<const Address *>(buffer); Log::write(0, "[%s%s]: received remove up route 'FromMaster' to [%s]\n", _peer.toString().data(), peerTime.data(), destination.toString().data()); PeerTracker::instance()->removeFromMasterRoute(destination); return readBytes+sizeof(Address); } break; default: break; } Log::write(2, "[%s%s]: bad block type: %hhu\n", _peer.toString().data(), peerTime.data(), hb->type()); return 0; }
void LinkBuffer::startMeasure | ( | RouteTcpHeader::BlockType | type | ) |
References GpCoreTools::DynamicBuffer::sendNoGateway(), and Flowmeter::startGlobal().
Referenced by bytesAvailable(), and LinkMeasureTimer::exec().
{ _flowmeter.startGlobal(); RouteTcpHeader hb(type); sendNoGateway((const char *)&hb, sizeof(RouteTcpHeader)); }
void LinkBuffer::startTest | ( | ) |
Called only through test timer
References GpCoreTools::DynamicBuffer::sendNoGateway(), RouteTcpHeader::TestLeaderNegociation, and GpCoreTools::Address::toString().
Referenced by LinkStream::connect(), and LinkTestTimer::exec().
{ Log::write(2, "[%s]: start test\n", _peer.toString().data()); RouteTcpHeader hb(RouteTcpHeader::TestLeaderNegociation); sendNoGateway((const char *)&hb, sizeof(RouteTcpHeader)); }