1    	/****************************************************************************
2    	 *   Copyright (C) 2006-2010 by Jason Ansel, Kapil Arya, and Gene Cooperman *
3    	 *   jansel@csail.mit.edu, kapil@ccs.neu.edu, gene@ccs.neu.edu              *
4    	 *                                                                          *
5    	 *   This file is part of the dmtcp/src module of DMTCP (DMTCP:dmtcp/src).  *
6    	 *                                                                          *
7    	 *  DMTCP:dmtcp/src is free software: you can redistribute it and/or        *
8    	 *  modify it under the terms of the GNU Lesser General Public License as   *
9    	 *  published by the Free Software Foundation, either version 3 of the      *
10   	 *  License, or (at your option) any later version.                         *
11   	 *                                                                          *
12   	 *  DMTCP:dmtcp/src is distributed in the hope that it will be useful,      *
13   	 *  but WITHOUT ANY WARRANTY; without even the implied warranty of          *
14   	 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the           *
15   	 *  GNU Lesser General Public License for more details.                     *
16   	 *                                                                          *
17   	 *  You should have received a copy of the GNU Lesser General Public        *
18   	 *  License along with DMTCP:dmtcp/src.  If not, see                        *
19   	 *  <http://www.gnu.org/licenses/>.                                         *
20   	 ****************************************************************************/
21   	
22   	#include "dmtcpworker.h"
23   	#include "constants.h"
24   	#include  "../jalib/jconvert.h"
25   	#include  "../jalib/jalloc.h"
26   	#include "dmtcpmessagetypes.h"
27   	#include <stdlib.h>
28   	#include "mtcpinterface.h"
29   	#include <unistd.h>
30   	#include "sockettable.h"
31   	#include  "../jalib/jsocket.h"
32   	#include <map>
33   	#include "kernelbufferdrainer.h"
34   	#include  "../jalib/jfilesystem.h"
35   	#include "syscallwrappers.h"
36   	#include "protectedfds.h"
37   	#include "connectionidentifier.h"
38   	#include "connectionmanager.h"
39   	#include "connectionstate.h"
40   	#include "dmtcp_coordinator.h"
41   	#include "sysvipc.h"
42   	#include <signal.h>
43   	#include <pthread.h>
44   	#include <sys/types.h>
45   	#include <sys/stat.h>
46   	#include <fcntl.h>
47   	#include <sys/wait.h>
48   	#include <sys/time.h>
49   	#include <sys/resource.h>
50   	#include <sys/personality.h>
51   	
52   	
53   	/* Read-write lock initializers.  */
54   	#ifdef __USE_GNU
55   	# if __WORDSIZE == 64
56   	#  define PTHREAD_RWLOCK_PREFER_WRITER_RECURSIVE_INITIALIZER_NP \
57   	 { { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,					      \
58   	       PTHREAD_RWLOCK_PREFER_WRITER_NP } }
59   	# else
60   	#  if __BYTE_ORDER == __LITTLE_ENDIAN
61   	#   define PTHREAD_RWLOCK_PREFER_WRITER_RECURSIVE_INITIALIZER_NP \
62   	 { { 0, 0, 0, 0, 0, 0, PTHREAD_RWLOCK_PREFER_WRITER_NP, \
63   	     0, 0, 0, 0 } }
64   	#  else
65   	#   define PTHREAD_RWLOCK_PREFER_WRITER_RECURSIVE_INITIALIZER_NP \
66   	 { { 0, 0, 0, 0, 0, 0, 0, 0, 0, PTHREAD_RWLOCK_PREFER_WRITER_NP,\
67   	     0 } }
68   	#  endif
69   	# endif
70   	#endif
71   	
72   	
73   	static pthread_mutex_t theCkptCanStart = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
74   	static pthread_mutex_t destroyDmtcpWorker = PTHREAD_MUTEX_INITIALIZER;
75   	
76   	/*
77   	 * WrapperProtectionLock is used to make the checkpoint safe by making sure
78   	 *   that no user-thread is executing any DMTCP wrapper code when it receives
79   	 *   the checkpoint signal.
80   	 * Working:
81   	 *   On entering the wrapper in DMTCP, the user-thread acquires the read lock,
82   	 *     and releases it before leaving the wrapper.
83   	 *   When the Checkpoint-thread wants to send the SUSPEND signal to user
84   	 *     threads, it must acquire the write lock. It is blocked until all the
85   	 *     existing read-locks by user threads have been released. NOTE that this
86   	 *     is a WRITER-PREFERRED lock.
87   	 *
88   	 * There is a corner case too -- the newly created thread that has not been
89   	 *   initialized yet; we need to take some extra efforts for that.
90   	 * Here are the steps to handle the newly created uninitialized thread:
91   	 *   A counter for the number of newly created uninitialized threads is kept.
92   	 *     The counter is made thread safe by using a mutex.
93   	 *   The calling thread (parent) increments the counter before calling clone.
94   	 *   The newly created child thread decrements the counter at the end of
95   	 *     initialization in MTCP/DMTCP.
96   	 *   After acquiring the Write lock, the checkpoint thread waits until the
97   	 *     number of uninitialized threads is zero. At that point, no thread is
98   	 *     executing in the clone wrapper and it is safe to do a checkpoint.
99   	 *
100  	 * XXX: Currently this security is provided only for the clone wrapper; this
101  	 * should be extended to other calls as well.           -- KAPIL
102  	 */
103  	static pthread_rwlock_t theWrapperExecutionLock = PTHREAD_RWLOCK_WRITER_NONRECURSIVE_INITIALIZER_NP;
104  	static pthread_mutex_t unInitializedThreadCountLock = PTHREAD_MUTEX_INITIALIZER;
105  	static int unInitializedThreadCount = 0;
106  	static dmtcp::UniquePid compGroup;
107  	
108  	// static dmtcp::KernelBufferDrainer* theDrainer = NULL;
109  	static dmtcp::ConnectionState* theCheckpointState = NULL;
110  	
111  	#ifdef EXTERNAL_SOCKET_HANDLING
112  	static dmtcp::vector <dmtcp::TcpConnectionInfo> theTcpConnections;
113  	dmtcp::vector <dmtcp::ConnectionIdentifier> externalTcpConnections;
114  	static bool _waitingForExternalSocketsToClose = false;
115  	#endif
116  	
117  	static int theRestorePort = RESTORE_PORT_START;
118  	
119  	bool dmtcp::DmtcpWorker::_exitInProgress = false;
120  	
121  	void processDmtcpCommands(dmtcp::string programName);
122  	static void processSshCommand(dmtcp::string programName);
123  	
124  	void dmtcp::DmtcpWorker::useAlternateCoordinatorFd(){
125  	  _coordinatorSocket = jalib::JSocket( PROTECTEDFD( 4 ) );
126  	}
127  	
128  	const unsigned int dmtcp::DmtcpWorker::ld_preload_c_len;
129  	char dmtcp::DmtcpWorker::ld_preload_c[dmtcp::DmtcpWorker::ld_preload_c_len];
130  	
131  	bool _checkpointThreadInitialized = false;
132  	void restoreUserLDPRELOAD()
133  	{
134  	  // We have now successfully used LD_PRELOAD to execute prior to main()
135  	  // Next, hide our value of LD_PRELOAD, in a global variable.
136  	  // At checkpoint and restart time, we will no longer need our LD_PRELOAD.
137  	  // We will need it in only one place:
138  	  //  when the user application makes an exec call:
139  	  //   If anybody calls our execwrapper, we will reset LD_PRELOAD then.
140  	  //   If they directly call _real_execve to get libc symbol, they will
141  	  //   not be part of DMTCP computation.
142  	  // This has the advantage that our value of LD_PRELOAD will always come
143  	  //   before any paths set by user application.
144  	  // Also, bash likes to keep its own envp, but we will interact with bash only
145  	  //   within the exec wrapper.
146  	  // NOTE:  If the user called exec("ssh ..."), we currently catch this in
147  	  //   DmtcpWorker() due to LD_PRELOAD, unset LD_PRELOAD, and edit this into
148  	  //   exec("dmtcp_checkpoint --ssh-slave ... ssh ..."), and re-execute.
149  	  //   This way, we will unset LD_PRELOAD here and now, instead of at that time.
150  	  char * preload =  getenv("LD_PRELOAD");
151  	  char * preload_rest = strstr(preload, ":");
152  	  if (preload_rest) {
153  	    *preload_rest = '\0'; // Now preload is just our preload string
154  	    preload_rest++;
155  	  }
156  	  JTRACE("LD_PRELOAD")(preload);
157  	  JASSERT(strlen(preload) < dmtcp::DmtcpWorker::ld_preload_c_len)
158  		 (preload) (dmtcp::DmtcpWorker::ld_preload_c_len)
159  		 .Text("preload string is longer than ld_preload_c_len");
160  	  strcpy(dmtcp::DmtcpWorker::ld_preload_c, preload);  // Don't malloc
161  	  if (preload_rest) {
162  	    setenv("LD_PRELOAD", preload_rest, 1);
163  	  } else {
164  	    _dmtcp_unsetenv("LD_PRELOAD");
165  	  }
166  	}
167  	
168  	#include "../../mtcp/mtcp.h" //for MTCP_DEFAULT_SIGNAL
169  	
170  	// This shold be visible to library only.  DmtcpWorker will call
171  	//   this to initialize tmp (ckpt signal) at startup time.  This avoids
172  	//   any later calls to getenv(), at which time the user app may have
173  	//   a wrapper around getenv, modified environ, or other tricks.
174  	//   (Matlab needs this or else it segfaults on restart, and bash plays
175  	//   similar tricks with maintaining its own environmnet.)
176  	// Used in mtcpinterface.cpp and signalwrappers.cpp.
177  	__attribute__ ((visibility ("hidden")))
178  	int _determineMtcpSignal(){
179  	  // this mimics the MTCP logic for determining signal number found in
180  	  // mtcp_init()
181  	  int sig = MTCP_DEFAULT_SIGNAL;
182  	  char* endp = NULL;
183  	  static const char* tmp = getenv("MTCP_SIGCKPT");
184  	  if(tmp != NULL){
185  	      sig = strtol(tmp, &endp, 0);
186  	      if((errno != 0) || (tmp == endp))
187  	        sig = MTCP_DEFAULT_SIGNAL;
188  	      if(sig < 1 || sig > 31)
189  	        sig = MTCP_DEFAULT_SIGNAL;
190  	  }
191  	  return sig;
192  	}
193  	
194  	//called before user main()
195  	//workerhijack.cpp initializes a static variable theInstance to DmtcpWorker obj
196  	dmtcp::DmtcpWorker::DmtcpWorker ( bool enableCheckpointing )
197  	    :_coordinatorSocket ( PROTECTEDFD ( 1 ) )
198  	    ,_restoreSocket ( PROTECTEDFD ( 3 ) )
199  	{
200  	  if ( !enableCheckpointing ) return;
201  	
202  	  WorkerState::setCurrentState( WorkerState::UNKNOWN);
203  	
204  	  /* DO NOT PUT ANYTHING BEFORE THE FOLLOWING BLOCK OF CODE (#ifdef .... #endif) */
205  	#ifdef DEBUG
206  	  /* Disable Jassert Logging */
207  	  dmtcp::UniquePid::ThisProcess(true);
208  	
209  	  dmtcp::ostringstream o;
210  	  o << dmtcp::UniquePid::getTmpDir() << "/jassertlog." << dmtcp::UniquePid::ThisProcess()
211  	    << "_" << jalib::Filesystem::GetProgramName();
212  	  JASSERT_INIT (o.str());
213  	
214  	  JTRACE ( "recalculated process UniquePid..." ) ( dmtcp::UniquePid::ThisProcess() );
215  	#endif
216  	
217  	  //This is called for side effect only.  Force this function to call
218  	  // getenv("MTCP_SIGCKPT") now and cache it to avoid getenv calls later.
219  	  _determineMtcpSignal();
220  	
221  	#ifdef __i386__
222  	  // Match work begun in dmtcpPrepareForExec()
223  	# if 0
224  	  if (getenv("DMTCP_ADDR_COMPAT_LAYOUT")) {
225  	    _dmtcp_unsetenv("DMTCP_ADDR_COMPAT_LAYOUT");
226  	    // DMTCP had set ADDR_COMPAT_LAYOUT.  Now unset it.
227  	    personality( (unsigned long)personality(0xffffffff) ^ ADDR_COMPAT_LAYOUT );
228  	    JTRACE( "unsetting ADDR_COMPAT_LAYOUT" );
229  	  }
230  	# else
231  	  { char * rlim_cur_char = getenv("DMTCP_RLIMIT_STACK");
232  	    if ( rlim_cur_char != NULL ) {
233  	      struct rlimit rlim;
234  	      getrlimit(RLIMIT_STACK, &rlim);
235  	      rlim.rlim_cur = atol(rlim_cur_char);
236  	      JTRACE ( "rlim_cur for RLIMIT_STACK being restored." ) ( rlim.rlim_cur );
237  	      setrlimit(RLIMIT_STACK, &rlim);
238  	      _dmtcp_unsetenv("DMTCP_RLIMIT_STACK");
239  	    }
240  	  }
241  	# endif
242  	#endif
243  	
244  	  if ( getenv(ENV_VAR_UTILITY_DIR) == NULL ) {
245  	    JNOTE ( "\n **** Not checkpointing this process,"
246  	            " due to missing environment var ****" )
247  	          ( getenv(ENV_VAR_UTILITY_DIR) )
248  	          ( jalib::Filesystem::GetProgramName() );
249  	    return;
250  	  }
251  	  if (! getenv(ENV_VAR_QUIET))
252  	    setenv(ENV_VAR_QUIET, "0", 0);
253  	  jassert_quiet = *getenv(ENV_VAR_QUIET) - '0';
254  	
255  	
256  	  JTRACE ( "dmtcphijack.so:  Running " ) ( jalib::Filesystem::GetProgramName() )                                         ( getenv ( "LD_PRELOAD" ) );
257  	  JTRACE ( "dmtcphijack.so:  Child of pid " ) ( getppid() );
258  	
259  	  dmtcp::string programName = jalib::Filesystem::GetProgramName();
260  	
261  	  if ( programName == "dmtcp_coordinator"  ||
262  	       programName == "dmtcp_checkpoint"   ||
263  	       programName == "dmtcp_restart"      ||
264  	       programName == "dmtcp_command"      ||
265  	       programName == "mtcp_restart" ) {
266  	    processDmtcpCommands(programName);
267  	  }
268  	  if ( programName == "ssh" ) {
269  	    processSshCommand(programName);
270  	  }
271  	
272  	  WorkerState::setCurrentState ( WorkerState::RUNNING );
273  	
274  	  const char* serialFile = getenv( ENV_VAR_SERIALFILE_INITIAL );
275  	  if ( serialFile != NULL )
276  	  {
277  	    JTRACE ( "loading initial socket table from file..." ) ( serialFile );
278  	
279  	    jalib::JBinarySerializeReader rd ( serialFile );
280  	    UniquePid::serialize ( rd );
281  	    KernelDeviceToConnection::instance().serialize ( rd );
282  	
283  	#ifdef PID_VIRTUALIZATION
284  	    VirtualPidTable::instance().serialize ( rd );
285  	    VirtualPidTable::instance().postExec();
286  	#endif
287  	    SysVIPC::instance().serialize ( rd );
288  	
289  	#ifdef DEBUG
290  	    JTRACE ( "initial socket table:" );
291  	    KernelDeviceToConnection::instance().dbgSpamFds();
292  	#endif
293  	
294  	    _dmtcp_unsetenv ( ENV_VAR_SERIALFILE_INITIAL );
295  	  }
296  	  else
297  	  {
298  	    JTRACE ( "root of processes tree, checking for pre-existing sockets" );
299  	
300  	#ifdef PID_VIRTUALIZATION
301  	    if ( getenv( ENV_VAR_ROOT_PROCESS ) != NULL ) {
302  	      dmtcp::VirtualPidTable::instance().setRootOfProcessTree();
303  	      _dmtcp_unsetenv( ENV_VAR_ROOT_PROCESS );
304  	    }
305  	#endif
306  	
307  	    ConnectionList::instance().scanForPreExisting();
308  	  }
309  	
310  	  connectToCoordinatorWithHandshake();
311  	
312  	  WorkerState::setCurrentState ( WorkerState::RUNNING );
313  	
314  	  /* Acquire the lock here, so that the checkpoint-thread won't be able to
315  	   * process CHECKPOINT request until we are done with initializeMtcpEngine()
316  	   */
317  	  WRAPPER_EXECUTION_DISABLE_CKPT();
318  	  initializeMtcpEngine();
319  	  WRAPPER_EXECUTION_ENABLE_CKPT();
320  	
321  	  /* 
322  	   * Now wait for Checkpoint Thread to finish initialization 
323  	   * XXX: This should be the last thing in this constructor
324  	   */
325  	  while (!_checkpointThreadInitialized) {
326  	    struct timespec sleepTime = {0, 10*1000*1000};
327  	    nanosleep(&sleepTime, NULL);
328  	  }
329  	}
330  	
331  	void dmtcp::DmtcpWorker::cleanupWorker()
332  	{
333  	  pthread_rwlock_t newLock = PTHREAD_RWLOCK_WRITER_NONRECURSIVE_INITIALIZER_NP;
334  	  theWrapperExecutionLock = newLock;
335  	
336  	  pthread_mutex_t newCountLock = PTHREAD_MUTEX_INITIALIZER;
337  	  unInitializedThreadCountLock = newCountLock;
338  	
339  	  pthread_mutex_t newDestroyDmtcpWorker = PTHREAD_MUTEX_INITIALIZER;
340  	  destroyDmtcpWorker = newDestroyDmtcpWorker;
341  	
342  	  unInitializedThreadCount = 0;
343  	  WorkerState::setCurrentState( WorkerState::UNKNOWN);
344  	  JTRACE ( "disconnecting from dmtcp coordinator" );
345  	  _coordinatorSocket.close();
346  	}
347  	
348  	void dmtcp::DmtcpWorker::interruptCkpthread()
349  	{
350  	  if (pthread_mutex_trylock(&destroyDmtcpWorker) == EBUSY) {
351  	    killCkpthread();
352  	    JASSERT(pthread_mutex_lock(&destroyDmtcpWorker) == 0) (JASSERT_ERRNO);
353  	  }
354  	}
355  	
356  	//called after user main()
357  	dmtcp::DmtcpWorker::~DmtcpWorker()
358  	{
359  	
360  	  if( exitInProgress() ){
361  	    /*
362  	     * Exit race fixed. If the user threads calls exit(), ~DmtcpWorker() is
363  	     * called.  Now if the ckpt-thread is trying to use DmtcpWorker object
364  	     * while it is being destroyed, there is a problem.
365  	     *
366  	     * The fix here is to raise the flag exitInProgress in the exit() system
367  	     * call wrapper. Later in ~DmtcpWorker() we check if the flag has been
368  	     * raised or not.  If the exitInProgress flag has been raised, it closes
369  	     * the coordinator socket and tries to acquire destroyDmtcpWorker mutex.
370  	     *
371  	     * The ckpt-thread tries to acquire the destroyDmtcpWorker mutex before
372  	     * writing/reading any message to/from coordinator socket while the user
373  	     * threads are running (i.e. messages like DMT_SUSPEND, DMT_SUSPENDED
374  	     * etc.)_. If it fails to acquire the lock, it verifies that the
375  	     * exitInProgress has been raised and performs pthread_exit().
376  	     *
377  	     * As obvious, once the user threads have been suspended the ckpt-thread
378  	     *  releases the destroyDmtcpWorker() mutex and continues normal execution.
379  	     */
380  	    JTRACE ( "exit() in progress, disconnecting from dmtcp coordinator" );
381  	    _coordinatorSocket.close();
382  	    interruptCkpthread();
383  	  }
384  	  cleanupWorker();
385  	}
386  	
387  	void processDmtcpCommands(dmtcp::string programName)
388  	{
389  	  JASSERT ( programName == "dmtcp_coordinator"  ||
390  	      programName == "dmtcp_checkpoint"   ||
391  	      programName == "dmtcp_restart"      ||
392  	      programName == "dmtcp_command"      ||
393  	      programName == "mtcp_restart" );
394  	
395  	  //make sure coordinator connection is closed
396  	  _real_close ( PROTECTEDFD ( 1 ) );
397  	
398  	  //get program args
399  	  dmtcp::vector<dmtcp::string> args = jalib::Filesystem::GetProgramArgs();
400  	
401  	  //now repack args
402  	  char** argv = new char*[args.size() + 1];
403  	  memset ( argv, 0, sizeof ( char* ) * ( args.size() + 1 ) );
404  	
405  	  for ( size_t i=0; i< args.size(); ++i ) {
406  	    argv[i] = ( char* ) args[i].c_str();
407  	  }
408  	
409  	  JNOTE ( "re-running without checkpointing" ) ( programName );
410  	
411  	  //now re-call the command
412  	  restoreUserLDPRELOAD();
413  	  _real_execvp ( jalib::Filesystem::GetProgramPath().c_str(), argv );
414  	
415  	  //should be unreachable
416  	  JASSERT ( false ) (jalib::Filesystem::GetProgramPath()) ( argv[0] )
417  	    ( JASSERT_ERRNO ) .Text ( "exec() failed" );
418  	}
419  	
420  	static void processSshCommand(dmtcp::string programName)
421  	{
422  	  JASSERT ( jalib::Filesystem::GetProgramName() == "ssh" );
423  	  //make sure coordinator connection is closed
424  	  _real_close ( PROTECTEDFD ( 1 ) );
425  	
426  	  //get prog args
427  	  dmtcp::vector<dmtcp::string> args = jalib::Filesystem::GetProgramArgs();
428  	  JASSERT ( args.size() >= 3 ) ( args.size() ).Text ( "ssh must have at least 3 args to be wrapped (ie: ssh host cmd)" );
429  	
430  	  //find command part
431  	  size_t commandStart = 2;
432  	  for ( size_t i = 1; i < args.size(); ++i )
433  	  {
434  	    if ( args[i][0] != '-' )
435  	    {
436  	      commandStart = i + 1;
437  	      break;
438  	    }
439  	  }
440  	  JASSERT ( commandStart < args.size() && args[commandStart][0] != '-' )
441  	    ( commandStart ) ( args.size() ) ( args[commandStart] )
442  	    .Text ( "failed to parse ssh command line" );
443  	
444  	  //find the start of the command
445  	  dmtcp::string& cmd = args[commandStart];
446  	
447  	
448  	  const char * coordinatorAddr      = getenv ( ENV_VAR_NAME_ADDR );
449  	  const char * coordinatorPortStr   = getenv ( ENV_VAR_NAME_PORT );
450  	  const char * sigckpt              = getenv ( ENV_VAR_SIGCKPT );
451  	  const char * compression          = getenv ( ENV_VAR_COMPRESSION );
452  	  const char * ckptOpenFiles        = getenv ( ENV_VAR_CKPT_OPEN_FILES );
453  	  const char * ckptDir              = getenv ( ENV_VAR_CHECKPOINT_DIR );
454  	  const char * tmpDir               = getenv ( ENV_VAR_TMPDIR );
455  	  jassert_quiet                     = *getenv ( ENV_VAR_QUIET ) - '0';
456  	
457  	  //modify the command
458  	
459  	  //dmtcp::string prefix = "env ";
460  	
461  	  dmtcp::string prefix = DMTCP_CHECKPOINT_CMD " --ssh-slave ";
462  	
463  	
464  	  if ( coordinatorAddr != NULL )
465  	    prefix += dmtcp::string() + "--host " + coordinatorAddr    + " ";
466  	  if ( coordinatorPortStr != NULL )
467  	    prefix += dmtcp::string() + "--port " + coordinatorPortStr + " ";
468  	  if ( sigckpt != NULL )
469  	    prefix += dmtcp::string() + "--mtcp-checkpoint-signal "    + sigckpt + " ";
470  	  if ( ckptDir != NULL )
471  	    prefix += dmtcp::string() + "--ckptdir " + ckptDir         + " ";
472  	  if ( tmpDir != NULL )
473  	    prefix += dmtcp::string() + "--tmpdir " + tmpDir           + " ";
474  	  if ( ckptOpenFiles != NULL )
475  	    prefix += dmtcp::string() + "--checkpoint-open-files"      + " ";
476  	
477  	  if ( compression != NULL ) {
478  	    if ( strcmp ( compression, "0" ) == 0 )
479  	      prefix += "--no-gzip ";
480  	    else
481  	      prefix += "--gzip ";
482  	  }
483  	
484  	  // process command
485  	  size_t semipos, pos;
486  	  size_t actpos = dmtcp::string::npos;
487  	  for(semipos = 0; (pos = cmd.find(';',semipos+1)) != dmtcp::string::npos;
488  	      semipos = pos, actpos = pos);
489  	
490  	  if( actpos > 0 && actpos != dmtcp::string::npos ){
491  	    cmd = cmd.substr(0,actpos+1) + prefix + cmd.substr(actpos+1);
492  	  } else {
493  	    cmd = prefix + cmd;
494  	  }
495  	
496  	  //now repack args
497  	  dmtcp::string newCommand = "";
498  	  char** argv = new char*[args.size() +2];
499  	  memset ( argv,0,sizeof ( char* ) * ( args.size() +2 ) );
500  	
501  	  for ( size_t i=0; i< args.size(); ++i )
502  	  {
503  	    argv[i] = ( char* ) args[i].c_str();
504  	    newCommand += args[i] + ' ';
505  	  }
506  	
507  	  JNOTE ( "re-running SSH with checkpointing" ) ( newCommand );
508  	
509  	  restoreUserLDPRELOAD();
510  	  //now re-call ssh
511  	  _real_execvp ( argv[0], argv );
512  	
513  	  //should be unreachable
514  	  JASSERT ( false ) ( cmd ) ( JASSERT_ERRNO ).Text ( "exec() failed" );
515  	}
516  	
517  	
518  	const dmtcp::UniquePid& dmtcp::DmtcpWorker::coordinatorId() const
519  	{
520  	  return _coordinatorId;
521  	}
522  	
523  	
524  	void dmtcp::DmtcpWorker::waitForCoordinatorMsg(dmtcp::string signalStr,
525  	                                               DmtcpMessageType type )
526  	{
527  	  if ( type == DMT_DO_SUSPEND ) {
528  	    if ( pthread_mutex_trylock(&destroyDmtcpWorker) != 0 ) {
529  	      JTRACE ( "User thread is performing exit()."
530  	               " ckpt thread exit()ing as well" );
531  	      pthread_exit(NULL);
532  	    }
533  	    if ( exitInProgress() ) {
534  	      JASSERT(pthread_mutex_unlock(&destroyDmtcpWorker)==0)(JASSERT_ERRNO);
535  	      pthread_exit(NULL);
536  	    }
537  	  }
538  	
539  	  dmtcp::DmtcpMessage msg;
540  	
541  	  msg.type = DMT_OK;
542  	  msg.state = WorkerState::currentState();
543  	  _coordinatorSocket << msg;
544  	
545  	  JTRACE ( "waiting for " + signalStr + " Signal" );
546  	
547  	  do {
548  	    msg.poison();
549  	    _coordinatorSocket >> msg;
550  	
551  	    if ( type == DMT_DO_SUSPEND && exitInProgress() ) {
552  	      JASSERT(pthread_mutex_unlock(&destroyDmtcpWorker)==0)(JASSERT_ERRNO);
553  	      pthread_exit(NULL);
554  	    }
555  	
556  	    msg.assertValid();
557  	
558  	    if ( msg.type == DMT_KILL_PEER ) {
559  	      JTRACE ( "Received KILL Message from coordinator, exiting" );
560  	      _exit ( 0 );
561  	    }
562  	
563  	    // The ckpt thread can receive multiple DMT_RESTORE_WAITING or
564  	    // DMT_FORCE_RESTART messages while waiting for a DMT_DO_REFILL message, we
565  	    // need to ignore them and wait for the DMT_DO_REFILL message to arrive.
566  	    if ( type != DMT_DO_REFILL ) {
567  	      break;
568  	    }
569  	
570  	  } while ( type == DMT_DO_REFILL &&
571  	            ( msg.type == DMT_RESTORE_WAITING || msg.type == DMT_FORCE_RESTART ) );
572  	
573  	  JASSERT ( msg.type == type ) ( msg.type );
574  	
575  	  // Coordinator sends some computation information along with the SUSPEND
576  	  // message. Extracting that.
577  	  if ( type == DMT_DO_SUSPEND ) {
578  	    JTRACE ( "Computation information" ) ( msg.compGroup ) ( msg.params[0] );
579  	    JASSERT ( theCheckpointState != NULL );
580  	    theCheckpointState->numPeers(msg.params[0]);
581  	    theCheckpointState->compGroup(msg.compGroup);
582  	    compGroup = msg.compGroup;
583  	  }
584  	}
585  	
586  	void dmtcp::DmtcpWorker::waitForStage1Suspend()
587  	{
588  	  JTRACE ( "running" );
589  	
590  	  WorkerState::setCurrentState ( WorkerState::RUNNING );
591  	
592  	  /*
593  	   * Its only use is to inform the user thread (waiting in DmtcpWorker
594  	   * constructor) that the checkpoint thread has finished initialization. This
595  	   * is to serialize DmtcpWorker-Constructor(), mtcp_init(), checkpoint-thread
596  	   * initialization and user main(). As obvious, this is only effective when
597  	   * the process is being initialized.
598  	   */
599  	  if (!_checkpointThreadInitialized) {
600  	    /*
601  	     * We should not call this function any higher in the logic because it
602  	     * calls setenv() and if it is running under bash, then it getenv() will
603  	     * not work between the call to setenv() and bash main().
604  	     */
605  	    restoreUserLDPRELOAD();
606  	    _checkpointThreadInitialized = true;
607  	  }
608  	
609  	  if ( compGroup != UniquePid() ) {
610  	    dmtcp::string signatureFile = UniquePid::getTmpDir() + "/"
611  	                                + compGroup.toString() + "-"
612  	#ifdef PID_VIRTUALIZATION
613  	                                + jalib::XToString ( _real_getppid() );
614  	#else
615  	                                + jalib::XToString ( getppid() );
616  	#endif
617  	    JTRACE("creating signature file") (signatureFile)(_real_getpid());
618  	    int fd = _real_open ( signatureFile.c_str(), O_CREAT|O_WRONLY, 0600 );
619  	    JASSERT ( fd != -1 ) ( fd ) ( signatureFile )
620  	      .Text ( "Unable to create signature file" );
621  	    dmtcp::string pidstr = jalib::XToString(_real_getpid());
622  	    // FIXME: This assumes write is small, always completes
623  	    JASSERT( pidstr.length()+1
624  		     == write(fd, pidstr.c_str(), pidstr.length()+1) )
625  	      ( pidstr.length()+1 );
626  	    _real_close(fd);
627  	  }
628  	
629  	  if ( theCheckpointState != NULL ) {
630  	    delete theCheckpointState;
631  	    theCheckpointState = NULL;
632  	  }
633  	
634  	  theCheckpointState = new ConnectionState();
635  	
636  	#ifdef EXTERNAL_SOCKET_HANDLING
637  	  JASSERT ( _waitingForExternalSocketsToClose == true ||
638  	             externalTcpConnections.empty() == true );
639  	
640  	  while ( externalTcpConnections.empty() == false ) {
641  	    JTRACE("Waiting for externalSockets toClose")
642  	          (_waitingForExternalSocketsToClose);
643  	    sleep ( 1 );
644  	  }
645  	  if ( _waitingForExternalSocketsToClose == true ) {
646  	    DmtcpMessage msg ( DMT_EXTERNAL_SOCKETS_CLOSED );
647  	    _coordinatorSocket << msg;
648  	    _waitingForExternalSocketsToClose = false;
649  	    JTRACE("externalSocketsClosed") (_waitingForExternalSocketsToClose);
650  	  }
651  	#endif
652  	
653  	  waitForCoordinatorMsg ( "SUSPEND", DMT_DO_SUSPEND );
654  	
655  	  JTRACE ( "got SUSPEND signal, waiting for dmtcp_lock():"
656  		   " to get synchronized with _runCoordinatorCmd if we use DMTCP API" );
657  	  _dmtcp_lock();
658  	  // TODO: may be it is better to move unlock to more appropriate place.
659  	  // For example after suspending all threads
660  	  _dmtcp_unlock();
661  	
662  	
663  	  JTRACE ( "got SUSPEND signal, waiting for lock(&theCkptCanStart)" );
664  	  JASSERT(pthread_mutex_lock(&theCkptCanStart)==0)(JASSERT_ERRNO);
665  	
666  	  JTRACE ( "got SUSPEND signal,"
667  	           " waiting for other threads to exit DMTCP-Wrappers" );
668  	  JASSERT(pthread_rwlock_wrlock(&theWrapperExecutionLock) == 0)(JASSERT_ERRNO);
669  	  JTRACE ( "got SUSPEND signal,"
670  	           " waiting for newly created threads to finish initialization" )
671  	         (unInitializedThreadCount);
672  	  waitForThreadsToFinishInitialization();
673  	
674  	  JTRACE ( "Starting checkpoint, suspending..." );
675  	}
676  	
677  	#ifdef EXTERNAL_SOCKET_HANDLING
678  	bool dmtcp::DmtcpWorker::waitForStage2Checkpoint()
679  	#else
680  	void dmtcp::DmtcpWorker::waitForStage2Checkpoint()
681  	#endif
682  	{
683  	  WorkerState::setCurrentState ( WorkerState::SUSPENDED );
684  	  JTRACE ( "suspended" );
685  	
686  	  if ( exitInProgress() ) {
687  	    JASSERT(pthread_mutex_unlock(&destroyDmtcpWorker)==0)(JASSERT_ERRNO);
688  	    pthread_exit(NULL);
689  	  }
690  	
691  	  JASSERT(_coordinatorSocket.isValid());
692  	  JASSERT(pthread_mutex_unlock(&destroyDmtcpWorker)==0)(JASSERT_ERRNO);
693  	
694  	  JASSERT(pthread_rwlock_unlock(&theWrapperExecutionLock) == 0)(JASSERT_ERRNO);
695  	
696  	  JASSERT(pthread_mutex_unlock(&theCkptCanStart)==0)(JASSERT_ERRNO);
697  	
698  	  theCheckpointState->preLockSaveOptions();
699  	
700  	  waitForCoordinatorMsg ( "LOCK", DMT_DO_LOCK_FDS );
701  	
702  	  JTRACE ( "locking..." );
703  	  JASSERT ( theCheckpointState != NULL );
704  	  theCheckpointState->preCheckpointLock();
705  	  JTRACE ( "locked" );
706  	
707  	  /*
708  	   * Save first 2 * sizeof(pid_t) bytes of each shared memory area and fill it
709  	   * with all zeros.
710  	   */
711  	  SysVIPC::instance().prepareForLeaderElection();
712  	
713  	  WorkerState::setCurrentState ( WorkerState::FD_LEADER_ELECTION );
714  	
715  	#ifdef EXTERNAL_SOCKET_HANDLING
716  	  if ( waitForStage2bCheckpoint() == false ) {
717  	    return false;
718  	  }
719  	#else
720  	  waitForCoordinatorMsg ( "DRAIN", DMT_DO_DRAIN );
721  	#endif
722  	
723  	  JTRACE ( "draining..." );
724  	  theCheckpointState->preCheckpointDrain();
725  	  JTRACE ( "drained" );
726  	
727  	  /*
728  	   * write pid at offset 0. Also write pid at offset sizeof(pid_t) if this
729  	   * process is the creator of this memory area. After the leader election
730  	   * barrier, the leader of the shared-memory object is the creater of the
731  	   * object. If the creater process is missing, then the leader process is the
732  	   * process whose pid is stored at offset 0
733  	   */
734  	  SysVIPC::instance().leaderElection();
735  	
736  	  WorkerState::setCurrentState ( WorkerState::DRAINED );
737  	
738  	  waitForCoordinatorMsg ( "CHECKPOINT", DMT_DO_CHECKPOINT );
739  	  JTRACE ( "got checkpoint signal" );
740  	
741  	#if HANDSHAKE_ON_CHECKPOINT == 1
742  	  //handshake is done after one barrier after drain
743  	  JTRACE ( "beginning handshakes" );
744  	  theCheckpointState->preCheckpointHandshakes(coordinatorId());
745  	  JTRACE ( "handshaking done" );
746  	#endif
747  	
748  	//   JTRACE("writing *.dmtcp file");
749  	//   theCheckpointState->outputDmtcpConnectionTable();
750  	
751  	#ifdef PID_VIRTUALIZATION
752  	  dmtcp::VirtualPidTable::instance().preCheckpoint();
753  	#endif
754  	
755  	  SysVIPC::instance().preCheckpoint();
756  	
757  	#ifdef EXTERNAL_SOCKET_HANDLING
758  	  return true;
759  	#endif
760  	}
761  	
762  	#ifdef EXTERNAL_SOCKET_HANDLING
763  	bool dmtcp::DmtcpWorker::waitForStage2bCheckpoint()
764  	{
765  	  waitForCoordinatorMsg ( "PEER_LOOKUP", DMT_DO_PEER_LOOKUP );
766  	  JTRACE ( "Looking up Socket Peers..." );
767  	  theTcpConnections.clear();
768  	  theCheckpointState->preCheckpointPeerLookup(theTcpConnections);
769  	  sendPeerLookupRequest(theTcpConnections);
770  	  JTRACE ( "Done Socket Peer Lookup" );
771  	
772  	
773  	  WorkerState::setCurrentState ( WorkerState::PEER_LOOKUP_COMPLETE );
774  	
775  	  {
776  	    dmtcp::DmtcpMessage msg;
777  	
778  	    msg.type = DMT_OK;
779  	    msg.state = WorkerState::currentState();
780  	    _coordinatorSocket << msg;
781  	
782  	    JTRACE ( "waiting for DRAIN/RESUME Signal" );
783  	
784  	    do {
785  	      msg.poison();
786  	      _coordinatorSocket >> msg;
787  	      msg.assertValid();
788  	
789  	      if ( msg.type == DMT_KILL_PEER ) {
790  	        JTRACE ( "Received KILL Message from coordinator, exiting" );
791  	        _exit ( 0 );
792  	      }
793  	      JTRACE ( "received message" ) (msg.type );
794  	      if ( msg.type != DMT_UNKNOWN_PEER )
795  	        break;
796  	
797  	      JTRACE ("received DMT_UNKNOWN_PEER message") (msg.conId);
798  	
799  	      TcpConnection* con =
800  	        (TcpConnection*) &( ConnectionList::instance() [msg.conId] );
801  	      con->markExternal();
802  	      externalTcpConnections.push_back(msg.conId);
803  	      _waitingForExternalSocketsToClose = true;
804  	
805  	    } while ( msg.type == DMT_UNKNOWN_PEER );
806  	
807  	    JASSERT ( msg.type == DMT_DO_DRAIN || msg.type == DMT_DO_RESUME )
808  	            ( msg.type );
809  	
810  	    ConnectionList& connections = ConnectionList::instance();
811  	
812  	    // Tcp Accept and Connect connection with PeerType UNKNOWN should be marked as INTERNAL
813  	    for ( ConnectionList::iterator i = connections.begin()
814  	        ; i!= connections.end()
815  	        ; ++i )
816  	    {
817  	      Connection* con =  i->second;
818  	      if ( con->conType() == Connection::TCP ) {
819  	        TcpConnection* tcpCon = (TcpConnection *) con;
820  	        if ( (tcpCon->tcpType() == TcpConnection::TCP_ACCEPT ||
821  	             tcpCon->tcpType() == TcpConnection::TCP_CONNECT) &&
822  	             tcpCon->peerType() == TcpConnection::PEER_UNKNOWN )
823  	          tcpCon->markInternal();
824  	      }
825  	    }
826  	    if ( msg.type == DMT_DO_RESUME ) {
827  	      JTRACE ( "Peer Lookup not complete, skipping checkpointing \n\n\n\n\n");
828  	      return false;
829  	    }
830  	
831  	    JASSERT (msg.type == DMT_DO_DRAIN);
832  	  }
833  	}
834  	
835  	void dmtcp::DmtcpWorker::sendPeerLookupRequest (dmtcp::vector<TcpConnectionInfo>& conInfoTable )
836  	{
837  	  for (int i = 0; i < conInfoTable.size(); ++i) {
838  	    DmtcpMessage msg;
839  	    msg.type = DMT_PEER_LOOKUP;
840  	    msg.localAddr    = conInfoTable[i].localAddr();
841  	    msg.remoteAddr   = conInfoTable[i].remoteAddr();
842  	    msg.localAddrlen = conInfoTable[i].addrlen();
843  	    msg.conId        = conInfoTable[i].conId();
844  	
845  	    _coordinatorSocket << msg;
846  	  }
847  	}
848  	
849  	bool dmtcp::DmtcpWorker::waitingForExternalSocketsToClose() {
850  	  return _waitingForExternalSocketsToClose;
851  	}
852  	#endif
853  	
854  	void dmtcp::DmtcpWorker::writeCheckpointPrefix ( int fd )
855  	{
856  	  const int len = strlen(DMTCP_FILE_HEADER);
857  	  JASSERT(write(fd, DMTCP_FILE_HEADER, len)==len);
858  	
859  	  theCheckpointState->outputDmtcpConnectionTable(fd);
860  	}
861  	
862  	void dmtcp::DmtcpWorker::sendCkptFilenameToCoordinator()
863  	{
864  	  // Tell coordinator to record our filename in the restart script
865  	  dmtcp::string ckptFilename = dmtcp::UniquePid::checkpointFilename();
866  	  dmtcp::string hostname = jalib::Filesystem::GetCurrentHostname();
867  	  JTRACE ( "recording filenames" ) ( ckptFilename ) ( hostname );
868  	  dmtcp::DmtcpMessage msg;
869  	  msg.type = DMT_CKPT_FILENAME;
870  	  msg.extraBytes = ckptFilename.length() +1 + hostname.length() +1;
871  	  _coordinatorSocket << msg;
872  	  _coordinatorSocket.writeAll ( ckptFilename.c_str(), ckptFilename.length() +1 );
873  	  _coordinatorSocket.writeAll ( hostname.c_str(),     hostname.length() +1 );
874  	}
875  	
876  	void dmtcp::DmtcpWorker::postRestart()
877  	{
878  	  JTRACE("begin postRestart()");
879  	
880  	  WorkerState::setCurrentState(WorkerState::RESTARTING);
881  	  recvCoordinatorHandshake();
882  	
883  	  JASSERT ( theCheckpointState != NULL );
884  	  theCheckpointState->postRestart();
885  	
886  	  if ( jalib::Filesystem::GetProgramName() == "screen" )
887  	    send_sigwinch = 1;
888  	  // With hardstatus (bottom status line), screen process has diff. size window
889  	  // Must send SIGWINCH to adjust it.
890  	  // MTCP will send SIGWINCH to process on restart.  This will force 'screen'
891  	  // to execute ioctl wrapper.  The wrapper will report a changed winsize,
892  	  // so that 'screen' must re-initialize the screen (scrolling resions, etc.).
893  	  // The wrapper will also send a second SIGWINCH.  Then 'screen' will
894  	  // call ioctl and get the correct window size and resize again.
895  	  // We can't just send two SIGWINCH's now, since window size has not
896  	  // changed yet, and 'screen' will assume that there's nothing to do. 
897  	
898  	#ifdef PID_VIRTUALIZATION
899  	  dmtcp::VirtualPidTable::instance().postRestart();
900  	#endif
901  	  SysVIPC::instance().postRestart();
902  	}
903  	
904  	void dmtcp::DmtcpWorker::waitForStage3Refill( bool isRestart )
905  	{
906  	  JTRACE ( "checkpointed" );
907  	
908  	  WorkerState::setCurrentState ( WorkerState::CHECKPOINTED );
909  	
910  	  waitForCoordinatorMsg ( "REFILL", DMT_DO_REFILL );
911  	
912  	  JASSERT ( theCheckpointState != NULL );
913  	  theCheckpointState->postCheckpoint(isRestart);
914  	  delete theCheckpointState;
915  	  theCheckpointState = NULL;
916  	
917  	  SysVIPC::instance().postCheckpoint();
918  	}
919  	
920  	void dmtcp::DmtcpWorker::waitForStage4Resume()
921  	{
922  	  JTRACE ( "refilled" );
923  	  WorkerState::setCurrentState ( WorkerState::REFILLED );
924  	  waitForCoordinatorMsg ( "RESUME", DMT_DO_RESUME );
925  	  JTRACE ( "got resume signal" );
926  	
927  	  SysVIPC::instance().preResume();
928  	}
929  	
930  	void dmtcp::DmtcpWorker::restoreVirtualPidTable()
931  	{
932  	#ifdef PID_VIRTUALIZATION
933  	  dmtcp::VirtualPidTable::instance().readPidMapsFromFile();
934  	  dmtcp::VirtualPidTable::instance().restoreProcessGroupInfo();
935  	#endif
936  	}
937  	
938  	void dmtcp::DmtcpWorker::restoreSockets(ConnectionState& coordinator,
939  	                                        dmtcp::UniquePid compGroup,
940  	                                        int              numPeers,
941  	                                        int&             coordTstamp)
942  	{
943  	  JTRACE ( "restoreSockets begin" );
944  	
945  	  theRestorePort = RESTORE_PORT_START;
946  	
947  	  //open up restore socket
948  	  {
949  	    jalib::JSocket restoreSocket ( -1 );
950  	    while ( !restoreSocket.isValid() && theRestorePort < RESTORE_PORT_STOP )
951  	    {
952  	      restoreSocket = jalib::JServerSocket ( jalib::JSockAddr::ANY, ++theRestorePort );
953  	      JTRACE ( "open listen socket attempt" ) ( theRestorePort );
954  	    }
955  	    JASSERT ( restoreSocket.isValid() ) ( RESTORE_PORT_START ).Text ( "failed to open listen socket" );
956  	    restoreSocket.changeFd ( _restoreSocket.sockfd() );
957  	    JTRACE ( "opening listen sockets" ) ( _restoreSocket.sockfd() ) ( restoreSocket.sockfd() );
958  	    _restoreSocket = restoreSocket;
959  	  }
960  	
961  	  //reconnect to our coordinator
962  	  connectToCoordinatorWithoutHandshake();
963  	  sendCoordinatorHandshake(jalib::Filesystem::GetProgramName(),
964  				   compGroup, numPeers, DMT_RESTART_PROCESS);
965  	  recvCoordinatorHandshake(&coordTstamp);
966  	  JTRACE("Connected to coordinator")(coordTstamp);
967  	
968  	  // finish sockets restoration
969  	  coordinator.doReconnect ( _coordinatorSocket,_restoreSocket );
970  	
971  	  JTRACE ( "sockets restored!" );
972  	
973  	}
974  	
975  	void dmtcp::DmtcpWorker::delayCheckpointsLock(){
976  	  JASSERT(pthread_mutex_lock(&theCkptCanStart)==0)(JASSERT_ERRNO);
977  	}
978  	
979  	void dmtcp::DmtcpWorker::delayCheckpointsUnlock(){
980  	  JASSERT(pthread_mutex_unlock(&theCkptCanStart)==0)(JASSERT_ERRNO);
981  	}
982  	
983  	// XXX: Handle deadlock error code
984  	// NOTE: Don't do any fancy stuff in this wrapper which can cause the process to go into DEADLOCK
985  	bool dmtcp::DmtcpWorker::wrapperExecutionLockLock()
986  	{
987  	#ifdef PTRACE 
988  	  return false;
989  	#endif
Event unreachable: This code cannot be reached: "int saved_errno = *__errno_...".
990  	  int saved_errno = errno;
991  	  bool lockAcquired = false;
992  	  if ( dmtcp::WorkerState::currentState() == dmtcp::WorkerState::RUNNING ) {
993  	    int retVal = pthread_rwlock_rdlock(&theWrapperExecutionLock);
994  	    if ( retVal != 0 && retVal != EDEADLK ) {
995  	      perror ( "ERROR DmtcpWorker::wrapperExecutionLockLock: Failed to acquire lock" );
996  	      _exit(1);
997  	    }
998  	    // retVal should always be 0 (success) here.
999  	    lockAcquired = retVal == 0 ? true : false;
1000 	  }
1001 	  errno = saved_errno;
1002 	  return lockAcquired;
1003 	}
1004 	
1005 	// NOTE: Don't do any fancy stuff in this wrapper which can cause the process to go into DEADLOCK
1006 	void dmtcp::DmtcpWorker::wrapperExecutionLockUnlock()
1007 	{
1008 	  int saved_errno = errno;
1009 	  if ( dmtcp::WorkerState::currentState() != dmtcp::WorkerState::RUNNING ) {
1010 	    printf ( "ERROR: DmtcpWorker::wrapperExecutionLockUnlock: This process is not in \n"
1011 	             "RUNNING state and yet this thread managed to acquire the wrapperExecutionLock.\n"
1012 	             "This should not be happening, something is wrong." );
1013 	    _exit(1);
1014 	  }
1015 	  if ( pthread_rwlock_unlock(&theWrapperExecutionLock) != 0) {
1016 	    perror ( "ERROR DmtcpWorker::wrapperExecutionLockUnlock: Failed to release lock" );
1017 	    _exit(1);
1018 	    }
1019 	  errno = saved_errno;
1020 	}
1021 	
1022 	void dmtcp::DmtcpWorker::waitForThreadsToFinishInitialization()
1023 	{
1024 	  while (unInitializedThreadCount != 0) {
1025 	    struct timespec sleepTime = {0, 10*1000*1000};
1026 	    nanosleep(&sleepTime, NULL);
1027 	  }
1028 	}
1029 	
1030 	void dmtcp::DmtcpWorker::incrementUninitializedThreadCount()
1031 	{
1032 	  int saved_errno = errno;
1033 	  if ( dmtcp::WorkerState::currentState() == dmtcp::WorkerState::RUNNING ) {
1034 	    JASSERT(pthread_mutex_lock(&unInitializedThreadCountLock) == 0) (JASSERT_ERRNO);
1035 	    unInitializedThreadCount++;
1036 	    //JTRACE(":") (unInitializedThreadCount);
1037 	    JASSERT(pthread_mutex_unlock(&unInitializedThreadCountLock) == 0) (JASSERT_ERRNO);
1038 	  }
1039 	  errno = saved_errno;
1040 	}
1041 	
1042 	void dmtcp::DmtcpWorker::decrementUninitializedThreadCount()
1043 	{
1044 	  int saved_errno = errno;
1045 	  if ( dmtcp::WorkerState::currentState() == dmtcp::WorkerState::RUNNING ) {
1046 	    JASSERT(pthread_mutex_lock(&unInitializedThreadCountLock) == 0) (JASSERT_ERRNO);
1047 	    JASSERT(unInitializedThreadCount > 0) (unInitializedThreadCount);
1048 	    unInitializedThreadCount--;
1049 	    //JTRACE(":") (unInitializedThreadCount);
1050 	    JASSERT(pthread_mutex_unlock(&unInitializedThreadCountLock) == 0) (JASSERT_ERRNO);
1051 	  }
1052 	  errno = saved_errno;
1053 	}
1054 	
1055 	void dmtcp::DmtcpWorker::connectAndSendUserCommand(char c, int* result /*= NULL*/)
1056 	{
1057 	  //prevent checkpoints from starting
1058 	  delayCheckpointsLock();
1059 	  {
1060 	    if ( tryConnectToCoordinator() == false ) {
1061 	      *result = DmtcpCoordinator::ERROR_COORDINATOR_NOT_FOUND;
1062 	      return;
1063 	    }
1064 	    sendUserCommand(c,result);
1065 	    _coordinatorSocket.close();
1066 	  }
1067 	  delayCheckpointsUnlock();
1068 	}
1069 	
1070 	//tell the coordinator to run given user command
1071 	void dmtcp::DmtcpWorker::sendUserCommand(char c, int* result /*= NULL*/)
1072 	{
1073 	  DmtcpMessage msg,reply;
1074 	
1075 	  //send
1076 	  msg.type = DMT_USER_CMD;
1077 	  msg.params[0] = c;
1078 	
1079 	  if (c == 'i') {
1080 	    const char* interval = getenv ( ENV_VAR_CKPT_INTR );
1081 	    if ( interval != NULL )
1082 	      msg.theCheckpointInterval = jalib::StringToInt ( interval );
1083 	  }
1084 	
1085 	  _coordinatorSocket << msg;
1086 	
1087 	  //the coordinator will violently close our socket...
1088 	  if(c=='q' || c=='Q'){
1089 	    result[0]=0;
1090 	    return;
1091 	  }
1092 	
1093 	  //receive REPLY
1094 	  reply.poison();
1095 	  _coordinatorSocket >> reply;
1096 	  reply.assertValid();
1097 	  JASSERT ( reply.type == DMT_USER_CMD_RESULT );
1098 	
1099 	  if(result!=NULL){
1100 	    memcpy( result, reply.params, sizeof(reply.params) );
1101 	  }
1102 	}
1103 	
1104 	
1105 	/*!
1106 	    \fn dmtcp::DmtcpWorker::connectToCoordinator()
1107 	 */
1108 	bool dmtcp::DmtcpWorker::tryConnectToCoordinator()
1109 	{
1110 	  return connectToCoordinator ( false );
1111 	}
1112 	
1113 	void dmtcp::DmtcpWorker::connectToCoordinatorWithoutHandshake()
1114 	{
1115 	  connectToCoordinator ( );
1116 	}
1117 	
1118 	void dmtcp::DmtcpWorker::connectToCoordinatorWithHandshake()
1119 	{
1120 	  connectToCoordinator ( );
1121 	  JTRACE("CONNECT TO coordinator, trying to handshake");
1122 	  sendCoordinatorHandshake(jalib::Filesystem::GetProgramName());
1123 	  recvCoordinatorHandshake();
1124 	}
1125 	
1126 	bool dmtcp::DmtcpWorker::connectToCoordinator(bool dieOnError /*= true*/)
1127 	{
1128 	
1129 	  const char * coordinatorAddr = getenv ( ENV_VAR_NAME_ADDR );
1130 	  const char * coordinatorPortStr = getenv ( ENV_VAR_NAME_PORT );
1131 	  dmtcp::UniquePid zeroGroup;
1132 	
1133 	  if ( coordinatorAddr == NULL ) coordinatorAddr = DEFAULT_HOST;
1134 	  int coordinatorPort = coordinatorPortStr==NULL ? DEFAULT_PORT : jalib::StringToInt ( coordinatorPortStr );
1135 	
1136 	  jalib::JSocket oldFd = _coordinatorSocket;
1137 	
1138 	  _coordinatorSocket = jalib::JClientSocket ( coordinatorAddr,coordinatorPort );
1139 	
1140 	  if ( ! _coordinatorSocket.isValid() && ! dieOnError ) {
1141 	    return false;
1142 	  }
1143 	
1144 	  JASSERT ( _coordinatorSocket.isValid() )
1145 	    ( coordinatorAddr ) ( coordinatorPort )
1146 	    .Text ( "Failed to connect to DMTCP coordinator" );
1147 	
1148 	  JTRACE ( "connected to dmtcp coordinator, no handshake" )
1149 	    ( coordinatorAddr ) ( coordinatorPort );
1150 	
1151 	  if ( oldFd.isValid() )
1152 	  {
1153 	    JTRACE ( "restoring old coordinatorsocket fd" )
1154 	      ( oldFd.sockfd() ) ( _coordinatorSocket.sockfd() );
1155 	
1156 	    _coordinatorSocket.changeFd ( oldFd.sockfd() );
1157 	  }
1158 	  return true;
1159 	}
1160 	
1161 	void dmtcp::DmtcpWorker::sendCoordinatorHandshake(const dmtcp::string& progname,
1162 	                                                  UniquePid compGroup /*= UniquePid()*/,
1163 	                                                  int np /*= -1*/,
1164 	                                                  DmtcpMessageType msgType /*= DMT_HELLO_COORDINATOR*/)
1165 	{
1166 	  JTRACE("sending coordinator handshake")(UniquePid::ThisProcess());
1167 	
1168 	  dmtcp::string hostname = jalib::Filesystem::GetCurrentHostname();
1169 	  dmtcp::DmtcpMessage hello_local;
1170 	  hello_local.type = msgType;
1171 	  hello_local.params[0] = np;
1172 	  hello_local.compGroup = compGroup;
1173 	  hello_local.restorePort = theRestorePort;
1174 	
1175 	  const char* interval = getenv ( ENV_VAR_CKPT_INTR );
1176 	  if ( interval != NULL )
1177 	    hello_local.theCheckpointInterval = jalib::StringToInt ( interval );
1178 	
1179 	  hello_local.extraBytes = hostname.length() + 1 + progname.length() + 1;
1180 	  _coordinatorSocket << hello_local;
1181 	  _coordinatorSocket.writeAll( hostname.c_str(),hostname.length()+1);
1182 	  _coordinatorSocket.writeAll( progname.c_str(),progname.length()+1);
1183 	}
1184 	
1185 	void dmtcp::DmtcpWorker::recvCoordinatorHandshake( int *param1 ){
1186 	  JTRACE("receiving coordinator handshake");
1187 	
1188 	  dmtcp::DmtcpMessage hello_remote;
1189 	  hello_remote.poison();
1190 	  _coordinatorSocket >> hello_remote;
1191 	  hello_remote.assertValid();
1192 	
1193 	  if ( param1 == NULL )
1194 	    JASSERT ( hello_remote.type == dmtcp::DMT_HELLO_WORKER ) ( hello_remote.type );
1195 	  else
1196 	    JASSERT ( hello_remote.type == dmtcp::DMT_RESTART_PROCESS_REPLY ) ( hello_remote.type );
1197 	
1198 	  _coordinatorId = hello_remote.coordinator;
1199 	  DmtcpMessage::setDefaultCoordinator ( _coordinatorId );
1200 	  if( param1 ){
1201 	    *param1 = hello_remote.params[0];
1202 	  }
1203 	  JTRACE("Coordinator handshake RECEIVED!!!!!");
1204 	}
1205 	
1206 	void dmtcp::DmtcpWorker::startCoordinatorIfNeeded(int modes, int isRestart){
1207 	  const static int CS_OK = 91;
1208 	  const static int CS_NO = 92;
1209 	  int coordinatorStatus = -1;
1210 	
1211 	  if (modes & COORD_BATCH) {
1212 	    startNewCoordinator ( modes, isRestart );
1213 	    return;
1214 	  }
1215 	  //fork a child process to probe the coordinator
1216 	  if (fork()==0) {
1217 	    //fork so if we hit an error parent won't die
1218 	    dup2(2,1);                          //copy stderr to stdout
1219 	    dup2(open("/dev/null",O_RDWR), 2);  //close stderr
1220 	    int result[DMTCPMESSAGE_NUM_PARAMS];
1221 	    dmtcp::DmtcpCoordinatorAPI coordinatorAPI;
1222 	    {
1223 	      if ( coordinatorAPI.tryConnectToCoordinator() == false ) {
1224 	        JTRACE("Coordinator not found.  Will try to start a new one.");
1225 	        _real_exit(1);
1226 	      }
1227 	    }
1228 	
1229 	    coordinatorAPI.sendUserCommand('s',result);
1230 	    coordinatorAPI._coordinatorSocket.close();
1231 	
1232 	    // result[0] == numPeers of coord;  bool result[1] == computation is running
1233 	    if(result[0]==0 || result[1] ^ isRestart){
1234 	      if(result[0] != 0) {
1235 	        int num_processes = result[0];
1236 	        JTRACE("Joining existing computation.") (num_processes);
1237 	      }
1238 	      _real_exit(CS_OK);
1239 	    }else{
1240 	      JTRACE("Existing computation not in a running state," \
1241 		     " perhaps checkpoint in progress?");
1242 	      _real_exit(CS_NO);
1243 	    }
1244 	  }
1245 	  errno = 0;
1246 	  // FIXME:  wait() could return -1 if a signal happened before child exits
1247 	  JASSERT(::wait(&coordinatorStatus)>0)(JASSERT_ERRNO);
1248 	  JASSERT(WIFEXITED(coordinatorStatus));
1249 	
1250 	  //is coordinator running?
1251 	  if (WEXITSTATUS(coordinatorStatus) != CS_OK) {
1252 	    //is coordinator in funny state?
1253 	    if(WEXITSTATUS(coordinatorStatus) == CS_NO){
1254 	      JASSERT (false) (isRestart)
1255 		 .Text ("Coordinator in a funny state.  Peers exist, not restarting," \
1256 			"\n but not in a running state.  Checkpointing?" \
1257 			"\n Or maybe restarting and running with peers existing?");
1258 	    }else{
1259 	      JTRACE("Bad result found for coordinator.  Try a new one.");
1260 	    }
1261 	
1262 	    JTRACE("Coordinator not found.  Starting a new one.");
1263 	    startNewCoordinator ( modes, isRestart );
1264 	
1265 	  }else{
1266 	    if (modes & COORD_FORCE_NEW) {
1267 	      JTRACE("Forcing new coordinator.  --new-coordinator flag given.");
1268 	      startNewCoordinator ( modes, isRestart );
1269 	      return;
1270 	    }
1271 	    JASSERT( modes & COORD_JOIN )
1272 	      .Text("Coordinator already running, but '--new' flag was given.");
1273 	  }
1274 	}
1275 	
1276 	void dmtcp::DmtcpWorker::startNewCoordinator(int modes, int isRestart)
1277 	{
1278 	  int coordinatorStatus = -1;
1279 	  //get location of coordinator
1280 	  const char * coordinatorAddr = getenv ( ENV_VAR_NAME_ADDR );
1281 	  if(coordinatorAddr==NULL) coordinatorAddr = DEFAULT_HOST;
1282 	  const char * coordinatorPortStr = getenv ( ENV_VAR_NAME_PORT );
1283 	  int coordinatorPort = coordinatorPortStr==NULL ? DEFAULT_PORT
1284 	                                                 : jalib::StringToInt(coordinatorPortStr);
1285 	
1286 	  dmtcp::string s=coordinatorAddr;
1287 	  if(s!="localhost" && s!="127.0.0.1" && s!=jalib::Filesystem::GetCurrentHostname()){
1288 	    JASSERT(false)
1289 	      .Text("Won't automatically start coordinator because DMTCP_HOST is set to a remote host.");
1290 	    _real_exit(1);
1291 	  }
1292 	
1293 	  if ( modes & COORD_BATCH || modes & COORD_FORCE_NEW ) {
1294 	    // Create a socket and bind it to an unused port.
1295 	    jalib::JServerSocket coordinatorListenerSocket ( jalib::JSockAddr::ANY, 0 );
1296 	    errno = 0;
1297 	    JASSERT ( coordinatorListenerSocket.isValid() )
1298 	      ( coordinatorListenerSocket.port() ) ( JASSERT_ERRNO )
1299 	      .Text ( "Failed to create listen socket."
1300 	          "\nIf msg is \"Address already in use\", this may be an old coordinator."
1301 	          "\nKill other coordinators and try again in a minute or so." );
1302 	    // Now dup the sockfd to
1303 	    coordinatorListenerSocket.changeFd(PROTECTEDFD(1));
1304 	
1305 	    dmtcp::string coordPort= jalib::XToString(coordinatorListenerSocket.port());
1306 	    setenv ( ENV_VAR_NAME_PORT, coordPort.c_str(), 1 );
1307 	  }
1308 	
1309 	  JTRACE("Starting a new coordinator automatically.") (coordinatorPortStr);
1310 	
1311 	  if(fork()==0){
1312 	    dmtcp::string coordinator = jalib::Filesystem::FindHelperUtility("dmtcp_coordinator");
1313 	    char *modeStr = (char *)"--background";
1314 	    if ( modes & COORD_BATCH ) {
1315 	      modeStr = (char *)"--batch";
1316 	    }
1317 	    char * args[] = {
1318 	      (char*)coordinator.c_str(),
1319 	      (char*)"--exit-on-last",
1320 	      modeStr,
1321 	      NULL
1322 	    };
1323 	    execv(args[0], args);
1324 	    JASSERT(false)(coordinator)(JASSERT_ERRNO).Text("exec(dmtcp_coordinator) failed");
1325 	  } else {
1326 	    _real_close ( PROTECTEDFD (1) );
1327 	  }
1328 	
1329 	  errno = 0;
1330 	
1331 	  if ( modes & COORD_BATCH ) {
1332 	    // FIXME: If running in batch Mode, we sleep here for 5 seconds to let
1333 	    // the coordinator get started up.  We need to fix this in future.
1334 	    sleep(5);
1335 	  } else {
1336 	    JASSERT(::wait(&coordinatorStatus)>0)(JASSERT_ERRNO);
1337 	
1338 	    JASSERT(WEXITSTATUS(coordinatorStatus) == 0)
1339 	      .Text("Failed to start coordinator, port already in use.  You may use a different port by running with \'-p 12345\'\n");
1340 	  }
1341 	}
1342 	
1343 	//to allow linking without mtcpinterface
1344 	void __attribute__ ((weak)) dmtcp::initializeMtcpEngine()
1345 	{
1346 	  JASSERT(false).Text("should not be called");
1347 	}
1348 	
1349 	void __attribute__ ((weak)) dmtcp::killCkpthread()
1350 	{
1351 	  JASSERT(false).Text("should not be called");
1352 	}