1 /****************************************************************************
2 * Copyright (C) 2006-2010 by Jason Ansel, Kapil Arya, and Gene Cooperman *
3 * jansel@csail.mit.edu, kapil@ccs.neu.edu, gene@ccs.neu.edu *
4 * *
5 * This file is part of the dmtcp/src module of DMTCP (DMTCP:dmtcp/src). *
6 * *
7 * DMTCP:dmtcp/src is free software: you can redistribute it and/or *
8 * modify it under the terms of the GNU Lesser General Public License as *
9 * published by the Free Software Foundation, either version 3 of the *
10 * License, or (at your option) any later version. *
11 * *
12 * DMTCP:dmtcp/src is distributed in the hope that it will be useful, *
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
15 * GNU Lesser General Public License for more details. *
16 * *
17 * You should have received a copy of the GNU Lesser General Public *
18 * License along with DMTCP:dmtcp/src. If not, see *
19 * <http://www.gnu.org/licenses/>. *
20 ****************************************************************************/
21
22 #include "connectionrewirer.h"
23 #include "dmtcpmessagetypes.h"
24 #include "syscallwrappers.h"
25
26
27
28 void dmtcp::ConnectionRewirer::onData ( jalib::JReaderInterface* sock )
29 {
30 JASSERT ( sock->bytesRead() == sizeof ( DmtcpMessage ) ) ( sock->bytesRead() ) ( sizeof ( DmtcpMessage ) );
31 DmtcpMessage& msg = * ( DmtcpMessage* ) sock->buffer();
32 msg.assertValid();
33
34 if ( msg.type == DMT_FORCE_RESTART )
35 {
36 JTRACE ( "got DMT_FORCE_RESTART, exiting ConnectionRewirer" ) ( _pendingOutgoing.size() ) ( _pendingIncoming.size() );
37 _pendingIncoming.clear();
38 _pendingOutgoing.clear();
39 finishup();
40 return;
41 }
42
43 JASSERT ( msg.type==DMT_RESTORE_WAITING ) ( msg.type ).Text ( "unexpected message" );
44
45 // Find returns iterator 'i' w/ 0 or more elts, with first elt matching key.
46 iterator i = _pendingOutgoing.find ( msg.restorePid );
47
48 if ( i == _pendingOutgoing.end() )
49 {
50 // 'i' is an iterator over 0 elements.
51 JTRACE ( "got RESTORE_WAITING MESSAGE [not used]" ) ( msg.restorePid ) ( _pendingOutgoing.size() ) ( _pendingIncoming.size() );
52 }
53 else
54 {
55 // 'i' is an iterator over 1 element.
56 JTRACE ( "got RESTORE_WAITING MESSAGE, reconnecting..." )
57 ( msg.restorePid ) ( msg.restorePort ) ( msg.restoreAddrlen ) ( _pendingOutgoing.size() ) ( _pendingIncoming.size() );
58 const dmtcp::vector<int>& fds = i->second;
59 JASSERT ( fds.size() > 0 );
60 int fd0 = fds[0];
61
62 jalib::JSocket remote = jalib::JSocket::Create();
63 remote.changeFd ( fd0 );
64 errno = 0;
65 JASSERT ( remote.connect ( ( sockaddr* ) &msg.restoreAddr,msg.restoreAddrlen,msg.restorePort ) )
66 ( msg.restorePid ) ( msg.restorePort ) ( JASSERT_ERRNO )
67 .Text ( "failed to restore connection" );
68
69 {
70 DmtcpMessage peerMsg;
71 peerMsg.type = DMT_RESTORE_RECONNECTED;
72 peerMsg.restorePid = msg.restorePid;
73 addWrite ( new jalib::JChunkWriter ( remote, ( char* ) &peerMsg,sizeof ( peerMsg ) ) );
74 }
75
76 for ( size_t n = 1; n<fds.size(); ++n )
77 {
78 JTRACE ( "restoring extra fd" ) ( fd0 ) ( fds[n] );
79 JASSERT ( _real_dup2 ( fd0,fds[n] ) == fds[n] ) ( fd0 ) ( fds[n] ) ( msg.restorePid )
80 .Text ( "dup2() failed" );
81 }
82 _pendingOutgoing.erase ( i );
83 }
84
85 if ( pendingCount() ==0 ) finishup();
86 #ifdef DEBUG
87 else debugPrint();
88 #endif
89 }
90
91 void dmtcp::ConnectionRewirer::onConnect ( const jalib::JSocket& sock, const struct sockaddr* /*remoteAddr*/,socklen_t /*remoteLen*/ )
92 {
93 jalib::JSocket remote = sock;
94 DmtcpMessage msg;
95 msg.poison();
96 remote >> msg;
97 msg.assertValid();
98 JASSERT ( msg.type == DMT_RESTORE_RECONNECTED ) ( msg.type ).Text ( "unexpected message" );
99
100 iterator i = _pendingIncoming.find ( msg.restorePid );
101
|
Event past_the_end: |
Function "end" creates an iterator. |
|
Event tested_end: |
"i" testing equal to "this->_pendingIncoming.end()". |
| Also see events: |
[deref_iterator] |
|
At conditional (1): "i.operator !=(this->_pendingIncoming.end())": Taking false branch.
|
102 JASSERT ( i != _pendingIncoming.end() ) ( msg.restorePid )
103 .Text ( "got unexpected incoming restore request" );
104
|
Event deref_iterator: |
Dereferencing iterator "i" though it is already past the end of its container. |
| Also see events: |
[past_the_end][tested_end] |
105 const dmtcp::vector<int>& fds = i->second;
106 JASSERT ( fds.size() > 0 );
107 int fd0 = fds[0];
108
109 remote.changeFd ( fd0 );
110
111 JTRACE ( "restoring incoming connection" ) ( msg.restorePid ) ( fd0 ) ( fds.size() );
112
113 for ( size_t i = 1; i<fds.size(); ++i )
114 {
115 JTRACE ( "restoring extra fd" ) ( fd0 ) ( fds[i] );
116 JASSERT ( _real_dup2 ( fd0,fds[i] ) == fds[i] ) ( fd0 ) ( fds[i] ) ( msg.restorePid )
117 .Text ( "dup2() failed" );
118 }
119
120 _pendingIncoming.erase ( i );
121
122
123 if ( pendingCount() ==0 ) finishup();
124 #ifdef DEBUG
125 else debugPrint();
126 #endif
127 }
128
129 void dmtcp::ConnectionRewirer::finishup()
130 {
131 JTRACE ( "finishup begin" ) ( _listenSockets.size() ) ( _dataSockets.size() );
132 //i expect both sizes above to be 1
133 //close the restoreSocket
134 for ( size_t i=0; i<_listenSockets.size(); ++i )
135 _listenSockets[i].close();
136 //poison the coordinator socket listener
137 for ( size_t i=0; i<_dataSockets.size(); ++i )
138 _dataSockets[i]->socket() = -1;
139
140 // JTRACE("finishup end");
141 }
142
143 void dmtcp::ConnectionRewirer::onDisconnect ( jalib::JReaderInterface* sock )
144 {
145 JASSERT ( sock->socket().sockfd() < 0 )
146 .Text ( "dmtcp_coordinator disconnected" );
147 }
148
149 int dmtcp::ConnectionRewirer::coordinatorFd() const
150 {
151 return _coordinatorFd;
152 }
153
154
155 void dmtcp::ConnectionRewirer::setCoordinatorFd ( const int& theValue )
156 {
157 _coordinatorFd = theValue;
158 }
159
160 void dmtcp::ConnectionRewirer::doReconnect()
161 {
162 if ( pendingCount() > 0 ) monitorSockets();
163 }
164
165 void dmtcp::ConnectionRewirer::registerIncoming ( const ConnectionIdentifier& local
166 , const dmtcp::vector<int>& fds )
167 {
168 _pendingIncoming[local] = fds;
169
170 JTRACE ( "announcing pending incoming" ) ( local );
171 DmtcpMessage msg;
172 msg.type = DMT_RESTORE_WAITING;
173 msg.restorePid = local;
174
175 JASSERT ( _coordinatorFd > 0 );
176 addWrite ( new jalib::JChunkWriter ( _coordinatorFd , ( char* ) &msg, sizeof ( DmtcpMessage ) ) );
177 }
178
179 void dmtcp::ConnectionRewirer::registerOutgoing ( const ConnectionIdentifier& remote
180 , const dmtcp::vector<int>& fds )
181 {
182 _pendingOutgoing[remote] = fds;
183 }
184
185 void dmtcp::ConnectionRewirer::debugPrint() const
186 {
187 JASSERT_STDERR << "Pending Incoming:\n";
188 for ( const_iterator i = _pendingIncoming.begin(); i!=_pendingIncoming.end(); ++i )
189 {
190 JASSERT_STDERR << i->first << " numFds=" << i->second.size() << " firstFd=" << i->second[0] << '\n';
191 }
192 JASSERT_STDERR << "Pending Outgoing:\n";
193 for ( const_iterator i = _pendingOutgoing.begin(); i!=_pendingOutgoing.end(); ++i )
194 {
195 JASSERT_STDERR << i->first << " numFds=" << i->second.size() << " firstFd=" << i->second[0] << '\n';
196 }
197 }