File: [Development] / xfs-cmds / xfsdump / common / ring.c (download)
Revision 1.3, Tue Jun 4 22:53:09 2002 UTC (15 years, 4 months ago) by sandeen
Branch: MAIN
Changes since 1.2: +1 -1
lines
Undoes mod: xfs-cmds:slinx:120772a
Undo xfs-cmds:slinx:120772a, inadvertently whacked a previous mod.
|
/*
* Copyright (c) 2000 Silicon Graphics, Inc. All Rights Reserved.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of version 2 of the GNU General Public License as
* published by the Free Software Foundation.
*
* This program is distributed in the hope that it would be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
*
* Further, this software is distributed without any warranty that it is
* free of the rightful claim of any third person regarding infringement
* or the like. Any license provided herein, whether implied or
* otherwise, applies only to this software file. Patent licenses, if
* any, provided herein do not apply to combinations of this program with
* other software, or any other product whatsoever.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write the Free Software Foundation, Inc., 59
* Temple Place - Suite 330, Boston MA 02111-1307, USA.
*
* Contact information: Silicon Graphics, Inc., 1600 Amphitheatre Pkwy,
* Mountain View, CA 94043, or:
*
* http://www.sgi.com
*
* For further information regarding this notice, see:
*
* http://oss.sgi.com/projects/GenInfo/SGIGPLNoticeExplan/
*/
#include <libxfs.h>
#include <jdm.h>
#include <sys/types.h>
#include <sys/mman.h>
#include <errno.h>
#include <signal.h>
#include <time.h>
#include <stdlib.h>
#include <unistd.h>
#include <malloc.h>
#include "types.h"
#include "qlock.h"
#include "ring.h"
static int ring_slave_entry( void *ringctxp );
ring_t *
ring_create( size_t ringlen,
size_t bufsz,
bool_t pinpr,
void ( *threadfunc )( void *clientctxp,
int ( * entry )( void *ringctxp ),
void *ringctxp ),
int ( *readfunc )( void *clientctxp, char *bufp ),
int ( *writefunc )( void *clientctxp, char *bufp ),
void *clientctxp,
intgen_t *rvalp )
{
ring_t *ringp;
size_t mix;
/* pre-initialize return value
*/
*rvalp = 0;
/* allocate a ring descriptor
*/
ringp = ( ring_t * )calloc( 1, sizeof( ring_t ));
ASSERT( ringp );
ringp->r_len = ringlen;
ringp->r_clientctxp = clientctxp;
ringp->r_readfunc = readfunc;
ringp->r_writefunc = writefunc;
/* allocate counting semaphores for the ready and active queues,
* and initialize the queue input and output indices.
*/
ringp->r_ready_qsemh = qsem_alloc( ringlen );
ringp->r_active_qsemh = qsem_alloc( 0 );
ringp->r_ready_in_ix = 0;
ringp->r_ready_out_ix = 0;
ringp->r_active_in_ix = 0;
ringp->r_active_out_ix = 0;
ringp->r_client_cnt = 0;
ringp->r_slave_cnt = 0;
/* initialize the meters
*/
ringp->r_client_msgcnt = 0;
ringp->r_slave_msgcnt = 0;
ringp->r_client_blkcnt = 0;
ringp->r_slave_blkcnt = 0;
ringp->r_first_io_time = 0;
ringp->r_all_io_cnt = 0;
/* allocate the ring messages
*/
ringp->r_msgp = ( ring_msg_t * )calloc( ringlen, sizeof( ring_msg_t ));
ASSERT( ringp->r_msgp );
/* allocate the buffers and initialize the messages
*/
for ( mix = 0 ; mix < ringlen ; mix++ ) {
ring_msg_t *msgp = &ringp->r_msgp[ mix ];
msgp->rm_mix = mix;
msgp->rm_op = RING_OP_NONE;
msgp->rm_stat = RING_STAT_INIT;
msgp->rm_user = 0;
msgp->rm_loc = RING_LOC_READY;
msgp->rm_bufp = ( char * )memalign( PGSZ, bufsz );
if ( ! msgp->rm_bufp ) {
*rvalp = ENOMEM;
return 0;
}
if ( pinpr ) {
intgen_t rval;
rval = mlock( ( void * )msgp->rm_bufp, bufsz );
if ( rval ) {
if ( errno == ENOMEM ) {
*rvalp = E2BIG;
return 0;
}
if ( errno == EPERM ) {
*rvalp = EPERM;
return 0;
}
ASSERT( 0 );
}
}
}
/* kick off the slave thread
*/
( *threadfunc )( clientctxp, ring_slave_entry, ( void * )ringp );
return ringp;
}
ring_msg_t *
ring_get( ring_t *ringp )
{
ring_msg_t *msgp;
/* assert client currently holds no messages
*/
ASSERT( ringp->r_client_cnt == 0 );
/* bump client message count and note if client needs to block
*/
ringp->r_client_msgcnt++;
if ( qsemPwouldblock( ringp->r_ready_qsemh )) {
ringp->r_client_blkcnt++;
}
/* block until msg available on ready queue ("P")
*/
qsemP( ringp->r_ready_qsemh );
/* get a pointer to the next msg on the queue
*/
msgp = &ringp->r_msgp[ ringp->r_ready_out_ix ];
/* assert the message is where it belongs
*/
ASSERT( msgp->rm_loc == RING_LOC_READY );
/* verify the message index has not become corrupted
*/
ASSERT( msgp->rm_mix == ringp->r_ready_out_ix );
/* bump the output index
*/
ringp->r_ready_out_ix = ( ringp->r_ready_out_ix + 1 )
%
ringp->r_len;
/* update the message location
*/
msgp->rm_loc = RING_LOC_CLIENT;
/* bump the count of messages held by the client
*/
ringp->r_client_cnt++;
/* return the msg to the client
*/
return msgp;
}
void
ring_put( ring_t *ringp, ring_msg_t *msgp )
{
/* assert the client holds exactly one message
*/
ASSERT( ringp->r_client_cnt == 1 );
/* assert the client is returning the right message
*/
ASSERT( msgp->rm_mix == ringp->r_active_in_ix );
/* assert the message is where it belongs
*/
ASSERT( msgp->rm_loc == RING_LOC_CLIENT );
/* decrement the count of messages held by the client
*/
ringp->r_client_cnt--;
/* update the message location
*/
msgp->rm_loc = RING_LOC_ACTIVE;
/* bump the active queue input ix
*/
ringp->r_active_in_ix = ( ringp->r_active_in_ix + 1 )
%
ringp->r_len;
/* bump the semaphore for the active queue ("V")
*/
qsemV( ringp->r_active_qsemh );
}
void
ring_reset( ring_t *ringp, ring_msg_t *msgp )
{
size_t mix;
/* if the client is not holding a message, get the next message
*/
if ( ringp->r_client_cnt == 0 ) {
ASSERT( ! msgp );
msgp = ring_get( ringp );
ASSERT( msgp );
ASSERT( ringp->r_client_cnt == 1 );
} else {
ASSERT( msgp );
ASSERT( ringp->r_client_cnt == 1 );
}
/* tell the slave to abort
*/
msgp->rm_op = RING_OP_RESET;
ring_put( ringp, msgp );
/* wait for the reset to be acknowledged
*/
ASSERT( ringp->r_client_cnt == 0 );
do {
/* pull a message from the ready queue
*/
qsemP( ringp->r_ready_qsemh );
msgp = &ringp->r_msgp[ ringp->r_ready_out_ix ];
ASSERT( msgp->rm_loc == RING_LOC_READY );
ringp->r_ready_out_ix = ( ringp->r_ready_out_ix + 1 )
%
ringp->r_len;
ringp->r_client_cnt++;
} while ( msgp->rm_stat != RING_STAT_RESETACK );
ASSERT( ringp->r_client_cnt == ringp->r_len );
/* re-initialize the ring
*/
ASSERT( qsemPavail( ringp->r_ready_qsemh ) == 0 );
ASSERT( qsemPblocked( ringp->r_ready_qsemh ) == 0 );
ASSERT( qsemPavail( ringp->r_active_qsemh ) == 0 );
ASSERT( qsemPblocked( ringp->r_active_qsemh ) <= 1 );
ringp->r_ready_in_ix = 0;
ringp->r_ready_out_ix = 0;
ringp->r_active_in_ix = 0;
ringp->r_active_out_ix = 0;
ringp->r_client_cnt = 0;
ringp->r_slave_cnt = 0;
for ( mix = 0 ; mix < ringp->r_len ; mix++ ) {
ring_msg_t *msgp = &ringp->r_msgp[ mix ];
msgp->rm_mix = mix;
msgp->rm_op = RING_OP_NONE;
msgp->rm_stat = RING_STAT_INIT;
msgp->rm_user = 0;
msgp->rm_loc = RING_LOC_READY;
qsemV( ringp->r_ready_qsemh );
}
ASSERT( qsemPavail( ringp->r_ready_qsemh ) == ringp->r_len );
ASSERT( qsemPblocked( ringp->r_ready_qsemh ) == 0 );
ASSERT( qsemPavail( ringp->r_active_qsemh ) == 0 );
ASSERT( qsemPblocked( ringp->r_active_qsemh ) <= 1 );
}
void
ring_destroy( ring_t *ringp )
{
ring_msg_t *msgp;
/* the client must not be holding a message
*/
ASSERT( ringp->r_client_cnt == 0 );
/* get a message
*/
msgp = ring_get( ringp );
/* tell the slave to exit
*/
msgp->rm_op = RING_OP_DIE;
ring_put( ringp, msgp );
/* wait for the die to be acknowledged
*/
do {
/* pull a message from the ready queue
*/
qsemP( ringp->r_ready_qsemh );
msgp = &ringp->r_msgp[ ringp->r_ready_out_ix ];
ASSERT( msgp->rm_loc == RING_LOC_READY );
ringp->r_ready_out_ix = ( ringp->r_ready_out_ix + 1 )
%
ringp->r_len;
} while ( msgp->rm_stat != RING_STAT_DIEACK );
/* the slave is dead.
*/
qsem_free( ringp->r_ready_qsemh );
qsem_free( ringp->r_active_qsemh );
free( ( void * )ringp );
}
static ring_msg_t *
ring_slave_get( ring_t *ringp )
{
ring_msg_t *msgp;
/* assert slave currently holds no messages
*/
ASSERT( ringp->r_slave_cnt == 0 );
/* bump slave message count and note if slave needs to block
*/
ringp->r_slave_msgcnt++;
if ( qsemPwouldblock( ringp->r_active_qsemh )) {
ringp->r_slave_blkcnt++;
}
/* block until msg available on active queue ("P")
*/
qsemP( ringp->r_active_qsemh );
/* get a pointer to the next msg on the queue
*/
msgp = &ringp->r_msgp[ ringp->r_active_out_ix ];
/* assert the message is where it belongs
*/
ASSERT( msgp->rm_loc == RING_LOC_ACTIVE );
/* verify the message index has not become corrupted
*/
ASSERT( msgp->rm_mix == ringp->r_active_out_ix );
/* bump the output index
*/
ringp->r_active_out_ix = ( ringp->r_active_out_ix + 1 )
%
ringp->r_len;
/* update the message location
*/
msgp->rm_loc = RING_LOC_SLAVE;
/* bump the count of messages held by the slave
*/
ringp->r_slave_cnt++;
/* return the msg to the slave
*/
return msgp;
}
static void
ring_slave_put( ring_t *ringp, ring_msg_t *msgp )
{
/* assert the slave holds exactly one message
*/
ASSERT( ringp->r_slave_cnt == 1 );
/* assert the slave is returning the right message
*/
ASSERT( msgp->rm_mix == ringp->r_ready_in_ix );
/* assert the message is where it belongs
*/
ASSERT( msgp->rm_loc == RING_LOC_SLAVE );
/* decrement the count of messages held by the slave
*/
ringp->r_slave_cnt--;
/* update the message location
*/
msgp->rm_loc = RING_LOC_READY;
/* bump the ready queue input ix
*/
ringp->r_ready_in_ix = ( ringp->r_ready_in_ix + 1 )
%
ringp->r_len;
/* bump the semaphore for the ready queue ("V")
*/
qsemV( ringp->r_ready_qsemh );
}
static int
ring_slave_entry( void *ringctxp )
{
ring_t *ringp = ( ring_t * )ringctxp;
enum { LOOPMODE_NORMAL, LOOPMODE_IGNORE, LOOPMODE_DIE } loopmode;
/* ignore signals
*/
sigset( SIGHUP, SIG_IGN );
sigset( SIGINT, SIG_IGN );
sigset( SIGQUIT, SIG_IGN );
sigset( SIGPIPE, SIG_IGN );
sigset( SIGALRM, SIG_IGN );
sigset( SIGCLD, SIG_IGN );
/* record slave pid to be used to kill slave
*/
ringp->r_slavepid = getpid( );
/* loop reading and precessing messages until told to die
*/
for ( loopmode = LOOPMODE_NORMAL ; loopmode != LOOPMODE_DIE ; ) {
ring_msg_t *msgp;
int rval;
msgp = ring_slave_get( ringp );
msgp->rm_rval = 0;
switch( msgp->rm_op ) {
case RING_OP_READ:
if ( loopmode == LOOPMODE_IGNORE ) {
msgp->rm_stat = RING_STAT_IGNORE;
break;
}
if ( ! ringp->r_first_io_time ) {
ringp->r_first_io_time = time( 0 );
ASSERT( ringp->r_first_io_time );
}
rval = ( ringp->r_readfunc )( ringp->r_clientctxp,
msgp->rm_bufp );
msgp->rm_rval = rval;
ringp->r_all_io_cnt++;
if ( msgp->rm_rval == 0 ) {
msgp->rm_stat = RING_STAT_OK;
} else {
msgp->rm_stat = RING_STAT_ERROR;
loopmode = LOOPMODE_IGNORE;
}
break;
case RING_OP_WRITE:
if ( loopmode == LOOPMODE_IGNORE ) {
msgp->rm_stat = RING_STAT_IGNORE;
break;
}
if ( ! ringp->r_first_io_time ) {
ringp->r_first_io_time = time( 0 );
ASSERT( ringp->r_first_io_time );
}
rval = ( ringp->r_writefunc )( ringp->r_clientctxp,
msgp->rm_bufp );
msgp->rm_rval = rval;
ringp->r_all_io_cnt++;
if ( msgp->rm_rval == 0 ) {
msgp->rm_stat = RING_STAT_OK;
} else {
msgp->rm_stat = RING_STAT_ERROR;
loopmode = LOOPMODE_IGNORE;
}
break;
case RING_OP_NOP:
msgp->rm_stat = RING_STAT_NOPACK;
break;
case RING_OP_TRACE:
msgp->rm_stat = RING_STAT_IGNORE;
break;
case RING_OP_RESET:
loopmode = LOOPMODE_NORMAL;
msgp->rm_stat = RING_STAT_RESETACK;
break;
case RING_OP_DIE:
msgp->rm_stat = RING_STAT_DIEACK;
loopmode = LOOPMODE_DIE;
break;
default:
msgp->rm_stat = RING_STAT_IGNORE;
break;
}
ring_slave_put( ringp, msgp );
}
exit( 0 );
}