Brief description of class still missing. More...
#include <PeerTracker.h>
Public Member Functions | |
void | addFromMasterRoute (const Address &destination, const Address &gateway) |
void | checkDeadPeers (double validityDelay) |
virtual void | event (short type) |
virtual short | eventTypes () |
bool | listen (uint16_t udpPort) |
const Address & | masterAddress () const |
MasterTime | masterTime (const Address &peer) const |
PeerTracker (const char *interface, uint16_t tcpPort) | |
void | removeFromMasterRoute (const Address &destination) |
void | resetFromMasterGateway () |
void | resetToMasterGateway () |
void | setAsMaster () |
void | setLink (const Address &peer, const LinkInfo &link, const MasterTime &peerMasterTime) |
void | setMasterAddress (const Address &masterAddress) |
void | start (const Address &peer, LinkStream *link) |
void | stop (const Address &peer) |
~PeerTracker () | |
Static Public Member Functions | |
static PeerTracker * | instance () |
Brief description of class still missing.
Full description of class still missing
PeerTracker::PeerTracker | ( | const char * | interface, |
uint16_t | tcpPort | ||
) |
Description of constructor still missing
References GpCoreTools::Stream::fileDescriptor(), and GpCoreTools::Stream::setFileDescriptor().
: _routes(interface) { assert(!_self); _self=this; // Used for rate testing _tcpPort=tcpPort; setFileDescriptor(socket(AF_INET, SOCK_DGRAM, 0)); if(fileDescriptor()<=0) { Log::write(0, "peer tracker socket: [%i] %s\n", errno, strerror(errno)); CoreApplication::exit(2); } int optval=1; if(setsockopt(fileDescriptor(),SOL_SOCKET,SO_REUSEADDR,&optval,sizeof(optval)) < 0) { Log::write(0, "peer tracker reuseaddr: [%i] %s\n", errno, strerror(errno)); CoreApplication::exit(2); } Log::write(0, "seeking for peers in %s/%i\n", Address::me().subnet().toString().data(), Address::maskSize()); int maximumPeerCount=1 << (32-Address::maskSize()); _peers=new PeerInfo *[maximumPeerCount]; memset(_peers, 0, maximumPeerCount*sizeof(PeerInfo *)); }
{ Log::write(8, "PeerTracker::~PeerTracker::begin\n"); std::list<PeerInfo *>::iterator it; for(it=_activePeers.begin(); it!=_activePeers.end(); it++) { delete *it; } delete [] _peers; Log::write(8, "PeerTracker::~PeerTracker::end\n"); }
void PeerTracker::addFromMasterRoute | ( | const Address & | destination, |
const Address & | gateway | ||
) |
References KernelRoute::add(), RouteTcpHeader::ForwardAddRouteFromMaster, GpCoreTools::Address::index(), GpCoreTools::Address::isValid(), PeerInfo::stream(), GpCoreTools::Address::toString(), GpCoreTools::TcpClientStream::writeNoGateway(), and GpCoreTools::TcpClientStream::writePartialNoGateway().
Referenced by LinkBuffer::bytesAvailable().
{ Log::write(2, "add 'From Master' route with destination %s (gw %s)\n", destination.toString().data(), gateway.toString().data()); if(Address::me()!=destination) { _routes.add(destination, gateway); } if(Address::me()!=_masterAddress) { if(_fromMasterGateway.isValid()) { PeerInfo * p=_peers[_fromMasterGateway.index()]; if(p) { const LinkStream * s=p->stream(); RouteTcpHeader hb(RouteTcpHeader::ForwardAddRouteFromMaster); s->writePartialNoGateway((const char *)&hb, sizeof(RouteTcpHeader)); s->writeNoGateway((const char *)&destination, sizeof(Address)); } else { Log::write(0, "broken route 'from master' to [%s] (add)\n", destination.toString().data()); } } else { Log::write(0, "broken route 'from master' to [%s] (add)\n", destination.toString().data()); } } }
void PeerTracker::checkDeadPeers | ( | double | validityDelay | ) |
References PeerInfo::address(), PeerInfo::lastBeaconDelay(), and stop().
Referenced by DeadPeersTimer::exec().
{ std::list<PeerInfo *>::iterator it; Address toRemove[_activePeers.size()]; int nToRemove=0; for(it=_activePeers.begin(); it!=_activePeers.end(); it++) { PeerInfo * p=*it; if(p->lastBeaconDelay()>validityDelay) { toRemove[nToRemove]=p->address(); nToRemove++; } } for(int i=0;i<nToRemove;i++) { Log::write(1, "[%s]: timeout\n", toRemove[i].toString().data()); stop(toRemove[i]); } }
void PeerTracker::event | ( | short | type | ) | [virtual] |
Implements GpCoreTools::Stream.
References PeerInfo::beaconReceived(), GpCoreTools::Stream::fileDescriptor(), GpCoreTools::Address::index(), and GpCoreTools::Address::toString().
{ if(type & POLLIN) { sockaddr_in peerAddr; socklen_t peerAddrLen=sizeof(sockaddr_in); char buffer[4]; if((recvfrom(fileDescriptor(), buffer, 4, 0, (struct sockaddr *) &peerAddr,&peerAddrLen))<0) { Log::write(1, "peer tracker recvfrom error: %s\n", strerror(errno)); } else { if(strncmp(buffer, "WAUB", 4)==0) { Address peer(peerAddr.sin_addr.s_addr); if(peer!=Address::me()) { Leds::flash(2, 100, true); int index=peer.index(); PeerInfo * p=_peers[index]; if(p) { p->beaconReceived(); } else if(peer<Address::me()) { // All indexes above mine will initiate the link themselves, they will discover my own beacon. // This is simpler to avoid double connections (from both side). Log::write(1, "discovered %s\n",peer.toString().data()); p=new PeerInfo(peer); _peers[index]=p; _activePeers.push_back(p); p->connect(peer, _tcpPort); } } } } } }
virtual short PeerTracker::eventTypes | ( | ) | [inline, virtual] |
Implements GpCoreTools::Stream.
{return POLLIN;}
static PeerTracker* PeerTracker::instance | ( | ) | [inline, static] |
Referenced by LinkBuffer::bytesAvailable(), LinkTimer::exec(), DeadPeersTimer::exec(), LinkStream::hungUp(), and LinkStream::LinkStream().
{return _self;}
bool PeerTracker::listen | ( | uint16_t | udpPort | ) |
References GpCoreTools::Stream::fileDescriptor().
Referenced by main().
{ sockaddr_in addr; addr.sin_family=AF_INET; addr.sin_port=htons(udpPort); addr.sin_addr.s_addr=htonl(INADDR_ANY); if(bind(fileDescriptor(), (sockaddr *)&addr, sizeof(sockaddr_in))<0) { Log::write(0, "peer tracker bind: [%i] %s\n", errno, strerror(errno)); return false; } return true; }
const Address& PeerTracker::masterAddress | ( | ) | const [inline] |
Referenced by setMasterAddress().
{return _masterAddress;}
MasterTime PeerTracker::masterTime | ( | const Address & | peer | ) | const |
Returns current master time. If peer corresponds to one of the master gateways the time is set to invalid value (1e99) to avoid cycling time assignments.
References MasterTime::setTimeFromMaster(), and MasterTime::setTimeToMaster().
Referenced by LinkBuffer::bytesAvailable().
{ MasterTime t=_masterTime; if(peer==_toMasterGateway) { t.setTimeToMaster(1e99); } if(peer==_fromMasterGateway) { t.setTimeFromMaster(1e99); } return t; }
void PeerTracker::removeFromMasterRoute | ( | const Address & | destination | ) |
References RouteTcpHeader::ForwardRemoveRouteFromMaster, GpCoreTools::Address::index(), GpCoreTools::Address::isValid(), KernelRoute::removeDestination(), PeerInfo::stream(), GpCoreTools::Address::toString(), GpCoreTools::TcpClientStream::writeNoGateway(), and GpCoreTools::TcpClientStream::writePartialNoGateway().
Referenced by LinkBuffer::bytesAvailable(), and stop().
{ Log::write(2, "remove 'From Master' route with destination %s\n", destination.toString().data()); _routes.removeDestination(destination); if(Address::me()!=_masterAddress) { if(_fromMasterGateway.isValid()) { PeerInfo * p=_peers[_fromMasterGateway.index()]; if(p) { const LinkStream * s=p->stream(); RouteTcpHeader hb(RouteTcpHeader::ForwardRemoveRouteFromMaster); s->writePartialNoGateway((const char *)&hb, sizeof(RouteTcpHeader)); s->writeNoGateway((const char *)&destination, sizeof(Address)); } else { Log::write(0, "broken route 'from master' to [%s] (remove up)\n", destination.toString().data()); } } else { Log::write(0, "broken route 'from master' to [%s] (remove up)\n", destination.toString().data()); } } }
void PeerTracker::resetFromMasterGateway | ( | ) |
References PeerInfo::address(), PeerInfo::masterTime(), and MasterTime::timeFromMaster().
Referenced by setLink(), setMasterAddress(), and stop().
{ Log::write(1, "reset gateway from master\n"); double timeFromMaster=1e99; Address fromMasterGateway; std::list<PeerInfo *>::iterator it; for(it=_activePeers.begin(); it!=_activePeers.end(); it++) { PeerInfo * p=*it; if(p->masterTime().timeFromMaster()<timeFromMaster) { fromMasterGateway=p->address(); timeFromMaster=p->masterTime().timeFromMaster(); } } setFromMasterGateway(timeFromMaster, fromMasterGateway); }
void PeerTracker::resetToMasterGateway | ( | ) |
References PeerInfo::address(), PeerInfo::masterTime(), and MasterTime::timeToMaster().
Referenced by setLink(), setMasterAddress(), and stop().
{ Log::write(1, "reset gateway to master\n"); double timeToMaster=1e99; Address toMasterGateway; std::list<PeerInfo *>::iterator it; for(it=_activePeers.begin(); it!=_activePeers.end(); it++) { PeerInfo * p=*it; if(p->masterTime().timeToMaster()<timeToMaster) { toMasterGateway=p->address(); timeToMaster=p->masterTime().timeToMaster(); } } setToMasterGateway(timeToMaster, toMasterGateway); }
void PeerTracker::setAsMaster | ( | ) |
References GpCoreTools::Address::isValid(), MasterTime::setTimeFromMaster(), and MasterTime::setTimeToMaster().
Referenced by main().
{ assert(!_masterAddress.isValid()); _masterTime.setTimeToMaster(0.0); _masterTime.setTimeFromMaster(0.0); _masterAddress=Address::me(); Log::write(0, "I'm the Master\n"); }
void PeerTracker::setLink | ( | const Address & | peer, |
const LinkInfo & | link, | ||
const MasterTime & | peerMasterTime | ||
) |
References PeerInfo::calculate(), GpCoreTools::Address::index(), PeerInfo::masterTime(), resetFromMasterGateway(), resetToMasterGateway(), PeerInfo::setLink(), PeerInfo::setPeerMasterTime(), MasterTime::setTimeFromMaster(), MasterTime::setTimeToMaster(), MasterTime::timeFromMaster(), and MasterTime::timeToMaster().
Referenced by LinkBuffer::bytesAvailable().
{ PeerInfo * p=_peers[peer.index()]; assert(p); p->setLink(link); p->setPeerMasterTime(peerMasterTime); p->calculate(); if(_toMasterGateway==peer) { Log::write(5, "'to master': current=%.3lg new=%.3lg\n", _masterTime.timeToMaster(), p->masterTime().timeToMaster()); // Our best link gave bad results, maybe another link is suitable for gateway if(p->masterTime().timeToMaster()>_masterTime.timeToMaster()) { resetToMasterGateway(); } else { // Just a better flow but no change on routing _masterTime.setTimeToMaster(p->masterTime().timeToMaster()); } } else { // It was not the best link but it will be... if(p->masterTime().timeToMaster()<_masterTime.timeToMaster()) { setToMasterGateway(p->masterTime().timeToMaster(), peer); } } if(_fromMasterGateway==peer) { Log::write(5, "'from master': current=%.3lg new=%.3lg\n", _masterTime.timeFromMaster(), p->masterTime().timeFromMaster()); // Our best link gave bad results, maybe another link is suitable for gateway if(p->masterTime().timeFromMaster()>_masterTime.timeFromMaster()) { resetFromMasterGateway(); } else { // Just a better flow but no change on routing _masterTime.setTimeFromMaster(p->masterTime().timeFromMaster()); } } else { // It was not the best link but it will be... if(p->masterTime().timeFromMaster()<_masterTime.timeFromMaster()) { setFromMasterGateway(p->masterTime().timeFromMaster(), peer); } } }
void PeerTracker::setMasterAddress | ( | const Address & | masterAddress | ) |
References GpCoreTools::Address::isValid(), masterAddress(), resetFromMasterGateway(), resetToMasterGateway(), and GpCoreTools::Address::toString().
Referenced by LinkBuffer::bytesAvailable().
{ if(masterAddress.isValid() && masterAddress!=_masterAddress) { _masterAddress=masterAddress; resetToMasterGateway(); resetFromMasterGateway(); Log::write(0, "master address changed to %s\n", _masterAddress.toString().data()); updatePeers(); } }
void PeerTracker::start | ( | const Address & | peer, |
LinkStream * | link | ||
) |
References GpCoreTools::Address::index(), and PeerInfo::setStream().
Referenced by LinkStream::LinkStream().
void PeerTracker::stop | ( | const Address & | peer | ) |
References PeerInfo::address(), KernelRoute::destination(), GpCoreTools::Address::index(), removeFromMasterRoute(), resetFromMasterGateway(), resetToMasterGateway(), and GpCoreTools::Address::toString().
Referenced by checkDeadPeers(), LinkTimer::exec(), and LinkStream::hungUp().
{ int index=peer.index(); Log::write(7, "stop peer with index: %i\n", index); PeerInfo * p=_peers[index]; if(p) { std::list<PeerInfo *>::iterator it; for(it=_activePeers.begin(); it!=_activePeers.end(); it++) { PeerInfo * p=*it; if(p->address()==peer) { break; } } assert(it!=_activePeers.end()); // Remove all routes related to the closing peer Address destination; while((destination=_routes.destination(peer)).isValid()) { removeFromMasterRoute(destination); } _activePeers.erase(it); Log::write(7, "number of active peers: %i\n", _activePeers.size()); for(it=_activePeers.begin(); it!=_activePeers.end(); it++) { Log::write(7, " [%s]\n",(*it)->address().toString().data()); } _peers[index]=0; delete p; Log::write(7, "peer %s removed\n", peer.toString().data()); // Peer was my connection to master... check for another route if(peer==_toMasterGateway) { resetToMasterGateway(); } if(peer==_fromMasterGateway) { resetFromMasterGateway(); } Log::write(7, "route to/from master updated\n"); } }