1 /****************************************************************************
2 * Copyright (C) 2006-2008 by Jason Ansel *
3 * jansel@csail.mit.edu *
4 * *
5 * This file is part of the JALIB module of DMTCP (DMTCP:dmtcp/jalib). *
6 * *
7 * DMTCP:dmtcp/jalib is free software: you can redistribute it and/or *
8 * modify it under the terms of the GNU Lesser General Public License as *
9 * published by the Free Software Foundation, either version 3 of the *
10 * License, or (at your option) any later version. *
11 * *
12 * DMTCP:dmtcp/src is distributed in the hope that it will be useful, *
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
15 * GNU Lesser General Public License for more details. *
16 * *
17 * You should have received a copy of the GNU Lesser General Public *
18 * License along with DMTCP:dmtcp/src. If not, see *
19 * <http://www.gnu.org/licenses/>. *
20 ****************************************************************************/
21
22 #define _BSD_SOURCE
23
24 #include "jsocket.h"
25
26 #include <stdio.h>
27 #include <sys/types.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <netdb.h>
31 #include <stdlib.h>
32 #include <strings.h>
33 #include <string.h>
34 #include <unistd.h>
35 #include "jassert.h"
36 #include <errno.h>
37 #include <algorithm>
38 #include <set>
39
40 #ifndef DMTCP
41 # define DECORATE_FN(fn) ::fn
42 #else
43 # include "syscallwrappers.h"
44 # define DECORATE_FN(fn) ::_real_ ## fn
45 #endif
46
47
48 const jalib::JSockAddr jalib::JSockAddr::ANY ( NULL );
49
50 jalib::JSockAddr::JSockAddr ( const char* hostname )
51 {
52 memset ( &_addr, 0, sizeof ( _addr ) );
53 _addr.sin_family = AF_INET;
54 if ( hostname == NULL )
55 {
56 _addr.sin_addr.s_addr = INADDR_ANY;
57 }
58 else
59 {
60 struct hostent *server = gethostbyname ( hostname );
61 JWARNING ( server != NULL ) ( hostname ).Text ( "No such host" );
62 if ( server != NULL )
63 {
64 JASSERT ( ( int ) sizeof ( _addr.sin_addr.s_addr ) <= server->h_length )
65 ( sizeof ( _addr.sin_addr.s_addr ) )
66 ( server->h_length );
67
68 memcpy ( &_addr.sin_addr.s_addr,
69 server->h_addr,
70 server->h_length );
71 }
72 }
73 }
74
75 jalib::JSocket::JSocket()
76 {
77 _sockfd = DECORATE_FN ( socket ) ( AF_INET, SOCK_STREAM, 0 );
78 }
79
80
81 bool jalib::JSocket::connect ( const JSockAddr& addr, int port )
82 {
83 return JSocket::connect ( ( sockaddr* ) &addr._addr, sizeof ( addr._addr ), port );
84 }
85
86
87 bool jalib::JSocket::connect ( const struct sockaddr *addr, socklen_t addrlen, int port )
88 {
89 struct sockaddr_storage addrbuf;
90 memset ( &addrbuf,0,sizeof ( addrbuf ) );
|
At conditional (1): "addrlen <= sizeof (addrbuf) /*128*/": Taking false branch.
|
91 JASSERT ( addrlen <= sizeof ( addrbuf ) ) ( addrlen ) ( sizeof ( addrbuf ) );
|
Event overrun-buffer-arg: |
Overrunning struct type struct sockaddr_storage of size 128 bytes by passing it to a function which indexes it with argument "addrlen" at byte position 128. |
92 memcpy ( &addrbuf,addr,addrlen );
93 JWARNING ( addrlen == sizeof ( sockaddr_in ) ) ( addrlen ) ( sizeof ( sockaddr_in ) ).Text ( "may not be correct socket type" );
94 ( ( sockaddr_in* ) &addrbuf )->sin_port = htons ( port );
95 return DECORATE_FN ( connect ) ( _sockfd, ( sockaddr* ) &addrbuf, addrlen ) == 0;
96 }
97
98 bool jalib::JSocket::bind ( const JSockAddr& addr, int port )
99 {
100 struct sockaddr_in addrbuf = addr._addr;
101 addrbuf.sin_port = htons ( port );
102 return bind ( ( sockaddr* ) &addrbuf, sizeof ( addrbuf ) );
103 }
104
105 bool jalib::JSocket::bind ( const struct sockaddr *addr, socklen_t addrlen )
106 {
107 return DECORATE_FN ( bind ) ( _sockfd, addr, addrlen ) == 0;
108 }
109
110 bool jalib::JSocket::listen ( int backlog/* = 32*/ )
111 {
112 return DECORATE_FN ( listen ) ( _sockfd, backlog ) == 0;
113 }
114
115 jalib::JSocket jalib::JSocket::accept ( struct sockaddr_storage* remoteAddr,socklen_t* remoteLen )
116 {
117 if ( remoteAddr == NULL || remoteLen == NULL )
118 return JSocket ( DECORATE_FN ( accept ) ( _sockfd,NULL,NULL ) );
119 else
120 return JSocket ( DECORATE_FN ( accept ) ( _sockfd, ( sockaddr* ) remoteAddr, remoteLen ) );
121 }
122
123 void jalib::JSocket::enablePortReuse()
124 {
125 int one = 1;
126 //These options will hopefully reduce address already in use errors
127 #ifdef SO_REUSEADDR
128 if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0){
129 JWARNING(false)(JASSERT_ERRNO).Text("setsockopt(SO_REUSEADDR) failed");
130 }
131 #endif
132 #ifdef SO_REUSEPORT
133 if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one)) < 0){
134 JWARNING(false)(JASSERT_ERRNO).Text("setsockopt(SO_REUSEPORT) failed");
135 }
136 #endif
137 }
138
139 bool jalib::JSocket::close()
140 {
141 if ( !isValid() ) return false;
142 int ret = DECORATE_FN(close) ( _sockfd );
143 _sockfd = -1;
144 return ret==0;
145 }
146
147 ssize_t jalib::JSocket::read ( char* buf, size_t len )
148 {
149 return ::read ( _sockfd,buf,len );
150 }
151
152 ssize_t jalib::JSocket::write ( const char* buf, size_t len )
153 {
154 return ::write ( _sockfd,buf,len );
155 }
156
157 ssize_t jalib::JSocket::readAll ( char* buf, size_t len )
158 {
159 int origLen = len;
160 while ( len > 0 )
161 {
162 fd_set rfds;
163 struct timeval tv;
164 int retval;
165
166 int tmp_sockfd = _sockfd;
167 if ( tmp_sockfd == -1 ) {
168 return -1;
169 }
170
171 /* Watch stdin (fd 0) to see when it has input. */
172 FD_ZERO ( &rfds );
173 FD_SET ( tmp_sockfd, &rfds );
174
175 tv.tv_sec = 120;
176 tv.tv_usec = 0;
177
178 retval = select ( tmp_sockfd+1, &rfds, NULL, NULL, &tv );
179 /* Don't rely on the value of tv now! */
180
181
182 if ( retval == -1 )
183 {
184 if ( errno == EBADF ) {
185 JWARNING (false) .Text ( "Socket already closed" );
186 return -1;
187 } else if( errno != EINTR ){
188 JWARNING ( retval >= 0 ) ( tmp_sockfd ) ( JASSERT_ERRNO ).Text ( "select() failed" );
189 return -1;
190 }
191 }
192 else if ( retval )
193 {
194 errno = 0;
195 ssize_t cnt = read ( buf, len );
196 if ( cnt < 0 && errno != EAGAIN && errno != EINTR )
197 {
198 JWARNING(cnt>=0)(sockfd())(cnt)(len)(JASSERT_ERRNO).Text( "JSocket read failure" );
199 return -1;
200 }
201 if (cnt == 0)
202 {
203 JWARNING(cnt!=0)(sockfd())(origLen)(len).Text( "JSocket needed to read origLen chars,\n still needs to read len chars, but EOF reached" );
204 return -1;
205 }
206 if ( cnt > 0 )
207 {
208 buf += cnt;
209 len -= cnt;
210 }
211 }
212 else
213 {
214 JTRACE ( "still waiting for data" ) ( tmp_sockfd ) ( len );
215 }
216 }
217 return origLen;
218 }
219
220 ssize_t jalib::JSocket::writeAll ( const char* buf, size_t len )
221 {
222 int origLen = len;
223 while ( len > 0 )
224 {
225 fd_set wfds;
226 struct timeval tv;
227 int retval;
228
229 int tmp_sockfd = _sockfd;
230 if ( tmp_sockfd == -1 ) {
231 return -1;
232 }
233
234 /* Watch stdin (fd 0) to see when it has input. */
235 FD_ZERO ( &wfds );
236 FD_SET ( tmp_sockfd, &wfds );
237
238 /* Wait up to five seconds. */
239 tv.tv_sec = 30;
240 tv.tv_usec = 0;
241
242 retval = select ( tmp_sockfd+1, NULL, &wfds, NULL, &tv );
243 /* Don't rely on the value of tv now! */
244
245
246 if ( retval == -1 )
247 {
248 if ( errno == EBADF ) {
249 JWARNING (false) .Text ( "Socket already closed" );
250 return -1;
251 }
252 JWARNING ( retval >= 0 ) ( tmp_sockfd ) ( JASSERT_ERRNO ).Text ( "select() failed" );
253 return -1;
254 }
255 else if ( retval )
256 {
257 errno = 0;
258 ssize_t cnt = write ( buf, len );
259 if ( cnt <= 0 && errno != EAGAIN && errno != EINTR )
260 {
261 JWARNING ( cnt > 0 ) ( cnt ) ( len ) ( JASSERT_ERRNO ).Text ( "JSocket read failure" );
262 return -1;
263 }
264 if ( cnt > 0 )
265 {
266 buf += cnt;
267 len -= cnt;
268 }
269 }
270 else
271 {
272 JTRACE ( "still waiting for data" ) ( tmp_sockfd ) ( len );
273 }
274 }
275 return origLen;
276 }
277
278 bool jalib::JSocket::isValid() const
279 {
280 return _sockfd >= 0;
281 }
282
283
284
285
286 /*!
287 \fn jalib::JChunkReader::readAvailable()
288 */
289 bool jalib::JChunkReader::readOnce()
290 {
291 if ( !ready() )
292 {
293 ssize_t cnt = _sock.read ( _buffer + _read, _length - _read );
294 if ( cnt <= 0 && errno != EAGAIN && errno != EINTR )
295 _hadError = true;
296 if ( cnt > 0 )
297 _read += cnt;
298 }
299 return ready();
300 }
301
302 /*!
303 \fn jalib::JChunkWriter::isDone()
304 */
305 bool jalib::JChunkWriter::isDone()
306 {
307 return _length <= _sent;
308 }
309
310
311 /*!
312 \fn jalib::JChunkWriter::writeOnce()
313 */
314 bool jalib::JChunkWriter::writeOnce()
315 {
316 if ( !isDone() )
317 {
318 ssize_t cnt = _sock.write ( _buffer + _sent, _length - _sent );
319 if ( cnt <= 0 && errno != EAGAIN && errno != EINTR )
320 _hadError = true;
321 if ( cnt > 0 )
322 _sent += cnt;
323 }
324 return isDone();
325 }
326
327
328 /*!
329 \fn jalib::JChunkWriter::hadError()
330 */
331 bool jalib::JChunkWriter::hadError()
332 {
333 return _hadError || !_sock.isValid();
334 }
335
336
337
338 /*!
339 \fn jalib::JChunkReader::reset()
340 */
341 void jalib::JChunkReader::reset()
342 {
343 memset ( _buffer,0,_length );
344 _read = 0;
345 }
346
347
348 /*!
349 \fn jalib::JChunkReader::readAll()
350 */
351 void jalib::JChunkReader::readAll()
352 {
353 while ( !ready() ) readOnce();
354 }
355
356 /*!
357 \fn jalib::JChunkReader::JChunkReader(JSocket sock, int chunkSize);
358 */
359 jalib::JChunkReader::JChunkReader ( JSocket sock, int chunkSize )
360 : JReaderInterface ( sock )
361 , _buffer ( new char[chunkSize] )
362 , _length ( chunkSize )
363 , _read ( 0 )
364 , _hadError ( false )
365 {
366 memset ( _buffer,0,_length );
367 }
368
369
370 /*!
371 \fn jalib::JChunkReader::JChunkReader(const JChunkReader& that)
372 */
373 jalib::JChunkReader::JChunkReader ( const JChunkReader& that )
374 : JReaderInterface ( that )
375 , _buffer ( new char[that._length] )
376 , _length ( that._length )
377 , _read ( that._read )
378 , _hadError ( that._hadError )
379 {
380 memcpy ( _buffer,that._buffer,_length );
381 }
382
383
384 /*!
385 \fn jalib::JChunkReader::~JChunkReader()
386 */
387 jalib::JChunkReader::~JChunkReader()
388 {
389 delete [] _buffer;
390 _buffer = 0;
391 }
392
393
394 /*!
395 \fn jalib::JChunkReader::operator=(const JChunkReader& that)
396 */
397 jalib::JChunkReader& jalib::JChunkReader::operator= ( const JChunkReader& that )
398 {
399 if ( this == &that ) return *this;;
400 delete [] _buffer;
401 _buffer = 0;
402 new ( this ) JChunkReader ( that );
403 return *this;
404 }
405
406
407
408
409 /*!
410 \fn jalib::JSocket::changeFd(int newFd)
411 */
412 void jalib::JSocket::changeFd ( int newFd )
413 {
414 if ( _sockfd == newFd ) return;
415 JASSERT ( newFd == DECORATE_FN(dup2) ( _sockfd, newFd ) )
416 ( _sockfd ) ( newFd ).Text ( "dup2 failed" );
417 close();
418 _sockfd = newFd;
419 }
420
421
422 void jalib::JMultiSocketProgram::addDataSocket ( JReaderInterface* sock )
423 {
424 _dataSockets.push_back ( sock );
425 }
426
427 void jalib::JMultiSocketProgram::addListenSocket ( const JSocket& sock )
428 {
429 _listenSockets.push_back ( sock );
430 }
431
432
433 void jalib::JMultiSocketProgram::addWrite ( JWriterInterface* write )
434 {
435 _writes.push_back ( write );
436 }
437
438 void jalib::JMultiSocketProgram::setTimeoutInterval ( double dblTimeout )
439 {
440 int tSec = ( int ) dblTimeout;
441 int tMs = ( int ) ( 1000000.0 * ( dblTimeout - tSec ) );
442 timeoutInterval.tv_sec = tSec;
443 timeoutInterval.tv_usec = tMs;
444 timeoutEnabled = dblTimeout > 0 && timerisset ( &timeoutInterval );
445
446 JASSERT ( gettimeofday ( &stoptime,NULL ) ==0 );
447 timeradd ( &timeoutInterval,&stoptime,&stoptime );
448 }
449
450 void jalib::JMultiSocketProgram::monitorSockets ( double dblTimeout )
451 {
452 /*
453 int tSec = ( int ) dblTimeout;
454 int tMs = ( int ) ( 1000000.0 * ( dblTimeout - tSec ) );
455 const struct timeval timeoutInterval = {tSec,tMs};
456 bool timeoutEnabled = dblTimeout > 0 && timerisset ( &timeoutInterval );
457
458 struct timeval stoptime={0,0};
459 struct timeval timeoutBuf=timeoutInterval;
460 struct timeval * timeout = timeoutEnabled ? &timeoutBuf : NULL;
461 JASSERT ( gettimeofday ( &stoptime,NULL ) ==0 );
462 timeradd ( &timeoutInterval,&stoptime,&stoptime );
463 */
464 struct timeval tmptime={0,0};
465 struct timeval timeoutBuf;
466 struct timeval * timeout;
467
468 setTimeoutInterval ( dblTimeout );
469
470 timeoutBuf=timeoutInterval;
471 timeout = timeoutEnabled ? &timeoutBuf : NULL;
472
473 IntSet closedFds;
474 fd_set rfds;
475 fd_set wfds;
476 int maxFd;
477 size_t i;
478 for ( ;; )
479 {
480 closedFds.clear();
481 maxFd = -1;
482 FD_ZERO ( &rfds );
483 FD_ZERO ( &wfds );
484
485 //collect listen fds in rfds, cleanup dead sockets
486 for ( i=0; i<_listenSockets.size(); ++i )
487 {
488 if ( _listenSockets[i].isValid() )
489 {
490 FD_SET ( _listenSockets[i].sockfd(), &rfds );
491 maxFd = std::max ( maxFd,_listenSockets[i].sockfd() );
492 }
493 else
494 {
495 _listenSockets[i].close();
496 //socket is dead... remove it
497 JTRACE ( "listen socket failure" ) ( i );
498 //swap with last
499 _listenSockets[i] = _listenSockets[_listenSockets.size()-1];
500 _listenSockets.pop_back();
501 i--;
502 }
503 }
504
505 //collect data fds in rfds, cleanup dead sockets
506 for ( i=0; i<_dataSockets.size(); ++i )
507 {
508 if ( !_dataSockets[i]->hadError() )
509 {
510 FD_SET ( _dataSockets[i]->socket().sockfd(), &rfds );
511 maxFd = std::max ( maxFd,_dataSockets[i]->socket().sockfd() );
512 }
513 else
514 {
515 closedFds.insert(_dataSockets[i]->socket().sockfd());
516 //socket is dead... remove it
517 //JTRACE ( "disconnect" ) ( i ) ( _dataSockets[i]->socket().sockfd() );
518 onDisconnect ( _dataSockets[i] );
519 _dataSockets[i]->socket().close();
520
521 delete _dataSockets[i];
522 _dataSockets[i] = 0;
523 //swap with last
524 _dataSockets[i] = _dataSockets[_dataSockets.size()-1];
525 _dataSockets.pop_back();
526 i--;
527 }
528 }
529
530 //collect all writes in wfds, cleanup finished/dead
531 for ( i=0; i<_writes.size(); ++i )
532 {
533 if ( !_writes[i]->hadError()
534 && !_writes[i]->isDone()
535 && closedFds.find(_writes[i]->socket().sockfd())==closedFds.end() )
536 {
537 FD_SET ( _writes[i]->socket().sockfd(), &wfds );
538 maxFd = std::max ( maxFd,_writes[i]->socket().sockfd() );
539 }
540 else
541 {
542 //socket is or write done... pop it
543 delete _writes[i];
544 _writes[i] = 0;
545 //swap with last
546 _writes[i] = _writes[_writes.size()-1];
547 _writes.pop_back();
548 i--;
549 }
550 }
551
552 if ( maxFd == -1 )
553 {
554 //JTRACE ( "no sockets left" );
555 return;
556 }
557
558 //this will block till we have some work to do
559 int retval = select ( maxFd+1, &rfds, &wfds, NULL, timeout );
560
561 if ( retval == -1 )
562 {
563 JWARNING ( retval != -1 ) ( maxFd ) ( retval ) ( JASSERT_ERRNO ).Text ( "select failed" );
564 return;
565 }
566 else if ( retval > 0 )
567 {
568
569 //write all data
570 for ( i=0; i<_writes.size(); ++i )
571 {
572 int fd = _writes[i]->socket().sockfd();
573 if ( fd >= 0 && FD_ISSET ( fd, &wfds ) )
574 {
575 // JTRACE("writing data")(_writes[i]->socket().sockfd());
576 _writes[i]->writeOnce();
577 }
578 }
579
580
581 //read all new data
582 for ( i=0; i<_dataSockets.size(); ++i )
583 {
584 int fd = _dataSockets[i]->socket().sockfd();
585 if ( fd >= 0 && FD_ISSET ( fd, &rfds ) )
586 {
587 // JTRACE("receiving data")(i)(_dataSockets[i].socket().sockfd());
588 if ( _dataSockets[i]->readOnce() )
589 {
590 onData ( _dataSockets[i] );
591 _dataSockets[i]->reset();
592 }
593 }
594 }
595
596
597 //accept all new connections
598 for ( i=0; i<_listenSockets.size(); ++i )
599 {
600 int fd = _listenSockets[i].sockfd();
601 if ( fd >= 0 && FD_ISSET ( fd, &rfds ) )
602 {
603 struct sockaddr_storage addr;
604 socklen_t addrlen=sizeof ( addr );
605 JSocket sk = _listenSockets[i].accept ( &addr,&addrlen );
606 JTRACE ( "accepting new connection" ) ( i ) ( sk.sockfd() ) ( _listenSockets[i].sockfd() ) ( JASSERT_ERRNO );
607 if ( sk.isValid() )
608 {
609 onConnect ( sk, ( sockaddr* ) &addr,addrlen );
610 }
611 else if ( errno != EAGAIN && errno != EINTR )
612 {
613 _listenSockets[i].close();
614 }
615 }
616 }
617
618
619
620 }
621
622 if ( timeoutEnabled )
623 {
624 JASSERT ( gettimeofday ( &tmptime,NULL ) ==0 );
625 if ( timercmp ( &tmptime, &stoptime, < ) )
626 {
627 timersub ( &stoptime, &tmptime, &timeoutBuf );
628 }
629 else
630 {
631 timeoutBuf = timeoutInterval;
632 stoptime = tmptime;
633 timeradd ( &timeoutInterval,&stoptime,&stoptime );
634 // JTRACE("timeout interval")(timeoutSec);
635 onTimeoutInterval();
636 }
637 }
638 }
639 }
640
641
642
643
644
645 /*!
646 \fn jalib::JChunkWriter::JChunkWriter(JSocket sock, const char* buf, int len)
647 */
648 jalib::JChunkWriter::JChunkWriter ( JSocket sock, const char* buf, int len )
649 :JWriterInterface ( sock )
650 ,_buffer ( new char [len] )
651 ,_length ( len )
652 ,_sent ( 0 )
653 ,_hadError ( false )
654 {
655 memcpy ( _buffer,buf,len );
656 }
657
658
659 /*!
660 \fn jalib::JChunkWriter::JChunkWriter(const JChunkWriter& that)
661 */
662 jalib::JChunkWriter::JChunkWriter ( const JChunkWriter& that )
663 :JWriterInterface ( that )
664 ,_buffer ( new char [that._length] )
665 ,_length ( that._length )
666 ,_sent ( that._sent )
667 ,_hadError ( false )
668 {
669 memcpy ( _buffer,that._buffer,that._length );
670 }
671
672
673 /*!
674 \fn jalib::JChunkWriter::~JChunkWriter()
675 */
676 jalib::JChunkWriter::~JChunkWriter()
677 {
678 delete [] _buffer;
679 _buffer = 0;
680 }
681
682
683 /*!
684 \fn jalib::JChunkWriter::method_4()
685 */
686 jalib::JChunkWriter& jalib::JChunkWriter::operator= ( const JChunkWriter& that )
687 {
688 if ( this == &that ) return *this;;
689 delete [] _buffer;
690 _buffer = 0;
691 new ( this ) JChunkWriter ( that );
692 return *this;
693 }
694
695