1 /****************************************************************************
2 * Copyright (C) 2006-2010 by Jason Ansel, Kapil Arya, and Gene Cooperman *
3 * jansel@csail.mit.edu, kapil@ccs.neu.edu, gene@ccs.neu.edu *
4 * *
5 * This file is part of the dmtcp/src module of DMTCP (DMTCP:dmtcp/src). *
6 * *
7 * DMTCP:dmtcp/src is free software: you can redistribute it and/or *
8 * modify it under the terms of the GNU Lesser General Public License as *
9 * published by the Free Software Foundation, either version 3 of the *
10 * License, or (at your option) any later version. *
11 * *
12 * DMTCP:dmtcp/src is distributed in the hope that it will be useful, *
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
15 * GNU Lesser General Public License for more details. *
16 * *
17 * You should have received a copy of the GNU Lesser General Public *
18 * License along with DMTCP:dmtcp/src. If not, see *
19 * <http://www.gnu.org/licenses/>. *
20 ****************************************************************************/
21
22 #include "connectionstate.h"
23 #include "constants.h"
24 #include "dmtcpmessagetypes.h"
25 #include "syslogcheckpointer.h"
26 #include "signalmanager.h"
27 #include "dmtcpworker.h"
28 #include "connectionrewirer.h"
29
30 dmtcp::ConnectionState::ConnectionState ( const ConnectionToFds& ctfd )
31 : _conToFds ( ctfd )
|
Event uninit_member: |
Non-static class member _numPeers is not initialized in this constructor nor in any functions that it calls. |
| Also see events: |
[member_decl] |
32 {}
33
34 void dmtcp::ConnectionState::deleteDupFileConnections()
35 {
36 ConnectionList& connections = ConnectionList::instance();
37
38 // i is a list of all File Connections, including those which were dup()'d
39 for ( ConnectionList::iterator i = connections.begin()
40 ; i != connections.end()
41 ; ++i ) {
42 if ( i->second->conType() != Connection::FILE )
43 continue;
44 FileConnection* fileConI = (FileConnection*) i->second;
45
46 // Search ahead with j, and erase any dup()'s of i of type Connection::FILE
47 ConnectionList::iterator prevJ = i;
48 ConnectionList::iterator j = prevJ;
49 j++;
50 for ( ; j != connections.end() ; prevJ = j, j++ ) {
51 FileConnection* fileConJ = (FileConnection*) j->second;
52 if ( j->second->conType() != Connection::FILE )
53 continue;
54 else if ( fileConJ->isDupConnection( *fileConI, _conToFds ) ) {
55 JTRACE ("dup()'s file connections found, merging them")
56 ( i->first ) ( j->first );
57 for ( size_t st = 0; st < _conToFds[j->first].size(); st++ )
58 _conToFds[i->first].push_back ( _conToFds[j->first][st] );
59 JTRACE("Deleting dup()'d file connection") (j->first);
60 _conToFds.erase( fileConJ->id() );
61 // ConnectionList::iterator j = connections.erase ( j );
62 connections.erase ( j ); // returns next position after old j pos
63 j = prevJ; // old position of j is now undefined, so back up to prev j
64 }
65 }
66 }
67
68 for ( ConnectionToFds::iterator cfIt = _conToFds.begin();
69 cfIt != _conToFds.end();
70 ++cfIt ) {
71 JTRACE("ConToFds")(cfIt->first);
72 }
73 }
74
75 void dmtcp::ConnectionState::deleteStaleConnections()
76 {
77 ConnectionList& connections = ConnectionList::instance();
78
79 //build list of stale connections
80 dmtcp::vector<ConnectionIdentifier> staleConnections;
81 for ( ConnectionList::iterator i = connections.begin()
82 ; i!= connections.end()
83 ; ++i )
84 {
85 if ( _conToFds[i->first].size() == 0 )
86 staleConnections.push_back ( i->first );
87 }
88
89 //delete all the stale connections
90 for ( size_t i=0; i<staleConnections.size(); ++i )
91 {
92 JTRACE ( "deleting stale connection" ) ( staleConnections[i] );
93 connections.erase ( staleConnections[i] );
94 }
95 }
96
97 void dmtcp::ConnectionState::preLockSaveOptions()
98 {
99 SignalManager::saveSignals();
100 SyslogCheckpointer::stopService();
101
102 // build fd table with stale connections included
103 _conToFds = ConnectionToFds ( KernelDeviceToConnection::instance() );
104
105 // Save Options for each Fd (We need to do it here instead of
106 // preCheckpointLock because we want to restore the correct owner in
107 // postcheckpoint).
108 ConnectionList& connections = ConnectionList::instance();
109 for ( ConnectionList::iterator i = connections.begin()
110 ; i!= connections.end()
111 ; ++i ) {
112 if ( _conToFds[i->first].size() == 0 ) continue;
113
114 ( i->second )->saveOptions ( _conToFds[i->first] );
115 }
116 }
117
118 void dmtcp::ConnectionState::preCheckpointLock()
119 {
120 ConnectionList& connections = ConnectionList::instance();
121 for ( ConnectionList::iterator i = connections.begin()
122 ; i!= connections.end()
123 ; ++i ) {
124 if ( _conToFds[i->first].size() == 0 ) continue;
125
126 ( i->second )->doLocking ( _conToFds[i->first] );
127 }
128 }
129
130 #ifdef EXTERNAL_SOCKET_HANDLING
131 void dmtcp::ConnectionState::preCheckpointPeerLookup( dmtcp::vector<TcpConnectionInfo>& conInfoTable )
132 {
133 deleteStaleConnections();
134 ConnectionList& connections = ConnectionList::instance();
135
136 for ( ConnectionList::iterator i = connections.begin()
137 ; i!= connections.end()
138 ; ++i )
139 {
140 if ( ( i->second )->conType() == Connection::TCP )
141 {
142 ( (TcpConnection *) (i->second) )->preCheckpointPeerLookup ( _conToFds[i->first], conInfoTable );
143 }
144 }
145 }
146 #endif
147
148 void dmtcp::ConnectionState::preCheckpointDrain()
149 {
150 deleteStaleConnections();
151 ConnectionList& connections = ConnectionList::instance();
152
153 //initialize the drainer
154 for ( ConnectionList::iterator i = connections.begin()
155 ; i!= connections.end()
156 ; ++i )
157 {
158 if ( _conToFds[i->first].size() > 0 )
159 {
160 ( i->second )->preCheckpoint ( _conToFds[i->first], _drain );
161 }
162 }
163
164 //this will block until draining is complete
165 _drain.monitorSockets ( DRAINER_CHECK_FREQ );
166
167 //handle disconnected sockets
168 const dmtcp::vector<ConnectionIdentifier>& discn = _drain.getDisconnectedSockets();
169 for(size_t i=0; i<discn.size(); ++i){
170 const ConnectionIdentifier& id = discn[i];
171 TcpConnection& con = connections[id].asTcp();
172 dmtcp::vector<int>& fds = _conToFds[discn[i]];
173 JASSERT(fds.size()>0);
174 JTRACE("recreating disconnected socket")(fds[0])(id);
175
176 //reading from the socket, and taking the error, resulted in an implicit close().
177 //we will create a new, broken socket that is not closed
178
179 con.onError();
180 static ConnectionRewirer ignored;
181 con.restore(fds, ignored); //restoring a TCP_ERROR connection makes a dead socket
182 KernelDeviceToConnection::instance().redirect(fds[0], id);
183 }
184
185 //re build fd table without stale connections and with disconnects
186 _conToFds = ConnectionToFds ( KernelDeviceToConnection::instance() );
187
188 deleteDupFileConnections();
189 }
190
191 void dmtcp::ConnectionState::preCheckpointHandshakes(const UniquePid& coordinator)
192 {
193 ConnectionList& connections = ConnectionList::instance();
194
195 //must send first to avoid deadlock
196 //we are relying on OS buffers holding our message without blocking
197 for ( ConnectionList::iterator i = connections.begin()
198 ; i!= connections.end()
199 ; ++i )
200 {
201 const dmtcp::vector<int>& fds = _conToFds[i->first];
202 Connection* con = i->second;
203 if ( fds.size() > 0 ){
204 con->doSendHandshakes(fds, coordinator);
205 }
206 }
207
208 //now receive
209 for ( ConnectionList::iterator i = connections.begin()
210 ; i!= connections.end()
211 ; ++i )
212 {
213 const dmtcp::vector<int>& fds = _conToFds[i->first];
214 Connection* con = i->second;
215 if ( fds.size() > 0 ){
216 con->doRecvHandshakes(fds, coordinator);
217 }
218 }
219 }
220
221 void dmtcp::ConnectionState::outputDmtcpConnectionTable(int fd)
222 {
223 //write out the *.dmtcp file
224 //dmtcp::string serialFile = dmtcp::UniquePid::dmtcpCheckpointFilename();
225 //JTRACE ( "Writing *.dmtcp checkpoint file" );
226 jalib::JBinarySerializeWriterRaw wr ( "mtcp-file-prefix", fd );
227
228 wr & _compGroup;
229 wr & _numPeers;
230 _conToFds.serialize ( wr );
231
232 #ifdef PID_VIRTUALIZATION
233 dmtcp::VirtualPidTable::instance().refresh( );
234 dmtcp::VirtualPidTable::instance().serialize( wr );
235 #endif
236 }
237
238
239 void dmtcp::ConnectionState::postCheckpoint( bool isRestart )
240 {
241 _drain.refillAllSockets();
242
243 ConnectionList& connections = ConnectionList::instance();
244 for ( ConnectionList::iterator i= connections.begin()
245 ; i!= connections.end()
246 ; ++i )
247 {
248 if ( _conToFds[i->first].size() <= 0 )
249 JWARNING(false) ( i->first.conId() ) .Text ( "WARNING:: stale connections should be gone by now" );
250
251 if ( _conToFds[i->first].size() == 0 ) continue;
252
253 ( i->second )->postCheckpoint ( _conToFds[i->first], isRestart );
254 }
255
256 SyslogCheckpointer::restoreService();
257 SignalManager::restoreSignals();
258 }
259
260 void dmtcp::ConnectionState::postRestart()
261 {
262 ConnectionList& connections = ConnectionList::instance();
263
264 // Two part restoreOptions. See the comments in doReconnect()
265 // Part 1: Restore options for all but Pseudo-terminal slaves
266 for ( ConnectionList::iterator i= connections.begin()
267 ; i!= connections.end()
268 ; ++i )
269 {
270 JWARNING ( _conToFds[i->first].size() > 0 ).Text ( "stale connections should be gone by now" );
271 if ( _conToFds[i->first].size() == 0 ) continue;
272
273 Connection *c = i->second;
274
275 if ( ( i->second )->conType() == Connection::PTY &&
276 ( ( (PtyConnection*) (i->second) )->ptyType() == PtyConnection::PTY_SLAVE ||
277 ( (PtyConnection*) (i->second) )->ptyType() == PtyConnection::PTY_BSD_SLAVE ) ) { }
278 else {
279 ( i->second )->restoreOptions ( _conToFds[i->first] );
280 }
281 }
282
283 // Part 2: Restore options for all Pseudo-terminal slaves
284 for ( ConnectionList::iterator i= connections.begin()
285 ; i!= connections.end()
286 ; ++i )
287 {
288 if ( _conToFds[i->first].size() == 0 ) continue;
289
290 Connection *c = i->second;
291
292 if ( ( i->second )->conType() == Connection::PTY &&
293 ( ( (PtyConnection*) (i->second) )->ptyType() == PtyConnection::PTY_SLAVE ||
294 ( (PtyConnection*) (i->second) )->ptyType() == PtyConnection::PTY_BSD_SLAVE ) ) {
295 ( i->second )->restoreOptions ( _conToFds[i->first] );
296 }
297 }
298
299 KernelDeviceToConnection::instance().dbgSpamFds();
300
301 //fix our device table to match the new world order
302 KernelDeviceToConnection::instance() = KernelDeviceToConnection ( _conToFds );
303 }
304
305 void dmtcp::ConnectionState::doReconnect ( jalib::JSocket& coordinator, jalib::JSocket& restoreListen )
306 {
307 _rewirer.addDataSocket ( new jalib::JChunkReader ( coordinator,sizeof ( DmtcpMessage ) ) );
308 _rewirer.addListenSocket ( restoreListen );
309 _rewirer.setCoordinatorFd ( coordinator.sockfd() );
310
311 ConnectionList& connections = ConnectionList::instance();
312
313 // Here we modify the restore algorithm by splitting it in two parts. In the
314 // first part we restore all the connection except the PTY_SLAVE types and in
315 // the second part we restore only PTY_SLAVE connections. This is done to
316 // make sure that by the time we are trying to restore a PTY_SLAVE
317 // connection, its corresponding PTY_MASTER connection has already been
318 // restored.
319 // UPDATE: We also restore the files for which the we didn't have the lock in
320 // second iteration along with PTY_SLAVEs
321 // Part 1: Restore all but Pseudo-terminal slaves and file connection which
322 // were not checkpointed
323 for ( ConnectionList::iterator i= connections.begin()
324 ; i!= connections.end()
325 ; ++i )
326 {
327 JASSERT ( _conToFds[i->first].size() > 0 ).Text ( "stale connections should be gone by now" );
328
329 if ( (i->second)->restoreInSecondIteration() == false ){
330 // if ( ( i->second )->conType() == Connection::PTY &&
331 // ( ( (PtyConnection*) (i->second) )->ptyType() == PtyConnection::PTY_SLAVE ||
332 // ( (PtyConnection*) (i->second) )->ptyType() == PtyConnection::PTY_BSD_SLAVE ) ) { }
333 // else {
334 ( i->second )->restore ( _conToFds[i->first], _rewirer );
335 }
336 }
337
338 // Part 2: Restore all Pseudo-terminal slaves and file connections that were
339 // not checkpointed.
340 for ( ConnectionList::iterator i= connections.begin()
341 ; i!= connections.end()
342 ; ++i )
343 {
344 JASSERT ( _conToFds[i->first].size() > 0 ).Text ( "stale connections should be gone by now" );
345
346 if ( ( i->second )->restoreInSecondIteration() == true ) {
347 // if ( ( i->second )->conType() == Connection::PTY &&
348 // ( ( (PtyConnection*) (i->second) )->ptyType() == PtyConnection::PTY_SLAVE ||
349 // ( (PtyConnection*) (i->second) )->ptyType() == PtyConnection::PTY_BSD_SLAVE ) ) {
350 ( i->second )->restore ( _conToFds[i->first], _rewirer );
351 }
352 }
353 _rewirer.doReconnect();
354 }