[BACK]Return to NetConnection.c++ CVS log [TXT][DIR] Up to [Development] / fam / fam

File: [Development] / fam / fam / NetConnection.c++ (download)

Revision 1.1.1.1 (vendor branch), Thu Apr 24 19:08:26 2003 UTC (14 years, 5 months ago) by trev
Branch: sgi-fam, MAIN
CVS Tags: fam-2-6-10, HEAD
Changes since 1.1: +0 -0 lines

Initial FAM CVS repository build..

-- Trev


//  Copyright (C) 1999 Silicon Graphics, Inc.  All Rights Reserved.
//  
//  This program is free software; you can redistribute it and/or modify it
//  under the terms of version 2 of the GNU General Public License as
//  published by the Free Software Foundation.
//
//  This program is distributed in the hope that it would be useful, but
//  WITHOUT ANY WARRANTY; without even the implied warranty of
//  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  Further, any
//  license provided herein, whether implied or otherwise, is limited to
//  this program in accordance with the express provisions of the GNU
//  General Public License.  Patent licenses, if any, provided herein do not
//  apply to combinations of this program with other product or programs, or
//  any other product whatsoever.  This program is distributed without any
//  warranty that the program is delivered free of the rightful claim of any
//  third person by way of infringement or the like.  See the GNU General
//  Public License for more details.
//
//  You should have received a copy of the GNU General Public License along
//  with this program; if not, write the Free Software Foundation, Inc., 59
//  Temple Place - Suite 330, Boston MA 02111-1307, USA.

#include "NetConnection.h"

#include <assert.h>
#include <errno.h>
#include <stdarg.h>
#include <stddef.h>
#include <stdio.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <unistd.h>
#include <netinet/in.h>

#include "Log.h"
#include "Scheduler.h"

NetConnection::NetConnection(int a_fd,
			     UnblockHandler uhandler, void *uclosure)
    : fd(a_fd),
      iready(true), oready(true),
      iend(ibuf + sizeof(ibuf)),
      itail(ibuf),
      unblock_handler(uhandler), closure(uclosure)
{
    // It must be the case that Length is a 32 bit int.
    assert (sizeof(Length) == 4);
    
    omsgList = omsgListTail = NULL;
    
    // Enable nonblocking output on socket.

    int yes = 1;
    if (ioctl(fd, FIONBIO, &yes) < 0)
    {   Log::perror("can't set NBIO on fd %d", fd);
	shutdown(false);
	return;
    }

#ifdef TINY_BUFFERS			/* This kills throughput. */

    // Reduce kernel's send and receive buffer sizes.  Useful for debugging
    // flow control.

    if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &osize, sizeof osize))
	Log::perror("setsockopt(%d, SO_SNDBUF)", fd);
    if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &isize, sizeof isize))
	Log::perror("setsockopt(%d, SO_RCVBUF)", fd);

#endif /* !TINY_BUFFERS */

    // Wait for something to read.

    (void) Scheduler::install_read_handler(fd, read_handler, this);
}

NetConnection::~NetConnection()
{
    shutdown(false);
}

void
NetConnection::shutdown(bool call_input)
{
    if (fd >= 0)
    {
        Log::info("Shutting down connection");
	Scheduler::IOHandler oldh;
	if (iready && oready)
	{   oldh = Scheduler::remove_read_handler(fd);
	    assert(oldh == read_handler);
	}
	if (!oready)
	{   oldh = Scheduler::remove_write_handler(fd);
	    assert(oldh == write_handler);
	}
	(void) close(fd);
	fd = -1;
	oready = true;
	if (call_input)
	    input_msg(NULL, 0);
    }
}

///////////////////////////////////////////////////////////////////////////////
//  Input

void
NetConnection::read_handler(int fd, void *closure)
{
    NetConnection *connection = (NetConnection *) closure;
    assert(fd == connection->fd);
    connection->input();
}

void
NetConnection::input()
{
    if (fd < 0)
	return;

    // Read from socket.
    
    assert(itail < iend);
    int maxbytes = iend - itail;
    int ret = recv(fd, itail, maxbytes, 0);
    if (ret < 0)
    {   if (errno != EAGAIN && errno != ECONNRESET)
	{   Log::perror("fd %d read error", fd);
	    shutdown();
	}
	return;
    }
    else if (ret == 0)
    {   shutdown();
	return;
    }
    else // (ret > 0)
    {
	itail += ret;
    }

    deliver_input();
}

