1    	/****************************************************************************
2    	 *   Copyright (C) 2006-2010 by Jason Ansel, Kapil Arya, and Gene Cooperman *
3    	 *   jansel@csail.mit.edu, kapil@ccs.neu.edu, gene@ccs.neu.edu              *
4    	 *                                                                          *
5    	 *   This file is part of the dmtcp/src module of DMTCP (DMTCP:dmtcp/src).  *
6    	 *                                                                          *
7    	 *  DMTCP:dmtcp/src is free software: you can redistribute it and/or        *
8    	 *  modify it under the terms of the GNU Lesser General Public License as   *
9    	 *  published by the Free Software Foundation, either version 3 of the      *
10   	 *  License, or (at your option) any later version.                         *
11   	 *                                                                          *
12   	 *  DMTCP:dmtcp/src is distributed in the hope that it will be useful,      *
13   	 *  but WITHOUT ANY WARRANTY; without even the implied warranty of          *
14   	 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the           *
15   	 *  GNU Lesser General Public License for more details.                     *
16   	 *                                                                          *
17   	 *  You should have received a copy of the GNU Lesser General Public        *
18   	 *  License along with DMTCP:dmtcp/src.  If not, see                        *
19   	 *  <http://www.gnu.org/licenses/>.                                         *
20   	 ****************************************************************************/
21   	
22   	#include "connectionrewirer.h"
23   	#include "dmtcpmessagetypes.h"
24   	#include "syscallwrappers.h"
25   	
26   	
27   	
28   	void dmtcp::ConnectionRewirer::onData ( jalib::JReaderInterface* sock )
29   	{
30   	  JASSERT ( sock->bytesRead() == sizeof ( DmtcpMessage ) ) ( sock->bytesRead() ) ( sizeof ( DmtcpMessage ) );
31   	  DmtcpMessage& msg = * ( DmtcpMessage* ) sock->buffer();
32   	  msg.assertValid();
33   	
34   	  if ( msg.type == DMT_FORCE_RESTART )
35   	  {
36   	    JTRACE ( "got DMT_FORCE_RESTART, exiting ConnectionRewirer" ) ( _pendingOutgoing.size() ) ( _pendingIncoming.size() );
37   	    _pendingIncoming.clear();
38   	    _pendingOutgoing.clear();
39   	    finishup();
40   	    return;
41   	  }
42   	
43   	  JASSERT ( msg.type==DMT_RESTORE_WAITING ) ( msg.type ).Text ( "unexpected message" );
44   	
45   	  // Find returns iterator 'i' w/ 0 or more elts, with first elt matching key.
46   	  iterator i = _pendingOutgoing.find ( msg.restorePid );
47   	
48   	  if ( i == _pendingOutgoing.end() )
49   	  {
50   	    // 'i' is an iterator over 0 elements.
51   	    JTRACE ( "got RESTORE_WAITING MESSAGE [not used]" ) ( msg.restorePid ) ( _pendingOutgoing.size() ) ( _pendingIncoming.size() );
52   	  }
53   	  else
54   	  {
55   	    // 'i' is an iterator over 1 element.
56   	    JTRACE ( "got RESTORE_WAITING MESSAGE, reconnecting..." )
57   	    ( msg.restorePid ) ( msg.restorePort ) ( msg.restoreAddrlen ) ( _pendingOutgoing.size() ) ( _pendingIncoming.size() );
58   	    const dmtcp::vector<int>& fds = i->second;
59   	    JASSERT ( fds.size() > 0 );
60   	    int fd0 = fds[0];
61   	
62   	    jalib::JSocket remote = jalib::JSocket::Create();
63   	    remote.changeFd ( fd0 );
64   	    errno = 0;
65   	    JASSERT ( remote.connect ( ( sockaddr* ) &msg.restoreAddr,msg.restoreAddrlen,msg.restorePort ) )
66   	    ( msg.restorePid ) ( msg.restorePort ) ( JASSERT_ERRNO )
67   	    .Text ( "failed to restore connection" );
68   	
69   	    {
70   	      DmtcpMessage peerMsg;
71   	      peerMsg.type = DMT_RESTORE_RECONNECTED;
72   	      peerMsg.restorePid = msg.restorePid;
73   	      addWrite ( new jalib::JChunkWriter ( remote, ( char* ) &peerMsg,sizeof ( peerMsg ) ) );
74   	    }
75   	
76   	    for ( size_t n = 1; n<fds.size(); ++n )
77   	    {
78   	      JTRACE ( "restoring extra fd" ) ( fd0 ) ( fds[n] );
79   	      JASSERT ( _real_dup2 ( fd0,fds[n] ) == fds[n] ) ( fd0 ) ( fds[n] ) ( msg.restorePid )
80   	      .Text ( "dup2() failed" );
81   	    }
82   	    _pendingOutgoing.erase ( i );
83   	  }
84   	
85   	  if ( pendingCount() ==0 ) finishup();
86   	#ifdef DEBUG
87   	  else debugPrint();
88   	#endif
89   	}
90   	
91   	void dmtcp::ConnectionRewirer::onConnect ( const jalib::JSocket& sock,  const struct sockaddr* /*remoteAddr*/,socklen_t /*remoteLen*/ )
92   	{
93   	  jalib::JSocket remote = sock;
94   	  DmtcpMessage msg;
95   	  msg.poison();
96   	  remote >> msg;
97   	  msg.assertValid();
98   	  JASSERT ( msg.type == DMT_RESTORE_RECONNECTED ) ( msg.type ).Text ( "unexpected message" );
99   	
100  	  iterator i = _pendingIncoming.find ( msg.restorePid );
101  	
Event past_the_end: Function "end" creates an iterator.
Event tested_end: "i" testing equal to "this->_pendingIncoming.end()".
Also see events: [deref_iterator]
At conditional (1): "i.operator !=(this->_pendingIncoming.end())": Taking false branch.
102  	  JASSERT ( i != _pendingIncoming.end() ) ( msg.restorePid )
103  	  .Text ( "got unexpected incoming restore request" );
104  	
Event deref_iterator: Dereferencing iterator "i" though it is already past the end of its container.
Also see events: [past_the_end][tested_end]
105  	  const dmtcp::vector<int>& fds = i->second;
106  	  JASSERT ( fds.size() > 0 );
107  	  int fd0 = fds[0];
108  	
109  	  remote.changeFd ( fd0 );
110  	
111  	  JTRACE ( "restoring incoming connection" ) ( msg.restorePid ) ( fd0 ) ( fds.size() );
112  	
113  	  for ( size_t i = 1; i<fds.size(); ++i )
114  	  {
115  	    JTRACE ( "restoring extra fd" ) ( fd0 ) ( fds[i] );
116  	    JASSERT ( _real_dup2 ( fd0,fds[i] ) == fds[i] ) ( fd0 ) ( fds[i] ) ( msg.restorePid )
117  	    .Text ( "dup2() failed" );
118  	  }
119  	
120  	  _pendingIncoming.erase ( i );
121  	
122  	
123  	  if ( pendingCount() ==0 ) finishup();
124  	#ifdef DEBUG
125  	  else debugPrint();
126  	#endif
127  	}
128  	
129  	void dmtcp::ConnectionRewirer::finishup()
130  	{
131  	  JTRACE ( "finishup begin" ) ( _listenSockets.size() ) ( _dataSockets.size() );
132  	  //i expect both sizes above to be 1
133  	  //close the restoreSocket
134  	  for ( size_t i=0; i<_listenSockets.size(); ++i )
135  	    _listenSockets[i].close();
136  	  //poison the coordinator socket listener
137  	  for ( size_t i=0; i<_dataSockets.size(); ++i )
138  	    _dataSockets[i]->socket() = -1;
139  	
140  	//     JTRACE("finishup end");
141  	}
142  	
143  	void dmtcp::ConnectionRewirer::onDisconnect ( jalib::JReaderInterface* sock )
144  	{
145  	  JASSERT ( sock->socket().sockfd() < 0 )
146  	  .Text ( "dmtcp_coordinator disconnected" );
147  	}
148  	
149  	int dmtcp::ConnectionRewirer::coordinatorFd() const
150  	{
151  	  return _coordinatorFd;
152  	}
153  	
154  	
155  	void dmtcp::ConnectionRewirer::setCoordinatorFd ( const int& theValue )
156  	{
157  	  _coordinatorFd = theValue;
158  	}
159  	
160  	void dmtcp::ConnectionRewirer::doReconnect()
161  	{
162  	  if ( pendingCount() > 0 ) monitorSockets();
163  	}
164  	
165  	void dmtcp::ConnectionRewirer::registerIncoming ( const ConnectionIdentifier& local
166  	        , const dmtcp::vector<int>& fds )
167  	{
168  	  _pendingIncoming[local] = fds;
169  	
170  	  JTRACE ( "announcing pending incoming" ) ( local );
171  	  DmtcpMessage msg;
172  	  msg.type = DMT_RESTORE_WAITING;
173  	  msg.restorePid = local;
174  	
175  	  JASSERT ( _coordinatorFd > 0 );
176  	  addWrite ( new jalib::JChunkWriter ( _coordinatorFd , ( char* ) &msg, sizeof ( DmtcpMessage ) ) );
177  	}
178  	
179  	void dmtcp::ConnectionRewirer::registerOutgoing ( const ConnectionIdentifier& remote
180  	        , const dmtcp::vector<int>& fds )
181  	{
182  	  _pendingOutgoing[remote] = fds;
183  	}
184  	
185  	void dmtcp::ConnectionRewirer::debugPrint() const
186  	{
187  	  JASSERT_STDERR << "Pending Incoming:\n";
188  	  for ( const_iterator i = _pendingIncoming.begin(); i!=_pendingIncoming.end(); ++i )
189  	  {
190  	    JASSERT_STDERR << i->first << " numFds=" << i->second.size() << " firstFd=" << i->second[0] << '\n';
191  	  }
192  	  JASSERT_STDERR << "Pending Outgoing:\n";
193  	  for ( const_iterator i = _pendingOutgoing.begin(); i!=_pendingOutgoing.end(); ++i )
194  	  {
195  	    JASSERT_STDERR << i->first << " numFds=" << i->second.size() << " firstFd=" << i->second[0] << '\n';
196  	  }
197  	}