1    	/****************************************************************************
2    	 *   Copyright (C) 2006-2010 by Jason Ansel, Kapil Arya, and Gene Cooperman *
3    	 *   jansel@csail.mit.edu, kapil@ccs.neu.edu, gene@ccs.neu.edu              *
4    	 *                                                                          *
5    	 *   This file is part of the dmtcp/src module of DMTCP (DMTCP:dmtcp/src).  *
6    	 *                                                                          *
7    	 *  DMTCP:dmtcp/src is free software: you can redistribute it and/or        *
8    	 *  modify it under the terms of the GNU Lesser General Public License as   *
9    	 *  published by the Free Software Foundation, either version 3 of the      *
10   	 *  License, or (at your option) any later version.                         *
11   	 *                                                                          *
12   	 *  DMTCP:dmtcp/src is distributed in the hope that it will be useful,      *
13   	 *  but WITHOUT ANY WARRANTY; without even the implied warranty of          *
14   	 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the           *
15   	 *  GNU Lesser General Public License for more details.                     *
16   	 *                                                                          *
17   	 *  You should have received a copy of the GNU Lesser General Public        *
18   	 *  License along with DMTCP:dmtcp/src.  If not, see                        *
19   	 *  <http://www.gnu.org/licenses/>.                                         *
20   	 ****************************************************************************/
21   	
22   	#include "kernelbufferdrainer.h"
23   	
24   	#include  "../jalib/jassert.h"
25   	#include "constants.h"
26   	#include "sockettable.h"
27   	#include  "../jalib/jbuffer.h"
28   	#include "connectionmanager.h"
29   	#include "syscallwrappers.h"
30   	
31   	namespace
32   	{
33   	  const char theMagicDrainCookie[] = SOCKET_DRAIN_MAGIC_COOKIE_STR;
34   	
35   	  void scaleSendBuffers ( double factor )
36   	  {
37   	    //todo resize buffers to avoid blocking
38   	  }
39   	
40   	}
41   	
42   	
43   	void dmtcp::KernelBufferDrainer::onConnect ( const jalib::JSocket& sock, const struct sockaddr* remoteAddr,socklen_t remoteLen )
44   	{
45   	  JWARNING ( false ) ( sock.sockfd() ).Text ( "we don't yet support checkpointing non-accepted connections... restore will likely fail.. closing connection" );
46   	  jalib::JSocket ( sock ).close();
47   	}
48   	
49   	void dmtcp::KernelBufferDrainer::onData ( jalib::JReaderInterface* sock )
50   	{
51   	  dmtcp::vector<char>& buffer = _drainedData[sock->socket().sockfd() ];
52   	  buffer.resize ( buffer.size() + sock->bytesRead() );
53   	  int startIdx = buffer.size() - sock->bytesRead();
54   	  memcpy ( &buffer[startIdx],sock->buffer(),sock->bytesRead() );
55   	//     JTRACE("got buffer chunk")(sock->bytesRead());
56   	  sock->reset();
57   	}
58   	void dmtcp::KernelBufferDrainer::onDisconnect ( jalib::JReaderInterface* sock )
59   	{
60   	  int fd;
61   	  errno = 0;
62   	  fd = sock->socket().sockfd();
63   	  //check if this was on purpose
64   	  if ( fd < 0 ) return;
65   	  JTRACE ( "found disconnected socket... marking it dead" )
66   	      (fd)(_reverseLookup[fd])( JASSERT_ERRNO );
67   	  _disconnectedSockets.push_back(_reverseLookup[fd]);
68   	  _drainedData.erase ( fd );
69   	}
70   	void dmtcp::KernelBufferDrainer::onTimeoutInterval()
71   	{
72   	  int count = 0;
73   	  for ( size_t i = 0; i < _dataSockets.size();++i )
74   	  {
75   	    if ( _dataSockets[i]->bytesRead() > 0 ) onData ( _dataSockets[i] );
76   	    dmtcp::vector<char>& buffer = _drainedData[_dataSockets[i]->socket().sockfd() ];
77   	    if ( buffer.size() >= sizeof ( theMagicDrainCookie )
78   		 && memcmp ( &buffer[buffer.size() - sizeof ( theMagicDrainCookie ) ]
79   	                     , theMagicDrainCookie
80   	                     , sizeof ( theMagicDrainCookie ) ) == 0 )
81   	    {
82   	      buffer.resize ( buffer.size() - sizeof ( theMagicDrainCookie ) );
83   	      JTRACE ( "buffer drain complete" ) ( _dataSockets[i]->socket().sockfd() ) ( buffer.size() ) ( ( _dataSockets.size() ) );
84   	      _dataSockets[i]->socket() = -1; //poison socket
85   	    }
86   	    else
87   	      ++count;
88   	  }
89   	
90   	  if ( count == 0 )
91   	  {
92   	    _listenSockets.clear();
93   	  }else{
94   	    const static int WARN_INTERVAL_TICKS = (int)(DRAINER_WARNING_FREQ/DRAINER_CHECK_FREQ + 0.5);
95   	    const static float WARN_INTERVAL_SEC = WARN_INTERVAL_TICKS*DRAINER_CHECK_FREQ;
96   	    if(_timeoutCount++ > WARN_INTERVAL_TICKS){
97   	      _timeoutCount=0;
98   	      for ( size_t i = 0; i < _dataSockets.size();++i ){
99   	        dmtcp::vector<char>& buffer = _drainedData[_dataSockets[i]->socket().sockfd() ];
100  	        JWARNING(false)(_dataSockets[i]->socket().sockfd())(buffer.size())(WARN_INTERVAL_SEC)
101  	                 .Text("Still draining socket... perhaps remote host is not running under DMTCP?");
102  	#ifdef CERN_CMS
103  	        JNOTE("\n*** Closing this socket (to database?? ).  Please use dmtcpaware to\n"
104  	              "***  gracefully handle database connections, and re-run.\n"
105  	              "***  Trying a workaround for now, and hoping it doesn't fail.\n");
106  	        _real_close(_dataSockets[i]->socket().sockfd());
107  		//it does it by creating a socket pair and closing one side
108  		int sp[2] = {-1,-1};
109  		JASSERT ( _real_socketpair ( AF_UNIX, SOCK_STREAM, 0, sp ) == 0 ) ( JASSERT_ERRNO )
110  			.Text ( "socketpair() failed" );
111  		JASSERT ( sp[0]>=0 && sp[1]>=0 ) ( sp[0] ) ( sp[1] )
112  			.Text ( "socketpair() failed" );
113  		_real_close ( sp[1] );
114  		JTRACE ( "created dead socket" ) ( sp[0] );
115  		_real_dup2(sp[0], _dataSockets[i]->socket().sockfd());
116  	#endif
117  	
118  	      }
119  	    }
120  	  }
121  	}
122  	
123  	// void dmtcp::KernelBufferDrainer::drainAllSockets()
124  	// {
125  	//     scaleSendBuffers(2);
126  	/*
127  	    SocketTable& table = SocketTable::instance();
128  	    for(  SocketTable::iterator i = table.begin()
129  	        ; i != table.end()
130  	        ; ++i)
131  	    {
132  	        switch(i->state())
133  	        {
134  	            case SocketEntry::T_LISTEN:
135  	//                 addListenSocket( i->sockfd() );
136  	                break;
137  	            case SocketEntry::T_CONNECT:
138  	            case SocketEntry::T_ACCEPT:
139  	                if(i->isStillAlive())
140  	                {
141  	                    JTRACE("will drain socket")(i->sockfd())(i->remoteId().id);
142  	                    _drainedData[i->sockfd()]; // create buffer
143  	                    jalib::JSocket(i->sockfd()) << theMagicDrainCookie;
144  	                    addDataSocket( new jalib::JChunkReader(i->sockfd(),512));
145  	                }
146  	                else
147  	                {
148  	                    JTRACE("FOUND DEAD SOCKET")(i->sockfd());
149  	                    i->setState(SocketEntry::T_ERROR);
150  	                }
151  	                break;
152  	        }
153  	    }
154  	    monitorSockets( DRAINER_CHECK_FREQ );
155  	  */
156  	//     scaleSendBuffers(0.5);
157  	// }
158  	
159  	void dmtcp::KernelBufferDrainer::beginDrainOf ( int fd, const ConnectionIdentifier& id )
160  	{
161  	//     JTRACE("will drain socket")(fd);
162  	  _drainedData[fd]; // create buffer
163  	// this is the simple way:  jalib::JSocket(fd) << theMagicDrainCookie;
164  	  //instead used delayed write in case kernel buffer is full:
165  	  addWrite ( new jalib::JChunkWriter ( fd, theMagicDrainCookie, sizeof theMagicDrainCookie ) );
166  	  //now setup a reader:
167  	  addDataSocket ( new jalib::JChunkReader ( fd,512 ) );
168  	
169  	  //insert it in reverse lookup
170  	  _reverseLookup[fd]=id;
171  	}
172  	
173  	
174  	void dmtcp::KernelBufferDrainer::refillAllSockets()
175  	{
176  	  scaleSendBuffers ( 2 );
177  	
178  	  JTRACE ( "refilling socket buffers" ) ( _drainedData.size() );
179  	
180  	  //write all buffers out
181  	  for ( dmtcp::map<int , dmtcp::vector<char> >::iterator i = _drainedData.begin()
182  	          ;i != _drainedData.end()
183  	          ;++i )
184  	  {
185  	    int size = i->second.size();
186  	    JWARNING ( size>=0 ) ( size ).Text ( "a failed drain is in our table???" );
187  	    if ( size<0 ) size=0;
188  	    DmtcpMessage msg;
189  	    msg.type = DMT_PEER_ECHO;
190  	    msg.params[0] = size;
191  	    jalib::JSocket sock ( i->first );
192  	    if ( size>0 ) JTRACE ( "requesting repeat buffer..." ) ( sock.sockfd() ) ( size );
193  	    sock << msg;
194  	    if ( size>0 ) sock.writeAll ( &i->second[0],size );
195  	    i->second.clear();
196  	  }
197  	
198  	//     JTRACE("repeating our friends buffers...");
199  	
200  	  //read all buffers in
201  	  for ( dmtcp::map<int , dmtcp::vector<char> >::iterator i = _drainedData.begin()
202  	          ;i != _drainedData.end()
203  	          ;++i )
204  	  {
205  	    DmtcpMessage msg;
206  	    msg.poison();
207  	    jalib::JSocket sock ( i->first );
Event tainted_data_argument: Calling function "jalib::JSocket &jalib::JSocket::operator >><dmtcp::DmtcpMessage>(dmtcp::DmtcpMessage &)" taints argument "msg". [details]
Also see events: [var_assign_var][lower_bounds][tainted_data]
208  	    sock >> msg;
209  	
210  	    msg.assertValid();
At conditional (1): "msg.type == 16": Taking true branch.
211  	    JASSERT ( msg.type == DMT_PEER_ECHO ) ( msg.type );
Event var_assign_var: Assigning: "msg.params[0]" = "size". Both are now tainted.
Also see events: [tainted_data_argument][lower_bounds][tainted_data]
212  	    int size = msg.params[0];
213  	    JTRACE ( "repeating buffer back to peer" ) ( size );
Event lower_bounds: Checking lower bounds of signed scalar "size" by "size > 0".
Also see events: [tainted_data_argument][var_assign_var][tainted_data]
At conditional (2): "size > 0": Taking true branch.
214  	    if ( size>0 )
215  	    {
216  	      //echo it back...
Event tainted_data: Passing tainted variable "size" to a tainted sink. [details]
Also see events: [tainted_data_argument][var_assign_var][lower_bounds]
217  	      jalib::JBuffer tmp ( size );
218  	      sock.readAll ( tmp,size );
219  	      sock.writeAll ( tmp,size );
220  	    }
221  	  }
222  	
223  	  JTRACE ( "buffers refilled" );
224  	
225  	
226  	  scaleSendBuffers ( 0.5 );
227  	}