Public Member Functions
LinkBuffer Class Reference

Brief description of class still missing. More...

#include <LinkBuffer.h>

Inheritance diagram for LinkBuffer:
GpCoreTools::DynamicBuffer

List of all members.

Public Member Functions

virtual int bytesAvailable (char *buffer, int byteCount)
 LinkBuffer (const Address &peer, int fd)
void startMeasure (RouteTcpHeader::BlockType type)
void startTest ()
 ~LinkBuffer ()

Detailed Description

Brief description of class still missing.

Full description of class still missing

Block types:

0 Negociation, which side is going to calculate test results 1 Reset ballast factor to default and start test 2 Keep ballast factor as is and start test 3 Increase ballast factor and start test 4 First test block send by peer 5 Second test block send to peer 6 Raw results send by peer 7 Link info send to peer 8 Share pseudo time to master


Constructor & Destructor Documentation

LinkBuffer::LinkBuffer ( const Address peer,
int  fd 
)

Description of constructor still missing

    : DynamicBuffer(fd),
    _testTimer(this),
    _measureTimer(this)
{
  _peer=peer;
}

References GpCoreTools::Address::toString().

{
  Log::write(1, "[%s]: close link\n", _peer.toString().data());
}

Member Function Documentation

int LinkBuffer::bytesAvailable ( char *  buffer,
int  byteCount 
) [virtual]

Implements GpCoreTools::DynamicBuffer.

References PeerTracker::addFromMasterRoute(), Flowmeter::addPacket(), Flowmeter::ballastFactor(), RouteTcpHeader::ForwardAddRouteFromMaster, RouteTcpHeader::ForwardRemoveRouteFromMaster, RouteTcpHeader::InfoToLeader, RouteTcpHeader::InfoToSecond, PeerTracker::instance(), WaranCore::TcpHeader::isCompatible(), WaranCore::TcpHeader::isValid(), PeerTracker::masterTime(), RouteTcpHeader::MeasureIncreaseBallast, RouteTcpHeader::MeasureKeepBallast, Flowmeter::measurement(), RouteTcpHeader::MeasureResetBallast, RouteTcpHeader::MeasureResultToLeader, PeerTracker::removeFromMasterRoute(), GpCoreTools::Average::reset(), ROUTE_TCP_VERSION, RouteTcpHeader::SendLoadToLeader, RouteTcpHeader::SendLoadToSecond, GpCoreTools::DynamicBuffer::sendNoGateway(), GpCoreTools::DynamicBuffer::sendPartialNoGateway(), Flowmeter::setBallastFactor(), PeerTracker::setLink(), PeerTracker::setMasterAddress(), Flowmeter::setReceiveByteCount(), GpCoreTools::Timer::start(), Flowmeter::startGlobal(), startMeasure(), Flowmeter::stopGlobal(), LinkInfo::swap(), RouteTcpHeader::TestLeaderNegociation, RouteTcpHeader::TestNow, RouteTcpHeader::time(), LinkInfo::toLog(), GpCoreTools::Address::toString(), and WaranCore::TcpHeader::type().

