1 /****************************************************************************
2 * Copyright (C) 2006-2010 by Jason Ansel, Kapil Arya, and Gene Cooperman *
3 * jansel@csail.mit.edu, kapil@ccs.neu.edu, gene@ccs.neu.edu *
4 * *
5 * This file is part of the dmtcp/src module of DMTCP (DMTCP:dmtcp/src). *
6 * *
7 * DMTCP:dmtcp/src is free software: you can redistribute it and/or *
8 * modify it under the terms of the GNU Lesser General Public License as *
9 * published by the Free Software Foundation, either version 3 of the *
10 * License, or (at your option) any later version. *
11 * *
12 * DMTCP:dmtcp/src is distributed in the hope that it will be useful, *
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
15 * GNU Lesser General Public License for more details. *
16 * *
17 * You should have received a copy of the GNU Lesser General Public *
18 * License along with DMTCP:dmtcp/src. If not, see *
19 * <http://www.gnu.org/licenses/>. *
20 ****************************************************************************/
21
22 #include "kernelbufferdrainer.h"
23
24 #include "../jalib/jassert.h"
25 #include "constants.h"
26 #include "sockettable.h"
27 #include "../jalib/jbuffer.h"
28 #include "connectionmanager.h"
29 #include "syscallwrappers.h"
30
31 namespace
32 {
33 const char theMagicDrainCookie[] = SOCKET_DRAIN_MAGIC_COOKIE_STR;
34
35 void scaleSendBuffers ( double factor )
36 {
37 //todo resize buffers to avoid blocking
38 }
39
40 }
41
42
43 void dmtcp::KernelBufferDrainer::onConnect ( const jalib::JSocket& sock, const struct sockaddr* remoteAddr,socklen_t remoteLen )
44 {
45 JWARNING ( false ) ( sock.sockfd() ).Text ( "we don't yet support checkpointing non-accepted connections... restore will likely fail.. closing connection" );
46 jalib::JSocket ( sock ).close();
47 }
48
49 void dmtcp::KernelBufferDrainer::onData ( jalib::JReaderInterface* sock )
50 {
51 dmtcp::vector<char>& buffer = _drainedData[sock->socket().sockfd() ];
52 buffer.resize ( buffer.size() + sock->bytesRead() );
53 int startIdx = buffer.size() - sock->bytesRead();
54 memcpy ( &buffer[startIdx],sock->buffer(),sock->bytesRead() );
55 // JTRACE("got buffer chunk")(sock->bytesRead());
56 sock->reset();
57 }
58 void dmtcp::KernelBufferDrainer::onDisconnect ( jalib::JReaderInterface* sock )
59 {
60 int fd;
61 errno = 0;
62 fd = sock->socket().sockfd();
63 //check if this was on purpose
64 if ( fd < 0 ) return;
65 JTRACE ( "found disconnected socket... marking it dead" )
66 (fd)(_reverseLookup[fd])( JASSERT_ERRNO );
67 _disconnectedSockets.push_back(_reverseLookup[fd]);
68 _drainedData.erase ( fd );
69 }
70 void dmtcp::KernelBufferDrainer::onTimeoutInterval()
71 {
72 int count = 0;
73 for ( size_t i = 0; i < _dataSockets.size();++i )
74 {
75 if ( _dataSockets[i]->bytesRead() > 0 ) onData ( _dataSockets[i] );
76 dmtcp::vector<char>& buffer = _drainedData[_dataSockets[i]->socket().sockfd() ];
77 if ( buffer.size() >= sizeof ( theMagicDrainCookie )
78 && memcmp ( &buffer[buffer.size() - sizeof ( theMagicDrainCookie ) ]
79 , theMagicDrainCookie
80 , sizeof ( theMagicDrainCookie ) ) == 0 )
81 {
82 buffer.resize ( buffer.size() - sizeof ( theMagicDrainCookie ) );
83 JTRACE ( "buffer drain complete" ) ( _dataSockets[i]->socket().sockfd() ) ( buffer.size() ) ( ( _dataSockets.size() ) );
84 _dataSockets[i]->socket() = -1; //poison socket
85 }
86 else
87 ++count;
88 }
89
90 if ( count == 0 )
91 {
92 _listenSockets.clear();
93 }else{
94 const static int WARN_INTERVAL_TICKS = (int)(DRAINER_WARNING_FREQ/DRAINER_CHECK_FREQ + 0.5);
95 const static float WARN_INTERVAL_SEC = WARN_INTERVAL_TICKS*DRAINER_CHECK_FREQ;
96 if(_timeoutCount++ > WARN_INTERVAL_TICKS){
97 _timeoutCount=0;
98 for ( size_t i = 0; i < _dataSockets.size();++i ){
99 dmtcp::vector<char>& buffer = _drainedData[_dataSockets[i]->socket().sockfd() ];
100 JWARNING(false)(_dataSockets[i]->socket().sockfd())(buffer.size())(WARN_INTERVAL_SEC)
101 .Text("Still draining socket... perhaps remote host is not running under DMTCP?");
102 #ifdef CERN_CMS
103 JNOTE("\n*** Closing this socket (to database?? ). Please use dmtcpaware to\n"
104 "*** gracefully handle database connections, and re-run.\n"
105 "*** Trying a workaround for now, and hoping it doesn't fail.\n");
106 _real_close(_dataSockets[i]->socket().sockfd());
107 //it does it by creating a socket pair and closing one side
108 int sp[2] = {-1,-1};
109 JASSERT ( _real_socketpair ( AF_UNIX, SOCK_STREAM, 0, sp ) == 0 ) ( JASSERT_ERRNO )
110 .Text ( "socketpair() failed" );
111 JASSERT ( sp[0]>=0 && sp[1]>=0 ) ( sp[0] ) ( sp[1] )
112 .Text ( "socketpair() failed" );
113 _real_close ( sp[1] );
114 JTRACE ( "created dead socket" ) ( sp[0] );
115 _real_dup2(sp[0], _dataSockets[i]->socket().sockfd());
116 #endif
117
118 }
119 }
120 }
121 }
122
123 // void dmtcp::KernelBufferDrainer::drainAllSockets()
124 // {
125 // scaleSendBuffers(2);
126 /*
127 SocketTable& table = SocketTable::instance();
128 for( SocketTable::iterator i = table.begin()
129 ; i != table.end()
130 ; ++i)
131 {
132 switch(i->state())
133 {
134 case SocketEntry::T_LISTEN:
135 // addListenSocket( i->sockfd() );
136 break;
137 case SocketEntry::T_CONNECT:
138 case SocketEntry::T_ACCEPT:
139 if(i->isStillAlive())
140 {
141 JTRACE("will drain socket")(i->sockfd())(i->remoteId().id);
142 _drainedData[i->sockfd()]; // create buffer
143 jalib::JSocket(i->sockfd()) << theMagicDrainCookie;
144 addDataSocket( new jalib::JChunkReader(i->sockfd(),512));
145 }
146 else
147 {
148 JTRACE("FOUND DEAD SOCKET")(i->sockfd());
149 i->setState(SocketEntry::T_ERROR);
150 }
151 break;
152 }
153 }
154 monitorSockets( DRAINER_CHECK_FREQ );
155 */
156 // scaleSendBuffers(0.5);
157 // }
158
159 void dmtcp::KernelBufferDrainer::beginDrainOf ( int fd, const ConnectionIdentifier& id )
160 {
161 // JTRACE("will drain socket")(fd);
162 _drainedData[fd]; // create buffer
163 // this is the simple way: jalib::JSocket(fd) << theMagicDrainCookie;
164 //instead used delayed write in case kernel buffer is full:
165 addWrite ( new jalib::JChunkWriter ( fd, theMagicDrainCookie, sizeof theMagicDrainCookie ) );
166 //now setup a reader:
167 addDataSocket ( new jalib::JChunkReader ( fd,512 ) );
168
169 //insert it in reverse lookup
170 _reverseLookup[fd]=id;
171 }
172
173
174 void dmtcp::KernelBufferDrainer::refillAllSockets()
175 {
176 scaleSendBuffers ( 2 );
177
178 JTRACE ( "refilling socket buffers" ) ( _drainedData.size() );
179
180 //write all buffers out
181 for ( dmtcp::map<int , dmtcp::vector<char> >::iterator i = _drainedData.begin()
182 ;i != _drainedData.end()
183 ;++i )
184 {
185 int size = i->second.size();
186 JWARNING ( size>=0 ) ( size ).Text ( "a failed drain is in our table???" );
187 if ( size<0 ) size=0;
188 DmtcpMessage msg;
189 msg.type = DMT_PEER_ECHO;
190 msg.params[0] = size;
191 jalib::JSocket sock ( i->first );
192 if ( size>0 ) JTRACE ( "requesting repeat buffer..." ) ( sock.sockfd() ) ( size );
193 sock << msg;
194 if ( size>0 ) sock.writeAll ( &i->second[0],size );
195 i->second.clear();
196 }
197
198 // JTRACE("repeating our friends buffers...");
199
200 //read all buffers in
201 for ( dmtcp::map<int , dmtcp::vector<char> >::iterator i = _drainedData.begin()
202 ;i != _drainedData.end()
203 ;++i )
204 {
205 DmtcpMessage msg;
206 msg.poison();
207 jalib::JSocket sock ( i->first );
|
Event tainted_data_argument: |
Calling function "jalib::JSocket &jalib::JSocket::operator >><dmtcp::DmtcpMessage>(dmtcp::DmtcpMessage &)" taints argument "msg". [details] |
| Also see events: |
[var_assign_var][lower_bounds][tainted_data] |
208 sock >> msg;
209
210 msg.assertValid();
|
At conditional (1): "msg.type == 16": Taking true branch.
|
211 JASSERT ( msg.type == DMT_PEER_ECHO ) ( msg.type );
212 int size = msg.params[0];
213 JTRACE ( "repeating buffer back to peer" ) ( size );
|
At conditional (2): "size > 0": Taking true branch.
|
214 if ( size>0 )
215 {
216 //echo it back...
217 jalib::JBuffer tmp ( size );
218 sock.readAll ( tmp,size );
219 sock.writeAll ( tmp,size );
220 }
221 }
222
223 JTRACE ( "buffers refilled" );
224
225
226 scaleSendBuffers ( 0.5 );
227 }