void
NetConnection::deliver_input()
{
    // Find messages and process them.

    char *ihead = ibuf;
    while (iready && oready && ihead + sizeof (Length) <= itail)
    {
        Length len;
        memcpy(&len, ihead, sizeof(Length));
	len = ntohl(len);

	if (len > MAXMSGSIZE)
	{   Log::error("fd %d message length %d bytes exceeds max of %d.",
		       fd, len, MAXMSGSIZE);
	    shutdown();
	    return;
	}
	if (ihead + sizeof (Length) + len > itail)
	    break;

	// Call the message reader.

	if (input_msg(ihead + sizeof (Length), len) == false) {
            // if input_msg sees an error in the message and thinks
            // the connection should be closed, it will return false.
            shutdown();
            return;
        }

	ihead += sizeof (Length) + len;
    }

    // If data remain in buffer, slide them to the left.

    assert(ihead <= itail);
    int remaining = itail - ihead;
    if (remaining && ihead != ibuf)
	memmove(ibuf, ihead, remaining);
    itail = ibuf + remaining;
    assert(itail < iend);
}

bool
NetConnection::ready_for_output() const
{
    return oready;
}

void
NetConnection::ready_for_input(bool tf)
{
    set_handlers(tf, oready);
}

///////////////////////////////////////////////////////////////////////////////
//  Output

void
NetConnection::mprintf(const char *format, ...)
{
    if (fd < 0)
	return;				// if closed, do nothing.

    va_list args;
    va_start(args, format);

    msgList_t * msg = new msgList_t;
    msg->next = NULL;
    Length len = vsnprintf(
        msg->msg + 4, MAXMSGSIZE + 1, format, args) + 1;
    va_end(args);

    if (len <= 0 || len == MAXMSGSIZE+1) {
        Log::error("tried to write a message that was too big");
        assert(0);
        // protocol botch.  Don't send the message.
        return;
    }
    
    if (omsgListTail) {
        omsgListTail = omsgListTail->next = msg;
    } else {
        omsgList = omsgListTail = msg;
    }
    msg->len = 4 + len;
    
    len = htonl(len);
    memcpy(msg->msg, &len, 4);
    flush();
}

void
NetConnection::flush()
{
    while (omsgList) 
    {
	int ret = send(fd, omsgList->msg, omsgList->len, 0);
        if (ret < 0 && errno == EWOULDBLOCK) 
        {
            break;
        } else 
        {
            if (ret >= 0) 
            {
                assert(ret == omsgList->len);
            } else 
            {
                /* Since the client library can close it's fd before
                 * getting acks from all FAMCancelMonitor requests we
                 * may get a broken pipe error here when writing the ack.
                 * Don't threat this as an error, since that fills the logs
                 * with crap.
                 */
                if (errno == EPIPE)
                {
                    Log::debug("fd %d write error: %m", fd);
                } else
                {
                    Log::error("fd %d write error: %m", fd);
                }
            }
            msgList_t *oldHead = omsgList;
            omsgList = omsgList->next;
            if (omsgListTail == oldHead) {
                omsgListTail = NULL;
            }
            delete oldHead;
	}
    }
    set_handlers(iready, !omsgList);
}

void
NetConnection::set_handlers(bool new_iready, bool new_oready)
{
    Scheduler::IOHandler oldh;
    bool call_unblock = false;

    if (oready != new_oready)
    {   if (new_oready)
	{   oldh = Scheduler::remove_write_handler(fd);
	    assert(oldh == write_handler);
	    call_unblock = true;
	}
	else
	{   oldh = Scheduler::install_write_handler(fd, write_handler, this);
	    assert(oldh == NULL);
	}
    }

    //  Install read_handler iff iready && oready.

    if ((iready && oready) != (new_iready && new_oready))
    {
	if (new_iready && new_oready)
	{   oldh = Scheduler::install_read_handler(fd, read_handler, this);
	    assert(oldh == NULL);
	}
	else
	{   oldh = Scheduler::remove_read_handler(fd);
	    assert(oldh == read_handler);
	}
    }
    bool old_iready = iready;
    iready = new_iready;
    oready = new_oready;

    //  If we unblocked output, call the unblock handler.
    //  If there's input to deliver, deliver it.

    if (call_unblock && unblock_handler)
    {   assert(!iready || old_iready);
	(*unblock_handler)(closure);
    }
    else if (iready && !old_iready)
	deliver_input();
}

void
NetConnection::write_handler(int fd, void *closure)
{
    NetConnection *connection = (NetConnection *) closure;
    assert(connection->fd == fd);
    connection->flush();
}