1 /****************************************************************************
2 * Copyright (C) 2006-2010 by Jason Ansel, Kapil Arya, and Gene Cooperman *
3 * jansel@csail.mit.edu, kapil@ccs.neu.edu, gene@ccs.neu.edu *
4 * *
5 * This file is part of the dmtcp/src module of DMTCP (DMTCP:dmtcp/src). *
6 * *
7 * DMTCP:dmtcp/src is free software: you can redistribute it and/or *
8 * modify it under the terms of the GNU Lesser General Public License as *
9 * published by the Free Software Foundation, either version 3 of the *
10 * License, or (at your option) any later version. *
11 * *
12 * DMTCP:dmtcp/src is distributed in the hope that it will be useful, *
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
15 * GNU Lesser General Public License for more details. *
16 * *
17 * You should have received a copy of the GNU Lesser General Public *
18 * License along with DMTCP:dmtcp/src. If not, see *
19 * <http://www.gnu.org/licenses/>. *
20 ****************************************************************************/
21
22 /****************************************************************************
23 * Coordinator code logic: *
24 * main calls monitorSockets, which acts as a top level event loop. *
25 * monitorSockets calls: onConnect, onData, onDisconnect, onTimeoutInterval*
26 * when client or dmtcp_command talks to coordinator. *
27 * onConnect and onData receive a socket parameter reads msg and passes to: *
28 * handleUserCommand, which takes single char arg ('s', 'c', 'k', 'q', ...)*
29 * handleUserCommand calls broadcastMessage to send data back *
30 * any message sent by broadcastMessage takes effect only on returning *
31 * back up to top level monitorSockets *
32 * Hence, even for checkpoint, handleUserCommand just changes state, *
33 * broadcasts an initial checkpoint command, and then returns to top *
34 * level. Replies from clients then driver further state changes. *
35 * The prefix command 'b' (blocking) from dmtcp_command modifies behavior *
36 * of 'c' so that the reply to dmtcp_command happens only when clients *
37 * are back in RUNNING state. *
38 * The states for a worker (client) are: *
39 * Checkpoint: RUNNING -> SUSPENDED -> FD_LEADER_ELECTION -> DRAINED *
40 * -> CHECKPOINTED -> REFILLED -> RUNNING *
41 * Restart: RESTARTING -> CHECKPOINTED -> REFILLED -> RUNNING *
42 * If debugging, set gdb breakpoint on: *
43 * dmtcp::DmtcpCoordinator::onConnect *
44 * dmtcp::DmtcpCoordinator::onData *
45 * dmtcp::DmtcpCoordinator::handleUserCommand *
46 * dmtcp::DmtcpCoordinator::broadcastMessage *
47 ****************************************************************************/
48
49 #include "dmtcp_coordinator.h"
50 #include "constants.h"
51 #include "protectedfds.h"
52 #include "../jalib/jconvert.h"
53 #include "dmtcpmessagetypes.h"
54 #include "dmtcpworker.h"
55 #include <stdio.h>
56 #include <unistd.h>
57 #include <sys/stat.h>
58 #include "../jalib/jtimer.h"
59 #include <algorithm>
60 #include <sys/wait.h>
61 #include <sys/types.h>
62 #include <sys/stat.h>
63 #include <fcntl.h>
64 #undef min
65 #undef max
66
67
68 static int thePort = -1;
69 static char *argv0;
70
71 static const char* theHelpMessage =
72 "COMMANDS:\n"
73 " l : List connected nodes\n"
74 " s : Print status message\n"
75 " c : Checkpoint all nodes\n"
76 " f : Force a restart even if there are missing nodes (debugging only)\n"
77 " k : Kill all nodes\n"
78 " q : Kill all nodes and quit\n"
79 " ? : Show this message\n"
80 "\n"
81 ;
82
83 static const char* theUsage =
84 "USAGE: \n"
85 " dmtcp_coordinator [OPTIONS] [port]\n\n"
86 "OPTIONS:\n"
87 " --port, -p, (environment variable DMTCP_PORT):\n"
88 " Port to listen on (default: 7779)\n"
89 " --ckptdir, -c, (environment variable DMTCP_CHECKPOINT_DIR):\n"
90 " Directory to store dmtcp_restart_script.sh (default: ./)\n"
91 " --tmpdir, -t, (environment variable DMTCP_TMPDIR):\n"
92 " Directory to store temporary files (default: env var TMDPIR or /tmp)\n"
93 " --exit-on-last\n"
94 " Exit automatically when last client disconnects\n"
95 " --background\n"
96 " Run silently in the background (mutually exclusive with --batch)\n"
97 " --batch\n"
98 " Run in batch mode (mutually exclusive with --background)\n"
99 " The checkpoint interval is set to 3600 seconds (1 hr) by default\n"
100 " --interval, -i, (environment variable DMTCP_CHECKPOINT_INTERVAL):\n"
101 " Time in seconds between automatic checkpoints\n"
102 " (default: 0, disabled)\n"
103 "COMMANDS:\n"
104 " (type '?<return>' at runtime for list)\n\n"
105 "See http://dmtcp.sf.net/ for more information.\n"
106 ;
107
108
109 static const char* theRestartScriptHeader =
110 "#!/bin/bash \n"
111 "set -m # turn on job control\n\n"
112 "#This script launches all the restarts in the background.\n"
113 "#Suggestions for editing:\n"
114 "# 1. For those processes executing on the localhost, remove 'ssh <hostname>' from the start of the line. \n"
115 "# 2. If using ssh, verify that ssh does not require passwords or other prompts.\n"
116 "# 3. Verify that the dmtcp_restart command is in your path on all hosts.\n"
117 "# 4. Verify DMTCP_HOST and DMTCP_PORT match the location of the dmtcp_coordinator.\n"
118 "# If necessary, add 'DMTCP_PORT=<dmtcp_coordinator port>' after 'DMTCP_HOST=<...>'.\n"
119 "# 5. Remove the '&' from a line if that process reads STDIN.\n"
120 "# If multiple processes read STDIN then prefix the line with 'xterm -hold -e' and put '&' at the end of the line.\n"
121 "# 6. Processes on same host can be restarted with single dmtcp_restart command.\n\n\n"
122 ;
123
124 static const char* theRestartScriptUsage =
125 "usage_str='USAGE:\n"
126 " dmtcp_restart_script.sh [OPTIONS]\n\n"
127 "OPTIONS:\n"
128 " --host, -h, (environment variable DMTCP_HOST):\n"
129 " Hostname where dmtcp_coordinator is running\n"
130 " --port, -p, (environment variable DMTCP_PORT):\n"
131 " Port where dmtcp_coordinator is running\n"
132 " --hostfile <arg0> :\n"
133 " Provide a hostfile (One host per line, \"#\" indicates comments)\n"
134 " --restartdir, -d, (environment variable DMTCP_RESTART_DIR):\n"
135 " Directory to read checkpoint images from\n"
136 " --batch, -b:\n"
137 " Enable batch mode for dmtcp_restart\n"
138 " --disable-batch, -b:\n"
139 " Disable batch mode for dmtcp_restart (if previously enabled)\n"
140 " --interval, -i, (environment variable DMTCP_CHECKPOINT_INTERVAL):\n"
141 " Time in seconds between automatic checkpoints\n"
142 " (Default: Use pre-checkpoint value)\n"
143 " --help:\n"
144 " Print this message\'\n\n\n"
145 ;
146
147 static const char* theRestartScriptCmdlineArgHandler =
148 "if [ $# -gt 0 ]; then\n"
149 " while [ $# -gt 0 ]\n"
150 " do\n"
151 " if [ $1 = \"--help\" ]; then\n"
152 " echo \"$usage_str\"\n"
153 " exit\n"
154 " elif [ $1 = \"--batch\" -o $1 = \"-b\" ]; then\n"
155 " maybebatch='--batch'\n"
156 " shift\n"
157 " elif [ $1 = \"--disable-batch\" ]; then\n"
158 " maybebatch=\n"
159 " shift\n"
160 " elif [ $# -ge 2 ]; then\n"
161 " case \"$1\" in \n"
162 " --host|-h)\n"
163 " coord_host=\"$2\";;\n"
164 " --port|-p)\n"
165 " coord_port=\"$2\";;\n"
166 " --hostfile)\n"
167 " hostfile=\"$2\"\n"
168 " if [ ! -f \"$hostfile\" ]; then\n"
169 " echo \"ERROR: hostfile $hostfile not found\"\n"
170 " exit\n"
171 " fi;;\n"
172 " --restartdir|-d)\n"
173 " DMTCP_RESTART_DIR=$2;;\n"
174 " --interval|-i)\n"
175 " checkpoint_interval=$2;;\n"
176 " *)\n"
177 " echo \"$0: unrecognized option \'$1\'. See correct usage below\"\n"
178 " echo \"$usage_str\"\n"
179 " exit;;\n"
180 " esac\n"
181 " shift\n"
182 " shift\n"
183 " elif [ $1 = \"--help\" ]; then\n"
184 " echo \"$usage_str\"\n"
185 " exit\n"
186 " else\n"
187 " echo \"$0: Incorrect usage. See correct usage below\"\n"
188 " echo\n"
189 " echo \"$usage_str\"\n"
190 " exit\n"
191 " fi\n"
192 " done\n"
193 "fi\n\n"
194 ;
195
196 static bool exitOnLast = false;
197 static bool blockUntilDone = false;
198 static int blockUntilDoneRemote = -1;
199 static dmtcp::DmtcpMessage blockUntilDoneReply;
200 #ifdef EXTERNAL_SOCKET_HANDLING
201 static int numWorkersWithExternalSockets = 0;
202 static dmtcp::vector<dmtcp::ConnectionIdentifier> workersWithExternalSockets;
203 #endif
204
205 static dmtcp::DmtcpCoordinator prog;
206
207 /* The coordinator can receive a second checkpoint request while processing the
208 * first one. If the second request comes at a point where the coordinator has
209 * broadcasted DMTCP_DO_SUSPEND message but the workers haven't replied, the
210 * coordinator sends another DMTCP_DO_SUSPEND message. The workers having
211 * replied to the first DMTCP_DO_SUSPEND message (by suspending all the user
212 * threads) are waiting for the next message (DMT_DO_LOCK_FDS or
213 * DMT_KILL_PEER), however they receive DMT_DO_SUSPEND message and thus exit()
214 * indicating an error.
215 * The fix to this problem is to introduce a global
216 * variable "workersRunningAndSuspendMsgSent" which, as the name implies,
217 * indicates that the DMT_DO_SUSPEND message has been sent and the coordinator
218 * is waiting for replies from the workers. If this variable is set, the
219 * coordinator will not process another checkpoint request.
220 */
221 static bool workersRunningAndSuspendMsgSent = false;
222
223 static int theCheckpointInterval = 0;
224 static bool batchMode = false;
225
226 const int STDIN_FD = fileno ( stdin );
227
228 JTIMER ( checkpoint );
229 JTIMER ( restart );
230
231 static dmtcp::UniquePid curCompGroup = dmtcp::UniquePid();
232 static int numPeers = -1;
233 static int curTimeStamp = -1;
234
235 namespace
236 {
237 static int theNextClientNumber = 1;
238
239 class NamedChunkReader : public jalib::JChunkReader
240 {
241 public:
242 NamedChunkReader ( const jalib::JSocket& sock
243 ,const dmtcp::UniquePid& identity
244 ,dmtcp::WorkerState state
245 ,const struct sockaddr * remote
246 ,socklen_t len
247 ,int restorePort )
248 : jalib::JChunkReader ( sock,sizeof ( dmtcp::DmtcpMessage ) )
249 , _identity ( identity )
250 , _clientNumber ( theNextClientNumber++ )
251 , _state ( state )
252 , _addrlen ( len )
253 , _restorePort ( restorePort )
254 {
255 memset ( &_addr, 0, sizeof _addr );
256 memcpy ( &_addr, remote, len );
257 }
258 const dmtcp::UniquePid& identity() const { return _identity;}
259 int clientNumber() const { return _clientNumber; }
260 dmtcp::WorkerState state() const { return _state; }
261 const struct sockaddr_storage* addr() const { return &_addr; }
262 socklen_t addrlen() const { return _addrlen; }
263 int restorePort() const { return _restorePort; }
264 void setState ( dmtcp::WorkerState value ) { _state = value; }
265 void progname(dmtcp::string pname){ _progname = pname; }
266 dmtcp::string progname(void) const { return _progname; }
267 void hostname(dmtcp::string hname){ _hostname = hname; }
268 dmtcp::string hostname(void) const { return _hostname; }
269 private:
270 dmtcp::UniquePid _identity;
271 int _clientNumber;
272 dmtcp::WorkerState _state;
273 struct sockaddr_storage _addr;
274 socklen_t _addrlen;
275 int _restorePort;
276 dmtcp::string _hostname;
277 dmtcp::string _progname;
278 };
279 }
280
281 #ifdef EXTERNAL_SOCKET_HANDLING
282 void dmtcp::DmtcpCoordinator::sendUnIdentifiedPeerNotifications()
283 {
284 _socketPeerLookupMessagesIterator it;
285 for ( it = _socketPeerLookupMessages.begin();
286 it != _socketPeerLookupMessages.end();
287 ++it ) {
288 DmtcpMessage msg (DMT_UNKNOWN_PEER);
289 msg.conId = it->conId;
290 jalib::JSocket remote(_workerSocketTable[it->from]);
291 remote << msg;
292 //*(it->second) << msg;
293
294 vector<dmtcp::ConnectionIdentifier>::iterator i;
295 for ( i = workersWithExternalSockets.begin();
296 i != workersWithExternalSockets.end();
297 ++i) {
298 if ( *i == it->from ) {
299 break;
300 }
301 }
302 if ( i == workersWithExternalSockets.end() ) {
303 workersWithExternalSockets.push_back ( it->from );
304 }
305 }
306 _socketPeerLookupMessages.clear();
307 }
308 #endif
309
310 void dmtcp::DmtcpCoordinator::handleUserCommand(char cmd, DmtcpMessage* reply /*= NULL*/)
311 {
312 int * replyParams;
313 if(reply!=NULL){
314 replyParams = reply->params;
315 }else{
316 static int dummy[sizeof(reply->params)/sizeof(int)];
317 replyParams = dummy;
318 }
319
320 JASSERT(sizeof(reply->params)/sizeof(int) >= 2); //this should be compiled out
321 //default reply is 0
322 replyParams[0] = NOERROR;
323 replyParams[1] = NOERROR;
324
325 switch ( cmd ){
326 case 'b': case 'B': // prefix blocking command, prior to checkpoint command
327 blockUntilDone = true;
328 replyParams[0] = 0; // reply from prefix command will be ignored
329 break;
330 case 'c': case 'C':
331 if(startCheckpoint()){
332 replyParams[0] = getStatus().numPeers;
333 }else{
334 replyParams[0] = ERROR_NOT_RUNNING_STATE;
335 replyParams[1] = ERROR_NOT_RUNNING_STATE;
336 }
337 break;
338 case 'i': case 'I':
339 setTimeoutInterval ( theCheckpointInterval );
340 JNOTE ( "CheckpointInterval Updated" ) ( theCheckpointInterval );
341 break;
342 case 'l': case 'L':
343 case 't': case 'T':
344 JASSERT_STDERR << "Client List:\n";
345 JASSERT_STDERR << "#, PROG[PID]@HOST, DMTCP-UNIQUEPID, STATE\n";
346 for ( dmtcp::vector<jalib::JReaderInterface*>::iterator i = _dataSockets.begin()
347 ;i!= _dataSockets.end()
348 ;++i )
349 {
350 if ( ( *i )->socket().sockfd() != STDIN_FD )
351 {
352 const NamedChunkReader& cli = *((NamedChunkReader*)(*i));
353 JASSERT_STDERR << cli.clientNumber()
354 << ", " << cli.progname() << "[" << cli.identity().pid() << "]@" << cli.hostname()
355 << ", " << cli.identity()
356 << ", " << cli.state().toString()
357 << '\n';
358 }
359 }
360 break;
361 case 'f': case 'F':
362 JNOTE ( "forcing restart..." );
363 broadcastMessage ( DMT_FORCE_RESTART );
364 break;
365 case 'q': case 'Q':
366 {
367 JNOTE ( "Killing all connected Peers ..." );
368 broadcastMessage ( DMT_KILL_PEER );
369 /* Call to broadcastMessage only puts the messages into the write queue.
370 * We actually want the messages to be written out to the respective sockets
371 * so that we can then close the sockets and exit gracefully. The following
372 * loop is taken from the implementation of monitorSocket() implementation
373 * in jsocket.cpp.
374 *
375 * Once the messages have been written out, the coordinator closes all the
376 * connections and calls exit().
377 */
378 for ( size_t i=0; i<_writes.size(); ++i )
379 {
380 int fd = _writes[i]->socket().sockfd();
381 if ( fd >= 0 ) {
382 _writes[i]->writeOnce();
383 }
384 }
385 JASSERT_STDERR << "DMTCP coordinator exiting... (per request)\n";
386 for ( dmtcp::vector<jalib::JReaderInterface*>::iterator i = _dataSockets.begin()
387 ; i!= _dataSockets.end()
388 ; ++i )
389 {
390 (*i)->socket().close();
391 }
392 for ( dmtcp::vector<jalib::JSocket>::iterator i = _listenSockets.begin()
393 ; i!= _listenSockets.end()
394 ; ++i )
395 {
396 i->close();
397 }
398 JTRACE ("Exiting ...");
399 exit ( 0 );
400 break;
401 }
402 case 'k': case 'K':
403 JNOTE ( "Killing all connected Peers..." );
404 //XXX: What happens if a 'k' command is followed by a 'c' command before
405 // the *real* broadcast takes place? --Kapil
406 broadcastMessage ( DMT_KILL_PEER );
407 break;
408 case 'h': case 'H': case '?':
409 JASSERT_STDERR << theHelpMessage;
410 break;
411 case 's': case 'S':
412 {
413 CoordinatorStatus s = getStatus();
414 bool running= s.minimumStateUnanimous && s.minimumState==WorkerState::RUNNING;
415 if(reply==NULL){
416 printf("Status...\n");
417 printf("NUM_PEERS=%d\n", s.numPeers);
418 printf("RUNNING=%s\n", (running?"yes":"no"));
419 fflush(stdout);
420 if(!running) JTRACE("raw status")(s.minimumState)(s.minimumStateUnanimous);
421 }else{
422 replyParams[0]=s.numPeers;
423 replyParams[1]=running;
424 }
425 }
426 break;
427 case ' ': case '\t': case '\n': case '\r':
428 //ignore whitespace
429 break;
430 default:
431 JTRACE("unhandled user command")(cmd);
432 replyParams[0] = ERROR_INVALID_COMMAND;
433 replyParams[1] = ERROR_INVALID_COMMAND;
434 }
435 return;
436 }
437
438 void dmtcp::DmtcpCoordinator::onData ( jalib::JReaderInterface* sock )
439 {
440 if ( sock->socket().sockfd() == STDIN_FD )
441 {
442 handleUserCommand(sock->buffer()[0]);
443 return;
444 }
445 else
446 {
447 NamedChunkReader * client= ( NamedChunkReader* ) sock;
448 DmtcpMessage& msg = * ( DmtcpMessage* ) sock->buffer();
449 msg.assertValid();
450 char * extraData = 0;
451
452 if ( msg.extraBytes > 0 )
453 {
454 extraData = new char[msg.extraBytes];
455 sock->socket().readAll ( extraData, msg.extraBytes );
456 }
457
458 switch ( msg.type )
459 {
460 case DMT_OK:
461 {
462 WorkerState oldState = client->state();
463 client->setState ( msg.state );
464 WorkerState newState = minimumState();
465
466 JTRACE ("got DMT_OK message")( msg.from )( msg.state )( oldState )( newState );
467
468 if ( oldState == WorkerState::RUNNING
469 && newState == WorkerState::SUSPENDED )
470 {
471 // All the workers are in SUSPENDED state, now it is safe to reset this flag.
472 workersRunningAndSuspendMsgSent = false;
473
474 JNOTE ( "locking all nodes" );
475 broadcastMessage ( DMT_DO_LOCK_FDS );
476 }
477 #ifdef EXTERNAL_SOCKET_HANDLING
478 if ( oldState == WorkerState::SUSPENDED
479 && newState == WorkerState::FD_LEADER_ELECTION )
480 {
481 JNOTE ( "performing peerlookup for all sockets" );
482 broadcastMessage ( DMT_DO_PEER_LOOKUP );
483 }
484 if ( oldState == WorkerState::FD_LEADER_ELECTION
485 && newState == WorkerState::PEER_LOOKUP_COMPLETE )
486 {
487 if ( _socketPeerLookupMessages.empty() ) {
488 JNOTE ( "draining all nodes" );
489 broadcastMessage ( DMT_DO_DRAIN );
490 } else {
491 sendUnIdentifiedPeerNotifications();
492 JNOTE ( "Not all socket peers were Identified, resuming computation without checkpointing" );
493 broadcastMessage ( DMT_DO_RESUME );
494 }
495 }
496 if ( oldState == WorkerState::PEER_LOOKUP_COMPLETE
497 && newState == WorkerState::DRAINED )
498 {
499 JNOTE ( "checkpointing all nodes" );
500 broadcastMessage ( DMT_DO_CHECKPOINT );
501 }
502 #else
503 if ( oldState == WorkerState::SUSPENDED
504 && newState == WorkerState::FD_LEADER_ELECTION )
505 {
506 JNOTE ( "draining all nodes" );
507 broadcastMessage ( DMT_DO_DRAIN );
508 }
509 if ( oldState == WorkerState::FD_LEADER_ELECTION
510 && newState == WorkerState::DRAINED )
511 {
512 JNOTE ( "checkpointing all nodes" );
513 broadcastMessage ( DMT_DO_CHECKPOINT );
514 }
515 #endif
516 if ( oldState == WorkerState::DRAINED
517 && newState == WorkerState::CHECKPOINTED )
518 {
519 JNOTE ( "refilling all nodes" );
520 broadcastMessage ( DMT_DO_REFILL );
521 writeRestartScript();
522 }
523 if ( oldState == WorkerState::RESTARTING
524 && newState == WorkerState::CHECKPOINTED )
525 {
526 JTRACE ( "resetting _restoreWaitingMessages" )
527 ( _restoreWaitingMessages.size() );
528 _restoreWaitingMessages.clear();
529
530 JTIMER_STOP ( restart );
531
532 JNOTE ( "refilling all nodes (after checkpoint)" );
533 broadcastMessage ( DMT_DO_REFILL );
534 }
535 if ( oldState == WorkerState::CHECKPOINTED
536 && newState == WorkerState::REFILLED )
537 {
538 JNOTE ( "restarting all nodes" );
539 broadcastMessage ( DMT_DO_RESUME );
540
541 JTIMER_STOP ( checkpoint );
542
543 setTimeoutInterval( theCheckpointInterval );
544
545 if (blockUntilDone) {
546 JNOTE ( "replying to dmtcp_command: we're done" );
547 // These were set in dmtcp::DmtcpCoordinator::onConnect in this file
548 jalib::JSocket remote ( blockUntilDoneRemote );
549 remote << blockUntilDoneReply;
550 remote.close();
551 blockUntilDone = false;
552 blockUntilDoneRemote = -1;
553 }
554 }
555 break;
556 }
557 case DMT_RESTORE_WAITING:
558 {
559 DmtcpMessage restMsg = msg;
560 restMsg.type = DMT_RESTORE_WAITING;
561 memcpy ( &restMsg.restoreAddr,client->addr(),client->addrlen() );
562 restMsg.restoreAddrlen = client->addrlen();
563 restMsg.restorePort = client->restorePort();
564 JASSERT ( restMsg.restorePort > 0 ) ( restMsg.restorePort ) ( client->identity() );
565 JASSERT ( restMsg.restoreAddrlen > 0 ) ( restMsg.restoreAddrlen ) ( client->identity() );
566 JASSERT ( restMsg.restorePid != ConnectionIdentifier::Null() ) ( client->identity() );
567 JTRACE ( "broadcasting RESTORE_WAITING" )( restMsg.restorePid )( restMsg.restoreAddrlen )( restMsg.restorePort );
568 _restoreWaitingMessages.push_back ( restMsg );
569 broadcastMessage ( restMsg );
570 break;
571 }
572 case DMT_CKPT_FILENAME:
573 {
574 JASSERT ( extraData!=0 ).Text ( "extra data expected with DMT_CKPT_FILENAME message" );
575 dmtcp::string ckptFilename;
576 dmtcp::string hostname;
577 ckptFilename = extraData;
578 hostname = extraData + ckptFilename.length() + 1;
579
580 JTRACE ( "recording restart info" ) ( ckptFilename ) ( hostname );
581 _restartFilenames[hostname].push_back ( ckptFilename );
582 }
583 break;
584 case DMT_USER_CMD: // dmtcpaware API being used
585 {
586 JTRACE("got user command from client")(msg.params[0])(client->identity());
587 // Checkpointing commands should always block, to prevent
588 // dmtcpaware checkpoint call from returning prior to checkpoint.
589 if (msg.params[0] == 'c')
590 handleUserCommand( 'b', NULL );
591 DmtcpMessage reply;
592 reply.type = DMT_USER_CMD_RESULT;
593 if (msg.params[0] == 'i' && msg.theCheckpointInterval > 0 ) {
594 theCheckpointInterval = msg.theCheckpointInterval;
595 }
596 handleUserCommand( msg.params[0], &reply );
597 sock->socket() << reply;
598 //alternately, we could do the write without blocking:
599 //addWrite(new jalib::JChunkWriter(sock->socket(), (char*)&msg, sizeof(DmtcpMessage)));
600 }
601 break;
602 #ifdef EXTERNAL_SOCKET_HANDLING
603 case DMT_PEER_LOOKUP:
604 {
605 JTRACE ( "received PEER_LOOKUP msg" ) ( msg.conId );
606 JASSERT ( msg.localAddrlen > 0 ) ( msg.localAddrlen ) ( client->identity() );
607 _socketPeerLookupMessagesIterator i;
608 bool foundPeer = false;
609 for ( i = _socketPeerLookupMessages.begin();
610 i != _socketPeerLookupMessages.end();
611 ++i ) {
612 if ( ( msg.localAddrlen == i->localAddrlen ) &&
613 ( memcmp ( (void*) &msg.localAddr,
614 (void*) &(i->remoteAddr),
615 msg.localAddrlen ) == 0 ) ) {
616 _socketPeerLookupMessages.erase(i);
617 foundPeer = true;
618 break;
619 }
620 }
621 if ( !foundPeer ) {
622 _socketPeerLookupMessages.push_back(msg);
623 _workerSocketTable[msg.from] = sock->socket().sockfd();
624 }
625 }
626 break;
627 case DMT_EXTERNAL_SOCKETS_CLOSED:
628 {
629 vector<dmtcp::ConnectionIdentifier>::iterator i;
630 for ( i = workersWithExternalSockets.begin();
631 i != workersWithExternalSockets.end();
632 ++i) {
633 if ( *i == msg.from ) {
634 break;
635 }
636 }
637 JASSERT ( i != workersWithExternalSockets.end() ) ( msg.from )
638 .Text ( "DMT_EXTERNAL_SOCKETS_CLOSED msg received from worker"
639 " but it never had one" );
640
641 workersWithExternalSockets.erase(i);
642 JTRACE ("(Known) External Sockets closed by worker") (msg.from);
643
644 client->setState ( msg.state );
645
646 if (workersWithExternalSockets.empty() == true) {
647 JTRACE ( "External Sockets on all workers are closed now."
648 " Trying to checkpoint." );
649 handleUserCommand('c');
650 }
651 }
652 break;
653 #endif
654 default:
655 JASSERT ( false ) ( msg.from ) ( msg.type )
656 .Text ( "unexpected message from worker" );
657 }
658
659 delete[] extraData;
660 }
661 }
662
663 void dmtcp::DmtcpCoordinator::onDisconnect ( jalib::JReaderInterface* sock )
664 {
665 if ( sock->socket().sockfd() == STDIN_FD )
666 {
667 JTRACE ( "stdin closed" );
668 }
669 else
670 {
671 NamedChunkReader& client = * ( ( NamedChunkReader* ) sock );
672 JNOTE ( "client disconnected" ) ( client.identity() );
673
674 CoordinatorStatus s = getStatus();
675 if( s.numPeers <= 1 ){
676 if(exitOnLast){
677 JNOTE ("last client exited, shutting down..");
678 handleUserCommand('q');
679 }
680 }
681
682 // int clientNumber = ((NamedChunkReader*)sock)->clientNumber();
683 // JASSERT(clientNumber >= 0)(clientNumber);
684 // _table.removeClient(clientNumber);
685 }
686 }
687
688 void dmtcp::DmtcpCoordinator::onConnect ( const jalib::JSocket& sock,
689 const struct sockaddr* remoteAddr,
690 socklen_t remoteLen )
691 {
692 // If no client is connected to Coordinator, then there can be only zero data
693 // sockets OR there can be one data socket and that should be STDIN.
694 if ( _dataSockets.size() == 0 ||
695 ( _dataSockets.size() == 1
696 && _dataSockets[0]->socket().sockfd() == STDIN_FD ) )
697 {
698 //this is the first connection, do some initializations
699 workersRunningAndSuspendMsgSent = false;
700
701 setTimeoutInterval( theCheckpointInterval );
702
703 // drop current computation group to 0
704 curCompGroup = dmtcp::UniquePid(0,0,0);
705 curTimeStamp = 0; // Drop timestamp to 0
706 numPeers = -1; // Drop number of peers to unknown
707
708 JTRACE ( "resetting _restoreWaitingMessages" )
709 ( _restoreWaitingMessages.size() );
710 _restoreWaitingMessages.clear();
711
712 JTIMER_START ( restart );
713 }
714
715 jalib::JSocket remote ( sock );
716 dmtcp::DmtcpMessage hello_remote;
717 hello_remote.poison();
718 JTRACE("Reading from incoming connection...");
719 remote >> hello_remote;
720 hello_remote.assertValid();
721
722 if ( hello_remote.type == DMT_USER_CMD ) {
723 processDmtUserCmd ( hello_remote, remote );
724 return;
725 } else if ( hello_remote.type == DMT_RESTART_PROCESS ) {
726 if ( validateDmtRestartProcess ( hello_remote, remote ) == false )
727 return;
728 } else if ( hello_remote.type == DMT_HELLO_COORDINATOR ) {
729 if ( validateWorkerProcess ( hello_remote, remote ) == false )
730 return;
731 } else {
732 JASSERT ( false ) .Text ( "Connect request from Unknown Remote Process Type" );
733 }
734
735 JNOTE ( "worker connected" ) ( hello_remote.from );
736
737 if ( hello_remote.theCheckpointInterval > 0 ) {
738 int oldInterval = theCheckpointInterval;
739 theCheckpointInterval = hello_remote.theCheckpointInterval;
740 setTimeoutInterval ( theCheckpointInterval );
741 JNOTE ( "CheckpointInterval Updated" ) ( oldInterval )
742 ( theCheckpointInterval );
743 }
744 // _table[hello_remote.from.pid()].setState(hello_remote.state);
745
746 NamedChunkReader * ds = new NamedChunkReader (
747 sock
748 ,hello_remote.from.pid()
749 ,hello_remote.state
750 ,remoteAddr
751 ,remoteLen
752 ,hello_remote.restorePort );
753
754 if( hello_remote.extraBytes > 0 ){
755 char* extraData = new char[hello_remote.extraBytes];
756 remote.readAll(extraData, hello_remote.extraBytes);
757 dmtcp::string hostname = extraData;
758 dmtcp::string progname = extraData + hostname.length() + 1;
759 ds->progname(progname);
760 ds->hostname(hostname);
761 delete [] extraData;
762 }
763
764
765 //add this client as a chunk reader
766 // in this case a 'chunk' is sizeof(DmtcpMessage)
767 addDataSocket ( ds );
768
769 if ( hello_remote.state == WorkerState::RESTARTING
770 && _restoreWaitingMessages.size() >0 )
771 {
772 JTRACE ( "updating missing broadcasts for new connection" )
773 ( hello_remote.from.pid() )
774 ( _restoreWaitingMessages.size() );
775 for ( size_t i=0; i<_restoreWaitingMessages.size(); ++i )
776 {
777 addWrite (
778 new jalib::JChunkWriter ( sock
779 , ( char* ) &_restoreWaitingMessages[i]
780 , sizeof ( DmtcpMessage ) )
781 );
782 }
783 }
784
785 JTRACE( "END" )
786 ( _dataSockets.size() ) ( _dataSockets[0]->socket().sockfd() == STDIN_FD );
787 }
788
789 // WorkerNode& node = _table[hello_remote.from.pid()];
790 // node.setClientNumer( ds->clientNumber() );
791 /*
792 if(hello_remote.state == WorkerState::RESTARTING)
793 {
794 node.setAddr(remoteAddr, remoteLen);
795 node.setRestorePort(hello_remote.restorePort);
796
797 JASSERT(node.addrlen() > 0)(node.addrlen());
798 JASSERT(node.restorePort() > 0)(node.restorePort());
799 DmtcpMessage msg;
800 msg.type = DMT_RESTORE_WAITING;
801 memcpy(&msg.restoreAddr,node.addr(),node.addrlen());
802 msg.restoreAddrlen = node.addrlen();
803 msg.restorePid.id = node.id();
804 msg.restorePort = node.restorePort();
805 broadcastMessage( msg );
806 }*/
807 //}
808
809 void dmtcp::DmtcpCoordinator::processDmtUserCmd( DmtcpMessage& hello_remote,
810 jalib::JSocket& remote )
811 {
812 //dmtcp_command doesn't handshake (it is antisocial)
813 JTRACE("got user command from dmtcp_command")(hello_remote.params[0]);
814 DmtcpMessage reply;
815 reply.type = DMT_USER_CMD_RESULT;
816 // if previous 'b' blocking prefix command had set blockUntilDone
817 if (blockUntilDone && blockUntilDoneRemote == -1 &&
818 hello_remote.params[0] == 'c') {
819 // Reply will be done in dmtcp::DmtcpCoordinator::onData in this file.
820 blockUntilDoneRemote = remote.sockfd();
821 blockUntilDoneReply = reply;
822 handleUserCommand( hello_remote.params[0], &reply );
823 } else if ( (hello_remote.params[0] == 'i' || hello_remote.params[1] == 'I')
824 && hello_remote.theCheckpointInterval > 0 ) {
825 theCheckpointInterval = hello_remote.theCheckpointInterval;
826 handleUserCommand( hello_remote.params[0], &reply );
827 remote << reply;
828 remote.close();
829 } else {
830 handleUserCommand( hello_remote.params[0], &reply );
831 remote << reply;
832 remote.close();
833 }
834 return;
835 }
836
837 bool dmtcp::DmtcpCoordinator::validateDmtRestartProcess
838 ( DmtcpMessage& hello_remote, jalib::JSocket& remote )
839 {
840 // This is dmtcp_restart process, connecting to get timestamp
841 // and set current compGroup.
842
843 JASSERT ( hello_remote.params[0] > 0 );
844
845 dmtcp::DmtcpMessage hello_local ( dmtcp::DMT_RESTART_PROCESS_REPLY );
846
847 if( curCompGroup == dmtcp::UniquePid(0,0,0) ){
848 JASSERT ( minimumState() == WorkerState::UNKNOWN )
849 .Text ( "Coordinator should be idle at this moment" );
850 // Coordinator is free at this moment - setup all the things
851 curCompGroup = hello_remote.compGroup;
852 numPeers = hello_remote.params[0];
853 curTimeStamp = time(NULL);
854 hello_local.params[1] = 1;
855 JNOTE ( "FIRST dmtcp_restart connection. Set numPeers. Generate timestamp" )
856 ( numPeers ) ( curTimeStamp ) ( curCompGroup );
857 } else if ( curCompGroup != hello_remote.compGroup ) {
858 // Coordinator already serving some other computation group - reject this process.
859 JNOTE ("Reject incoming dmtcp_restart connection"
860 " since it is not from current computation")
861 ( curCompGroup ) ( hello_remote.compGroup );
862 hello_local.type = dmtcp::DMT_REJECT;
863 remote << hello_local;
864 remote.close();
865 return false;
866 } else if ( numPeers != hello_remote.params[0] ) {
867 // Sanity check
868 JNOTE ( "Invalid numPeers reported by dmtcp_restart process, Rejecting" )
869 ( numPeers ) ( hello_remote.params[0] );
870
871 hello_local.type = dmtcp::DMT_REJECT;
872 remote << hello_local;
873 remote.close();
874 return false;
875 } else {
876 // This is a second or higher dmtcp_restart process connecting to the coordinator.
877 // FIXME: Should the following be a JASSERT instead? -- Kapil
878 JWARNING ( minimumState() == WorkerState::RESTARTING );
879 hello_local.params[1] = 0;
880 }
881
882 // Sent generated timestamp in local massage for dmtcp_restart process.
883 hello_local.params[0] = curTimeStamp;
884
885 remote << hello_local;
886
887 return true;
888 }
889
890 bool dmtcp::DmtcpCoordinator::validateWorkerProcess
891 ( DmtcpMessage& hello_remote, jalib::JSocket& remote )
892 {
893 dmtcp::DmtcpMessage hello_local ( dmtcp::DMT_HELLO_WORKER );
894
895 if ( hello_remote.state == WorkerState::RESTARTING ) {
896 if ( minimumState() != WorkerState::RESTARTING &&
897 minimumState() != WorkerState::CHECKPOINTED ) {
898 JNOTE ("Computation not in RESTARTING or CHECKPOINTED state."
899 " Reject incoming restarting computation process.")
900 ( curCompGroup ) ( hello_remote.compGroup );
901 hello_local.type = dmtcp::DMT_REJECT;
902 remote << hello_local;
903 remote.close();
904 return false;
905 } else if ( hello_remote.compGroup != curCompGroup) {
906 JNOTE ("Reject incoming restarting computation process"
907 " since it is not from current computation")
908 ( curCompGroup ) ( hello_remote.compGroup );
909 hello_local.type = dmtcp::DMT_REJECT;
910 remote << hello_local;
911 remote.close();
912 return false;
913 }
914 // dmtcp_restart already connected and compGroup created.
915 // Computation process connection
916 JASSERT ( curTimeStamp != 0 );
917
918 JTRACE("Connection from (restarting) computation process")
919 ( curCompGroup ) ( hello_remote.compGroup ) ( minimumState() );
920
921 remote << hello_local;
922
923 } else if ( hello_remote.state == WorkerState::RUNNING ) {
924 CoordinatorStatus s = getStatus();
925 // If some of the processes are not in RUNNING state OR if the SUSPEND
926 // message has been sent, REJECT.
927 if ( s.numPeers > 0 &&
928 ( s.minimumState != WorkerState::RUNNING ||
929 s.minimumStateUnanimous == false ||
930 workersRunningAndSuspendMsgSent == true) ) {
931 JNOTE ( "Current Computation not in RUNNING state."
932 " Refusing to accept new connections.")
933 ( curCompGroup ) ( hello_remote.from.pid() );
934 hello_local.type = dmtcp::DMT_REJECT;
935 remote << hello_local;
936 remote.close();
937 return false;
938 } else if ( hello_remote.compGroup != UniquePid() ) {
939 // New Process trying to connect to Coordinator but already has compGroup
940 JNOTE ( "New Process, but already has computation group. Rejecting" );
941 hello_local.type = dmtcp::DMT_REJECT;
942 remote << hello_local;
943 remote.close();
944 return false;
945 } else {
946 // If first process, create the new computation group
947 if ( curCompGroup == UniquePid(0,0,0) ) {
948 // Connection of new computation.
949 curCompGroup = hello_remote.from.pid();
950 curTimeStamp = 0;
951 numPeers = -1;
952 JTRACE ( "First process connected. Creating new computation group" )
953 (curCompGroup );
954 } else {
955 JTRACE ( "New Process Connected" ) ( hello_remote.from.pid() );
956 }
957 remote << hello_local;
958 }
959 } else {
960 JASSERT ( false ) .Text ( "Invalid Worker Type" );
961 return false;
962 }
963
964 return true;
965 }
966
967 void dmtcp::DmtcpCoordinator::onTimeoutInterval()
968 {
969 if ( theCheckpointInterval > 0 )
970 startCheckpoint();
971 }
972
973
974 bool dmtcp::DmtcpCoordinator::startCheckpoint()
975 {
976 CoordinatorStatus s = getStatus();
977 if ( s.minimumState == WorkerState::RUNNING
978 && !workersRunningAndSuspendMsgSent )
979 {
980 JTIMER_START ( checkpoint );
981 _restartFilenames.clear();
982 JNOTE ( "starting checkpoint, suspending all nodes" )( s.numPeers );
983 // Pass number of connected peers to all clients
984 broadcastMessage ( DMT_DO_SUSPEND , curCompGroup, getStatus().numPeers );
985
986 // Suspend Message has been sent but the workers are still in running
987 // state. If the coordinator receives another checkpoint request from user
988 // at this point, it should fail.
989 workersRunningAndSuspendMsgSent = true;
990 return true;
991 }
992
993 if (s.numPeers > 0) {
994 JTRACE ( "delaying checkpoint, workers not ready" ) ( s.minimumState )
995 ( s.numPeers );
996 }
997 return false;
998 }
999
1000 dmtcp::DmtcpWorker& dmtcp::DmtcpWorker::instance()
1001 {
1002 JASSERT ( false ).Text ( "This method is only available on workers" );
1003 return * ( ( DmtcpWorker* ) 0 );
1004 }
1005
1006 /*
1007 Can cause conflict with method of same signature in dmtcpworker.cpp.
1008 What was the purpose of this method? -- Praveen
1009 */
1010 const dmtcp::UniquePid& dmtcp::DmtcpWorker::coordinatorId() const
1011 {
1012 JASSERT ( false ).Text ( "This method is only available on workers" );
1013 return * ( ( UniquePid* ) 0 );
1014 }
1015
1016 void dmtcp::DmtcpCoordinator::broadcastMessage ( DmtcpMessageType type,
1017 dmtcp::UniquePid compGroup = dmtcp::UniquePid(), int param1 = -1 )
1018 {
1019 DmtcpMessage msg;
1020 msg.type = type;
1021 if( param1 > 0 ){
1022 msg.params[0] = param1;
1023 msg.compGroup = compGroup;
1024 }
1025 broadcastMessage ( msg );
1026 JTRACE ("sending message")( type );
1027 }
1028
1029 void dmtcp::DmtcpCoordinator::broadcastMessage ( const DmtcpMessage& msg )
1030 {
1031 for ( dmtcp::vector<jalib::JReaderInterface*>::iterator i
1032 = _dataSockets.begin() ; i!= _dataSockets.end() ; i++ )
1033 {
1034 if ( ( *i )->socket().sockfd() != STDIN_FD )
1035 addWrite ( new jalib::JChunkWriter ( ( *i )->socket(),
1036 ( char* ) &msg,
1037 sizeof ( DmtcpMessage ) ) );
1038 }
1039 }
1040
1041 dmtcp::DmtcpCoordinator::CoordinatorStatus dmtcp::DmtcpCoordinator::getStatus() const
1042 {
1043 CoordinatorStatus status;
1044 const static int INITIAL = WorkerState::_MAX;
1045 int m = INITIAL;
1046 int count = 0;
1047 bool unanimous = true;
1048 for ( const_iterator i = _dataSockets.begin()
1049 ; i != _dataSockets.end()
1050 ; ++i )
1051 {
1052 if ( ( *i )->socket().sockfd() != STDIN_FD )
1053 {
1054 int cliState = ((NamedChunkReader*)*i)->state().value();
1055 count++;
1056 unanimous = unanimous && (m==cliState || m==INITIAL);
1057 if ( cliState < m ) m = cliState;
1058 }
1059 }
1060
1061 status.minimumState = ( m==INITIAL ? WorkerState::UNKNOWN
1062 : (WorkerState::eWorkerState)m );
1063 if( status.minimumState == WorkerState::CHECKPOINTED && count < numPeers ){
1064 JTRACE("minimal state counted as CHECKPOINTED but not all processes"
1065 " are connected yet. So we wait.") ( numPeers ) ( count );
1066 status.minimumState = WorkerState::RESTARTING;
1067 }
1068 status.minimumStateUnanimous = unanimous;
1069 status.numPeers = count;
1070 return status;
1071 }
1072
1073 void dmtcp::DmtcpCoordinator::writeRestartScript()
1074 {
1075 const char* dir = getenv ( ENV_VAR_CHECKPOINT_DIR );
|
At conditional (1): "dir == NULL": Taking true branch.
|
1076 if(dir==NULL) dir = ".";
1077 dmtcp::ostringstream o1, o2;
1078 dmtcp::string filename, uniqueFilename;
1079
1080 o1 << dmtcp::string(dir) << "/"
1081 << RESTART_SCRIPT_BASENAME << RESTART_SCRIPT_EXT;
1082 filename = o1.str();
1083
1084 o2 << dmtcp::string(dir) << "/"
1085 << RESTART_SCRIPT_BASENAME << "_" << curCompGroup << RESTART_SCRIPT_EXT;
1086 uniqueFilename = o2.str();
1087
|
At conditional (2): "this->_restartFilenames.size() == 1UL": Taking true branch.
|
1088 const bool isSingleHost = (_restartFilenames.size() == 1);
1089
1090 dmtcp::map< dmtcp::string, dmtcp::vector<dmtcp::string> >::const_iterator host;
1091 dmtcp::vector<dmtcp::string>::const_iterator file;
1092
1093 char hostname[80];
1094 gethostname ( hostname, 80 );
1095
1096 JTRACE ( "writing restart script" ) ( uniqueFilename );
1097
1098 FILE* fp = fopen ( uniqueFilename.c_str(),"w" );
|
At conditional (3): "fp != NULL": Taking true branch.
|
1099 JASSERT ( fp!=0 )(JASSERT_ERRNO)( uniqueFilename )
1100 .Text ( "failed to open file" );
1101
1102 fprintf ( fp, "%s", theRestartScriptHeader );
1103 fprintf ( fp, "%s", theRestartScriptUsage );
1104
1105 fprintf ( fp, "coord_host=$"ENV_VAR_NAME_ADDR"\n"
1106 "if test -z \"$" ENV_VAR_NAME_ADDR "\"; then\n"
1107 " coord_host=%s\nfi\n\n", hostname );
1108 fprintf ( fp, "coord_port=$"ENV_VAR_NAME_PORT"\n"
1109 "if test -z \"$" ENV_VAR_NAME_PORT "\"; then\n"
1110 " coord_port=%d\nfi\n\n", thePort );
1111 fprintf ( fp, "checkpoint_interval=$"ENV_VAR_CKPT_INTR"\n"
1112 "if test -z \"$" ENV_VAR_CKPT_INTR "\"; then\n"
1113 " checkpoint_interval=%d\nfi\n\n", theCheckpointInterval );
|
At conditional (4): "batchMode": Taking true branch.
|
1114 if ( batchMode )
1115 fprintf ( fp, "maybebatch='--batch'\n\n" );
1116 else
1117 fprintf ( fp, "maybebatch=\n\n" );
1118
1119 fprintf ( fp, "# Number of hosts in the computation = %zd\n",
1120 _restartFilenames.size() );
1121 fprintf ( fp, "# Number of processes in the computation = %d\n\n",
1122 getStatus().numPeers );
1123
|
At conditional (5): "isSingleHost": Taking true branch.
|
1124 if ( isSingleHost ) {
1125 JTRACE ( "Single HOST");
1126
1127 host=_restartFilenames.begin();
1128 dmtcp::ostringstream o;
|
At conditional (6): "__gnu_cxx::operator !=<std::basic_string<char, std::char_traits<char>, dmtcp::DmtcpAlloc<char> > const *, std::vector<std::basic_string<char, std::char_traits<char>, dmtcp::DmtcpAlloc<char> >, dmtcp::DmtcpAlloc<std::basic_string<char, std::char_traits<char>, dmtcp::DmtcpAlloc<char> > > > >(file, host.operator ->()->second.end())": Taking false branch.
|
1129 for ( file=host->second.begin(); file!=host->second.end(); ++file ) {
1130 o << " " << *file;
1131 }
1132
1133 fprintf ( fp, "%s", theRestartScriptCmdlineArgHandler );
1134 fprintf ( fp, "DMTCP_RESTART=dmtcp_restart\n" );
1135 fprintf ( fp, "which dmtcp_restart > /dev/null \\\n" \
1136 " || DMTCP_RESTART=`dirname %s`/dmtcp_restart\n\n", argv0 );
1137 fprintf ( fp, "if [ ! -z \"$DMTCP_RESTART_DIR\" ]; then\n"
1138 " new_ckpt_names=\"\"\n"
1139 " names=\"%s\"\n"
1140 " for tmp in $names; do\n"
1141 " new_ckpt_names=\"$DMTCP_RESTART_DIR/`basename $tmp` $new_ckpt_names\"\n"
1142 " done\n"
1143 "fi\n", o.str().c_str());
1144
1145 fprintf ( fp,
1146 "if [ ! -z \"$maybebatch\" ]; then\n"
1147 " if [ ! -z \"$DMTCP_RESTART_DIR\" ]; then\n"
1148 " exec $DMTCP_RESTART $maybebatch $maybejoin --interval \"$checkpoint_interval\"\\\n"
1149 " $new_ckpt_names\n"
1150 " else\n"
1151 " exec $DMTCP_RESTART $maybebatch $maybejoin --interval \"$checkpoint_interval\"\\\n"
1152 " %s\n"
1153 " fi\n"
1154 "else\n"
1155 " if [ ! -z \"$DMTCP_RESTART_DIR\" ]; then\n"
1156 " exec $DMTCP_RESTART --host \"$coord_host\" --port \"$coord_port\" $maybebatch\\\n"
1157 " $maybejoin --interval \"$checkpoint_interval\"\\\n"
1158 " $new_ckpt_names\n"
1159 " else\n"
1160 " exec $DMTCP_RESTART --host \"$coord_host\" --port \"$coord_port\" $maybebatch\\\n"
1161 " $maybejoin --interval \"$checkpoint_interval\"\\\n"
1162 " %s\n"
1163 " fi\n"
1164 "fi\n", o.str().c_str(), o.str().c_str() );
1165 }
1166 else
1167 {
1168 fprintf ( fp, "%s",
1169 "worker_ckpts_regexp=\'[^:]*::[ \\t\\n]*\\([^ \\t\\n]\\+\\)[ \\t\\n]*:\\([a-z]\\+\\):[ \\t\\n]*\\([^:]\\+\\)\'\n\n"
1170 "# SYNTAX:\n"
1171 "# :: <HOST> :<MODE>: <CHECKPOINT_IMAGE> ...\n"
1172 "# Host names and filenames must not include \':\'\n"
1173 "# At most one fg (foreground) mode allowed; it must be last.\n"
1174 "# \'maybexterm\' and \'maybebg\' are set from <MODE>.\n"
1175 "worker_ckpts=\'" );
1176
1177 for ( host=_restartFilenames.begin(); host!=_restartFilenames.end(); ++host )
1178 {
1179 fprintf ( fp, "\n :: %s :bg:", host->first.c_str() );
1180 for ( file=host->second.begin(); file!=host->second.end(); ++file )
1181 {
1182 fprintf ( fp," %s", file->c_str() );
1183 }
1184 }
1185
1186 fprintf ( fp, "%s", "\n\'\n\n\n" );
1187
1188
1189 fprintf ( fp, "%s", theRestartScriptCmdlineArgHandler );
1190
1191 fprintf ( fp, "%s",
1192 "worker_hosts=\\\n"
1193 "`echo $worker_ckpts | sed -e \'s/\'\"$worker_ckpts_regexp\"\'/\\1 /g\'`\n"
1194 "restart_modes=\\\n"
1195 "`echo $worker_ckpts | sed -e \'s/\'\"$worker_ckpts_regexp\"\'/: \\2/g\'`\n"
1196 "ckpt_files_groups=\\\n"
1197 "`echo $worker_ckpts | sed -e \'s/\'\"$worker_ckpts_regexp\"\'/: \\3/g\'`\n"
1198 "\n"
1199 "if [ ! -z \"$hostfile\" ]; then\n"
1200 " worker_hosts=`cat \"$hostfile\" | sed -e \'s/#.*//\' -e \'s/[ \\t\\r]*//\' -e \'/^$/ d\'`\n"
1201 "fi\n\n"
1202
1203 "localhost_ckpt_files_group=\n\n"
1204
1205 "num_worker_hosts=`echo $worker_hosts | wc -w`\n\n"
1206
1207 "maybejoin=\n"
1208 "if [ \"$num_worker_hosts\" != \"1\" ]; then\n"
1209 " maybejoin='--join'\n"
1210 "fi\n\n"
1211
1212 "for worker_host in $worker_hosts\n"
1213 "do\n\n"
1214 " ckpt_files_group=`echo $ckpt_files_groups | sed -e \'s/[^:]*:[ \\t\\n]*\\([^:]*\\).*/\\1/\'`\n"
1215 " ckpt_files_groups=`echo $ckpt_files_groups | sed -e \'s/[^:]*:[^:]*//\'`\n"
1216 "\n"
1217 " mode=`echo $restart_modes | sed -e \'s/[^:]*:[ \\t\\n]*\\([^:]*\\).*/\\1/\'`\n"
1218 " restart_modes=`echo $restart_modes | sed -e \'s/[^:]*:[^:]*//\'`\n\n"
1219 " maybexterm=\n"
1220 " maybebg=\n"
1221 " case $mode in\n"
1222 " bg) maybebg=\'bg\';;\n"
1223 " xterm) maybexterm=xterm;;\n"
1224 " fg) ;;\n"
1225 " *) echo \"WARNING: Unknown Mode\";;\n"
1226 " esac\n\n"
1227 " if [ -z \"$ckpt_files_group\" ]; then\n"
1228 " break;\n"
1229 " fi\n\n"
1230
1231 " new_ckpt_files_group=\"\"\n"
1232 " for tmp in $ckpt_files_group\n"
1233 " do\n"
1234 " if [ ! -z \"$DMTCP_RESTART_DIR\" ]; then\n"
1235 " tmp=$DMTCP_RESTART_DIR/`basename $tmp`\n"
1236 " fi\n"
1237 " new_ckpt_files_group=\"$new_ckpt_files_group $tmp\"\n"
1238 " done\n\n"
1239
1240 " if [ `hostname` == \"$worker_host\" -o \"$num_worker_hosts\" == \"1\" ]; then\n"
1241 " localhost_ckpt_files_group=\"$new_ckpt_files_group\"\n"
1242 " continue\n"
1243 " fi\n\n"
1244
1245 " if [ -z $maybebg ]; then\n"
1246 " $maybexterm /usr/bin/ssh -t \"$worker_host\" \\\n"
1247 " "DMTCP_RESTART_CMD" --host \"$coord_host\" --port \"$coord_port\" $maybebatch\\\n"
1248 " --join --interval \"$checkpoint_interval\" $new_ckpt_files_group\n"
1249 " else\n"
1250 " $maybexterm /usr/bin/ssh \"$worker_host\" \\\n"
1251 // In OpenMPI 1.4, without this (sh -c ...), orterun hangs at the
1252 // end of the computation until user presses enter key.
1253 " \"/bin/sh -c \'"DMTCP_RESTART_CMD" --host $coord_host --port $coord_port $maybebatch\\\n"
1254 " --join --interval \"$checkpoint_interval\" $new_ckpt_files_group\'\" &\n"
1255 " fi\n\n"
1256 "done\n\n");
1257
1258 fprintf ( fp, "DMTCP_RESTART=dmtcp_restart\n" );
1259 fprintf ( fp, "which dmtcp_restart > /dev/null \\\n" \
1260 " || DMTCP_RESTART=`dirname %s`/dmtcp_restart\n\n", argv0 );
1261
1262 fprintf ( fp, "%s",
1263 "if [ -n \"$localhost_ckpt_files_group\" ]; then\n"
1264 "exec dmtcp_restart --host \"$coord_host\" --port \"$coord_port\" $maybebatch\\\n"
1265 " $maybejoin --interval \"$checkpoint_interval\" $localhost_ckpt_files_group\n"
1266 "fi\n\n"
1267
1268
1269 "#wait for them all to finish\n"
1270 "wait\n");
1271 }
1272
1273 fclose ( fp );
1274 {
1275 /* Set execute permission for user. */
1276 struct stat buf;
|
Event fs_check_call: |
Calling function "stat" to perform check on "uniqueFilename.c_str()". |
| Also see events: |
[toctou] |
1277 stat ( uniqueFilename.c_str(), &buf );
|
Event toctou: |
Calling function "chmod" that uses "uniqueFilename.c_str()" after a check function. This can cause a time-of-check, time-of-use race condition. |
| Also see events: |
[fs_check_call] |
1278 chmod ( uniqueFilename.c_str(), buf.st_mode | S_IXUSR );
1279 // Create a symlink from
1280 // dmtcp_restart_script.sh -> dmtcp_restart_script_<curCompId>.sh
1281 unlink ( filename.c_str() );
1282 // FIXME: Handle error case of symlink()
1283 JWARNING( 0 == symlink ( uniqueFilename.c_str(), filename.c_str() ) );
1284 }
1285 _restartFilenames.clear();
1286 }
1287
1288 static void SIGINTHandler(int signum)
1289 {
1290 prog.handleUserCommand('q');
1291 }
1292
1293 static void setupSIGINTHandler()
1294 {
1295 struct sigaction action;
1296 action.sa_handler = SIGINTHandler;
1297 sigemptyset ( &action.sa_mask );
1298 action.sa_flags = 0;
1299 sigaction ( SIGINT, &action, NULL );
1300 }
1301
1302 #define shift argc--; argv++
1303
1304 int main ( int argc, char** argv )
1305 {
1306 argv0 = argv[0];
1307 dmtcp::DmtcpMessage::setDefaultCoordinator ( dmtcp::UniquePid::ThisProcess() );
1308
1309 //parse port
1310 thePort = DEFAULT_PORT;
1311 const char* portStr = getenv ( ENV_VAR_NAME_PORT );
1312 if ( portStr != NULL ) thePort = jalib::StringToInt ( portStr );
1313
1314 bool background = false;
1315
1316 shift;
1317 while(argc > 0){
1318 dmtcp::string s = argv[0];
1319 if(s=="-h" || s=="--help"){
1320 fprintf(stderr, theUsage, DEFAULT_PORT);
1321 return 1;
1322 }else if(s=="--exit-on-last"){
1323 exitOnLast = true;
1324 shift;
1325 }else if(s=="--background"){
1326 background = true;
1327 shift;
1328 }else if(s=="--batch"){
1329 batchMode = true;
1330 shift;
1331 }else if(argc>1 && (s == "-i" || s == "--interval")){
1332 setenv(ENV_VAR_CKPT_INTR, argv[1], 1);
1333 shift; shift;
1334 }else if(argc>1 && (s == "-p" || s == "--port")){
1335 thePort = jalib::StringToInt( argv[1] );
1336 shift; shift;
1337 }else if(argc>1 && (s == "-c" || s == "--ckptdir")){
1338 setenv(ENV_VAR_CHECKPOINT_DIR, argv[1], 1);
1339 shift; shift;
1340 }else if(argc>1 && (s == "-t" || s == "--tmpdir")){
1341 setenv(ENV_VAR_TMPDIR, argv[1], 1);
1342 shift; shift;
1343 }else if(argc == 1){ //last arg can be port
1344 thePort = jalib::StringToInt( argv[0] );
1345 shift;
1346 }else{
1347 fprintf(stderr, theUsage, DEFAULT_PORT);
1348 return 1;
1349 }
1350 }
1351
1352 JASSERT ( ! (background && batchMode) )
1353 .Text ( "--background and --batch can't be specified together");
1354
1355 dmtcp::UniquePid::setTmpDir(getenv(ENV_VAR_TMPDIR));
1356
1357 #ifdef DEBUG
1358 /* Disable Jassert Logging */
1359 dmtcp::UniquePid::ThisProcess(true);
1360
1361 dmtcp::ostringstream o;
1362 o << dmtcp::UniquePid::getTmpDir() << "/jassertlog."
1363 << dmtcp::UniquePid::ThisProcess();
1364 JASSERT_INIT(o.str());
1365 JTRACE ( "New DMTCP coordinator starting." );
1366
1367 JTRACE ( "recalculated process UniquePid..." )
1368 ( dmtcp::UniquePid::ThisProcess() );
1369 #endif
1370
1371 if ( thePort < 0 )
1372 {
1373 fprintf(stderr, theUsage, DEFAULT_PORT);
1374 return 1;
1375 }
1376
1377 jalib::JServerSocket* sock;
1378 /*Test if the listener socket is already open*/
1379 if ( fcntl(PROTECTEDFD(1), F_GETFD) != -1 ) {
1380 sock = new jalib::JServerSocket ( PROTECTEDFD(1) );
1381 JASSERT ( sock->port() != -1 ) .Text ( "Invalid listener socket" );
1382 JTRACE ( "Using already created listener socker" ) ( sock->port() );
1383 } else {
1384
1385 errno = 0;
1386 sock = new jalib::JServerSocket ( jalib::JSockAddr::ANY, thePort );
1387 JASSERT ( sock->isValid() ) ( thePort ) ( JASSERT_ERRNO )
1388 .Text ( "Failed to create listen socket."
1389 "\nIf msg is \"Address already in use\", this may be an old coordinator."
1390 "\nKill default coordinator and try again: dmtcp_command -q"
1391 "\nIf that fails, \"pkill -9 dmtcp_coord\","
1392 " and try again in a minute or so." );
1393 }
1394
1395 thePort = sock->port();
1396
1397 if ( batchMode && getenv ( ENV_VAR_CKPT_INTR ) == NULL ) {
1398 setenv(ENV_VAR_CKPT_INTR, "3600", 1);
1399 }
1400 //parse checkpoint interval
1401 const char* interval = getenv ( ENV_VAR_CKPT_INTR );
1402 if ( interval != NULL )
1403 theCheckpointInterval = jalib::StringToInt ( interval );
1404
1405 #if 0
1406 JASSERT_STDERR <<
1407 "dmtcp_coordinator starting..." <<
1408 "\n Port: " << thePort <<
1409 "\n Checkpoint Interval: ";
1410 if(theCheckpointInterval==0)
1411 JASSERT_STDERR << "disabled (checkpoint manually instead)";
1412 else
1413 JASSERT_STDERR << theCheckpointInterval;
1414 JASSERT_STDERR <<
1415 "\n Exit on last client: " << exitOnLast << "\n";
1416 #else
1417 fprintf(stderr, "dmtcp_coordinator starting..."
1418 "\n Port: %d"
1419 "\n Checkpoint Interval: ", thePort);
1420 if(theCheckpointInterval==0)
1421 fprintf(stderr, "disabled (checkpoint manually instead)");
1422 else
1423 fprintf(stderr, "%d", theCheckpointInterval);
1424 fprintf(stderr, "\n Exit on last client: %d\n", exitOnLast);
1425 #endif
1426
1427 if(background){
1428 JASSERT_STDERR << "Backgrounding...\n";
1429 JASSERT(dup2(open("/dev/null",O_RDWR), 0)==0);
1430 fflush(stdout);
1431 JASSERT(close(1)==0);
1432 JASSERT(open("/dev/null", O_WRONLY)==1);
1433 fflush(stderr);
1434 if (close(2) != 0 || dup2(1,2) != 2)
1435 JASSERT(false) .Text( "Can't print to stderr");
1436 close(JASSERT_STDERR_FD);
1437 dup2(2, JASSERT_STDERR_FD);
1438 if(fork()>0){
1439 JTRACE ( "Parent Exiting after fork()" );
1440 exit(0);
1441 }
1442 pid_t sid = setsid();
1443 } else if ( batchMode ) {
1444 JASSERT_STDERR << "Going into Batch Mode...\n";
1445 close(0);
1446 close(1);
1447 close(2);
1448 close(JASSERT_STDERR_FD);
1449
1450 JASSERT(open("/dev/null", O_WRONLY)==0);
1451
1452 JASSERT(dup2(0, 1) == 1);
1453 JASSERT(dup2(0, 2) == 2);
1454 JASSERT(dup2(0, JASSERT_STDERR_FD) == JASSERT_STDERR_FD);
1455
1456 } else {
1457 JASSERT_STDERR <<
1458 "Type '?' for help." <<
1459 "\n\n";
1460 }
1461
1462 /* We setup the signal handler for SIGINT so that it would send the
1463 * DMT_KILL_PEER message to all the connected peers before exiting.
1464 */
1465 setupSIGINTHandler();
1466 prog.addListenSocket ( *sock );
1467 if(!background && !batchMode)
1468 prog.addDataSocket ( new jalib::JChunkReader ( STDIN_FD , 1 ) );
1469
1470 // FIXME: Should we use a default checkpoint interval (1 hour in this case)
1471 // even if the user has not explicitely requested it.
1472 if ( theCheckpointInterval <= 0 ) theCheckpointInterval = 3600;
1473 prog.monitorSockets ( theCheckpointInterval );
1474 return 0;
1475 }