1 /****************************************************************************
2 * Copyright (C) 2006-2010 by Jason Ansel, Kapil Arya, and Gene Cooperman *
3 * jansel@csail.mit.edu, kapil@ccs.neu.edu, gene@ccs.neu.edu *
4 * *
5 * This file is part of the dmtcp/src module of DMTCP (DMTCP:dmtcp/src). *
6 * *
7 * DMTCP:dmtcp/src is free software: you can redistribute it and/or *
8 * modify it under the terms of the GNU Lesser General Public License as *
9 * published by the Free Software Foundation, either version 3 of the *
10 * License, or (at your option) any later version. *
11 * *
12 * DMTCP:dmtcp/src is distributed in the hope that it will be useful, *
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
15 * GNU Lesser General Public License for more details. *
16 * *
17 * You should have received a copy of the GNU Lesser General Public *
18 * License along with DMTCP:dmtcp/src. If not, see *
19 * <http://www.gnu.org/licenses/>. *
20 ****************************************************************************/
21
22 #include "dmtcpworker.h"
23 #include "constants.h"
24 #include "../jalib/jconvert.h"
25 #include "../jalib/jalloc.h"
26 #include "dmtcpmessagetypes.h"
27 #include <stdlib.h>
28 #include "mtcpinterface.h"
29 #include <unistd.h>
30 #include "sockettable.h"
31 #include "../jalib/jsocket.h"
32 #include <map>
33 #include "kernelbufferdrainer.h"
34 #include "../jalib/jfilesystem.h"
35 #include "syscallwrappers.h"
36 #include "protectedfds.h"
37 #include "connectionidentifier.h"
38 #include "connectionmanager.h"
39 #include "connectionstate.h"
40 #include "dmtcp_coordinator.h"
41 #include "sysvipc.h"
42 #include <signal.h>
43 #include <pthread.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <fcntl.h>
47 #include <sys/wait.h>
48 #include <sys/time.h>
49 #include <sys/resource.h>
50 #include <sys/personality.h>
51
52
53 /* Read-write lock initializers. */
54 #ifdef __USE_GNU
55 # if __WORDSIZE == 64
56 # define PTHREAD_RWLOCK_PREFER_WRITER_RECURSIVE_INITIALIZER_NP \
57 { { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, \
58 PTHREAD_RWLOCK_PREFER_WRITER_NP } }
59 # else
60 # if __BYTE_ORDER == __LITTLE_ENDIAN
61 # define PTHREAD_RWLOCK_PREFER_WRITER_RECURSIVE_INITIALIZER_NP \
62 { { 0, 0, 0, 0, 0, 0, PTHREAD_RWLOCK_PREFER_WRITER_NP, \
63 0, 0, 0, 0 } }
64 # else
65 # define PTHREAD_RWLOCK_PREFER_WRITER_RECURSIVE_INITIALIZER_NP \
66 { { 0, 0, 0, 0, 0, 0, 0, 0, 0, PTHREAD_RWLOCK_PREFER_WRITER_NP,\
67 0 } }
68 # endif
69 # endif
70 #endif
71
72
73 static pthread_mutex_t theCkptCanStart = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
74 static pthread_mutex_t destroyDmtcpWorker = PTHREAD_MUTEX_INITIALIZER;
75
76 /*
77 * WrapperProtectionLock is used to make the checkpoint safe by making sure
78 * that no user-thread is executing any DMTCP wrapper code when it receives
79 * the checkpoint signal.
80 * Working:
81 * On entering the wrapper in DMTCP, the user-thread acquires the read lock,
82 * and releases it before leaving the wrapper.
83 * When the Checkpoint-thread wants to send the SUSPEND signal to user
84 * threads, it must acquire the write lock. It is blocked until all the
85 * existing read-locks by user threads have been released. NOTE that this
86 * is a WRITER-PREFERRED lock.
87 *
88 * There is a corner case too -- the newly created thread that has not been
89 * initialized yet; we need to take some extra efforts for that.
90 * Here are the steps to handle the newly created uninitialized thread:
91 * A counter for the number of newly created uninitialized threads is kept.
92 * The counter is made thread safe by using a mutex.
93 * The calling thread (parent) increments the counter before calling clone.
94 * The newly created child thread decrements the counter at the end of
95 * initialization in MTCP/DMTCP.
96 * After acquiring the Write lock, the checkpoint thread waits until the
97 * number of uninitialized threads is zero. At that point, no thread is
98 * executing in the clone wrapper and it is safe to do a checkpoint.
99 *
100 * XXX: Currently this security is provided only for the clone wrapper; this
101 * should be extended to other calls as well. -- KAPIL
102 */
103 static pthread_rwlock_t theWrapperExecutionLock = PTHREAD_RWLOCK_WRITER_NONRECURSIVE_INITIALIZER_NP;
104 static pthread_mutex_t unInitializedThreadCountLock = PTHREAD_MUTEX_INITIALIZER;
105 static int unInitializedThreadCount = 0;
106 static dmtcp::UniquePid compGroup;
107
108 // static dmtcp::KernelBufferDrainer* theDrainer = NULL;
109 static dmtcp::ConnectionState* theCheckpointState = NULL;
110
111 #ifdef EXTERNAL_SOCKET_HANDLING
112 static dmtcp::vector <dmtcp::TcpConnectionInfo> theTcpConnections;
113 dmtcp::vector <dmtcp::ConnectionIdentifier> externalTcpConnections;
114 static bool _waitingForExternalSocketsToClose = false;
115 #endif
116
117 static int theRestorePort = RESTORE_PORT_START;
118
119 bool dmtcp::DmtcpWorker::_exitInProgress = false;
120
121 void processDmtcpCommands(dmtcp::string programName);
122 static void processSshCommand(dmtcp::string programName);
123
124 void dmtcp::DmtcpWorker::useAlternateCoordinatorFd(){
125 _coordinatorSocket = jalib::JSocket( PROTECTEDFD( 4 ) );
126 }
127
128 const unsigned int dmtcp::DmtcpWorker::ld_preload_c_len;
129 char dmtcp::DmtcpWorker::ld_preload_c[dmtcp::DmtcpWorker::ld_preload_c_len];
130
131 bool _checkpointThreadInitialized = false;
132 void restoreUserLDPRELOAD()
133 {
134 // We have now successfully used LD_PRELOAD to execute prior to main()
135 // Next, hide our value of LD_PRELOAD, in a global variable.
136 // At checkpoint and restart time, we will no longer need our LD_PRELOAD.
137 // We will need it in only one place:
138 // when the user application makes an exec call:
139 // If anybody calls our execwrapper, we will reset LD_PRELOAD then.
140 // If they directly call _real_execve to get libc symbol, they will
141 // not be part of DMTCP computation.
142 // This has the advantage that our value of LD_PRELOAD will always come
143 // before any paths set by user application.
144 // Also, bash likes to keep its own envp, but we will interact with bash only
145 // within the exec wrapper.
146 // NOTE: If the user called exec("ssh ..."), we currently catch this in
147 // DmtcpWorker() due to LD_PRELOAD, unset LD_PRELOAD, and edit this into
148 // exec("dmtcp_checkpoint --ssh-slave ... ssh ..."), and re-execute.
149 // This way, we will unset LD_PRELOAD here and now, instead of at that time.
150 char * preload = getenv("LD_PRELOAD");
151 char * preload_rest = strstr(preload, ":");
152 if (preload_rest) {
153 *preload_rest = '\0'; // Now preload is just our preload string
154 preload_rest++;
155 }
156 JTRACE("LD_PRELOAD")(preload);
157 JASSERT(strlen(preload) < dmtcp::DmtcpWorker::ld_preload_c_len)
158 (preload) (dmtcp::DmtcpWorker::ld_preload_c_len)
159 .Text("preload string is longer than ld_preload_c_len");
160 strcpy(dmtcp::DmtcpWorker::ld_preload_c, preload); // Don't malloc
161 if (preload_rest) {
162 setenv("LD_PRELOAD", preload_rest, 1);
163 } else {
164 _dmtcp_unsetenv("LD_PRELOAD");
165 }
166 }
167
168 #include "../../mtcp/mtcp.h" //for MTCP_DEFAULT_SIGNAL
169
170 // This shold be visible to library only. DmtcpWorker will call
171 // this to initialize tmp (ckpt signal) at startup time. This avoids
172 // any later calls to getenv(), at which time the user app may have
173 // a wrapper around getenv, modified environ, or other tricks.
174 // (Matlab needs this or else it segfaults on restart, and bash plays
175 // similar tricks with maintaining its own environmnet.)
176 // Used in mtcpinterface.cpp and signalwrappers.cpp.
177 __attribute__ ((visibility ("hidden")))
178 int _determineMtcpSignal(){
179 // this mimics the MTCP logic for determining signal number found in
180 // mtcp_init()
181 int sig = MTCP_DEFAULT_SIGNAL;
182 char* endp = NULL;
183 static const char* tmp = getenv("MTCP_SIGCKPT");
184 if(tmp != NULL){
185 sig = strtol(tmp, &endp, 0);
186 if((errno != 0) || (tmp == endp))
187 sig = MTCP_DEFAULT_SIGNAL;
188 if(sig < 1 || sig > 31)
189 sig = MTCP_DEFAULT_SIGNAL;
190 }
191 return sig;
192 }
193
194 //called before user main()
195 //workerhijack.cpp initializes a static variable theInstance to DmtcpWorker obj
196 dmtcp::DmtcpWorker::DmtcpWorker ( bool enableCheckpointing )
197 :_coordinatorSocket ( PROTECTEDFD ( 1 ) )
198 ,_restoreSocket ( PROTECTEDFD ( 3 ) )
199 {
200 if ( !enableCheckpointing ) return;
201
202 WorkerState::setCurrentState( WorkerState::UNKNOWN);
203
204 /* DO NOT PUT ANYTHING BEFORE THE FOLLOWING BLOCK OF CODE (#ifdef .... #endif) */
205 #ifdef DEBUG
206 /* Disable Jassert Logging */
207 dmtcp::UniquePid::ThisProcess(true);
208
209 dmtcp::ostringstream o;
210 o << dmtcp::UniquePid::getTmpDir() << "/jassertlog." << dmtcp::UniquePid::ThisProcess()
211 << "_" << jalib::Filesystem::GetProgramName();
212 JASSERT_INIT (o.str());
213
214 JTRACE ( "recalculated process UniquePid..." ) ( dmtcp::UniquePid::ThisProcess() );
215 #endif
216
217 //This is called for side effect only. Force this function to call
218 // getenv("MTCP_SIGCKPT") now and cache it to avoid getenv calls later.
219 _determineMtcpSignal();
220
221 #ifdef __i386__
222 // Match work begun in dmtcpPrepareForExec()
223 # if 0
224 if (getenv("DMTCP_ADDR_COMPAT_LAYOUT")) {
225 _dmtcp_unsetenv("DMTCP_ADDR_COMPAT_LAYOUT");
226 // DMTCP had set ADDR_COMPAT_LAYOUT. Now unset it.
227 personality( (unsigned long)personality(0xffffffff) ^ ADDR_COMPAT_LAYOUT );
228 JTRACE( "unsetting ADDR_COMPAT_LAYOUT" );
229 }
230 # else
231 { char * rlim_cur_char = getenv("DMTCP_RLIMIT_STACK");
232 if ( rlim_cur_char != NULL ) {
233 struct rlimit rlim;
234 getrlimit(RLIMIT_STACK, &rlim);
235 rlim.rlim_cur = atol(rlim_cur_char);
236 JTRACE ( "rlim_cur for RLIMIT_STACK being restored." ) ( rlim.rlim_cur );
237 setrlimit(RLIMIT_STACK, &rlim);
238 _dmtcp_unsetenv("DMTCP_RLIMIT_STACK");
239 }
240 }
241 # endif
242 #endif
243
244 if ( getenv(ENV_VAR_UTILITY_DIR) == NULL ) {
245 JNOTE ( "\n **** Not checkpointing this process,"
246 " due to missing environment var ****" )
247 ( getenv(ENV_VAR_UTILITY_DIR) )
248 ( jalib::Filesystem::GetProgramName() );
249 return;
250 }
251 if (! getenv(ENV_VAR_QUIET))
252 setenv(ENV_VAR_QUIET, "0", 0);
253 jassert_quiet = *getenv(ENV_VAR_QUIET) - '0';
254
255
256 JTRACE ( "dmtcphijack.so: Running " ) ( jalib::Filesystem::GetProgramName() ) ( getenv ( "LD_PRELOAD" ) );
257 JTRACE ( "dmtcphijack.so: Child of pid " ) ( getppid() );
258
259 dmtcp::string programName = jalib::Filesystem::GetProgramName();
260
261 if ( programName == "dmtcp_coordinator" ||
262 programName == "dmtcp_checkpoint" ||
263 programName == "dmtcp_restart" ||
264 programName == "dmtcp_command" ||
265 programName == "mtcp_restart" ) {
266 processDmtcpCommands(programName);
267 }
268 if ( programName == "ssh" ) {
269 processSshCommand(programName);
270 }
271
272 WorkerState::setCurrentState ( WorkerState::RUNNING );
273
274 const char* serialFile = getenv( ENV_VAR_SERIALFILE_INITIAL );
275 if ( serialFile != NULL )
276 {
277 JTRACE ( "loading initial socket table from file..." ) ( serialFile );
278
279 jalib::JBinarySerializeReader rd ( serialFile );
280 UniquePid::serialize ( rd );
281 KernelDeviceToConnection::instance().serialize ( rd );
282
283 #ifdef PID_VIRTUALIZATION
284 VirtualPidTable::instance().serialize ( rd );
285 VirtualPidTable::instance().postExec();
286 #endif
287 SysVIPC::instance().serialize ( rd );
288
289 #ifdef DEBUG
290 JTRACE ( "initial socket table:" );
291 KernelDeviceToConnection::instance().dbgSpamFds();
292 #endif
293
294 _dmtcp_unsetenv ( ENV_VAR_SERIALFILE_INITIAL );
295 }
296 else
297 {
298 JTRACE ( "root of processes tree, checking for pre-existing sockets" );
299
300 #ifdef PID_VIRTUALIZATION
301 if ( getenv( ENV_VAR_ROOT_PROCESS ) != NULL ) {
302 dmtcp::VirtualPidTable::instance().setRootOfProcessTree();
303 _dmtcp_unsetenv( ENV_VAR_ROOT_PROCESS );
304 }
305 #endif
306
307 ConnectionList::instance().scanForPreExisting();
308 }
309
310 connectToCoordinatorWithHandshake();
311
312 WorkerState::setCurrentState ( WorkerState::RUNNING );
313
314 /* Acquire the lock here, so that the checkpoint-thread won't be able to
315 * process CHECKPOINT request until we are done with initializeMtcpEngine()
316 */
317 WRAPPER_EXECUTION_DISABLE_CKPT();
318 initializeMtcpEngine();
319 WRAPPER_EXECUTION_ENABLE_CKPT();
320
321 /*
322 * Now wait for Checkpoint Thread to finish initialization
323 * XXX: This should be the last thing in this constructor
324 */
325 while (!_checkpointThreadInitialized) {
326 struct timespec sleepTime = {0, 10*1000*1000};
327 nanosleep(&sleepTime, NULL);
328 }
329 }
330
331 void dmtcp::DmtcpWorker::cleanupWorker()
332 {
333 pthread_rwlock_t newLock = PTHREAD_RWLOCK_WRITER_NONRECURSIVE_INITIALIZER_NP;
334 theWrapperExecutionLock = newLock;
335
336 pthread_mutex_t newCountLock = PTHREAD_MUTEX_INITIALIZER;
337 unInitializedThreadCountLock = newCountLock;
338
339 pthread_mutex_t newDestroyDmtcpWorker = PTHREAD_MUTEX_INITIALIZER;
340 destroyDmtcpWorker = newDestroyDmtcpWorker;
341
342 unInitializedThreadCount = 0;
343 WorkerState::setCurrentState( WorkerState::UNKNOWN);
344 JTRACE ( "disconnecting from dmtcp coordinator" );
345 _coordinatorSocket.close();
346 }
347
348 void dmtcp::DmtcpWorker::interruptCkpthread()
349 {
350 if (pthread_mutex_trylock(&destroyDmtcpWorker) == EBUSY) {
351 killCkpthread();
352 JASSERT(pthread_mutex_lock(&destroyDmtcpWorker) == 0) (JASSERT_ERRNO);
353 }
354 }
355
356 //called after user main()
357 dmtcp::DmtcpWorker::~DmtcpWorker()
358 {
359
360 if( exitInProgress() ){
361 /*
362 * Exit race fixed. If the user threads calls exit(), ~DmtcpWorker() is
363 * called. Now if the ckpt-thread is trying to use DmtcpWorker object
364 * while it is being destroyed, there is a problem.
365 *
366 * The fix here is to raise the flag exitInProgress in the exit() system
367 * call wrapper. Later in ~DmtcpWorker() we check if the flag has been
368 * raised or not. If the exitInProgress flag has been raised, it closes
369 * the coordinator socket and tries to acquire destroyDmtcpWorker mutex.
370 *
371 * The ckpt-thread tries to acquire the destroyDmtcpWorker mutex before
372 * writing/reading any message to/from coordinator socket while the user
373 * threads are running (i.e. messages like DMT_SUSPEND, DMT_SUSPENDED
374 * etc.)_. If it fails to acquire the lock, it verifies that the
375 * exitInProgress has been raised and performs pthread_exit().
376 *
377 * As obvious, once the user threads have been suspended the ckpt-thread
378 * releases the destroyDmtcpWorker() mutex and continues normal execution.
379 */
380 JTRACE ( "exit() in progress, disconnecting from dmtcp coordinator" );
381 _coordinatorSocket.close();
382 interruptCkpthread();
383 }
384 cleanupWorker();
385 }
386
387 void processDmtcpCommands(dmtcp::string programName)
388 {
389 JASSERT ( programName == "dmtcp_coordinator" ||
390 programName == "dmtcp_checkpoint" ||
391 programName == "dmtcp_restart" ||
392 programName == "dmtcp_command" ||
393 programName == "mtcp_restart" );
394
395 //make sure coordinator connection is closed
396 _real_close ( PROTECTEDFD ( 1 ) );
397
398 //get program args
399 dmtcp::vector<dmtcp::string> args = jalib::Filesystem::GetProgramArgs();
400
401 //now repack args
402 char** argv = new char*[args.size() + 1];
403 memset ( argv, 0, sizeof ( char* ) * ( args.size() + 1 ) );
404
405 for ( size_t i=0; i< args.size(); ++i ) {
406 argv[i] = ( char* ) args[i].c_str();
407 }
408
409 JNOTE ( "re-running without checkpointing" ) ( programName );
410
411 //now re-call the command
412 restoreUserLDPRELOAD();
413 _real_execvp ( jalib::Filesystem::GetProgramPath().c_str(), argv );
414
415 //should be unreachable
416 JASSERT ( false ) (jalib::Filesystem::GetProgramPath()) ( argv[0] )
417 ( JASSERT_ERRNO ) .Text ( "exec() failed" );
418 }
419
420 static void processSshCommand(dmtcp::string programName)
421 {
422 JASSERT ( jalib::Filesystem::GetProgramName() == "ssh" );
423 //make sure coordinator connection is closed
424 _real_close ( PROTECTEDFD ( 1 ) );
425
426 //get prog args
427 dmtcp::vector<dmtcp::string> args = jalib::Filesystem::GetProgramArgs();
428 JASSERT ( args.size() >= 3 ) ( args.size() ).Text ( "ssh must have at least 3 args to be wrapped (ie: ssh host cmd)" );
429
430 //find command part
431 size_t commandStart = 2;
432 for ( size_t i = 1; i < args.size(); ++i )
433 {
434 if ( args[i][0] != '-' )
435 {
436 commandStart = i + 1;
437 break;
438 }
439 }
440 JASSERT ( commandStart < args.size() && args[commandStart][0] != '-' )
441 ( commandStart ) ( args.size() ) ( args[commandStart] )
442 .Text ( "failed to parse ssh command line" );
443
444 //find the start of the command
445 dmtcp::string& cmd = args[commandStart];
446
447
448 const char * coordinatorAddr = getenv ( ENV_VAR_NAME_ADDR );
449 const char * coordinatorPortStr = getenv ( ENV_VAR_NAME_PORT );
450 const char * sigckpt = getenv ( ENV_VAR_SIGCKPT );
451 const char * compression = getenv ( ENV_VAR_COMPRESSION );
452 const char * ckptOpenFiles = getenv ( ENV_VAR_CKPT_OPEN_FILES );
453 const char * ckptDir = getenv ( ENV_VAR_CHECKPOINT_DIR );
454 const char * tmpDir = getenv ( ENV_VAR_TMPDIR );
455 jassert_quiet = *getenv ( ENV_VAR_QUIET ) - '0';
456
457 //modify the command
458
459 //dmtcp::string prefix = "env ";
460
461 dmtcp::string prefix = DMTCP_CHECKPOINT_CMD " --ssh-slave ";
462
463
464 if ( coordinatorAddr != NULL )
465 prefix += dmtcp::string() + "--host " + coordinatorAddr + " ";
466 if ( coordinatorPortStr != NULL )
467 prefix += dmtcp::string() + "--port " + coordinatorPortStr + " ";
468 if ( sigckpt != NULL )
469 prefix += dmtcp::string() + "--mtcp-checkpoint-signal " + sigckpt + " ";
470 if ( ckptDir != NULL )
471 prefix += dmtcp::string() + "--ckptdir " + ckptDir + " ";
472 if ( tmpDir != NULL )
473 prefix += dmtcp::string() + "--tmpdir " + tmpDir + " ";
474 if ( ckptOpenFiles != NULL )
475 prefix += dmtcp::string() + "--checkpoint-open-files" + " ";
476
477 if ( compression != NULL ) {
478 if ( strcmp ( compression, "0" ) == 0 )
479 prefix += "--no-gzip ";
480 else
481 prefix += "--gzip ";
482 }
483
484 // process command
485 size_t semipos, pos;
486 size_t actpos = dmtcp::string::npos;
487 for(semipos = 0; (pos = cmd.find(';',semipos+1)) != dmtcp::string::npos;
488 semipos = pos, actpos = pos);
489
490 if( actpos > 0 && actpos != dmtcp::string::npos ){
491 cmd = cmd.substr(0,actpos+1) + prefix + cmd.substr(actpos+1);
492 } else {
493 cmd = prefix + cmd;
494 }
495
496 //now repack args
497 dmtcp::string newCommand = "";
498 char** argv = new char*[args.size() +2];
499 memset ( argv,0,sizeof ( char* ) * ( args.size() +2 ) );
500
501 for ( size_t i=0; i< args.size(); ++i )
502 {
503 argv[i] = ( char* ) args[i].c_str();
504 newCommand += args[i] + ' ';
505 }
506
507 JNOTE ( "re-running SSH with checkpointing" ) ( newCommand );
508
509 restoreUserLDPRELOAD();
510 //now re-call ssh
511 _real_execvp ( argv[0], argv );
512
513 //should be unreachable
514 JASSERT ( false ) ( cmd ) ( JASSERT_ERRNO ).Text ( "exec() failed" );
515 }
516
517
518 const dmtcp::UniquePid& dmtcp::DmtcpWorker::coordinatorId() const
519 {
520 return _coordinatorId;
521 }
522
523
524 void dmtcp::DmtcpWorker::waitForCoordinatorMsg(dmtcp::string signalStr,
525 DmtcpMessageType type )
526 {
527 if ( type == DMT_DO_SUSPEND ) {
528 if ( pthread_mutex_trylock(&destroyDmtcpWorker) != 0 ) {
529 JTRACE ( "User thread is performing exit()."
530 " ckpt thread exit()ing as well" );
531 pthread_exit(NULL);
532 }
533 if ( exitInProgress() ) {
534 JASSERT(pthread_mutex_unlock(&destroyDmtcpWorker)==0)(JASSERT_ERRNO);
535 pthread_exit(NULL);
536 }
537 }
538
539 dmtcp::DmtcpMessage msg;
540
541 msg.type = DMT_OK;
542 msg.state = WorkerState::currentState();
543 _coordinatorSocket << msg;
544
545 JTRACE ( "waiting for " + signalStr + " Signal" );
546
547 do {
548 msg.poison();
549 _coordinatorSocket >> msg;
550
551 if ( type == DMT_DO_SUSPEND && exitInProgress() ) {
552 JASSERT(pthread_mutex_unlock(&destroyDmtcpWorker)==0)(JASSERT_ERRNO);
553 pthread_exit(NULL);
554 }
555
556 msg.assertValid();
557
558 if ( msg.type == DMT_KILL_PEER ) {
559 JTRACE ( "Received KILL Message from coordinator, exiting" );
560 _exit ( 0 );
561 }
562
563 // The ckpt thread can receive multiple DMT_RESTORE_WAITING or
564 // DMT_FORCE_RESTART messages while waiting for a DMT_DO_REFILL message, we
565 // need to ignore them and wait for the DMT_DO_REFILL message to arrive.
566 if ( type != DMT_DO_REFILL ) {
567 break;
568 }
569
570 } while ( type == DMT_DO_REFILL &&
571 ( msg.type == DMT_RESTORE_WAITING || msg.type == DMT_FORCE_RESTART ) );
572
573 JASSERT ( msg.type == type ) ( msg.type );
574
575 // Coordinator sends some computation information along with the SUSPEND
576 // message. Extracting that.
577 if ( type == DMT_DO_SUSPEND ) {
578 JTRACE ( "Computation information" ) ( msg.compGroup ) ( msg.params[0] );
579 JASSERT ( theCheckpointState != NULL );
580 theCheckpointState->numPeers(msg.params[0]);
581 theCheckpointState->compGroup(msg.compGroup);
582 compGroup = msg.compGroup;
583 }
584 }
585
586 void dmtcp::DmtcpWorker::waitForStage1Suspend()
587 {
588 JTRACE ( "running" );
589
590 WorkerState::setCurrentState ( WorkerState::RUNNING );
591
592 /*
593 * Its only use is to inform the user thread (waiting in DmtcpWorker
594 * constructor) that the checkpoint thread has finished initialization. This
595 * is to serialize DmtcpWorker-Constructor(), mtcp_init(), checkpoint-thread
596 * initialization and user main(). As obvious, this is only effective when
597 * the process is being initialized.
598 */
599 if (!_checkpointThreadInitialized) {
600 /*
601 * We should not call this function any higher in the logic because it
602 * calls setenv() and if it is running under bash, then it getenv() will
603 * not work between the call to setenv() and bash main().
604 */
605 restoreUserLDPRELOAD();
606 _checkpointThreadInitialized = true;
607 }
608
609 if ( compGroup != UniquePid() ) {
610 dmtcp::string signatureFile = UniquePid::getTmpDir() + "/"
611 + compGroup.toString() + "-"
612 #ifdef PID_VIRTUALIZATION
613 + jalib::XToString ( _real_getppid() );
614 #else
615 + jalib::XToString ( getppid() );
616 #endif
617 JTRACE("creating signature file") (signatureFile)(_real_getpid());
618 int fd = _real_open ( signatureFile.c_str(), O_CREAT|O_WRONLY, 0600 );
619 JASSERT ( fd != -1 ) ( fd ) ( signatureFile )
620 .Text ( "Unable to create signature file" );
621 dmtcp::string pidstr = jalib::XToString(_real_getpid());
622 // FIXME: This assumes write is small, always completes
623 JASSERT( pidstr.length()+1
624 == write(fd, pidstr.c_str(), pidstr.length()+1) )
625 ( pidstr.length()+1 );
626 _real_close(fd);
627 }
628
629 if ( theCheckpointState != NULL ) {
630 delete theCheckpointState;
631 theCheckpointState = NULL;
632 }
633
634 theCheckpointState = new ConnectionState();
635
636 #ifdef EXTERNAL_SOCKET_HANDLING
637 JASSERT ( _waitingForExternalSocketsToClose == true ||
638 externalTcpConnections.empty() == true );
639
640 while ( externalTcpConnections.empty() == false ) {
641 JTRACE("Waiting for externalSockets toClose")
642 (_waitingForExternalSocketsToClose);
643 sleep ( 1 );
644 }
645 if ( _waitingForExternalSocketsToClose == true ) {
646 DmtcpMessage msg ( DMT_EXTERNAL_SOCKETS_CLOSED );
647 _coordinatorSocket << msg;
648 _waitingForExternalSocketsToClose = false;
649 JTRACE("externalSocketsClosed") (_waitingForExternalSocketsToClose);
650 }
651 #endif
652
653 waitForCoordinatorMsg ( "SUSPEND", DMT_DO_SUSPEND );
654
655 JTRACE ( "got SUSPEND signal, waiting for dmtcp_lock():"
656 " to get synchronized with _runCoordinatorCmd if we use DMTCP API" );
657 _dmtcp_lock();
658 // TODO: may be it is better to move unlock to more appropriate place.
659 // For example after suspending all threads
660 _dmtcp_unlock();
661
662
663 JTRACE ( "got SUSPEND signal, waiting for lock(&theCkptCanStart)" );
664 JASSERT(pthread_mutex_lock(&theCkptCanStart)==0)(JASSERT_ERRNO);
665
666 JTRACE ( "got SUSPEND signal,"
667 " waiting for other threads to exit DMTCP-Wrappers" );
668 JASSERT(pthread_rwlock_wrlock(&theWrapperExecutionLock) == 0)(JASSERT_ERRNO);
669 JTRACE ( "got SUSPEND signal,"
670 " waiting for newly created threads to finish initialization" )
671 (unInitializedThreadCount);
672 waitForThreadsToFinishInitialization();
673
674 JTRACE ( "Starting checkpoint, suspending..." );
675 }
676
677 #ifdef EXTERNAL_SOCKET_HANDLING
678 bool dmtcp::DmtcpWorker::waitForStage2Checkpoint()
679 #else
680 void dmtcp::DmtcpWorker::waitForStage2Checkpoint()
681 #endif
682 {
683 WorkerState::setCurrentState ( WorkerState::SUSPENDED );
684 JTRACE ( "suspended" );
685
686 if ( exitInProgress() ) {
687 JASSERT(pthread_mutex_unlock(&destroyDmtcpWorker)==0)(JASSERT_ERRNO);
688 pthread_exit(NULL);
689 }
690
691 JASSERT(_coordinatorSocket.isValid());
692 JASSERT(pthread_mutex_unlock(&destroyDmtcpWorker)==0)(JASSERT_ERRNO);
693
694 JASSERT(pthread_rwlock_unlock(&theWrapperExecutionLock) == 0)(JASSERT_ERRNO);
695
696 JASSERT(pthread_mutex_unlock(&theCkptCanStart)==0)(JASSERT_ERRNO);
697
698 theCheckpointState->preLockSaveOptions();
699
700 waitForCoordinatorMsg ( "LOCK", DMT_DO_LOCK_FDS );
701
702 JTRACE ( "locking..." );
703 JASSERT ( theCheckpointState != NULL );
704 theCheckpointState->preCheckpointLock();
705 JTRACE ( "locked" );
706
707 /*
708 * Save first 2 * sizeof(pid_t) bytes of each shared memory area and fill it
709 * with all zeros.
710 */
711 SysVIPC::instance().prepareForLeaderElection();
712
713 WorkerState::setCurrentState ( WorkerState::FD_LEADER_ELECTION );
714
715 #ifdef EXTERNAL_SOCKET_HANDLING
716 if ( waitForStage2bCheckpoint() == false ) {
717 return false;
718 }
719 #else
720 waitForCoordinatorMsg ( "DRAIN", DMT_DO_DRAIN );
721 #endif
722
723 JTRACE ( "draining..." );
724 theCheckpointState->preCheckpointDrain();
725 JTRACE ( "drained" );
726
727 /*
728 * write pid at offset 0. Also write pid at offset sizeof(pid_t) if this
729 * process is the creator of this memory area. After the leader election
730 * barrier, the leader of the shared-memory object is the creater of the
731 * object. If the creater process is missing, then the leader process is the
732 * process whose pid is stored at offset 0
733 */
734 SysVIPC::instance().leaderElection();
735
736 WorkerState::setCurrentState ( WorkerState::DRAINED );
737
738 waitForCoordinatorMsg ( "CHECKPOINT", DMT_DO_CHECKPOINT );
739 JTRACE ( "got checkpoint signal" );
740
741 #if HANDSHAKE_ON_CHECKPOINT == 1
742 //handshake is done after one barrier after drain
743 JTRACE ( "beginning handshakes" );
744 theCheckpointState->preCheckpointHandshakes(coordinatorId());
745 JTRACE ( "handshaking done" );
746 #endif
747
748 // JTRACE("writing *.dmtcp file");
749 // theCheckpointState->outputDmtcpConnectionTable();
750
751 #ifdef PID_VIRTUALIZATION
752 dmtcp::VirtualPidTable::instance().preCheckpoint();
753 #endif
754
755 SysVIPC::instance().preCheckpoint();
756
757 #ifdef EXTERNAL_SOCKET_HANDLING
758 return true;
759 #endif
760 }
761
762 #ifdef EXTERNAL_SOCKET_HANDLING
763 bool dmtcp::DmtcpWorker::waitForStage2bCheckpoint()
764 {
765 waitForCoordinatorMsg ( "PEER_LOOKUP", DMT_DO_PEER_LOOKUP );
766 JTRACE ( "Looking up Socket Peers..." );
767 theTcpConnections.clear();
768 theCheckpointState->preCheckpointPeerLookup(theTcpConnections);
769 sendPeerLookupRequest(theTcpConnections);
770 JTRACE ( "Done Socket Peer Lookup" );
771
772
773 WorkerState::setCurrentState ( WorkerState::PEER_LOOKUP_COMPLETE );
774
775 {
776 dmtcp::DmtcpMessage msg;
777
778 msg.type = DMT_OK;
779 msg.state = WorkerState::currentState();
780 _coordinatorSocket << msg;
781
782 JTRACE ( "waiting for DRAIN/RESUME Signal" );
783
784 do {
785 msg.poison();
786 _coordinatorSocket >> msg;
787 msg.assertValid();
788
789 if ( msg.type == DMT_KILL_PEER ) {
790 JTRACE ( "Received KILL Message from coordinator, exiting" );
791 _exit ( 0 );
792 }
793 JTRACE ( "received message" ) (msg.type );
794 if ( msg.type != DMT_UNKNOWN_PEER )
795 break;
796
797 JTRACE ("received DMT_UNKNOWN_PEER message") (msg.conId);
798
799 TcpConnection* con =
800 (TcpConnection*) &( ConnectionList::instance() [msg.conId] );
801 con->markExternal();
802 externalTcpConnections.push_back(msg.conId);
803 _waitingForExternalSocketsToClose = true;
804
805 } while ( msg.type == DMT_UNKNOWN_PEER );
806
807 JASSERT ( msg.type == DMT_DO_DRAIN || msg.type == DMT_DO_RESUME )
808 ( msg.type );
809
810 ConnectionList& connections = ConnectionList::instance();
811
812 // Tcp Accept and Connect connection with PeerType UNKNOWN should be marked as INTERNAL
813 for ( ConnectionList::iterator i = connections.begin()
814 ; i!= connections.end()
815 ; ++i )
816 {
817 Connection* con = i->second;
818 if ( con->conType() == Connection::TCP ) {
819 TcpConnection* tcpCon = (TcpConnection *) con;
820 if ( (tcpCon->tcpType() == TcpConnection::TCP_ACCEPT ||
821 tcpCon->tcpType() == TcpConnection::TCP_CONNECT) &&
822 tcpCon->peerType() == TcpConnection::PEER_UNKNOWN )
823 tcpCon->markInternal();
824 }
825 }
826 if ( msg.type == DMT_DO_RESUME ) {
827 JTRACE ( "Peer Lookup not complete, skipping checkpointing \n\n\n\n\n");
828 return false;
829 }
830
831 JASSERT (msg.type == DMT_DO_DRAIN);
832 }
833 }
834
835 void dmtcp::DmtcpWorker::sendPeerLookupRequest (dmtcp::vector<TcpConnectionInfo>& conInfoTable )
836 {
837 for (int i = 0; i < conInfoTable.size(); ++i) {
838 DmtcpMessage msg;
839 msg.type = DMT_PEER_LOOKUP;
840 msg.localAddr = conInfoTable[i].localAddr();
841 msg.remoteAddr = conInfoTable[i].remoteAddr();
842 msg.localAddrlen = conInfoTable[i].addrlen();
843 msg.conId = conInfoTable[i].conId();
844
845 _coordinatorSocket << msg;
846 }
847 }
848
849 bool dmtcp::DmtcpWorker::waitingForExternalSocketsToClose() {
850 return _waitingForExternalSocketsToClose;
851 }
852 #endif
853
854 void dmtcp::DmtcpWorker::writeCheckpointPrefix ( int fd )
855 {
856 const int len = strlen(DMTCP_FILE_HEADER);
857 JASSERT(write(fd, DMTCP_FILE_HEADER, len)==len);
858
859 theCheckpointState->outputDmtcpConnectionTable(fd);
860 }
861
862 void dmtcp::DmtcpWorker::sendCkptFilenameToCoordinator()
863 {
864 // Tell coordinator to record our filename in the restart script
865 dmtcp::string ckptFilename = dmtcp::UniquePid::checkpointFilename();
866 dmtcp::string hostname = jalib::Filesystem::GetCurrentHostname();
867 JTRACE ( "recording filenames" ) ( ckptFilename ) ( hostname );
868 dmtcp::DmtcpMessage msg;
869 msg.type = DMT_CKPT_FILENAME;
870 msg.extraBytes = ckptFilename.length() +1 + hostname.length() +1;
871 _coordinatorSocket << msg;
872 _coordinatorSocket.writeAll ( ckptFilename.c_str(), ckptFilename.length() +1 );
873 _coordinatorSocket.writeAll ( hostname.c_str(), hostname.length() +1 );
874 }
875
876 void dmtcp::DmtcpWorker::postRestart()
877 {
878 JTRACE("begin postRestart()");
879
880 WorkerState::setCurrentState(WorkerState::RESTARTING);
881 recvCoordinatorHandshake();
882
883 JASSERT ( theCheckpointState != NULL );
884 theCheckpointState->postRestart();
885
886 if ( jalib::Filesystem::GetProgramName() == "screen" )
887 send_sigwinch = 1;
888 // With hardstatus (bottom status line), screen process has diff. size window
889 // Must send SIGWINCH to adjust it.
890 // MTCP will send SIGWINCH to process on restart. This will force 'screen'
891 // to execute ioctl wrapper. The wrapper will report a changed winsize,
892 // so that 'screen' must re-initialize the screen (scrolling resions, etc.).
893 // The wrapper will also send a second SIGWINCH. Then 'screen' will
894 // call ioctl and get the correct window size and resize again.
895 // We can't just send two SIGWINCH's now, since window size has not
896 // changed yet, and 'screen' will assume that there's nothing to do.
897
898 #ifdef PID_VIRTUALIZATION
899 dmtcp::VirtualPidTable::instance().postRestart();
900 #endif
901 SysVIPC::instance().postRestart();
902 }
903
904 void dmtcp::DmtcpWorker::waitForStage3Refill( bool isRestart )
905 {
906 JTRACE ( "checkpointed" );
907
908 WorkerState::setCurrentState ( WorkerState::CHECKPOINTED );
909
910 waitForCoordinatorMsg ( "REFILL", DMT_DO_REFILL );
911
912 JASSERT ( theCheckpointState != NULL );
913 theCheckpointState->postCheckpoint(isRestart);
914 delete theCheckpointState;
915 theCheckpointState = NULL;
916
917 SysVIPC::instance().postCheckpoint();
918 }
919
920 void dmtcp::DmtcpWorker::waitForStage4Resume()
921 {
922 JTRACE ( "refilled" );
923 WorkerState::setCurrentState ( WorkerState::REFILLED );
924 waitForCoordinatorMsg ( "RESUME", DMT_DO_RESUME );
925 JTRACE ( "got resume signal" );
926
927 SysVIPC::instance().preResume();
928 }
929
930 void dmtcp::DmtcpWorker::restoreVirtualPidTable()
931 {
932 #ifdef PID_VIRTUALIZATION
933 dmtcp::VirtualPidTable::instance().readPidMapsFromFile();
934 dmtcp::VirtualPidTable::instance().restoreProcessGroupInfo();
935 #endif
936 }
937
938 void dmtcp::DmtcpWorker::restoreSockets(ConnectionState& coordinator,
939 dmtcp::UniquePid compGroup,
940 int numPeers,
941 int& coordTstamp)
942 {
943 JTRACE ( "restoreSockets begin" );
944
945 theRestorePort = RESTORE_PORT_START;
946
947 //open up restore socket
948 {
949 jalib::JSocket restoreSocket ( -1 );
950 while ( !restoreSocket.isValid() && theRestorePort < RESTORE_PORT_STOP )
951 {
952 restoreSocket = jalib::JServerSocket ( jalib::JSockAddr::ANY, ++theRestorePort );
953 JTRACE ( "open listen socket attempt" ) ( theRestorePort );
954 }
955 JASSERT ( restoreSocket.isValid() ) ( RESTORE_PORT_START ).Text ( "failed to open listen socket" );
956 restoreSocket.changeFd ( _restoreSocket.sockfd() );
957 JTRACE ( "opening listen sockets" ) ( _restoreSocket.sockfd() ) ( restoreSocket.sockfd() );
958 _restoreSocket = restoreSocket;
959 }
960
961 //reconnect to our coordinator
962 connectToCoordinatorWithoutHandshake();
963 sendCoordinatorHandshake(jalib::Filesystem::GetProgramName(),
964 compGroup, numPeers, DMT_RESTART_PROCESS);
965 recvCoordinatorHandshake(&coordTstamp);
966 JTRACE("Connected to coordinator")(coordTstamp);
967
968 // finish sockets restoration
969 coordinator.doReconnect ( _coordinatorSocket,_restoreSocket );
970
971 JTRACE ( "sockets restored!" );
972
973 }
974
975 void dmtcp::DmtcpWorker::delayCheckpointsLock(){
976 JASSERT(pthread_mutex_lock(&theCkptCanStart)==0)(JASSERT_ERRNO);
977 }
978
979 void dmtcp::DmtcpWorker::delayCheckpointsUnlock(){
980 JASSERT(pthread_mutex_unlock(&theCkptCanStart)==0)(JASSERT_ERRNO);
981 }
982
983 // XXX: Handle deadlock error code
984 // NOTE: Don't do any fancy stuff in this wrapper which can cause the process to go into DEADLOCK
985 bool dmtcp::DmtcpWorker::wrapperExecutionLockLock()
986 {
987 #ifdef PTRACE
988 return false;
989 #endif
990 int saved_errno = errno;
991 bool lockAcquired = false;
992 if ( dmtcp::WorkerState::currentState() == dmtcp::WorkerState::RUNNING ) {
993 int retVal = pthread_rwlock_rdlock(&theWrapperExecutionLock);
994 if ( retVal != 0 && retVal != EDEADLK ) {
995 perror ( "ERROR DmtcpWorker::wrapperExecutionLockLock: Failed to acquire lock" );
996 _exit(1);
997 }
998 // retVal should always be 0 (success) here.
999 lockAcquired = retVal == 0 ? true : false;
1000 }
1001 errno = saved_errno;
1002 return lockAcquired;
1003 }
1004
1005 // NOTE: Don't do any fancy stuff in this wrapper which can cause the process to go into DEADLOCK
1006 void dmtcp::DmtcpWorker::wrapperExecutionLockUnlock()
1007 {
1008 int saved_errno = errno;
1009 if ( dmtcp::WorkerState::currentState() != dmtcp::WorkerState::RUNNING ) {
1010 printf ( "ERROR: DmtcpWorker::wrapperExecutionLockUnlock: This process is not in \n"
1011 "RUNNING state and yet this thread managed to acquire the wrapperExecutionLock.\n"
1012 "This should not be happening, something is wrong." );
1013 _exit(1);
1014 }
1015 if ( pthread_rwlock_unlock(&theWrapperExecutionLock) != 0) {
1016 perror ( "ERROR DmtcpWorker::wrapperExecutionLockUnlock: Failed to release lock" );
1017 _exit(1);
1018 }
1019 errno = saved_errno;
1020 }
1021
1022 void dmtcp::DmtcpWorker::waitForThreadsToFinishInitialization()
1023 {
1024 while (unInitializedThreadCount != 0) {
1025 struct timespec sleepTime = {0, 10*1000*1000};
1026 nanosleep(&sleepTime, NULL);
1027 }
1028 }
1029
1030 void dmtcp::DmtcpWorker::incrementUninitializedThreadCount()
1031 {
1032 int saved_errno = errno;
1033 if ( dmtcp::WorkerState::currentState() == dmtcp::WorkerState::RUNNING ) {
1034 JASSERT(pthread_mutex_lock(&unInitializedThreadCountLock) == 0) (JASSERT_ERRNO);
1035 unInitializedThreadCount++;
1036 //JTRACE(":") (unInitializedThreadCount);
1037 JASSERT(pthread_mutex_unlock(&unInitializedThreadCountLock) == 0) (JASSERT_ERRNO);
1038 }
1039 errno = saved_errno;
1040 }
1041
1042 void dmtcp::DmtcpWorker::decrementUninitializedThreadCount()
1043 {
1044 int saved_errno = errno;
1045 if ( dmtcp::WorkerState::currentState() == dmtcp::WorkerState::RUNNING ) {
1046 JASSERT(pthread_mutex_lock(&unInitializedThreadCountLock) == 0) (JASSERT_ERRNO);
1047 JASSERT(unInitializedThreadCount > 0) (unInitializedThreadCount);
1048 unInitializedThreadCount--;
1049 //JTRACE(":") (unInitializedThreadCount);
1050 JASSERT(pthread_mutex_unlock(&unInitializedThreadCountLock) == 0) (JASSERT_ERRNO);
1051 }
1052 errno = saved_errno;
1053 }
1054
1055 void dmtcp::DmtcpWorker::connectAndSendUserCommand(char c, int* result /*= NULL*/)
1056 {
1057 //prevent checkpoints from starting
1058 delayCheckpointsLock();
1059 {
1060 if ( tryConnectToCoordinator() == false ) {
1061 *result = DmtcpCoordinator::ERROR_COORDINATOR_NOT_FOUND;
1062 return;
1063 }
1064 sendUserCommand(c,result);
1065 _coordinatorSocket.close();
1066 }
1067 delayCheckpointsUnlock();
1068 }
1069
1070 //tell the coordinator to run given user command
1071 void dmtcp::DmtcpWorker::sendUserCommand(char c, int* result /*= NULL*/)
1072 {
1073 DmtcpMessage msg,reply;
1074
1075 //send
1076 msg.type = DMT_USER_CMD;
1077 msg.params[0] = c;
1078
1079 if (c == 'i') {
1080 const char* interval = getenv ( ENV_VAR_CKPT_INTR );
1081 if ( interval != NULL )
1082 msg.theCheckpointInterval = jalib::StringToInt ( interval );
1083 }
1084
1085 _coordinatorSocket << msg;
1086
1087 //the coordinator will violently close our socket...
1088 if(c=='q' || c=='Q'){
1089 result[0]=0;
1090 return;
1091 }
1092
1093 //receive REPLY
1094 reply.poison();
1095 _coordinatorSocket >> reply;
1096 reply.assertValid();
1097 JASSERT ( reply.type == DMT_USER_CMD_RESULT );
1098
1099 if(result!=NULL){
1100 memcpy( result, reply.params, sizeof(reply.params) );
1101 }
1102 }
1103
1104
1105 /*!
1106 \fn dmtcp::DmtcpWorker::connectToCoordinator()
1107 */
1108 bool dmtcp::DmtcpWorker::tryConnectToCoordinator()
1109 {
1110 return connectToCoordinator ( false );
1111 }
1112
1113 void dmtcp::DmtcpWorker::connectToCoordinatorWithoutHandshake()
1114 {
1115 connectToCoordinator ( );
1116 }
1117
1118 void dmtcp::DmtcpWorker::connectToCoordinatorWithHandshake()
1119 {
1120 connectToCoordinator ( );
1121 JTRACE("CONNECT TO coordinator, trying to handshake");
1122 sendCoordinatorHandshake(jalib::Filesystem::GetProgramName());
1123 recvCoordinatorHandshake();
1124 }
1125
1126 bool dmtcp::DmtcpWorker::connectToCoordinator(bool dieOnError /*= true*/)
1127 {
1128
1129 const char * coordinatorAddr = getenv ( ENV_VAR_NAME_ADDR );
1130 const char * coordinatorPortStr = getenv ( ENV_VAR_NAME_PORT );
1131 dmtcp::UniquePid zeroGroup;
1132
1133 if ( coordinatorAddr == NULL ) coordinatorAddr = DEFAULT_HOST;
1134 int coordinatorPort = coordinatorPortStr==NULL ? DEFAULT_PORT : jalib::StringToInt ( coordinatorPortStr );
1135
1136 jalib::JSocket oldFd = _coordinatorSocket;
1137
1138 _coordinatorSocket = jalib::JClientSocket ( coordinatorAddr,coordinatorPort );
1139
1140 if ( ! _coordinatorSocket.isValid() && ! dieOnError ) {
1141 return false;
1142 }
1143
1144 JASSERT ( _coordinatorSocket.isValid() )
1145 ( coordinatorAddr ) ( coordinatorPort )
1146 .Text ( "Failed to connect to DMTCP coordinator" );
1147
1148 JTRACE ( "connected to dmtcp coordinator, no handshake" )
1149 ( coordinatorAddr ) ( coordinatorPort );
1150
1151 if ( oldFd.isValid() )
1152 {
1153 JTRACE ( "restoring old coordinatorsocket fd" )
1154 ( oldFd.sockfd() ) ( _coordinatorSocket.sockfd() );
1155
1156 _coordinatorSocket.changeFd ( oldFd.sockfd() );
1157 }
1158 return true;
1159 }
1160
1161 void dmtcp::DmtcpWorker::sendCoordinatorHandshake(const dmtcp::string& progname,
1162 UniquePid compGroup /*= UniquePid()*/,
1163 int np /*= -1*/,
1164 DmtcpMessageType msgType /*= DMT_HELLO_COORDINATOR*/)
1165 {
1166 JTRACE("sending coordinator handshake")(UniquePid::ThisProcess());
1167
1168 dmtcp::string hostname = jalib::Filesystem::GetCurrentHostname();
1169 dmtcp::DmtcpMessage hello_local;
1170 hello_local.type = msgType;
1171 hello_local.params[0] = np;
1172 hello_local.compGroup = compGroup;
1173 hello_local.restorePort = theRestorePort;
1174
1175 const char* interval = getenv ( ENV_VAR_CKPT_INTR );
1176 if ( interval != NULL )
1177 hello_local.theCheckpointInterval = jalib::StringToInt ( interval );
1178
1179 hello_local.extraBytes = hostname.length() + 1 + progname.length() + 1;
1180 _coordinatorSocket << hello_local;
1181 _coordinatorSocket.writeAll( hostname.c_str(),hostname.length()+1);
1182 _coordinatorSocket.writeAll( progname.c_str(),progname.length()+1);
1183 }
1184
1185 void dmtcp::DmtcpWorker::recvCoordinatorHandshake( int *param1 ){
1186 JTRACE("receiving coordinator handshake");
1187
1188 dmtcp::DmtcpMessage hello_remote;
1189 hello_remote.poison();
1190 _coordinatorSocket >> hello_remote;
1191 hello_remote.assertValid();
1192
1193 if ( param1 == NULL )
1194 JASSERT ( hello_remote.type == dmtcp::DMT_HELLO_WORKER ) ( hello_remote.type );
1195 else
1196 JASSERT ( hello_remote.type == dmtcp::DMT_RESTART_PROCESS_REPLY ) ( hello_remote.type );
1197
1198 _coordinatorId = hello_remote.coordinator;
1199 DmtcpMessage::setDefaultCoordinator ( _coordinatorId );
1200 if( param1 ){
1201 *param1 = hello_remote.params[0];
1202 }
1203 JTRACE("Coordinator handshake RECEIVED!!!!!");
1204 }
1205
1206 void dmtcp::DmtcpWorker::startCoordinatorIfNeeded(int modes, int isRestart){
1207 const static int CS_OK = 91;
1208 const static int CS_NO = 92;
1209 int coordinatorStatus = -1;
1210
1211 if (modes & COORD_BATCH) {
1212 startNewCoordinator ( modes, isRestart );
1213 return;
1214 }
1215 //fork a child process to probe the coordinator
1216 if (fork()==0) {
1217 //fork so if we hit an error parent won't die
1218 dup2(2,1); //copy stderr to stdout
|
Event negative_return_fn: |
Function "open("/dev/null", 2)" returns a negative number. |
|
Event negative_returns: |
"open("/dev/null", 2)" is passed to a parameter that cannot be negative. |
1219 dup2(open("/dev/null",O_RDWR), 2); //close stderr
1220 int result[DMTCPMESSAGE_NUM_PARAMS];
1221 dmtcp::DmtcpCoordinatorAPI coordinatorAPI;
1222 {
1223 if ( coordinatorAPI.tryConnectToCoordinator() == false ) {
1224 JTRACE("Coordinator not found. Will try to start a new one.");
1225 _real_exit(1);
1226 }
1227 }
1228
1229 coordinatorAPI.sendUserCommand('s',result);
1230 coordinatorAPI._coordinatorSocket.close();
1231
1232 // result[0] == numPeers of coord; bool result[1] == computation is running
1233 if(result[0]==0 || result[1] ^ isRestart){
1234 if(result[0] != 0) {
1235 int num_processes = result[0];
1236 JTRACE("Joining existing computation.") (num_processes);
1237 }
1238 _real_exit(CS_OK);
1239 }else{
1240 JTRACE("Existing computation not in a running state," \
1241 " perhaps checkpoint in progress?");
1242 _real_exit(CS_NO);
1243 }
1244 }
1245 errno = 0;
1246 // FIXME: wait() could return -1 if a signal happened before child exits
1247 JASSERT(::wait(&coordinatorStatus)>0)(JASSERT_ERRNO);
1248 JASSERT(WIFEXITED(coordinatorStatus));
1249
1250 //is coordinator running?
1251 if (WEXITSTATUS(coordinatorStatus) != CS_OK) {
1252 //is coordinator in funny state?
1253 if(WEXITSTATUS(coordinatorStatus) == CS_NO){
1254 JASSERT (false) (isRestart)
1255 .Text ("Coordinator in a funny state. Peers exist, not restarting," \
1256 "\n but not in a running state. Checkpointing?" \
1257 "\n Or maybe restarting and running with peers existing?");
1258 }else{
1259 JTRACE("Bad result found for coordinator. Try a new one.");
1260 }
1261
1262 JTRACE("Coordinator not found. Starting a new one.");
1263 startNewCoordinator ( modes, isRestart );
1264
1265 }else{
1266 if (modes & COORD_FORCE_NEW) {
1267 JTRACE("Forcing new coordinator. --new-coordinator flag given.");
1268 startNewCoordinator ( modes, isRestart );
1269 return;
1270 }
1271 JASSERT( modes & COORD_JOIN )
1272 .Text("Coordinator already running, but '--new' flag was given.");
1273 }
1274 }
1275
1276 void dmtcp::DmtcpWorker::startNewCoordinator(int modes, int isRestart)
1277 {
1278 int coordinatorStatus = -1;
1279 //get location of coordinator
1280 const char * coordinatorAddr = getenv ( ENV_VAR_NAME_ADDR );
1281 if(coordinatorAddr==NULL) coordinatorAddr = DEFAULT_HOST;
1282 const char * coordinatorPortStr = getenv ( ENV_VAR_NAME_PORT );
1283 int coordinatorPort = coordinatorPortStr==NULL ? DEFAULT_PORT
1284 : jalib::StringToInt(coordinatorPortStr);
1285
1286 dmtcp::string s=coordinatorAddr;
1287 if(s!="localhost" && s!="127.0.0.1" && s!=jalib::Filesystem::GetCurrentHostname()){
1288 JASSERT(false)
1289 .Text("Won't automatically start coordinator because DMTCP_HOST is set to a remote host.");
1290 _real_exit(1);
1291 }
1292
1293 if ( modes & COORD_BATCH || modes & COORD_FORCE_NEW ) {
1294 // Create a socket and bind it to an unused port.
1295 jalib::JServerSocket coordinatorListenerSocket ( jalib::JSockAddr::ANY, 0 );
1296 errno = 0;
1297 JASSERT ( coordinatorListenerSocket.isValid() )
1298 ( coordinatorListenerSocket.port() ) ( JASSERT_ERRNO )
1299 .Text ( "Failed to create listen socket."
1300 "\nIf msg is \"Address already in use\", this may be an old coordinator."
1301 "\nKill other coordinators and try again in a minute or so." );
1302 // Now dup the sockfd to
1303 coordinatorListenerSocket.changeFd(PROTECTEDFD(1));
1304
1305 dmtcp::string coordPort= jalib::XToString(coordinatorListenerSocket.port());
1306 setenv ( ENV_VAR_NAME_PORT, coordPort.c_str(), 1 );
1307 }
1308
1309 JTRACE("Starting a new coordinator automatically.") (coordinatorPortStr);
1310
1311 if(fork()==0){
1312 dmtcp::string coordinator = jalib::Filesystem::FindHelperUtility("dmtcp_coordinator");
1313 char *modeStr = (char *)"--background";
1314 if ( modes & COORD_BATCH ) {
1315 modeStr = (char *)"--batch";
1316 }
1317 char * args[] = {
1318 (char*)coordinator.c_str(),
1319 (char*)"--exit-on-last",
1320 modeStr,
1321 NULL
1322 };
1323 execv(args[0], args);
1324 JASSERT(false)(coordinator)(JASSERT_ERRNO).Text("exec(dmtcp_coordinator) failed");
1325 } else {
1326 _real_close ( PROTECTEDFD (1) );
1327 }
1328
1329 errno = 0;
1330
1331 if ( modes & COORD_BATCH ) {
1332 // FIXME: If running in batch Mode, we sleep here for 5 seconds to let
1333 // the coordinator get started up. We need to fix this in future.
1334 sleep(5);
1335 } else {
1336 JASSERT(::wait(&coordinatorStatus)>0)(JASSERT_ERRNO);
1337
1338 JASSERT(WEXITSTATUS(coordinatorStatus) == 0)
1339 .Text("Failed to start coordinator, port already in use. You may use a different port by running with \'-p 12345\'\n");
1340 }
1341 }
1342
1343 //to allow linking without mtcpinterface
1344 void __attribute__ ((weak)) dmtcp::initializeMtcpEngine()
1345 {
1346 JASSERT(false).Text("should not be called");
1347 }
1348
1349 void __attribute__ ((weak)) dmtcp::killCkpthread()
1350 {
1351 JASSERT(false).Text("should not be called");
1352 }