From 8457d52edd104d4996c3c3c8894f770214c632e8 Mon Sep 17 00:00:00 2001 From: Henning Brauer Date: Sun, 21 Dec 2003 22:16:54 +0000 Subject: overhaul the write buffering code. introduce msgbuf API and bundle all info needed for the write buffers in a struct msgbuf. also switch to a write queue per handled connection (each bgp session, each pipe) instead of one big one. fixes some subtle problems and is overall nicer. ok claudio@ --- usr.sbin/bgpd/bgpd.c | 101 +++++++++++++++++++++++---------------------------- 1 file changed, 45 insertions(+), 56 deletions(-) (limited to 'usr.sbin/bgpd/bgpd.c') diff --git a/usr.sbin/bgpd/bgpd.c b/usr.sbin/bgpd/bgpd.c index f96b28e1487..c316ceedec6 100644 --- a/usr.sbin/bgpd/bgpd.c +++ b/usr.sbin/bgpd/bgpd.c @@ -1,4 +1,4 @@ -/* $OpenBSD: bgpd.c,v 1.9 2003/12/21 18:18:21 henning Exp $ */ +/* $OpenBSD: bgpd.c,v 1.10 2003/12/21 22:16:53 henning Exp $ */ /* * Copyright (c) 2003 Henning Brauer @@ -37,14 +37,15 @@ void sighdlr(int); void usage(void); int main(int, char *[]); -int reconfigure(char *, int, int *, int, int *, struct bgpd_config *, - struct mrt_config *); -int dispatch_imsg(int, int, struct mrt_config *, int, int *); +int reconfigure(char *, struct bgpd_config *, struct mrt_config *); +int dispatch_imsg(int, int, struct mrt_config *, struct msgbuf *); int mrtfd = -1; volatile sig_atomic_t mrtdump = 0; volatile sig_atomic_t quit = 0; volatile sig_atomic_t reconfig = 0; +struct msgbuf msgbuf_se; +struct msgbuf msgbuf_rde; void sighdlr(int sig) @@ -97,8 +98,6 @@ main(int argc, char *argv[]) int pipe_m2s[2]; int pipe_m2r[2]; int pipe_s2r[2]; - int m2s_writes_queued = 0; - int m2r_writes_queued = 0; conffile = CONFFILE; bgpd_process = PROC_MAIN; @@ -184,21 +183,25 @@ main(int argc, char *argv[]) close(pipe_s2r[0]); close(pipe_s2r[1]); + msgbuf_init(&msgbuf_se); + msgbuf_se.sock = pipe_m2s[0]; + msgbuf_init(&msgbuf_rde); + msgbuf_rde.sock = pipe_m2r[0]; init_imsg_buf(); while (quit == 0) { - pfd[PFD_PIPE_SESSION].fd = pipe_m2s[0]; + pfd[PFD_PIPE_SESSION].fd = msgbuf_se.sock; pfd[PFD_PIPE_SESSION].events = POLLIN; - if (m2s_writes_queued) + if (msgbuf_se.queued) pfd[PFD_PIPE_SESSION].events |= POLLOUT; - pfd[PFD_PIPE_ROUTE].fd = pipe_m2r[0]; + pfd[PFD_PIPE_ROUTE].fd = msgbuf_rde.sock; pfd[PFD_PIPE_ROUTE].events = POLLIN; - if (m2r_writes_queued) + if (msgbuf_rde.queued) pfd[PFD_PIPE_ROUTE].events |= POLLOUT; i = PFD_MRT_START; LIST_FOREACH(mconf, &mrtconf, list) - if (mconf->queued_writes) { - pfd[i].fd = mconf->fd; + if (mconf->msgbuf.queued > 0) { + pfd[i].fd = mconf->msgbuf.sock; pfd[i].events |= POLLOUT; mrt[i++] = mconf; } @@ -207,56 +210,46 @@ main(int argc, char *argv[]) if (errno != EINTR) fatal("poll error", errno); - if (nfds > 0 && (pfd[PFD_PIPE_SESSION].revents & POLLOUT) && - m2s_writes_queued) { - if ((n = buf_sock_write(pfd[PFD_PIPE_SESSION].fd)) == - -1) + if (nfds > 0 && (pfd[PFD_PIPE_SESSION].revents & POLLOUT)) + if ((n = msgbuf_write(&msgbuf_se)) == -1) fatal("pipe write error", errno); - m2s_writes_queued -= n; - } - if (nfds > 0 && (pfd[PFD_PIPE_ROUTE].revents & POLLOUT) && - m2r_writes_queued) { - if ((n = buf_sock_write(pfd[PFD_PIPE_ROUTE].fd)) == -1) + if (nfds > 0 && (pfd[PFD_PIPE_ROUTE].revents & POLLOUT)) + if ((n = msgbuf_write(&msgbuf_rde)) == -1) fatal("pipe write error", errno); - m2r_writes_queued -= n; - } if (nfds > 0 && pfd[PFD_PIPE_SESSION].revents & POLLIN) { nfds--; - dispatch_imsg(pfd[PFD_PIPE_SESSION].fd, pipe_m2s[0], - &mrtconf, pipe_m2r[0], &m2r_writes_queued); + dispatch_imsg(pfd[PFD_PIPE_SESSION].fd, + PFD_PIPE_SESSION, &mrtconf, &msgbuf_rde); } if (nfds > 0 && pfd[PFD_PIPE_ROUTE].revents & POLLIN) { nfds--; - dispatch_imsg(pfd[PFD_PIPE_ROUTE].fd, pipe_m2r[0], - &mrtconf, pipe_m2r[0], &m2r_writes_queued); + dispatch_imsg(pfd[PFD_PIPE_ROUTE].fd, PFD_PIPE_ROUTE, + &mrtconf, &msgbuf_rde); } for (j = PFD_MRT_START; j < i && nfds > 0 ; j++) { - if ((pfd[j].revents & POLLOUT) && - mrt[i]->queued_writes) { - if ((n = buf_sock_write(pfd[i].fd)) == -1) + if (pfd[j].revents & POLLOUT) { + if ((n = msgbuf_write(&mrt[i]->msgbuf)) == -1) fatal("pipe write error", errno); - mrt[i]->queued_writes -= n; } } if (reconfig) { logit(LOG_CRIT, "rereading config"); - reconfigure(conffile, pipe_m2s[0], &m2s_writes_queued, - pipe_m2r[0], &m2r_writes_queued, &conf, &mrtconf); + reconfigure(conffile, &conf, &mrtconf); LIST_FOREACH(mconf, &mrtconf, list) - mrt_state(mconf, IMSG_NONE, pipe_m2r[0], - &m2r_writes_queued); + mrt_state(mconf, IMSG_NONE, &msgbuf_rde); reconfig = 0; } + if (mrtdump == 1) { - mrt_alrm(&mrtconf, pipe_m2r[0], &m2r_writes_queued); + mrt_alrm(&mrtconf, &msgbuf_rde); mrtdump = 0; } else if (mrtdump == 2) { - mrt_usr1(&mrtconf, pipe_m2r[0], &m2r_writes_queued); + mrt_usr1(&mrtconf, &msgbuf_rde); mrtdump = 0; } } @@ -278,8 +271,7 @@ main(int argc, char *argv[]) } int -reconfigure(char *conffile, int se_fd, int *se_waiting, int rde_fd, - int *rde_waiting, struct bgpd_config *conf, struct mrt_config *mrtc) +reconfigure(char *conffile, struct bgpd_config *conf, struct mrt_config *mrtc) { struct peer *p; @@ -288,18 +280,18 @@ reconfigure(char *conffile, int se_fd, int *se_waiting, int rde_fd, conffile); return (-1); } - *se_waiting += imsg_compose(se_fd, IMSG_RECONF_CONF, 0, + imsg_compose(&msgbuf_se, IMSG_RECONF_CONF, 0, conf, sizeof(struct bgpd_config)); - *rde_waiting += imsg_compose(rde_fd, IMSG_RECONF_CONF, 0, + imsg_compose(&msgbuf_rde, IMSG_RECONF_CONF, 0, conf, sizeof(struct bgpd_config)); for (p = conf->peers; p != NULL; p = p->next) { - *se_waiting += imsg_compose(se_fd, IMSG_RECONF_PEER, - p->conf.id, &p->conf, sizeof(struct peer_config)); - *rde_waiting += imsg_compose(rde_fd, IMSG_RECONF_PEER, - p->conf.id, &p->conf, sizeof(struct peer_config)); + imsg_compose(&msgbuf_se, IMSG_RECONF_PEER, p->conf.id, + &p->conf, sizeof(struct peer_config)); + imsg_compose(&msgbuf_rde, IMSG_RECONF_PEER, p->conf.id, + &p->conf, sizeof(struct peer_config)); } - *se_waiting += imsg_compose(se_fd, IMSG_RECONF_DONE, 0, NULL, 0); - *rde_waiting += imsg_compose(rde_fd, IMSG_RECONF_DONE, 0, NULL, 0); + imsg_compose(&msgbuf_se, IMSG_RECONF_DONE, 0, NULL, 0); + imsg_compose(&msgbuf_rde, IMSG_RECONF_DONE, 0, NULL, 0); return (0); } @@ -308,12 +300,11 @@ reconfigure(char *conffile, int se_fd, int *se_waiting, int rde_fd, * XXX currently messages are only buffered for mrt files. */ int -dispatch_imsg(int fd, int idx, struct mrt_config *conf, - int rfd, int *rwait /*, int sfd, int *swait */) +dispatch_imsg(int fd, int idx, struct mrt_config *conf, struct msgbuf *rde) { + struct imsg imsg; struct buf *wbuf; struct mrtdump_config *m; - struct imsg imsg; ssize_t len; int n; @@ -324,20 +315,18 @@ dispatch_imsg(int fd, int idx, struct mrt_config *conf, LIST_FOREACH(m, conf, list) { if (m->id != imsg.hdr.peerid) continue; - if (mrt_state(m, imsg.hdr.type, - rfd, rwait) == 0) + if (mrt_state(m, imsg.hdr.type, rde) == 0) break; - if (m->fd == -1) + if (m->msgbuf.sock == -1) break; len = imsg.hdr.len - IMSG_HEADER_SIZE; - wbuf = buf_open(NULL, m->fd, len); + wbuf = buf_open(len); if (wbuf == NULL) fatal("buf_open error", 0); if (buf_add(wbuf, imsg.data, len) == -1) fatal("buf_add error", 0); - if ((n = buf_close(wbuf)) == -1) + if ((n = buf_close(&m->msgbuf, wbuf)) == -1) fatal("buf_close error", 0); - m->queued_writes += n; break; } break; -- cgit v1.2.3