1    	/****************************************************************************
2    	 *   Copyright (C) 2006-2010 by Jason Ansel, Kapil Arya, and Gene Cooperman *
3    	 *   jansel@csail.mit.edu, kapil@ccs.neu.edu, gene@ccs.neu.edu              *
4    	 *                                                                          *
5    	 *   This file is part of the dmtcp/src module of DMTCP (DMTCP:dmtcp/src).  *
6    	 *                                                                          *
7    	 *  DMTCP:dmtcp/src is free software: you can redistribute it and/or        *
8    	 *  modify it under the terms of the GNU Lesser General Public License as   *
9    	 *  published by the Free Software Foundation, either version 3 of the      *
10   	 *  License, or (at your option) any later version.                         *
11   	 *                                                                          *
12   	 *  DMTCP:dmtcp/src is distributed in the hope that it will be useful,      *
13   	 *  but WITHOUT ANY WARRANTY; without even the implied warranty of          *
14   	 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the           *
15   	 *  GNU Lesser General Public License for more details.                     *
16   	 *                                                                          *
17   	 *  You should have received a copy of the GNU Lesser General Public        *
18   	 *  License along with DMTCP:dmtcp/src.  If not, see                        *
19   	 *  <http://www.gnu.org/licenses/>.                                         *
20   	 ****************************************************************************/
21   	
22   	/****************************************************************************
23   	 * Coordinator code logic:                                                  *
24   	 * main calls monitorSockets, which acts as a top level event loop.         *
25   	 * monitorSockets calls:  onConnect, onData, onDisconnect, onTimeoutInterval*
26   	 *   when client or dmtcp_command talks to coordinator.                     *
27   	 * onConnect and onData receive a socket parameter reads msg and passes to: *
28   	 *   handleUserCommand, which takes single char arg ('s', 'c', 'k', 'q', ...)*
29   	 * handleUserCommand calls broadcastMessage to send data back               *
30   	 * any message sent by broadcastMessage takes effect only on returning      *
31   	 *   back up to top level monitorSockets                                    *
32   	 * Hence, even for checkpoint, handleUserCommand just changes state,        *
33   	 *   broadcasts an initial checkpoint command, and then returns to top      *
34   	 *   level.  Replies from clients then driver further state changes.        *
35   	 * The prefix command 'b' (blocking) from dmtcp_command modifies behavior   *
36   	 *   of 'c' so that the reply to dmtcp_command happens only when clients    *
37   	 *   are back in RUNNING state.                                             *
38   	 * The states for a worker (client) are:                                    *
39   	 * Checkpoint: RUNNING -> SUSPENDED -> FD_LEADER_ELECTION -> DRAINED        *
40   	 *       	  -> CHECKPOINTED -> REFILLED -> RUNNING		    *
41   	 * Restart:    RESTARTING -> CHECKPOINTED -> REFILLED -> RUNNING	    *
42   	 * If debugging, set gdb breakpoint on:					    *
43   	 *   dmtcp::DmtcpCoordinator::onConnect					    *
44   	 *   dmtcp::DmtcpCoordinator::onData					    *
45   	 *   dmtcp::DmtcpCoordinator::handleUserCommand				    *
46   	 *   dmtcp::DmtcpCoordinator::broadcastMessage				    *
47   	 ****************************************************************************/
48   	
49   	#include "dmtcp_coordinator.h"
50   	#include "constants.h"
51   	#include "protectedfds.h"
52   	#include  "../jalib/jconvert.h"
53   	#include "dmtcpmessagetypes.h"
54   	#include "dmtcpworker.h"
55   	#include <stdio.h>
56   	#include <unistd.h>
57   	#include <sys/stat.h>
58   	#include  "../jalib/jtimer.h"
59   	#include <algorithm>
60   	#include <sys/wait.h>
61   	#include <sys/types.h>
62   	#include <sys/stat.h>
63   	#include <fcntl.h>
64   	#undef min
65   	#undef max
66   	
67   	
68   	static int thePort = -1;
69   	static char *argv0;
70   	
71   	static const char* theHelpMessage =
72   	  "COMMANDS:\n"
73   	  "  l : List connected nodes\n"
74   	  "  s : Print status message\n"
75   	  "  c : Checkpoint all nodes\n"
76   	  "  f : Force a restart even if there are missing nodes (debugging only)\n"
77   	  "  k : Kill all nodes\n"
78   	  "  q : Kill all nodes and quit\n"
79   	  "  ? : Show this message\n"
80   	  "\n"
81   	;
82   	
83   	static const char* theUsage =
84   	  "USAGE: \n"
85   	  "   dmtcp_coordinator [OPTIONS] [port]\n\n"
86   	  "OPTIONS:\n"
87   	  "  --port, -p, (environment variable DMTCP_PORT):\n"
88   	  "      Port to listen on (default: 7779)\n"
89   	  "  --ckptdir, -c, (environment variable DMTCP_CHECKPOINT_DIR):\n"
90   	  "      Directory to store dmtcp_restart_script.sh (default: ./)\n"
91   	  "  --tmpdir, -t, (environment variable DMTCP_TMPDIR):\n"
92   	  "      Directory to store temporary files (default: env var TMDPIR or /tmp)\n"
93   	  "  --exit-on-last\n"
94   	  "      Exit automatically when last client disconnects\n"
95   	  "  --background\n"
96   	  "      Run silently in the background (mutually exclusive with --batch)\n"
97   	  "  --batch\n"
98   	  "      Run in batch mode (mutually exclusive with --background)\n"
99   	  "      The checkpoint interval is set to 3600 seconds (1 hr) by default\n"
100  	  "  --interval, -i, (environment variable DMTCP_CHECKPOINT_INTERVAL):\n"
101  	  "      Time in seconds between automatic checkpoints\n"
102  	  "      (default: 0, disabled)\n"
103  	  "COMMANDS:\n"
104  	  "  (type '?<return>' at runtime for list)\n\n"
105  	  "See http://dmtcp.sf.net/ for more information.\n"
106  	;
107  	
108  	
109  	static const char* theRestartScriptHeader =
110  	  "#!/bin/bash \n"
111  	  "set -m # turn on job control\n\n"
112  	  "#This script launches all the restarts in the background.\n"
113  	  "#Suggestions for editing:\n"
114  	  "#  1. For those processes executing on the localhost, remove 'ssh <hostname>' from the start of the line. \n"
115  	  "#  2. If using ssh, verify that ssh does not require passwords or other prompts.\n"
116  	  "#  3. Verify that the dmtcp_restart command is in your path on all hosts.\n"
117  	  "#  4. Verify DMTCP_HOST and DMTCP_PORT match the location of the dmtcp_coordinator.\n"
118  	  "#     If necessary, add 'DMTCP_PORT=<dmtcp_coordinator port>' after 'DMTCP_HOST=<...>'.\n"
119  	  "#  5. Remove the '&' from a line if that process reads STDIN.\n"
120  	  "#     If multiple processes read STDIN then prefix the line with 'xterm -hold -e' and put '&' at the end of the line.\n"
121  	  "#  6. Processes on same host can be restarted with single dmtcp_restart command.\n\n\n"
122  	;
123  	
124  	static const char* theRestartScriptUsage =
125  	  "usage_str='USAGE:\n"
126  	  "  dmtcp_restart_script.sh [OPTIONS]\n\n"
127  	  "OPTIONS:\n"
128  	  "  --host, -h, (environment variable DMTCP_HOST):\n"
129  	  "      Hostname where dmtcp_coordinator is running\n"
130  	  "  --port, -p, (environment variable DMTCP_PORT):\n"
131  	  "      Port where dmtcp_coordinator is running\n"
132  	  "  --hostfile <arg0> :\n"
133  	  "      Provide a hostfile (One host per line, \"#\" indicates comments)\n"
134  	  "  --restartdir, -d, (environment variable DMTCP_RESTART_DIR):\n"
135  	  "      Directory to read checkpoint images from\n"
136  	  "  --batch, -b:\n"
137  	  "      Enable batch mode for dmtcp_restart\n"
138  	  "  --disable-batch, -b:\n"
139  	  "      Disable batch mode for dmtcp_restart (if previously enabled)\n"
140  	  "  --interval, -i, (environment variable DMTCP_CHECKPOINT_INTERVAL):\n"
141  	  "      Time in seconds between automatic checkpoints\n"
142  	  "      (Default: Use pre-checkpoint value)\n"
143  	  "  --help:\n"
144  	  "      Print this message\'\n\n\n"
145  	;
146  	
147  	static const char* theRestartScriptCmdlineArgHandler =
148  	  "if [ $# -gt 0 ]; then\n"
149  	  "  while [ $# -gt 0 ]\n"
150  	  "  do\n"
151  	  "    if [ $1 = \"--help\" ]; then\n"
152  	  "      echo \"$usage_str\"\n"
153  	  "      exit\n"
154  	  "    elif [ $1 = \"--batch\" -o $1 = \"-b\" ]; then\n"
155  	  "      maybebatch='--batch'\n"
156  	  "      shift\n"
157  	  "    elif [ $1 = \"--disable-batch\" ]; then\n"
158  	  "      maybebatch=\n"
159  	  "      shift\n"
160  	  "    elif [ $# -ge 2 ]; then\n"
161  	  "      case \"$1\" in \n"
162  	  "        --host|-h)\n"
163  	  "          coord_host=\"$2\";;\n"
164  	  "        --port|-p)\n"
165  	  "          coord_port=\"$2\";;\n"
166  	  "        --hostfile)\n"
167  	  "          hostfile=\"$2\"\n"
168  	  "          if [ ! -f \"$hostfile\" ]; then\n"
169  	  "            echo \"ERROR: hostfile $hostfile not found\"\n"
170  	  "            exit\n"
171  	  "          fi;;\n"
172  	  "        --restartdir|-d)\n"
173  	  "          DMTCP_RESTART_DIR=$2;;\n"
174  	  "        --interval|-i)\n"
175  	  "          checkpoint_interval=$2;;\n"
176  	  "        *)\n"
177  	  "          echo \"$0: unrecognized option \'$1\'. See correct usage below\"\n"
178  	  "          echo \"$usage_str\"\n"
179  	  "          exit;;\n"
180  	  "      esac\n"
181  	  "      shift\n"
182  	  "      shift\n"
183  	  "    elif [ $1 = \"--help\" ]; then\n"
184  	  "      echo \"$usage_str\"\n"
185  	  "      exit\n"
186  	  "    else\n"
187  	  "      echo \"$0: Incorrect usage.  See correct usage below\"\n"
188  	  "      echo\n"
189  	  "      echo \"$usage_str\"\n"
190  	  "      exit\n"
191  	  "    fi\n"
192  	  "  done\n"
193  	  "fi\n\n"
194  	;
195  	
196  	static bool exitOnLast = false;
197  	static bool blockUntilDone = false;
198  	static int blockUntilDoneRemote = -1;
199  	static dmtcp::DmtcpMessage blockUntilDoneReply;
200  	#ifdef EXTERNAL_SOCKET_HANDLING
201  	static int numWorkersWithExternalSockets = 0;
202  	static dmtcp::vector<dmtcp::ConnectionIdentifier> workersWithExternalSockets;
203  	#endif
204  	
205  	static dmtcp::DmtcpCoordinator prog;
206  	
207  	/* The coordinator can receive a second checkpoint request while processing the
208  	 * first one.  If the second request comes at a point where the coordinator has
209  	 * broadcasted DMTCP_DO_SUSPEND message but the workers haven't replied, the
210  	 * coordinator sends another DMTCP_DO_SUSPEND message.  The workers having
211  	 * replied to the first DMTCP_DO_SUSPEND message (by suspending all the user
212  	 * threads) are waiting for the next message (DMT_DO_LOCK_FDS or
213  	 * DMT_KILL_PEER), however they receive DMT_DO_SUSPEND message and thus exit()
214  	 * indicating an error.
215  	 * The fix to this problem is to introduce a global
216  	 * variable "workersRunningAndSuspendMsgSent" which, as the name implies,
217  	 * indicates that the DMT_DO_SUSPEND message has been sent and the coordinator
218  	 * is waiting for replies from the workers.  If this variable is set, the
219  	 * coordinator will not process another checkpoint request.
220  	*/
221  	static bool workersRunningAndSuspendMsgSent = false;
222  	
223  	static int theCheckpointInterval = 0;
224  	static bool batchMode = false;
225  	
226  	const int STDIN_FD = fileno ( stdin );
227  	
228  	JTIMER ( checkpoint );
229  	JTIMER ( restart );
230  	
231  	static dmtcp::UniquePid curCompGroup = dmtcp::UniquePid();
232  	static int numPeers = -1;
233  	static int curTimeStamp = -1;
234  	
235  	namespace
236  	{
237  	  static int theNextClientNumber = 1;
238  	
239  	  class NamedChunkReader : public jalib::JChunkReader
240  	  {
241  	    public:
242  	      NamedChunkReader ( const jalib::JSocket& sock
243  	                         ,const dmtcp::UniquePid& identity
244  	                         ,dmtcp::WorkerState state
245  	                         ,const struct sockaddr * remote
246  	                         ,socklen_t len
247  	                         ,int restorePort )
248  	          : jalib::JChunkReader ( sock,sizeof ( dmtcp::DmtcpMessage ) )
249  	          , _identity ( identity )
250  	          , _clientNumber ( theNextClientNumber++ )
251  	          , _state ( state )
252  	          , _addrlen ( len )
253  	          , _restorePort ( restorePort )
254  	      {
255  	        memset ( &_addr, 0, sizeof _addr );
256  	        memcpy ( &_addr, remote, len );
257  	      }
258  	      const dmtcp::UniquePid& identity() const { return _identity;}
259  	      int clientNumber() const { return _clientNumber; }
260  	      dmtcp::WorkerState state() const { return _state; }
261  	      const struct sockaddr_storage* addr() const { return &_addr; }
262  	      socklen_t addrlen() const { return _addrlen; }
263  	      int restorePort() const { return _restorePort; }
264  	      void setState ( dmtcp::WorkerState value ) { _state = value; }
265  	      void progname(dmtcp::string pname){ _progname = pname; }
266  	      dmtcp::string progname(void) const { return _progname; }
267  	      void hostname(dmtcp::string hname){ _hostname = hname; }
268  	      dmtcp::string hostname(void) const { return _hostname; }
269  	    private:
270  	      dmtcp::UniquePid _identity;
271  	      int _clientNumber;
272  	      dmtcp::WorkerState _state;
273  	      struct sockaddr_storage _addr;
274  	      socklen_t               _addrlen;
275  	      int _restorePort;
276  	      dmtcp::string _hostname;
277  	      dmtcp::string _progname;
278  	  };
279  	}
280  	
281  	#ifdef EXTERNAL_SOCKET_HANDLING
282  	void dmtcp::DmtcpCoordinator::sendUnIdentifiedPeerNotifications()
283  	{
284  	  _socketPeerLookupMessagesIterator it;
285  	  for ( it = _socketPeerLookupMessages.begin();
286  	        it != _socketPeerLookupMessages.end();
287  	        ++it ) {
288  	    DmtcpMessage msg (DMT_UNKNOWN_PEER);
289  	    msg.conId = it->conId;
290  	    jalib::JSocket remote(_workerSocketTable[it->from]);
291  	    remote << msg;
292  	    //*(it->second) << msg;
293  	
294  	    vector<dmtcp::ConnectionIdentifier>::iterator i;
295  	    for ( i  = workersWithExternalSockets.begin();
296  	          i != workersWithExternalSockets.end();
297  	          ++i) {
298  	      if ( *i == it->from ) {
299  	        break;
300  	      }
301  	    }
302  	    if ( i == workersWithExternalSockets.end() ) {
303  	      workersWithExternalSockets.push_back ( it->from );
304  	    }
305  	  }
306  	  _socketPeerLookupMessages.clear();
307  	}
308  	#endif
309  	
310  	void dmtcp::DmtcpCoordinator::handleUserCommand(char cmd, DmtcpMessage* reply /*= NULL*/)
311  	{
312  	  int * replyParams;
313  	  if(reply!=NULL){
314  	    replyParams = reply->params;
315  	  }else{
316  	    static int dummy[sizeof(reply->params)/sizeof(int)];
317  	    replyParams = dummy;
318  	  }
319  	
320  	  JASSERT(sizeof(reply->params)/sizeof(int) >= 2); //this should be compiled out
321  	  //default reply is 0
322  	  replyParams[0] = NOERROR;
323  	  replyParams[1] = NOERROR;
324  	
325  	  switch ( cmd ){
326  	  case 'b': case 'B':  // prefix blocking command, prior to checkpoint command
327  	    blockUntilDone = true;
328  	    replyParams[0] = 0;  // reply from prefix command will be ignored
329  	    break;
330  	  case 'c': case 'C':
331  	    if(startCheckpoint()){
332  	      replyParams[0] = getStatus().numPeers;
333  	    }else{
334  	      replyParams[0] = ERROR_NOT_RUNNING_STATE;
335  	      replyParams[1] = ERROR_NOT_RUNNING_STATE;
336  	    }
337  	    break;
338  	  case 'i': case 'I':
339  	    setTimeoutInterval ( theCheckpointInterval );
340  	    JNOTE ( "CheckpointInterval Updated" ) ( theCheckpointInterval );
341  	    break;
342  	  case 'l': case 'L':
343  	  case 't': case 'T':
344  	    JASSERT_STDERR << "Client List:\n";
345  	    JASSERT_STDERR << "#, PROG[PID]@HOST, DMTCP-UNIQUEPID, STATE\n";
346  	    for ( dmtcp::vector<jalib::JReaderInterface*>::iterator i = _dataSockets.begin()
347  	            ;i!= _dataSockets.end()
348  	            ;++i )
349  	    {
350  	      if ( ( *i )->socket().sockfd() != STDIN_FD )
351  	      {
352  	        const NamedChunkReader& cli = *((NamedChunkReader*)(*i));
353  	        JASSERT_STDERR << cli.clientNumber()
354  	                       << ", " << cli.progname() << "["  << cli.identity().pid() << "]@"  << cli.hostname()
355  	                       << ", " << cli.identity()
356  	                       << ", " << cli.state().toString()
357  	                       << '\n';
358  	      }
359  	    }
360  	    break;
361  	  case 'f': case 'F':
362  	    JNOTE ( "forcing restart..." );
363  	    broadcastMessage ( DMT_FORCE_RESTART );
364  	    break;
365  	  case 'q': case 'Q':
366  	  {
367  	    JNOTE ( "Killing all connected Peers ..." );
368  	    broadcastMessage ( DMT_KILL_PEER );
369  	    /* Call to broadcastMessage only puts the messages into the write queue.
370  	     * We actually want the messages to be written out to the respective sockets
371  	     * so that we can then close the sockets and exit gracefully.  The following
372  	     * loop is taken from the implementation of monitorSocket() implementation
373  	     * in jsocket.cpp.
374  	     *
375  	     * Once the messages have been written out, the coordinator closes all the
376  	     * connections and calls exit().
377  	     */
378  	    for ( size_t i=0; i<_writes.size(); ++i )
379  	    {
380  	      int fd = _writes[i]->socket().sockfd();
381  	      if ( fd >= 0 ) {
382  	        _writes[i]->writeOnce();
383  	      }
384  	    }
385  	    JASSERT_STDERR << "DMTCP coordinator exiting... (per request)\n";
386  	    for ( dmtcp::vector<jalib::JReaderInterface*>::iterator i = _dataSockets.begin()
387  	        ; i!= _dataSockets.end()
388  	        ; ++i )
389  	    {
390  	      (*i)->socket().close();
391  	    }
392  	    for ( dmtcp::vector<jalib::JSocket>::iterator i = _listenSockets.begin()
393  	        ; i!= _listenSockets.end()
394  	        ; ++i )
395  	    {
396  	      i->close();
397  	    }
398  	    JTRACE ("Exiting ...");
399  	    exit ( 0 );
400  	    break;
401  	  }
402  	  case 'k': case 'K':
403  	    JNOTE ( "Killing all connected Peers..." );
404  	    //XXX: What happens if a 'k' command is followed by a 'c' command before
405  	    //     the *real* broadcast takes place?         --Kapil
406  	    broadcastMessage ( DMT_KILL_PEER );
407  	    break;
408  	  case 'h': case 'H': case '?':
409  	    JASSERT_STDERR << theHelpMessage;
410  	    break;
411  	  case 's': case 'S':
412  	    {
413  	      CoordinatorStatus s = getStatus();
414  	      bool running= s.minimumStateUnanimous && s.minimumState==WorkerState::RUNNING;
415  	      if(reply==NULL){
416  	        printf("Status...\n");
417  	        printf("NUM_PEERS=%d\n", s.numPeers);
418  	        printf("RUNNING=%s\n", (running?"yes":"no"));
419  	        fflush(stdout);
420  	        if(!running) JTRACE("raw status")(s.minimumState)(s.minimumStateUnanimous);
421  	      }else{
422  	        replyParams[0]=s.numPeers;
423  	        replyParams[1]=running;
424  	      }
425  	    }
426  	    break;
427  	  case ' ': case '\t': case '\n': case '\r':
428  	    //ignore whitespace
429  	    break;
430  	  default:
431  	    JTRACE("unhandled user command")(cmd);
432  	    replyParams[0] = ERROR_INVALID_COMMAND;
433  	    replyParams[1] = ERROR_INVALID_COMMAND;
434  	  }
435  	  return;
436  	}
437  	
438  	void dmtcp::DmtcpCoordinator::onData ( jalib::JReaderInterface* sock )
439  	{
440  	  if ( sock->socket().sockfd() == STDIN_FD )
441  	  {
442  	    handleUserCommand(sock->buffer()[0]);
443  	    return;
444  	  }
445  	  else
446  	  {
447  	    NamedChunkReader * client= ( NamedChunkReader* ) sock;
448  	    DmtcpMessage& msg = * ( DmtcpMessage* ) sock->buffer();
449  	    msg.assertValid();
450  	    char * extraData = 0;
451  	
452  	    if ( msg.extraBytes > 0 )
453  	    {
454  	      extraData = new char[msg.extraBytes];
455  	      sock->socket().readAll ( extraData, msg.extraBytes );
456  	    }
457  	
458  	    switch ( msg.type )
459  	    {
460  	      case DMT_OK:
461  	      {
462  	        WorkerState oldState = client->state();
463  	        client->setState ( msg.state );
464  	        WorkerState newState = minimumState();
465  	
466  	        JTRACE ("got DMT_OK message")( msg.from )( msg.state )( oldState )( newState );
467  	
468  	        if ( oldState == WorkerState::RUNNING
469  	                && newState == WorkerState::SUSPENDED )
470  	        {
471  	          // All the workers are in SUSPENDED state, now it is safe to reset this flag.
472  	          workersRunningAndSuspendMsgSent = false;
473  	
474  	          JNOTE ( "locking all nodes" );
475  	          broadcastMessage ( DMT_DO_LOCK_FDS );
476  	        }
477  	#ifdef EXTERNAL_SOCKET_HANDLING
478  	        if ( oldState == WorkerState::SUSPENDED
479  	                && newState == WorkerState::FD_LEADER_ELECTION )
480  	        {
481  	          JNOTE ( "performing peerlookup for all sockets" );
482  	          broadcastMessage ( DMT_DO_PEER_LOOKUP );
483  	        }
484  	        if ( oldState == WorkerState::FD_LEADER_ELECTION
485  	                && newState == WorkerState::PEER_LOOKUP_COMPLETE )
486  	        {
487  	          if ( _socketPeerLookupMessages.empty() ) {
488  	            JNOTE ( "draining all nodes" );
489  	            broadcastMessage ( DMT_DO_DRAIN );
490  	          } else {
491  	            sendUnIdentifiedPeerNotifications();
492  	            JNOTE ( "Not all socket peers were Identified, resuming computation without checkpointing" );
493  	            broadcastMessage ( DMT_DO_RESUME );
494  	          }
495  	        }
496  	        if ( oldState == WorkerState::PEER_LOOKUP_COMPLETE
497  	                && newState == WorkerState::DRAINED )
498  	        {
499  	          JNOTE ( "checkpointing all nodes" );
500  	          broadcastMessage ( DMT_DO_CHECKPOINT );
501  	        }
502  	#else
503  	        if ( oldState == WorkerState::SUSPENDED
504  	                && newState == WorkerState::FD_LEADER_ELECTION )
505  	        {
506  	          JNOTE ( "draining all nodes" );
507  	          broadcastMessage ( DMT_DO_DRAIN );
508  	        }
509  	        if ( oldState == WorkerState::FD_LEADER_ELECTION
510  	                && newState == WorkerState::DRAINED )
511  	        {
512  	          JNOTE ( "checkpointing all nodes" );
513  	          broadcastMessage ( DMT_DO_CHECKPOINT );
514  	        }
515  	#endif
516  	        if ( oldState == WorkerState::DRAINED
517  	                && newState == WorkerState::CHECKPOINTED )
518  	        {
519  	          JNOTE ( "refilling all nodes" );
520  	          broadcastMessage ( DMT_DO_REFILL );
521  	          writeRestartScript();
522  	        }
523  	        if ( oldState == WorkerState::RESTARTING
524  	                && newState == WorkerState::CHECKPOINTED )
525  	        {
526  	          JTRACE ( "resetting _restoreWaitingMessages" )
527  	          ( _restoreWaitingMessages.size() );
528  	          _restoreWaitingMessages.clear();
529  	
530  	          JTIMER_STOP ( restart );
531  	
532  	          JNOTE ( "refilling all nodes (after checkpoint)" );
533  	          broadcastMessage ( DMT_DO_REFILL );
534  	        }
535  	        if ( oldState == WorkerState::CHECKPOINTED
536  	                && newState == WorkerState::REFILLED )
537  	        {
538  	          JNOTE ( "restarting all nodes" );
539  	          broadcastMessage ( DMT_DO_RESUME );
540  	
541  	          JTIMER_STOP ( checkpoint );
542  	
543  	          setTimeoutInterval( theCheckpointInterval );
544  	
545  	          if (blockUntilDone) {
546  	          JNOTE ( "replying to dmtcp_command:  we're done" );
547  		    // These were set in dmtcp::DmtcpCoordinator::onConnect in this file
548  		    jalib::JSocket remote ( blockUntilDoneRemote );
549  	            remote << blockUntilDoneReply;
550  	            remote.close();
551  	            blockUntilDone = false;
552  	            blockUntilDoneRemote = -1;
553  	          }
554  	        }
555  	        break;
556  	      }
557  	      case DMT_RESTORE_WAITING:
558  	      {
559  	        DmtcpMessage restMsg = msg;
560  	        restMsg.type = DMT_RESTORE_WAITING;
561  	        memcpy ( &restMsg.restoreAddr,client->addr(),client->addrlen() );
562  	        restMsg.restoreAddrlen = client->addrlen();
563  	        restMsg.restorePort = client->restorePort();
564  	        JASSERT ( restMsg.restorePort > 0 ) ( restMsg.restorePort ) ( client->identity() );
565  	        JASSERT ( restMsg.restoreAddrlen > 0 ) ( restMsg.restoreAddrlen ) ( client->identity() );
566  	        JASSERT ( restMsg.restorePid != ConnectionIdentifier::Null() ) ( client->identity() );
567  	        JTRACE ( "broadcasting RESTORE_WAITING" )( restMsg.restorePid )( restMsg.restoreAddrlen )( restMsg.restorePort );
568  	        _restoreWaitingMessages.push_back ( restMsg );
569  	        broadcastMessage ( restMsg );
570  	        break;
571  	      }
572  	      case DMT_CKPT_FILENAME:
573  	      {
574  	        JASSERT ( extraData!=0 ).Text ( "extra data expected with DMT_CKPT_FILENAME message" );
575  	        dmtcp::string ckptFilename;
576  	        dmtcp::string hostname;
577  	        ckptFilename = extraData;
578  	        hostname = extraData + ckptFilename.length() + 1;
579  	
580  	        JTRACE ( "recording restart info" ) ( ckptFilename ) ( hostname );
581  	        _restartFilenames[hostname].push_back ( ckptFilename );
582  	      }
583  	      break;
584  	      case DMT_USER_CMD:  // dmtcpaware API being used
585  	        {
586  	          JTRACE("got user command from client")(msg.params[0])(client->identity());
587  		  // Checkpointing commands should always block, to prevent
588  		  //   dmtcpaware checkpoint call from returning prior to checkpoint.
589  		  if (msg.params[0] == 'c')
590  	            handleUserCommand( 'b', NULL );
591  	          DmtcpMessage reply;
592  	          reply.type = DMT_USER_CMD_RESULT;
593  	          if (msg.params[0] == 'i' &&  msg.theCheckpointInterval > 0 ) {
594  	            theCheckpointInterval = msg.theCheckpointInterval;
595  	          }
596  	          handleUserCommand( msg.params[0], &reply );
597  	          sock->socket() << reply;
598  	          //alternately, we could do the write without blocking:
599  	          //addWrite(new jalib::JChunkWriter(sock->socket(), (char*)&msg, sizeof(DmtcpMessage)));
600  	        }
601  	        break;
602  	#ifdef EXTERNAL_SOCKET_HANDLING
603  	      case DMT_PEER_LOOKUP:
604  	      {
605  	        JTRACE ( "received PEER_LOOKUP msg" ) ( msg.conId );
606  	        JASSERT ( msg.localAddrlen > 0 ) ( msg.localAddrlen ) ( client->identity() );
607  	        _socketPeerLookupMessagesIterator i;
608  	        bool foundPeer = false;
609  	        for ( i = _socketPeerLookupMessages.begin();
610  	              i != _socketPeerLookupMessages.end();
611  	              ++i ) {
612  	          if ( ( msg.localAddrlen == i->localAddrlen ) &&
613  	               ( memcmp ( (void*) &msg.localAddr,
614  	                          (void*) &(i->remoteAddr),
615  	                          msg.localAddrlen ) == 0 ) ) {
616  	            _socketPeerLookupMessages.erase(i);
617  	            foundPeer = true;
618  	            break;
619  	          }
620  	        }
621  	        if ( !foundPeer ) {
622  	          _socketPeerLookupMessages.push_back(msg);
623  	          _workerSocketTable[msg.from] = sock->socket().sockfd();
624  	        }
625  	      }
626  	      break;
627  	      case DMT_EXTERNAL_SOCKETS_CLOSED:
628  	      {
629  	        vector<dmtcp::ConnectionIdentifier>::iterator i;
630  	        for ( i  = workersWithExternalSockets.begin();
631  	              i != workersWithExternalSockets.end();
632  	              ++i) {
633  	          if ( *i == msg.from ) {
634  	            break;
635  	          }
636  	        }
637  	        JASSERT ( i != workersWithExternalSockets.end() ) ( msg.from )
638  	          .Text ( "DMT_EXTERNAL_SOCKETS_CLOSED msg received from worker"
639  			  " but it never had one" );
640  	
641  	        workersWithExternalSockets.erase(i);
642  	        JTRACE ("(Known) External Sockets closed by worker") (msg.from);
643  	
644  	        client->setState ( msg.state );
645  	
646  	        if (workersWithExternalSockets.empty() == true) {
647  	          JTRACE ( "External Sockets on all workers are closed now."
648  			   "  Trying to checkpoint." );
649  	          handleUserCommand('c');
650  	        }
651  	      }
652  	      break;
653  	#endif
654  	      default:
655  	        JASSERT ( false ) ( msg.from ) ( msg.type )
656  			.Text ( "unexpected message from worker" );
657  	    }
658  	
659  	    delete[] extraData;
660  	  }
661  	}
662  	
663  	void dmtcp::DmtcpCoordinator::onDisconnect ( jalib::JReaderInterface* sock )
664  	{
665  	  if ( sock->socket().sockfd() == STDIN_FD )
666  	  {
667  	    JTRACE ( "stdin closed" );
668  	  }
669  	  else
670  	  {
671  	    NamedChunkReader& client = * ( ( NamedChunkReader* ) sock );
672  	    JNOTE ( "client disconnected" ) ( client.identity() );
673  	
674  	    CoordinatorStatus s = getStatus();
675  	    if( s.numPeers <= 1 ){
676  	      if(exitOnLast){
677  	        JNOTE ("last client exited, shutting down..");
678  	        handleUserCommand('q');
679  	      }
680  	    }
681  	
682  	//         int clientNumber = ((NamedChunkReader*)sock)->clientNumber();
683  	//         JASSERT(clientNumber >= 0)(clientNumber);
684  	//         _table.removeClient(clientNumber);
685  	  }
686  	}
687  	
688  	void dmtcp::DmtcpCoordinator::onConnect ( const jalib::JSocket& sock,
689  	                                          const struct sockaddr* remoteAddr,
690  	                                          socklen_t remoteLen )
691  	{
692  	  // If no client is connected to Coordinator, then there can be only zero data
693  	  // sockets OR there can be one data socket and that should be STDIN.
694  	  if ( _dataSockets.size() == 0 ||
695  	       ( _dataSockets.size() == 1
696  		 && _dataSockets[0]->socket().sockfd() == STDIN_FD ) )
697  	  {
698  	      //this is the first connection, do some initializations
699  	      workersRunningAndSuspendMsgSent = false;
700  	
701  	      setTimeoutInterval( theCheckpointInterval );
702  	
703  	     // drop current computation group to 0
704  	      curCompGroup = dmtcp::UniquePid(0,0,0);
705  	      curTimeStamp = 0; // Drop timestamp to 0
706  	      numPeers = -1; // Drop number of peers to unknown
707  	
708  	      JTRACE ( "resetting _restoreWaitingMessages" )
709  	        ( _restoreWaitingMessages.size() );
710  	      _restoreWaitingMessages.clear();
711  	
712  	      JTIMER_START ( restart );
713  	  }
714  	
715  	  jalib::JSocket remote ( sock );
716  	  dmtcp::DmtcpMessage hello_remote;
717  	  hello_remote.poison();
718  	  JTRACE("Reading from incoming connection...");
719  	  remote >> hello_remote;
720  	  hello_remote.assertValid();
721  	
722  	  if ( hello_remote.type == DMT_USER_CMD ) {
723  	    processDmtUserCmd ( hello_remote, remote );
724  	    return;
725  	  } else if ( hello_remote.type == DMT_RESTART_PROCESS ) {
726  	    if ( validateDmtRestartProcess ( hello_remote, remote ) == false )
727  	      return;
728  	  } else if ( hello_remote.type == DMT_HELLO_COORDINATOR ) {
729  	    if ( validateWorkerProcess ( hello_remote, remote ) == false )
730  	      return;
731  	  } else {
732  	    JASSERT ( false ) .Text ( "Connect request from Unknown Remote Process Type" );
733  	  }
734  	
735  	  JNOTE ( "worker connected" ) ( hello_remote.from );
736  	
737  	  if ( hello_remote.theCheckpointInterval > 0 ) {
738  	    int oldInterval = theCheckpointInterval;
739  	    theCheckpointInterval = hello_remote.theCheckpointInterval;
740  	    setTimeoutInterval ( theCheckpointInterval );
741  	    JNOTE ( "CheckpointInterval Updated" ) ( oldInterval )
742  		  ( theCheckpointInterval );
743  	  }
744  	//     _table[hello_remote.from.pid()].setState(hello_remote.state);
745  	
746  	  NamedChunkReader * ds = new NamedChunkReader (
747  	      sock
748  	      ,hello_remote.from.pid()
749  	      ,hello_remote.state
750  	      ,remoteAddr
751  	      ,remoteLen
752  	      ,hello_remote.restorePort );
753  	
754  	  if( hello_remote.extraBytes > 0 ){
755  	    char* extraData = new char[hello_remote.extraBytes];
756  	    remote.readAll(extraData, hello_remote.extraBytes);
757  	    dmtcp::string hostname = extraData;
758  	    dmtcp::string progname = extraData + hostname.length() + 1;
759  	    ds->progname(progname);
760  	    ds->hostname(hostname);
761  	    delete [] extraData;
762  	  }
763  	
764  	
765  	  //add this client as a chunk reader
766  	  // in this case a 'chunk' is sizeof(DmtcpMessage)
767  	  addDataSocket ( ds );
768  	
769  	  if ( hello_remote.state == WorkerState::RESTARTING
770  	          &&  _restoreWaitingMessages.size() >0 )
771  	  {
772  	    JTRACE ( "updating missing broadcasts for new connection" )
773  	    ( hello_remote.from.pid() )
774  	    ( _restoreWaitingMessages.size() );
775  	    for ( size_t i=0; i<_restoreWaitingMessages.size(); ++i )
776  	    {
777  	      addWrite (
778  	          new jalib::JChunkWriter ( sock
779  	                                    , ( char* ) &_restoreWaitingMessages[i]
780  	                                    , sizeof ( DmtcpMessage ) )
781  	      );
782  	    }
783  	  }
784  	
785  	  JTRACE( "END" )
786  	  ( _dataSockets.size() ) ( _dataSockets[0]->socket().sockfd() == STDIN_FD );
787  	}
788  	
789  	//     WorkerNode& node = _table[hello_remote.from.pid()];
790  	//     node.setClientNumer( ds->clientNumber() );
791  	  /*
792  	      if(hello_remote.state == WorkerState::RESTARTING)
793  	      {
794  	          node.setAddr(remoteAddr, remoteLen);
795  	          node.setRestorePort(hello_remote.restorePort);
796  	
797  	          JASSERT(node.addrlen() > 0)(node.addrlen());
798  	          JASSERT(node.restorePort() > 0)(node.restorePort());
799  	          DmtcpMessage msg;
800  	          msg.type = DMT_RESTORE_WAITING;
801  	          memcpy(&msg.restoreAddr,node.addr(),node.addrlen());
802  	          msg.restoreAddrlen = node.addrlen();
803  	          msg.restorePid.id = node.id();
804  	          msg.restorePort = node.restorePort();
805  	          broadcastMessage( msg );
806  	      }*/
807  	//}
808  	
809  	void dmtcp::DmtcpCoordinator::processDmtUserCmd( DmtcpMessage& hello_remote,
810  							 jalib::JSocket& remote )
811  	{
812  	  //dmtcp_command doesn't handshake (it is antisocial)
813  	  JTRACE("got user command from dmtcp_command")(hello_remote.params[0]);
814  	  DmtcpMessage reply;
815  	  reply.type = DMT_USER_CMD_RESULT;
816  	  // if previous 'b' blocking prefix command had set blockUntilDone
817  	  if (blockUntilDone && blockUntilDoneRemote == -1  &&
818  	      hello_remote.params[0] == 'c') {
819  	    // Reply will be done in dmtcp::DmtcpCoordinator::onData in this file.
820  	    blockUntilDoneRemote = remote.sockfd();
821  	    blockUntilDoneReply = reply;
822  	    handleUserCommand( hello_remote.params[0], &reply );
823  	  } else if ( (hello_remote.params[0] == 'i' || hello_remote.params[1] == 'I')
824  	               && hello_remote.theCheckpointInterval > 0 ) {
825  	    theCheckpointInterval = hello_remote.theCheckpointInterval;
826  	    handleUserCommand( hello_remote.params[0], &reply );
827  	    remote << reply;
828  	    remote.close();
829  	  } else {
830  	    handleUserCommand( hello_remote.params[0], &reply );
831  	    remote << reply;
832  	    remote.close();
833  	  }
834  	  return;
835  	}
836  	
837  	bool dmtcp::DmtcpCoordinator::validateDmtRestartProcess
838  		 ( DmtcpMessage& hello_remote, jalib::JSocket& remote )
839  	{
840  	  // This is dmtcp_restart process, connecting to get timestamp
841  	  // and set current compGroup.
842  	
843  	  JASSERT ( hello_remote.params[0] > 0 );
844  	
845  	  dmtcp::DmtcpMessage hello_local ( dmtcp::DMT_RESTART_PROCESS_REPLY );
846  	
847  	  if( curCompGroup == dmtcp::UniquePid(0,0,0) ){
848  	    JASSERT ( minimumState() == WorkerState::UNKNOWN )
849  	      .Text ( "Coordinator should be idle at this moment" );
850  	    // Coordinator is free at this moment - setup all the things
851  	    curCompGroup = hello_remote.compGroup;
852  	    numPeers = hello_remote.params[0];
853  	    curTimeStamp = time(NULL);
854  	    hello_local.params[1] = 1;
855  	    JNOTE ( "FIRST dmtcp_restart connection.  Set numPeers. Generate timestamp" )
856  	      ( numPeers ) ( curTimeStamp ) ( curCompGroup );
857  	  } else if ( curCompGroup != hello_remote.compGroup ) {
858  	    // Coordinator already serving some other computation group - reject this process.
859  	    JNOTE ("Reject incoming dmtcp_restart connection"
860  	           " since it is not from current computation")
861  	      ( curCompGroup ) ( hello_remote.compGroup );
862  	    hello_local.type = dmtcp::DMT_REJECT;
863  	    remote << hello_local;
864  	    remote.close();
865  	    return false;
866  	  } else if ( numPeers != hello_remote.params[0] ) {
867  	    // Sanity check
868  	    JNOTE  ( "Invalid numPeers reported by dmtcp_restart process, Rejecting" )
869  	      ( numPeers ) ( hello_remote.params[0] );
870  	
871  	    hello_local.type = dmtcp::DMT_REJECT;
872  	    remote << hello_local;
873  	    remote.close();
874  	    return false;
875  	  } else {
876  	    // This is a second or higher dmtcp_restart process connecting to the coordinator.
877  	    // FIXME: Should the following be a JASSERT instead?      -- Kapil
878  	    JWARNING ( minimumState() == WorkerState::RESTARTING );
879  	    hello_local.params[1] = 0;
880  	  }
881  	
882  	  // Sent generated timestamp in local massage for dmtcp_restart process.
883  	  hello_local.params[0] = curTimeStamp;
884  	
885  	  remote << hello_local;
886  	
887  	  return true;
888  	}
889  	
890  	bool dmtcp::DmtcpCoordinator::validateWorkerProcess
891  		 ( DmtcpMessage& hello_remote, jalib::JSocket& remote )
892  	{
893  	  dmtcp::DmtcpMessage hello_local ( dmtcp::DMT_HELLO_WORKER );
894  	
895  	  if ( hello_remote.state == WorkerState::RESTARTING ) {
896  	    if ( minimumState() != WorkerState::RESTARTING &&
897  	         minimumState() != WorkerState::CHECKPOINTED ) {
898  	      JNOTE ("Computation not in RESTARTING or CHECKPOINTED state."
899  		     "  Reject incoming restarting computation process.")
900  	        ( curCompGroup ) ( hello_remote.compGroup );
901  	      hello_local.type = dmtcp::DMT_REJECT;
902  	      remote << hello_local;
903  	      remote.close();
904  	      return false;
905  	    } else if ( hello_remote.compGroup != curCompGroup) {
906  	      JNOTE ("Reject incoming restarting computation process"
907  		     " since it is not from current computation")
908  	        ( curCompGroup ) ( hello_remote.compGroup );
909  	      hello_local.type = dmtcp::DMT_REJECT;
910  	      remote << hello_local;
911  	      remote.close();
912  	      return false;
913  	    }
914  	    // dmtcp_restart already connected and compGroup created.
915  	    // Computation process connection
916  	    JASSERT ( curTimeStamp != 0 );
917  	
918  	    JTRACE("Connection from (restarting) computation process")
919  	      ( curCompGroup ) ( hello_remote.compGroup ) ( minimumState() );
920  	
921  	    remote << hello_local;
922  	
923  	  } else if ( hello_remote.state == WorkerState::RUNNING ) {
924  	    CoordinatorStatus s = getStatus();
925  	    // If some of the processes are not in RUNNING state OR if the SUSPEND
926  	    // message has been sent, REJECT.
927  	    if ( s.numPeers > 0 &&
928  	         ( s.minimumState != WorkerState::RUNNING ||
929  	           s.minimumStateUnanimous == false       ||
930  	           workersRunningAndSuspendMsgSent == true) ) {
931  	      JNOTE  ( "Current Computation not in RUNNING state."
932  		       "  Refusing to accept new connections.")
933  	        ( curCompGroup ) ( hello_remote.from.pid() );
934  	      hello_local.type = dmtcp::DMT_REJECT;
935  	      remote << hello_local;
936  	      remote.close();
937  	      return false;
938  	    } else if ( hello_remote.compGroup != UniquePid() ) {
939  	      // New Process trying to connect to Coordinator but already has compGroup
940  	      JNOTE  ( "New Process, but already has computation group.  Rejecting" );
941  	      hello_local.type = dmtcp::DMT_REJECT;
942  	      remote << hello_local;
943  	      remote.close();
944  	      return false;
945  	    } else {
946  	      // If first process, create the new computation group
947  	      if ( curCompGroup == UniquePid(0,0,0) ) {
948  	        // Connection of new computation.
949  	        curCompGroup = hello_remote.from.pid();
950  	        curTimeStamp = 0;
951  	        numPeers = -1;
952  	        JTRACE ( "First process connected.  Creating new computation group" )
953  		       (curCompGroup );
954  	      } else {
955  	        JTRACE ( "New Process Connected" ) ( hello_remote.from.pid() );
956  	      }
957  	      remote << hello_local;
958  	    }
959  	  } else {
960  	    JASSERT ( false ) .Text ( "Invalid Worker Type" );
961  	    return false;
962  	  }
963  	
964  	  return true;
965  	}
966  	
967  	void dmtcp::DmtcpCoordinator::onTimeoutInterval()
968  	{
969  	  if ( theCheckpointInterval > 0 )
970  	    startCheckpoint();
971  	}
972  	
973  	
974  	bool dmtcp::DmtcpCoordinator::startCheckpoint()
975  	{
976  	  CoordinatorStatus s = getStatus();
977  	  if ( s.minimumState == WorkerState::RUNNING
978  	       && !workersRunningAndSuspendMsgSent )
979  	  {
980  	    JTIMER_START ( checkpoint );
981  	    _restartFilenames.clear();
982  	    JNOTE ( "starting checkpoint, suspending all nodes" )( s.numPeers );
983  	    // Pass number of connected peers to all clients
984  	    broadcastMessage ( DMT_DO_SUSPEND , curCompGroup, getStatus().numPeers );
985  	
986  	    // Suspend Message has been sent but the workers are still in running
987  	    // state.  If the coordinator receives another checkpoint request from user
988  	    // at this point, it should fail.
989  	    workersRunningAndSuspendMsgSent = true;
990  	    return true;
991  	  }
992  	
993  	  if (s.numPeers > 0) {
994  	    JTRACE ( "delaying checkpoint, workers not ready" ) ( s.minimumState )
995  		   ( s.numPeers );
996  	  }
997  	  return false;
998  	}
999  	
1000 	dmtcp::DmtcpWorker& dmtcp::DmtcpWorker::instance()
1001 	{
1002 	  JASSERT ( false ).Text ( "This method is only available on workers" );
1003 	  return * ( ( DmtcpWorker* ) 0 );
1004 	}
1005 	
1006 	/*
1007 	  Can cause conflict with method of same signature in dmtcpworker.cpp.
1008 	  What was the purpose of this method? -- Praveen
1009 	*/
1010 	const dmtcp::UniquePid& dmtcp::DmtcpWorker::coordinatorId() const
1011 	{
1012 	  JASSERT ( false ).Text ( "This method is only available on workers" );
Event null_reference: NULL reference is not allowed
Event caretline: ^
1013 	  return * ( ( UniquePid* ) 0 );
1014 	}
1015 	
1016 	void dmtcp::DmtcpCoordinator::broadcastMessage ( DmtcpMessageType type,
1017 	    dmtcp::UniquePid compGroup = dmtcp::UniquePid(), int param1 = -1 )
1018 	{
1019 	  DmtcpMessage msg;
1020 	  msg.type = type;
1021 	  if( param1 > 0 ){
1022 	    msg.params[0] = param1;
1023 	    msg.compGroup = compGroup;
1024 	  }
1025 	  broadcastMessage ( msg );
1026 	  JTRACE ("sending message")( type );
1027 	}
1028 	
1029 	void dmtcp::DmtcpCoordinator::broadcastMessage ( const DmtcpMessage& msg )
1030 	{
1031 	  for ( dmtcp::vector<jalib::JReaderInterface*>::iterator i
1032 		= _dataSockets.begin() ; i!= _dataSockets.end() ; i++ )
1033 	  {
1034 	    if ( ( *i )->socket().sockfd() != STDIN_FD )
1035 	      addWrite ( new jalib::JChunkWriter ( ( *i )->socket(),
1036 						   ( char* ) &msg,
1037 						   sizeof ( DmtcpMessage ) ) );
1038 	  }
1039 	}
1040 	
1041 	dmtcp::DmtcpCoordinator::CoordinatorStatus dmtcp::DmtcpCoordinator::getStatus() const
1042 	{
1043 	  CoordinatorStatus status;
1044 	  const static int INITIAL = WorkerState::_MAX;
1045 	  int m = INITIAL;
1046 	  int count = 0;
1047 	  bool unanimous = true;
1048 	  for ( const_iterator i = _dataSockets.begin()
1049 	      ; i != _dataSockets.end()
1050 	      ; ++i )
1051 	  {
1052 	    if ( ( *i )->socket().sockfd() != STDIN_FD )
1053 	    {
1054 	      int cliState = ((NamedChunkReader*)*i)->state().value();
1055 	      count++;
1056 	      unanimous = unanimous && (m==cliState || m==INITIAL);
1057 	      if ( cliState < m ) m = cliState;
1058 	    }
1059 	  }
1060 	
1061 	  status.minimumState = ( m==INITIAL ? WorkerState::UNKNOWN
1062 				  : (WorkerState::eWorkerState)m );
1063 	  if( status.minimumState == WorkerState::CHECKPOINTED && count < numPeers ){
1064 	    JTRACE("minimal state counted as CHECKPOINTED but not all processes"
1065 		   " are connected yet.  So we wait.") ( numPeers ) ( count );
1066 	    status.minimumState = WorkerState::RESTARTING;
1067 	  }
1068 	  status.minimumStateUnanimous = unanimous;
1069 	  status.numPeers = count;
1070 	  return status;
1071 	}
1072 	
1073 	void dmtcp::DmtcpCoordinator::writeRestartScript()
1074 	{
1075 	  const char* dir = getenv ( ENV_VAR_CHECKPOINT_DIR );
1076 	  if(dir==NULL) dir = ".";
1077 	  dmtcp::ostringstream o1, o2;
1078 	  dmtcp::string filename, uniqueFilename;
1079 	
1080 	  o1 << dmtcp::string(dir) << "/"
1081 	     << RESTART_SCRIPT_BASENAME << RESTART_SCRIPT_EXT;
1082 	  filename = o1.str();
1083 	
1084 	  o2 << dmtcp::string(dir) << "/"
1085 	     << RESTART_SCRIPT_BASENAME << "_" << curCompGroup << RESTART_SCRIPT_EXT;
1086 	  uniqueFilename = o2.str();
1087 	
1088 	  const bool isSingleHost = (_restartFilenames.size() == 1);
1089 	
1090 	  dmtcp::map< dmtcp::string, dmtcp::vector<dmtcp::string> >::const_iterator host;
1091 	  dmtcp::vector<dmtcp::string>::const_iterator file;
1092 	
1093 	  char hostname[80];
1094 	  gethostname ( hostname, 80 );
1095 	
1096 	  JTRACE ( "writing restart script" ) ( uniqueFilename );
1097 	
1098 	  FILE* fp = fopen ( uniqueFilename.c_str(),"w" );
1099 	  JASSERT ( fp!=0 )(JASSERT_ERRNO)( uniqueFilename )
1100 		  .Text ( "failed to open file" );
1101 	
1102 	  fprintf ( fp, "%s", theRestartScriptHeader );
1103 	  fprintf ( fp, "%s", theRestartScriptUsage );
1104 	
1105 	  fprintf ( fp, "coord_host=$"ENV_VAR_NAME_ADDR"\n"
1106 	                "if test -z \"$" ENV_VAR_NAME_ADDR "\"; then\n"
1107 	                "  coord_host=%s\nfi\n\n", hostname );
1108 	  fprintf ( fp, "coord_port=$"ENV_VAR_NAME_PORT"\n"
1109 	                "if test -z \"$" ENV_VAR_NAME_PORT "\"; then\n"
1110 	                "  coord_port=%d\nfi\n\n", thePort );
1111 	  fprintf ( fp, "checkpoint_interval=$"ENV_VAR_CKPT_INTR"\n"
1112 	                "if test -z \"$" ENV_VAR_CKPT_INTR "\"; then\n"
1113 	                "  checkpoint_interval=%d\nfi\n\n", theCheckpointInterval );
1114 	  if ( batchMode )
1115 	    fprintf ( fp, "maybebatch='--batch'\n\n" );
1116 	  else
1117 	    fprintf ( fp, "maybebatch=\n\n" );
1118 	
1119 	  fprintf ( fp, "# Number of hosts in the computation = %zd\n",
1120 		    _restartFilenames.size() );
1121 	  fprintf ( fp, "# Number of processes in the computation = %d\n\n",
1122 		    getStatus().numPeers );
1123 	
1124 	  if ( isSingleHost ) {
1125 	    JTRACE ( "Single HOST");
1126 	
1127 	    host=_restartFilenames.begin();
1128 	    dmtcp::ostringstream o;
1129 	    for ( file=host->second.begin(); file!=host->second.end(); ++file ) {
1130 	      o << " " << *file;
1131 	    }
1132 	
1133 	    fprintf ( fp, "%s", theRestartScriptCmdlineArgHandler );
1134 	    fprintf ( fp, "DMTCP_RESTART=dmtcp_restart\n" );
1135 	    fprintf ( fp, "which dmtcp_restart > /dev/null \\\n" \
1136 			  " || DMTCP_RESTART=`dirname %s`/dmtcp_restart\n\n", argv0 );
1137 	    fprintf ( fp, "if [ ! -z \"$DMTCP_RESTART_DIR\" ]; then\n"
1138 	                  "  new_ckpt_names=\"\"\n"
1139 	                  "  names=\"%s\"\n"
1140 	                  "  for tmp in $names; do\n"
1141 	                  "    new_ckpt_names=\"$DMTCP_RESTART_DIR/`basename $tmp` $new_ckpt_names\"\n"
1142 	                  "  done\n"
1143 	    	            "fi\n", o.str().c_str());
1144 	
1145 	    fprintf ( fp,
1146 	              "if [ ! -z \"$maybebatch\" ]; then\n"
1147 	              "  if [ ! -z \"$DMTCP_RESTART_DIR\" ]; then\n"
1148 	              "    exec $DMTCP_RESTART $maybebatch $maybejoin --interval \"$checkpoint_interval\"\\\n"
1149 	              "       $new_ckpt_names\n"
1150 	              "  else\n"
1151 	              "    exec $DMTCP_RESTART $maybebatch $maybejoin --interval \"$checkpoint_interval\"\\\n"
1152 	              "       %s\n"
1153 	              "  fi\n"
1154 	              "else\n"
1155 	              "  if [ ! -z \"$DMTCP_RESTART_DIR\" ]; then\n"
1156 	              "    exec $DMTCP_RESTART --host \"$coord_host\" --port \"$coord_port\" $maybebatch\\\n"
1157 	              "      $maybejoin --interval \"$checkpoint_interval\"\\\n"
1158 	              "        $new_ckpt_names\n"
1159 	              "  else\n"
1160 	              "    exec $DMTCP_RESTART --host \"$coord_host\" --port \"$coord_port\" $maybebatch\\\n"
1161 	              "      $maybejoin --interval \"$checkpoint_interval\"\\\n"
1162 	              "        %s\n"
1163 	              "  fi\n"
1164 	              "fi\n", o.str().c_str(), o.str().c_str() );
1165 	  }
1166 	  else
1167 	  {
1168 	    fprintf ( fp, "%s",
1169 	              "worker_ckpts_regexp=\'[^:]*::[ \\t\\n]*\\([^ \\t\\n]\\+\\)[ \\t\\n]*:\\([a-z]\\+\\):[ \\t\\n]*\\([^:]\\+\\)\'\n\n"
1170 	              "# SYNTAX:\n"
1171 	              "#  :: <HOST> :<MODE>: <CHECKPOINT_IMAGE> ...\n"
1172 	              "# Host names and filenames must not include \':\'\n"
1173 	              "# At most one fg (foreground) mode allowed; it must be last.\n"
1174 	              "# \'maybexterm\' and \'maybebg\' are set from <MODE>.\n"
1175 	              "worker_ckpts=\'" );
1176 	
1177 	    for ( host=_restartFilenames.begin(); host!=_restartFilenames.end(); ++host )
1178 	    {
1179 	      fprintf ( fp, "\n :: %s :bg:", host->first.c_str() );
1180 	      for ( file=host->second.begin(); file!=host->second.end(); ++file )
1181 	      {
1182 	        fprintf ( fp," %s", file->c_str() );
1183 	      }
1184 	    }
1185 	
1186 	    fprintf ( fp, "%s", "\n\'\n\n\n" );
1187 	
1188 	
1189 	    fprintf ( fp, "%s", theRestartScriptCmdlineArgHandler );
1190 	
1191 	    fprintf ( fp, "%s",
1192 	              "worker_hosts=\\\n"
1193 	              "`echo $worker_ckpts | sed -e \'s/\'\"$worker_ckpts_regexp\"\'/\\1 /g\'`\n"
1194 	              "restart_modes=\\\n"
1195 	              "`echo $worker_ckpts | sed -e \'s/\'\"$worker_ckpts_regexp\"\'/: \\2/g\'`\n"
1196 	              "ckpt_files_groups=\\\n"
1197 	              "`echo $worker_ckpts | sed -e \'s/\'\"$worker_ckpts_regexp\"\'/: \\3/g\'`\n"
1198 	              "\n"
1199 	              "if [ ! -z \"$hostfile\" ]; then\n"
1200 	              "  worker_hosts=`cat \"$hostfile\" | sed -e \'s/#.*//\' -e \'s/[ \\t\\r]*//\' -e \'/^$/ d\'`\n"
1201 	              "fi\n\n"
1202 	
1203 	              "localhost_ckpt_files_group=\n\n"
1204 	
1205 	              "num_worker_hosts=`echo $worker_hosts | wc -w`\n\n"
1206 	
1207 	              "maybejoin=\n"
1208 	              "if [ \"$num_worker_hosts\" != \"1\" ]; then\n"
1209 	              "  maybejoin='--join'\n"
1210 	              "fi\n\n"
1211 	
1212 	              "for worker_host in $worker_hosts\n"
1213 	              "do\n\n"
1214 	              "  ckpt_files_group=`echo $ckpt_files_groups | sed -e \'s/[^:]*:[ \\t\\n]*\\([^:]*\\).*/\\1/\'`\n"
1215 	              "  ckpt_files_groups=`echo $ckpt_files_groups | sed -e \'s/[^:]*:[^:]*//\'`\n"
1216 	              "\n"
1217 	              "  mode=`echo $restart_modes | sed -e \'s/[^:]*:[ \\t\\n]*\\([^:]*\\).*/\\1/\'`\n"
1218 	              "  restart_modes=`echo $restart_modes | sed -e \'s/[^:]*:[^:]*//\'`\n\n"
1219 	              "  maybexterm=\n"
1220 	              "  maybebg=\n"
1221 	              "  case $mode in\n"
1222 	              "    bg) maybebg=\'bg\';;\n"
1223 	              "    xterm) maybexterm=xterm;;\n"
1224 	              "    fg) ;;\n"
1225 	              "    *) echo \"WARNING: Unknown Mode\";;\n"
1226 	              "  esac\n\n"
1227 	              "  if [ -z \"$ckpt_files_group\" ]; then\n"
1228 	              "    break;\n"
1229 	              "  fi\n\n"
1230 	
1231 	              "  new_ckpt_files_group=\"\"\n"
1232 	              "  for tmp in $ckpt_files_group\n"
1233 	              "  do\n"
1234 	              "      if  [ ! -z \"$DMTCP_RESTART_DIR\" ]; then\n"
1235 	              "        tmp=$DMTCP_RESTART_DIR/`basename $tmp`\n"
1236 	              "      fi\n"
1237 	              "      new_ckpt_files_group=\"$new_ckpt_files_group $tmp\"\n"
1238 	              "  done\n\n"
1239 	
1240 	              "  if [ `hostname` == \"$worker_host\" -o \"$num_worker_hosts\" == \"1\" ]; then\n"
1241 	              "    localhost_ckpt_files_group=\"$new_ckpt_files_group\"\n"
1242 	              "    continue\n"
1243 	              "  fi\n\n"
1244 	
1245 	              "  if [ -z $maybebg ]; then\n"
1246 	              "    $maybexterm /usr/bin/ssh -t \"$worker_host\" \\\n"
1247 	              "      "DMTCP_RESTART_CMD" --host \"$coord_host\" --port \"$coord_port\" $maybebatch\\\n"
1248 	              "        --join --interval \"$checkpoint_interval\" $new_ckpt_files_group\n"
1249 	              "  else\n"
1250 	              "    $maybexterm /usr/bin/ssh \"$worker_host\" \\\n"
1251 	              // In OpenMPI 1.4, without this (sh -c ...), orterun hangs at the
1252 	              // end of the computation until user presses enter key.
1253 	              "      \"/bin/sh -c \'"DMTCP_RESTART_CMD" --host $coord_host --port $coord_port $maybebatch\\\n"
1254 	              "        --join --interval \"$checkpoint_interval\" $new_ckpt_files_group\'\" &\n"
1255 	              "  fi\n\n"
1256 	              "done\n\n");
1257 	
1258 	    fprintf ( fp, "DMTCP_RESTART=dmtcp_restart\n" );
1259 	    fprintf ( fp, "which dmtcp_restart > /dev/null \\\n" \
1260 			  " || DMTCP_RESTART=`dirname %s`/dmtcp_restart\n\n", argv0 );
1261 	
1262 	    fprintf ( fp, "%s",
1263 	              "if [ -n \"$localhost_ckpt_files_group\" ]; then\n"
1264 	              "exec dmtcp_restart --host \"$coord_host\" --port \"$coord_port\" $maybebatch\\\n"
1265 	              "  $maybejoin --interval \"$checkpoint_interval\" $localhost_ckpt_files_group\n"
1266 	              "fi\n\n"
1267 	
1268 	
1269 	              "#wait for them all to finish\n"
1270 	              "wait\n");
1271 	  }
1272 	
1273 	  fclose ( fp );
1274 	  {
1275 	    /* Set execute permission for user. */
1276 	    struct stat buf;
1277 	    stat ( uniqueFilename.c_str(), &buf );
1278 	    chmod ( uniqueFilename.c_str(), buf.st_mode | S_IXUSR );
1279 	    // Create a symlink from
1280 	    //   dmtcp_restart_script.sh -> dmtcp_restart_script_<curCompId>.sh
1281 	    unlink ( filename.c_str() );
1282 	    // FIXME:  Handle error case of symlink()
1283 	    JWARNING( 0 == symlink ( uniqueFilename.c_str(), filename.c_str() ) );
1284 	  }
1285 	  _restartFilenames.clear();
1286 	}
1287 	
1288 	static void SIGINTHandler(int signum)
1289 	{
1290 	  prog.handleUserCommand('q');
1291 	}
1292 	
1293 	static void setupSIGINTHandler()
1294 	{
1295 	  struct sigaction action;
1296 	  action.sa_handler = SIGINTHandler;
1297 	  sigemptyset ( &action.sa_mask );
1298 	  action.sa_flags = 0;
1299 	  sigaction ( SIGINT, &action, NULL );
1300 	}
1301 	
1302 	#define shift argc--; argv++
1303 	
1304 	int main ( int argc, char** argv )
1305 	{
1306 	  argv0 = argv[0];
1307 	  dmtcp::DmtcpMessage::setDefaultCoordinator ( dmtcp::UniquePid::ThisProcess() );
1308 	
1309 	  //parse port
1310 	  thePort = DEFAULT_PORT;
1311 	  const char* portStr = getenv ( ENV_VAR_NAME_PORT );
1312 	  if ( portStr != NULL ) thePort = jalib::StringToInt ( portStr );
1313 	
1314 	  bool background = false;
1315 	
1316 	  shift;
1317 	  while(argc > 0){
1318 	    dmtcp::string s = argv[0];
1319 	    if(s=="-h" || s=="--help"){
1320 	      fprintf(stderr, theUsage, DEFAULT_PORT);
1321 	      return 1;
1322 	    }else if(s=="--exit-on-last"){
1323 	      exitOnLast = true;
1324 	      shift;
1325 	    }else if(s=="--background"){
1326 	      background = true;
1327 	      shift;
1328 	    }else if(s=="--batch"){
1329 	      batchMode = true;
1330 	      shift;
1331 	    }else if(argc>1 && (s == "-i" || s == "--interval")){
1332 	      setenv(ENV_VAR_CKPT_INTR, argv[1], 1);
1333 	      shift; shift;
1334 	    }else if(argc>1 && (s == "-p" || s == "--port")){
1335 	      thePort = jalib::StringToInt( argv[1] );
1336 	      shift; shift;
1337 	    }else if(argc>1 && (s == "-c" || s == "--ckptdir")){
1338 	      setenv(ENV_VAR_CHECKPOINT_DIR, argv[1], 1);
1339 	      shift; shift;
1340 	    }else if(argc>1 && (s == "-t" || s == "--tmpdir")){
1341 	      setenv(ENV_VAR_TMPDIR, argv[1], 1);
1342 	      shift; shift;
1343 	    }else if(argc == 1){ //last arg can be port
1344 	      thePort = jalib::StringToInt( argv[0] );
1345 	      shift;
1346 	    }else{
1347 	      fprintf(stderr, theUsage, DEFAULT_PORT);
1348 	      return 1;
1349 	    }
1350 	  }
1351 	
1352 	  JASSERT ( ! (background && batchMode) )
1353 	    .Text ( "--background and --batch can't be specified together");
1354 	
1355 	  dmtcp::UniquePid::setTmpDir(getenv(ENV_VAR_TMPDIR));
1356 	
1357 	#ifdef DEBUG
1358 	  /* Disable Jassert Logging */
1359 	  dmtcp::UniquePid::ThisProcess(true);
1360 	
1361 	  dmtcp::ostringstream o;
1362 	  o << dmtcp::UniquePid::getTmpDir() << "/jassertlog."
1363 	    << dmtcp::UniquePid::ThisProcess();
1364 	  JASSERT_INIT(o.str());
1365 	  JTRACE ( "New DMTCP coordinator starting." );
1366 	
1367 	  JTRACE ( "recalculated process UniquePid..." )
1368 		 ( dmtcp::UniquePid::ThisProcess() );
1369 	#endif
1370 	
1371 	  if ( thePort < 0 )
1372 	  {
1373 	    fprintf(stderr, theUsage, DEFAULT_PORT);
1374 	    return 1;
1375 	  }
1376 	
1377 	  jalib::JServerSocket* sock;
1378 	  /*Test if the listener socket is already open*/
1379 	  if ( fcntl(PROTECTEDFD(1), F_GETFD) != -1 ) {
1380 	    sock = new jalib::JServerSocket ( PROTECTEDFD(1) );
1381 	    JASSERT ( sock->port() != -1 ) .Text ( "Invalid listener socket" );
1382 	    JTRACE ( "Using already created listener socker" ) ( sock->port() );
1383 	  } else {
1384 	
1385 	    errno = 0;
1386 	    sock = new jalib::JServerSocket ( jalib::JSockAddr::ANY, thePort );
1387 	    JASSERT ( sock->isValid() ) ( thePort ) ( JASSERT_ERRNO )
1388 	      .Text ( "Failed to create listen socket."
1389 	       "\nIf msg is \"Address already in use\", this may be an old coordinator."
1390 	       "\nKill default coordinator and try again:  dmtcp_command -q"
1391 	       "\nIf that fails, \"pkill -9 dmtcp_coord\","
1392 		" and try again in a minute or so." );
1393 	  }
1394 	
1395 	  thePort = sock->port();
1396 	
1397 	  if ( batchMode && getenv ( ENV_VAR_CKPT_INTR ) == NULL ) {
1398 	    setenv(ENV_VAR_CKPT_INTR, "3600", 1);
1399 	  }
1400 	  //parse checkpoint interval
1401 	  const char* interval = getenv ( ENV_VAR_CKPT_INTR );
1402 	  if ( interval != NULL )
1403 	    theCheckpointInterval = jalib::StringToInt ( interval );
1404 	
1405 	#if 0
1406 	  JASSERT_STDERR <<
1407 	    "dmtcp_coordinator starting..." <<
1408 	    "\n    Port: " << thePort <<
1409 	    "\n    Checkpoint Interval: ";
1410 	  if(theCheckpointInterval==0)
1411 	    JASSERT_STDERR << "disabled (checkpoint manually instead)";
1412 	  else
1413 	    JASSERT_STDERR << theCheckpointInterval;
1414 	  JASSERT_STDERR  <<
1415 	    "\n    Exit on last client: " << exitOnLast << "\n";
1416 	#else
1417 	    fprintf(stderr, "dmtcp_coordinator starting..."
1418 	    "\n    Port: %d"
1419 	    "\n    Checkpoint Interval: ", thePort);
1420 	  if(theCheckpointInterval==0)
1421 	    fprintf(stderr, "disabled (checkpoint manually instead)");
1422 	  else
1423 	    fprintf(stderr, "%d", theCheckpointInterval);
1424 	  fprintf(stderr, "\n    Exit on last client: %d\n", exitOnLast);
1425 	#endif
1426 	
1427 	  if(background){
1428 	    JASSERT_STDERR  << "Backgrounding...\n";
1429 	    JASSERT(dup2(open("/dev/null",O_RDWR), 0)==0);
1430 	    fflush(stdout);
1431 	    JASSERT(close(1)==0);
1432 	    JASSERT(open("/dev/null", O_WRONLY)==1);
1433 	    fflush(stderr);
1434 	    if (close(2) != 0 || dup2(1,2) != 2)
1435 	      JASSERT(false) .Text( "Can't print to stderr");
1436 	    close(JASSERT_STDERR_FD);
1437 	    dup2(2, JASSERT_STDERR_FD);
1438 	    if(fork()>0){
1439 	      JTRACE ( "Parent Exiting after fork()" );
1440 	      exit(0);
1441 	    }
1442 	    pid_t sid = setsid();
1443 	  } else if ( batchMode ) {
1444 	    JASSERT_STDERR  << "Going into Batch Mode...\n";
1445 	    close(0);
1446 	    close(1);
1447 	    close(2);
1448 	    close(JASSERT_STDERR_FD);
1449 	
1450 	    JASSERT(open("/dev/null", O_WRONLY)==0);
1451 	
1452 	    JASSERT(dup2(0, 1) == 1);
1453 	    JASSERT(dup2(0, 2) == 2);
1454 	    JASSERT(dup2(0, JASSERT_STDERR_FD) == JASSERT_STDERR_FD);
1455 	
1456 	  } else {
1457 	    JASSERT_STDERR  <<
1458 	      "Type '?' for help." <<
1459 	      "\n\n";
1460 	  }
1461 	
1462 	  /* We setup the signal handler for SIGINT so that it would send the
1463 	   * DMT_KILL_PEER message to all the connected peers before exiting.
1464 	   */
1465 	  setupSIGINTHandler();
1466 	  prog.addListenSocket ( *sock );
1467 	  if(!background && !batchMode)
1468 	    prog.addDataSocket ( new jalib::JChunkReader ( STDIN_FD , 1 ) );
1469 	
1470 	  // FIXME: Should we use a default checkpoint interval (1 hour in this case)
1471 	  //        even if the user has not explicitely requested it.
1472 	  if ( theCheckpointInterval <= 0 ) theCheckpointInterval = 3600;
1473 	  prog.monitorSockets ( theCheckpointInterval );
1474 	  return 0;
1475 	}