{
  if(byteCount<(int)sizeof(RouteTcpHeader)) {
    return 0;
  }
  // Check block header
  const RouteTcpHeader * hb=reinterpret_cast<const RouteTcpHeader *>(buffer);
  std::string peerTime=hb->time();
  if(!hb->isValid()) {
    Log::write(2, "[%s%s]: bad header\n", _peer.toString().data(), peerTime.data());
    return 1; // shift by one until matching a good header
  }
  if(!hb->isCompatible(ROUTE_TCP_VERSION)) {
    Log::write(2, "[%s:%05hu]: bad block version\n", _peer.toString().data(), peerTime.data());
    return 0;
  }
  int readBytes=sizeof(RouteTcpHeader);
  buffer+=sizeof(RouteTcpHeader);
  byteCount-=sizeof(RouteTcpHeader);

  switch(static_cast<RouteTcpHeader::BlockType>(hb->type())) {
  case RouteTcpHeader::TestLeaderNegociation: // task negociations
    if(rand()%2==1) { // Ok I take it
      Log::write(2, "[%s%s]: ok thanks!\n", _peer.toString().data(), peerTime.data());
      _rateToPeer.reset();
      _rateFromPeer.reset();
      _latency.reset();
      _flowmeter.setBallastFactor(16);
      startMeasure(RouteTcpHeader::MeasureResetBallast);
    } else {
      Log::write(2, "[%s%s]: after you...\n", _peer.toString().data(), peerTime.data());
      RouteTcpHeader hb(RouteTcpHeader::TestLeaderNegociation);
      sendNoGateway((const char *)&hb, sizeof(RouteTcpHeader));
    }
    return readBytes;
  case RouteTcpHeader::MeasureResetBallast: // second
    _flowmeter.setBallastFactor(16);
    _flowmeter.startGlobal();
    sendTestBlock(RouteTcpHeader::SendLoadToLeader);
    return readBytes;
  case RouteTcpHeader::MeasureKeepBallast: // leader
    _flowmeter.startGlobal();
    sendTestBlock(RouteTcpHeader::SendLoadToLeader);
    return readBytes;
  case RouteTcpHeader::MeasureIncreaseBallast: // second
    _flowmeter.setBallastFactor(_flowmeter.ballastFactor()*2);
    Log::write(2, "[%s%s]: increase ballast factor to %i\n",
               _peer.toString().data(), peerTime.data(), _flowmeter.ballastFactor());
    _flowmeter.startGlobal();
    sendTestBlock(RouteTcpHeader::SendLoadToLeader);
    return readBytes;
  case RouteTcpHeader::SendLoadToLeader: { // leader
      byteCount-=sizeof(int);
      if(byteCount<0) {
        return 0;
      }
      const int& blockSize=*reinterpret_cast<const int *>(buffer);
      _flowmeter.setReceiveByteCount(blockSize);
      _flowmeter.addPacket(byteCount);
      if(byteCount<blockSize) {
        return 0;
      } else {
        sendTestBlock(RouteTcpHeader::SendLoadToSecond);
        return readBytes+sizeof(int)+byteCount;
      }
    }
    break;
  case RouteTcpHeader::SendLoadToSecond: { // second
      byteCount-=sizeof(int);
      if(byteCount<0) {
        return 0;
      }
      const int& blockSize=*reinterpret_cast<const int *>(buffer);
      _flowmeter.setReceiveByteCount(blockSize);
      _flowmeter.addPacket(byteCount);
      if(byteCount<blockSize) {
        return 0;
      } else {
        _flowmeter.stopGlobal();
        RouteTcpHeader hb(RouteTcpHeader::MeasureResultToLeader);
        sendPartialNoGateway((const char *)&hb, sizeof(RouteTcpHeader));
        sendNoGateway((const char *)&_flowmeter.measurement(), sizeof(FlowMeasurement));
        return readBytes+sizeof(int)+byteCount;
      }
    }
    break;
  case RouteTcpHeader::MeasureResultToLeader: { // leader
      if(byteCount<(int)sizeof(FlowMeasurement)) {
        return 0;
      }
      _flowmeter.stopGlobal();
      switch(addMeasurement(*reinterpret_cast<const FlowMeasurement *>(buffer))) {
      case Accepted:
        _measureTimer.start();
        break;
      case Commit: {
          _linkInfo.toLog(_peer);
          MasterTime t=PeerTracker::instance()->masterTime(_peer);
          RouteTcpHeader hb(RouteTcpHeader::InfoToSecond);
          sendPartialNoGateway((const char *)&hb, sizeof(RouteTcpHeader));
          sendPartialNoGateway((const char *)&_linkInfo, sizeof(LinkInfo));
          sendPartialNoGateway((const char *)&t, sizeof(MasterTime));
          sendNoGateway((const char *)&PeerTracker::instance()->masterAddress(), sizeof(Address));
        }
        break;
      case IncreaseBallast1:
        _flowmeter.setBallastFactor(_flowmeter.ballastFactor()*2);
        Log::write(2, "[%s%s]: increase ballast factor to %i\n",
                   _peer.toString().data(), peerTime.data(), _flowmeter.ballastFactor());
        startMeasure(RouteTcpHeader::MeasureKeepBallast);
        break;
      case IncreaseBallast2:
        startMeasure(RouteTcpHeader::MeasureIncreaseBallast);
        break;
      case IncreaseBothBallast:
        _flowmeter.setBallastFactor(_flowmeter.ballastFactor()*2);
        Log::write(2, "[%s%s]: increase ballast factor to %i\n",
                   _peer.toString().data(), peerTime.data(), _flowmeter.ballastFactor());
        startMeasure(RouteTcpHeader::MeasureIncreaseBallast);
        break;
      }
      return readBytes+sizeof(FlowMeasurement);
    }
    break;
  case RouteTcpHeader::InfoToSecond: {// Link test finished, received updated information from leader
      if(byteCount<(int)(sizeof(LinkInfo)+sizeof(MasterTime)+sizeof(Address))) {
        return 0;
      }
      _linkInfo=*reinterpret_cast<const LinkInfo *>(buffer);
      _linkInfo.swap();
      _linkInfo.toLog(_peer);
      const MasterTime& peerMasterTime=*reinterpret_cast<const MasterTime *>(buffer+sizeof(LinkInfo));
      const Address& masterAddress=*reinterpret_cast<const Address *>(buffer+sizeof(LinkInfo)+sizeof(MasterTime));
      PeerTracker::instance()->setMasterAddress(masterAddress);
      PeerTracker::instance()->setLink(_peer, _linkInfo, peerMasterTime);
      MasterTime t=PeerTracker::instance()->masterTime(_peer);
      RouteTcpHeader hb(RouteTcpHeader::InfoToLeader);
      sendPartialNoGateway((const char *)&hb, sizeof(RouteTcpHeader));
      sendPartialNoGateway((const char *)&t, sizeof(MasterTime));
      sendNoGateway((const char *)&PeerTracker::instance()->masterAddress(), sizeof(Address));
      return readBytes+sizeof(LinkInfo)+sizeof(MasterTime)+sizeof(Address);
    }
    break;
  case RouteTcpHeader::InfoToLeader: { // master
      if(byteCount<(int)(sizeof(MasterTime)+sizeof(Address))) {
        return 0;
      }
      const MasterTime& peerMasterTime=*reinterpret_cast<const MasterTime *>(buffer);
      const Address& masterAddress=*reinterpret_cast<const Address *>(buffer+sizeof(MasterTime));
      PeerTracker::instance()->setMasterAddress(masterAddress);
      PeerTracker::instance()->setLink(_peer, _linkInfo, peerMasterTime);
      _testTimer.start();
      return readBytes+sizeof(MasterTime)+sizeof(Address);
    }
    break;
  case RouteTcpHeader::TestNow:
    return readBytes;
  case RouteTcpHeader::ForwardAddRouteFromMaster: { // forward add 'FromMaster' route to Master
      if(byteCount<(int)sizeof(Address)) {
        return 0;
      }
      const Address& destination=*reinterpret_cast<const Address *>(buffer);
      Log::write(0, "[%s%s]: received add route 'FromMaster' to [%s]\n",
                 _peer.toString().data(), peerTime.data(), destination.toString().data());
      PeerTracker::instance()->addFromMasterRoute(destination, _peer);
      return readBytes+sizeof(Address);
    }
    break;
  case RouteTcpHeader::ForwardRemoveRouteFromMaster: { // forward remove up 'FromMaster' route to Master
      if(byteCount<(int)sizeof(Address)) {
        return 0;
      }
      const Address& destination=*reinterpret_cast<const Address *>(buffer);
      Log::write(0, "[%s%s]: received remove up route 'FromMaster' to [%s]\n",
                 _peer.toString().data(), peerTime.data(), destination.toString().data());
      PeerTracker::instance()->removeFromMasterRoute(destination);
      return readBytes+sizeof(Address);
    }
    break;
  default:
    break;
  }
  Log::write(2, "[%s%s]: bad block type: %hhu\n", _peer.toString().data(), peerTime.data(), hb->type());
  return 0;
}

Called only through test timer

References GpCoreTools::DynamicBuffer::sendNoGateway(), RouteTcpHeader::TestLeaderNegociation, and GpCoreTools::Address::toString().

Referenced by LinkStream::connect(), and LinkTestTimer::exec().

{
  Log::write(2, "[%s]: start test\n", _peer.toString().data());
  RouteTcpHeader hb(RouteTcpHeader::TestLeaderNegociation);
  sendNoGateway((const char *)&hb, sizeof(RouteTcpHeader));
}

The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Defines