Skip to content

Commit

Permalink
Cant 32 (#22)
Browse files Browse the repository at this point in the history
* force serialize ip reconnections for anagate: one ip after another, for many modules CANT-32

* unlock mutex when skipping to avoid block

* quickfix on master for OPCUA-1430

* small cleanups
  • Loading branch information
meeludwig authored Jun 5, 2019
1 parent f619ae0 commit 0caee76
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 7 deletions.
1 change: 1 addition & 0 deletions CanInterface/include/CCanAccess.h
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ class CCanAccess
}

LOG(Log::TRC, _lh) << __FUNCTION__ << " m_sBusName= " << m_sBusName;

vector<string> stringVector;
istringstream nameSS(name);
string temporalString;
Expand Down
81 changes: 74 additions & 7 deletions CanInterfaceImplementations/anagate/AnaCanScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@
using namespace CanModule;
using namespace std;

boost::mutex anagateReconnectMutex;

/* static */ bool AnaCanScan::s_isCanHandleInUseArray[256];
/* static */ AnaInt32 AnaCanScan::s_canHandleArray[256];
/* static */ bool AnaCanScan::s_logItRegisteredAnagate = false;
/* static */ Log::LogComponentHandle AnaCanScan::s_logItHandleAnagate = 0;

/* static */ std::map<string,bool> AnaCanScan::reconnectInProgress_map;

#define MLOGANA(LEVEL,THIS) LOG(Log::LEVEL, AnaCanScan::s_logItHandleAnagate) << __FUNCTION__ << " " << " bus= " << THIS->getBusName() << " "

Expand Down Expand Up @@ -91,6 +92,30 @@ AnaCanScan::~AnaCanScan()
LOG(Log::TRC, AnaCanScan::s_logItHandleAnagate ) << "Anagate Can Scan component closed successfully";
}

/* static */ void AnaCanScan::setIpReconnectInProgress( string ip, bool flag ){
std::map<string,bool>::iterator it = AnaCanScan::reconnectInProgress_map.find( ip );

if ( flag ){
// mark as in progress if not yet exists/marked
if ( it == AnaCanScan::reconnectInProgress_map.end() ) {
AnaCanScan::reconnectInProgress_map.insert ( std::pair<string,bool>( ip, true ) );
}
} else {
// erase existing if still exists/marked
if ( it != AnaCanScan::reconnectInProgress_map.end() ) {
AnaCanScan::reconnectInProgress_map.erase( it );
}
}
}

/* static */ bool AnaCanScan::isIpReconnectInProgress( string ip ){
std::map<string,bool>::iterator it = AnaCanScan::reconnectInProgress_map.find( ip );
if ( it == AnaCanScan::reconnectInProgress_map.end() )
return( false );
else
return( true );
}


