Annotation of fam/fam/NetConnection.c++, Revision 1.1.1.1
1.1 trev 1: // Copyright (C) 1999 Silicon Graphics, Inc. All Rights Reserved.
2: //
3: // This program is free software; you can redistribute it and/or modify it
4: // under the terms of version 2 of the GNU General Public License as
5: // published by the Free Software Foundation.
6: //
7: // This program is distributed in the hope that it would be useful, but
8: // WITHOUT ANY WARRANTY; without even the implied warranty of
9: // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. Further, any
10: // license provided herein, whether implied or otherwise, is limited to
11: // this program in accordance with the express provisions of the GNU
12: // General Public License. Patent licenses, if any, provided herein do not
13: // apply to combinations of this program with other product or programs, or
14: // any other product whatsoever. This program is distributed without any
15: // warranty that the program is delivered free of the rightful claim of any
16: // third person by way of infringement or the like. See the GNU General
17: // Public License for more details.
18: //
19: // You should have received a copy of the GNU General Public License along
20: // with this program; if not, write the Free Software Foundation, Inc., 59
21: // Temple Place - Suite 330, Boston MA 02111-1307, USA.
22:
23: #include "NetConnection.h"
24:
25: #include <assert.h>
26: #include <errno.h>
27: #include <stdarg.h>
28: #include <stddef.h>
29: #include <stdio.h>
30: #include <string.h>
31: #include <sys/ioctl.h>
32: #include <sys/socket.h>
33: #include <unistd.h>
34: #include <netinet/in.h>
35:
36: #include "Log.h"
37: #include "Scheduler.h"
38:
39: NetConnection::NetConnection(int a_fd,
40: UnblockHandler uhandler, void *uclosure)
41: : fd(a_fd),
42: iready(true), oready(true),
43: iend(ibuf + sizeof(ibuf)),
44: itail(ibuf),
45: unblock_handler(uhandler), closure(uclosure)
46: {
47: // It must be the case that Length is a 32 bit int.
48: assert (sizeof(Length) == 4);
49:
50: omsgList = omsgListTail = NULL;
51:
52: // Enable nonblocking output on socket.
53:
54: int yes = 1;
55: if (ioctl(fd, FIONBIO, &yes) < 0)
56: { Log::perror("can't set NBIO on fd %d", fd);
57: shutdown(false);
58: return;
59: }
60:
61: #ifdef TINY_BUFFERS /* This kills throughput. */
62:
63: // Reduce kernel's send and receive buffer sizes. Useful for debugging
64: // flow control.
65:
66: if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &osize, sizeof osize))
67: Log::perror("setsockopt(%d, SO_SNDBUF)", fd);
68: if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &isize, sizeof isize))
69: Log::perror("setsockopt(%d, SO_RCVBUF)", fd);
70:
71: #endif /* !TINY_BUFFERS */
72:
73: // Wait for something to read.
74:
75: (void) Scheduler::install_read_handler(fd, read_handler, this);
76: }
77:
78: NetConnection::~NetConnection()
79: {
80: shutdown(false);
81: }
82:
83: void
84: NetConnection::shutdown(bool call_input)
85: {
86: if (fd >= 0)
87: {
88: Log::info("Shutting down connection");
89: Scheduler::IOHandler oldh;
90: if (iready && oready)
91: { oldh = Scheduler::remove_read_handler(fd);
92: assert(oldh == read_handler);
93: }
94: if (!oready)
95: { oldh = Scheduler::remove_write_handler(fd);
96: assert(oldh == write_handler);
97: }
98: (void) close(fd);
99: fd = -1;
100: oready = true;
101: if (call_input)
102: input_msg(NULL, 0);
103: }
104: }
105:
106: ///////////////////////////////////////////////////////////////////////////////
107: // Input
108:
109: void
110: NetConnection::read_handler(int fd, void *closure)
111: {
112: NetConnection *connection = (NetConnection *) closure;
113: assert(fd == connection->fd);
114: connection->input();
115: }
116:
117: void
118: NetConnection::input()
119: {
120: if (fd < 0)
121: return;
122:
123: // Read from socket.
124:
125: assert(itail < iend);
126: int maxbytes = iend - itail;
127: int ret = recv(fd, itail, maxbytes, 0);
128: if (ret < 0)
129: { if (errno != EAGAIN && errno != ECONNRESET)
130: { Log::perror("fd %d read error", fd);
131: shutdown();
132: }
133: return;
134: }
135: else if (ret == 0)
136: { shutdown();
137: return;
138: }
139: else // (ret > 0)
140: {
141: itail += ret;
142: }
143:
144: deliver_input();
145: }
146:
147: void
148: NetConnection::deliver_input()
149: {
150: // Find messages and process them.
151:
152: char *ihead = ibuf;
153: while (iready && oready && ihead + sizeof (Length) <= itail)
154: {
155: Length len;
156: memcpy(&len, ihead, sizeof(Length));
157: len = ntohl(len);
158:
159: if (len > MAXMSGSIZE)
160: { Log::error("fd %d message length %d bytes exceeds max of %d.",
161: fd, len, MAXMSGSIZE);
162: shutdown();
163: return;
164: }
165: if (ihead + sizeof (Length) + len > itail)
166: break;
167:
168: // Call the message reader.
169:
170: if (input_msg(ihead + sizeof (Length), len) == false) {
171: // if input_msg sees an error in the message and thinks
172: // the connection should be closed, it will return false.
173: shutdown();
174: return;
175: }
176:
177: ihead += sizeof (Length) + len;
178: }
179:
180: // If data remain in buffer, slide them to the left.
181:
182: assert(ihead <= itail);
183: int remaining = itail - ihead;
184: if (remaining && ihead != ibuf)
185: memmove(ibuf, ihead, remaining);
186: itail = ibuf + remaining;
187: assert(itail < iend);
188: }
189:
190: bool
191: NetConnection::ready_for_output() const
192: {
193: return oready;
194: }
195:
196: void
197: NetConnection::ready_for_input(bool tf)
198: {
199: set_handlers(tf, oready);
200: }
201:
202: ///////////////////////////////////////////////////////////////////////////////
203: // Output
204:
205: void
206: NetConnection::mprintf(const char *format, ...)
207: {
208: if (fd < 0)
209: return; // if closed, do nothing.
210:
211: va_list args;
212: va_start(args, format);
213:
214: msgList_t * msg = new msgList_t;
215: msg->next = NULL;
216: Length len = vsnprintf(
217: msg->msg + 4, MAXMSGSIZE + 1, format, args) + 1;
218: va_end(args);
219:
220: if (len <= 0 || len == MAXMSGSIZE+1) {
221: Log::error("tried to write a message that was too big");
222: assert(0);
223: // protocol botch. Don't send the message.
224: return;
225: }
226:
227: if (omsgListTail) {
228: omsgListTail = omsgListTail->next = msg;
229: } else {
230: omsgList = omsgListTail = msg;
231: }
232: msg->len = 4 + len;
233:
234: len = htonl(len);
235: memcpy(msg->msg, &len, 4);
236: flush();
237: }
238:
239: void
240: NetConnection::flush()
241: {
242: while (omsgList)
243: {
244: int ret = send(fd, omsgList->msg, omsgList->len, 0);
245: if (ret < 0 && errno == EWOULDBLOCK)
246: {
247: break;
248: } else
249: {
250: if (ret >= 0)
251: {
252: assert(ret == omsgList->len);
253: } else
254: {
255: /* Since the client library can close it's fd before
256: * getting acks from all FAMCancelMonitor requests we
257: * may get a broken pipe error here when writing the ack.
258: * Don't threat this as an error, since that fills the logs
259: * with crap.
260: */
261: if (errno == EPIPE)
262: {
263: Log::debug("fd %d write error: %m", fd);
264: } else
265: {
266: Log::error("fd %d write error: %m", fd);
267: }
268: }
269: msgList_t *oldHead = omsgList;
270: omsgList = omsgList->next;
271: if (omsgListTail == oldHead) {
272: omsgListTail = NULL;
273: }
274: delete oldHead;
275: }
276: }
277: set_handlers(iready, !omsgList);
278: }
279:
280: void
281: NetConnection::set_handlers(bool new_iready, bool new_oready)
282: {
283: Scheduler::IOHandler oldh;
284: bool call_unblock = false;
285:
286: if (oready != new_oready)
287: { if (new_oready)
288: { oldh = Scheduler::remove_write_handler(fd);
289: assert(oldh == write_handler);
290: call_unblock = true;
291: }
292: else
293: { oldh = Scheduler::install_write_handler(fd, write_handler, this);
294: assert(oldh == NULL);
295: }
296: }
297:
298: // Install read_handler iff iready && oready.
299:
300: if ((iready && oready) != (new_iready && new_oready))
301: {
302: if (new_iready && new_oready)
303: { oldh = Scheduler::install_read_handler(fd, read_handler, this);
304: assert(oldh == NULL);
305: }
306: else
307: { oldh = Scheduler::remove_read_handler(fd);
308: assert(oldh == read_handler);
309: }
310: }
311: bool old_iready = iready;
312: iready = new_iready;
313: oready = new_oready;
314:
315: // If we unblocked output, call the unblock handler.
316: // If there's input to deliver, deliver it.
317:
318: if (call_unblock && unblock_handler)
319: { assert(!iready || old_iready);
320: (*unblock_handler)(closure);
321: }
322: else if (iready && !old_iready)
323: deliver_input();
324: }
325:
326: void
327: NetConnection::write_handler(int fd, void *closure)
328: {
329: NetConnection *connection = (NetConnection *) closure;
330: assert(connection->fd == fd);
331: connection->flush();
332: }
333:
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>