/**
* creates and returns anagate can access object
Expand All @@ -103,7 +128,8 @@ extern "C" DLLEXPORTFLAG CCanAccess *getCanBusAccess()


/**
* call back to catch incoming CAN messages for reading
* call back to catch incoming CAN messages for reading. It is called on a handle, which is internally
* registered and managed by the anagate API. If that handle changes, the callback has to be unregistered before I guess.
*/
void WINAPI InternalCallback(AnaUInt32 nIdentifier, const char * pcBuffer, AnaInt32 nBufferLen, AnaInt32 nFlags, AnaInt32 hHandle, AnaInt32 nSeconds, AnaInt32 nMicroseconds)
{
Expand All @@ -119,6 +145,11 @@ void WINAPI InternalCallback(AnaUInt32 nIdentifier, const char * pcBuffer, AnaIn
// canMsgCopy.c_time.tv_usec = nMicroseconds;
canMsgCopy.c_time = convertTimepointToTimeval(currentTimeTimeval());

cout << __FILE__ << " " << __LINE__ << " " << __FUNCTION__
<< " anagate message reception hHandle= " << hHandle
<< " nIdentifier= " << nIdentifier
<< endl;

g_AnaCanScanPointerMap[hHandle]->callbackOnRecieve(canMsgCopy);
g_AnaCanScanPointerMap[hHandle]->statisticsOnRecieve( nBufferLen );
}
Expand Down Expand Up @@ -398,13 +429,14 @@ bool AnaCanScan::sendMessage(short cobID, unsigned char len, unsigned char *mess

/**a sending on one port failed miserably.
* we are pessimistic and assume that the whole module (=ip) with all ports has lost
* power. Therefore we try to reconnect all ports of this ip.
* power. Therefore we try to reconnect all used ports of this ip.
* This is a bit brute, but it will work.
* Nevertheless we should limit our efforts to this module=ip only and not reconnect all other modules
* managed by this task.
* we also avoid reconnecting all ports of an ip several times (for each failed send on a port)
*/
AnaInt32 anaCallReturn0 = AnaCanScan::reconnectAllPorts( m_canIPAddress );
while ( anaCallReturn0 ){
while ( anaCallReturn0 < 0 ){
MLOGANA(WRN, this) << "failed to reconnect all module CAN ports once, keep on trying to reconnect. ip= " << m_canIPAddress;
anaCallReturn0 = AnaCanScan::reconnectAllPorts( m_canIPAddress );
}
Expand Down Expand Up @@ -433,12 +465,40 @@ bool AnaCanScan::sendMessage(short cobID, unsigned char len, unsigned char *mess
* reconnects all ports of a given ip adress=one module. We have one object per CAN port,
* so i.e. for an anagate duo (ports A and B) we have two objects with the same IP but
* with port 0 and 1.So we will have to scan over all objects there because we do not know
* how many CANports a given anagate bridge has.
* how many CANports a given anagate bridge has. We also allow globally only one ongoing reconnect
* to one ip at a time. Theoretically we could have a separate reconnection thread per ip, but that
* looks like overcomplicating the code and the issue. If several anagate modules fail
* they will all reconnect one after the other. That is OK since if all of these modules
* loose power and reboot at the same moment they will also be ready at roughly the same moment
* for reconnect. That means that we have to wait for the first ip to reconnect for a while
* but then all other ips will reconnect rather quickly in the most likely case.
* returns:
* 0 = OK
* -1 = cant open / reconnect CAN ports
* +1 = ignore, since another thread is doing the reconnect already
*/
/* static */ AnaInt32 AnaCanScan::reconnectAllPorts( string ip ){

// protect against several calls on the same ip.
// We need a mutex to force serialize this.
{
anagateReconnectMutex.lock();
if ( AnaCanScan::isIpReconnectInProgress( ip ) ) {
LOG(Log::WRN, AnaCanScan::s_logItHandleAnagate ) << "reconnecting all ports for ip= " << ip
<< " is already in progress, skipping.";

int us = 10000000;
boost::this_thread::sleep(boost::posix_time::microseconds( us ));
anagateReconnectMutex.unlock();

return(1);
}
AnaCanScan::setIpReconnectInProgress( ip, true );
LOG(Log::TRC, AnaCanScan::s_logItHandleAnagate ) << "reconnecting all ports for ip= " << ip
<< " is now in progress.";
anagateReconnectMutex.unlock();
}

int ret = 0;
AnaInt32 anaRet = 0;

Expand All @@ -448,7 +508,6 @@ bool AnaCanScan::sendMessage(short cobID, unsigned char len, unsigned char *mess
bool reconnectFailed = false;
for (std::map<AnaInt32, AnaCanScan*>::iterator it=g_AnaCanScanPointerMap.begin(); it!=g_AnaCanScanPointerMap.end(); it++){
if ( ip == it->second->ipAdress() ){
//LOG(Log::TRC, AnaCanScan::m_lh ) << __FUNCTION__
LOG(Log::TRC, AnaCanScan::s_logItHandleAnagate ) << __FUNCTION__
<< " key= " << it->first
<< " found ip= " << ip
Expand All @@ -470,13 +529,17 @@ bool AnaCanScan::sendMessage(short cobID, unsigned char len, unsigned char *mess
nbCANportsOnModule++;
}
}
LOG(Log::TRC, AnaCanScan::s_logItHandleAnagate) << " CAN bridge at ip= " << ip << " has nbCANportsOnModule= " << nbCANportsOnModule;
LOG(Log::TRC, AnaCanScan::s_logItHandleAnagate) << " CAN bridge at ip= " << ip << " uses nbCANportsOnModule= " << nbCANportsOnModule;

if ( reconnectFailed ) {
LOG(Log::WRN, AnaCanScan::s_logItHandleAnagate ) << " Problem reconnecting CAN ports for ip= " << ip
<< " last ret= " << ret << ". Just abandoning and trying again in 10 secs, module might not be ready yet.";
int us = 10000000;
boost::this_thread::sleep(boost::posix_time::microseconds( us ));

AnaCanScan::setIpReconnectInProgress( ip, false );
LOG(Log::TRC, AnaCanScan::s_logItHandleAnagate ) << "reconnecting all ports for ip= " << ip
<< " cancel.";
return(-1);
}

Expand All @@ -501,6 +564,10 @@ bool AnaCanScan::sendMessage(short cobID, unsigned char len, unsigned char *mess
LOG(Log::TRC, AnaCanScan::s_logItHandleAnagate) << __FUNCTION__ << " " __FILE__ << " " << __LINE__
<< " erasing stale handler " << it->first << " from obj. map";
g_AnaCanScanPointerMap.erase( it->first );

AnaCanScan::setIpReconnectInProgress( ip, false ); // all done, may fail another time
LOG(Log::TRC, AnaCanScan::s_logItHandleAnagate ) << "reconnecting all ports for ip= " << ip
<< " is done and OK.";
}
}
int us = 1000000;
Expand Down
6 changes: 6 additions & 0 deletions CanInterfaceImplementations/anagate/AnaCanScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ typedef unsigned long DWORD;
*/
using namespace CanModule;


class AnaCanScan: public CanModule::CCanAccess
{

Expand Down Expand Up @@ -103,6 +104,11 @@ class AnaCanScan: public CanModule::CCanAccess
static void setCanHandleOfPort(int port, AnaInt32 handle) { s_canHandleArray[ port ] = handle; }
static AnaInt32 getCanHandleFromPort(int n) { return s_canHandleArray[n]; }


static std::map<string, bool> reconnectInProgress_map; // could use 1-dim vector but map is faster
static void setIpReconnectInProgress( string ip, bool flag );
static bool isIpReconnectInProgress( string ip );

bool sendErrorCode(AnaInt32);
string ipAdress(){ return(m_canIPAddress );}
int canPortNumber(){ return(m_canPortNumber);}
Expand Down

0 comments on commit 0caee76

Please sign in to comment.