diff options
author | Henning Brauer <henning@cvs.openbsd.org> | 2003-12-21 22:16:54 +0000 |
---|---|---|
committer | Henning Brauer <henning@cvs.openbsd.org> | 2003-12-21 22:16:54 +0000 |
commit | 8457d52edd104d4996c3c3c8894f770214c632e8 (patch) | |
tree | aa17859eab0b6d68c19b335e87f8978d0aebecca /usr.sbin | |
parent | 23495634e653d596492ca742142b25d18babc127 (diff) |
overhaul the write buffering code.
introduce msgbuf API and bundle all info needed for the write buffers in a
struct msgbuf.
also switch to a write queue per handled connection (each bgp session, each
pipe) instead of one big one.
fixes some subtle problems and is overall nicer.
ok claudio@
Diffstat (limited to 'usr.sbin')
-rw-r--r-- | usr.sbin/bgpd/bgpd.c | 101 | ||||
-rw-r--r-- | usr.sbin/bgpd/bgpd.h | 48 | ||||
-rw-r--r-- | usr.sbin/bgpd/buffer.c | 107 | ||||
-rw-r--r-- | usr.sbin/bgpd/imsg.c | 16 | ||||
-rw-r--r-- | usr.sbin/bgpd/mrt.c | 80 | ||||
-rw-r--r-- | usr.sbin/bgpd/mrt.h | 18 | ||||
-rw-r--r-- | usr.sbin/bgpd/parse.y | 4 | ||||
-rw-r--r-- | usr.sbin/bgpd/rde.c | 54 | ||||
-rw-r--r-- | usr.sbin/bgpd/rde.h | 5 | ||||
-rw-r--r-- | usr.sbin/bgpd/rde_prefix.c | 7 | ||||
-rw-r--r-- | usr.sbin/bgpd/session.c | 63 |
11 files changed, 229 insertions, 274 deletions
diff --git a/usr.sbin/bgpd/bgpd.c b/usr.sbin/bgpd/bgpd.c index f96b28e1487..c316ceedec6 100644 --- a/usr.sbin/bgpd/bgpd.c +++ b/usr.sbin/bgpd/bgpd.c @@ -1,4 +1,4 @@ -/* $OpenBSD: bgpd.c,v 1.9 2003/12/21 18:18:21 henning Exp $ */ +/* $OpenBSD: bgpd.c,v 1.10 2003/12/21 22:16:53 henning Exp $ */ /* * Copyright (c) 2003 Henning Brauer <henning@openbsd.org> @@ -37,14 +37,15 @@ void sighdlr(int); void usage(void); int main(int, char *[]); -int reconfigure(char *, int, int *, int, int *, struct bgpd_config *, - struct mrt_config *); -int dispatch_imsg(int, int, struct mrt_config *, int, int *); +int reconfigure(char *, struct bgpd_config *, struct mrt_config *); +int dispatch_imsg(int, int, struct mrt_config *, struct msgbuf *); int mrtfd = -1; volatile sig_atomic_t mrtdump = 0; volatile sig_atomic_t quit = 0; volatile sig_atomic_t reconfig = 0; +struct msgbuf msgbuf_se; +struct msgbuf msgbuf_rde; void sighdlr(int sig) @@ -97,8 +98,6 @@ main(int argc, char *argv[]) int pipe_m2s[2]; int pipe_m2r[2]; int pipe_s2r[2]; - int m2s_writes_queued = 0; - int m2r_writes_queued = 0; conffile = CONFFILE; bgpd_process = PROC_MAIN; @@ -184,21 +183,25 @@ main(int argc, char *argv[]) close(pipe_s2r[0]); close(pipe_s2r[1]); + msgbuf_init(&msgbuf_se); + msgbuf_se.sock = pipe_m2s[0]; + msgbuf_init(&msgbuf_rde); + msgbuf_rde.sock = pipe_m2r[0]; init_imsg_buf(); while (quit == 0) { - pfd[PFD_PIPE_SESSION].fd = pipe_m2s[0]; + pfd[PFD_PIPE_SESSION].fd = msgbuf_se.sock; pfd[PFD_PIPE_SESSION].events = POLLIN; - if (m2s_writes_queued) + if (msgbuf_se.queued) pfd[PFD_PIPE_SESSION].events |= POLLOUT; - pfd[PFD_PIPE_ROUTE].fd = pipe_m2r[0]; + pfd[PFD_PIPE_ROUTE].fd = msgbuf_rde.sock; pfd[PFD_PIPE_ROUTE].events = POLLIN; - if (m2r_writes_queued) + if (msgbuf_rde.queued) pfd[PFD_PIPE_ROUTE].events |= POLLOUT; i = PFD_MRT_START; LIST_FOREACH(mconf, &mrtconf, list) - if (mconf->queued_writes) { - pfd[i].fd = mconf->fd; + if (mconf->msgbuf.queued > 0) { + pfd[i].fd = mconf->msgbuf.sock; pfd[i].events |= POLLOUT; mrt[i++] = mconf; } @@ -207,56 +210,46 @@ main(int argc, char *argv[]) if (errno != EINTR) fatal("poll error", errno); - if (nfds > 0 && (pfd[PFD_PIPE_SESSION].revents & POLLOUT) && - m2s_writes_queued) { - if ((n = buf_sock_write(pfd[PFD_PIPE_SESSION].fd)) == - -1) + if (nfds > 0 && (pfd[PFD_PIPE_SESSION].revents & POLLOUT)) + if ((n = msgbuf_write(&msgbuf_se)) == -1) fatal("pipe write error", errno); - m2s_writes_queued -= n; - } - if (nfds > 0 && (pfd[PFD_PIPE_ROUTE].revents & POLLOUT) && - m2r_writes_queued) { - if ((n = buf_sock_write(pfd[PFD_PIPE_ROUTE].fd)) == -1) + if (nfds > 0 && (pfd[PFD_PIPE_ROUTE].revents & POLLOUT)) + if ((n = msgbuf_write(&msgbuf_rde)) == -1) fatal("pipe write error", errno); - m2r_writes_queued -= n; - } if (nfds > 0 && pfd[PFD_PIPE_SESSION].revents & POLLIN) { nfds--; - dispatch_imsg(pfd[PFD_PIPE_SESSION].fd, pipe_m2s[0], - &mrtconf, pipe_m2r[0], &m2r_writes_queued); + dispatch_imsg(pfd[PFD_PIPE_SESSION].fd, + PFD_PIPE_SESSION, &mrtconf, &msgbuf_rde); } if (nfds > 0 && pfd[PFD_PIPE_ROUTE].revents & POLLIN) { nfds--; - dispatch_imsg(pfd[PFD_PIPE_ROUTE].fd, pipe_m2r[0], - &mrtconf, pipe_m2r[0], &m2r_writes_queued); + dispatch_imsg(pfd[PFD_PIPE_ROUTE].fd, PFD_PIPE_ROUTE, + &mrtconf, &msgbuf_rde); } for (j = PFD_MRT_START; j < i && nfds > 0 ; j++) { - if ((pfd[j].revents & POLLOUT) && - mrt[i]->queued_writes) { - if ((n = buf_sock_write(pfd[i].fd)) == -1) + if (pfd[j].revents & POLLOUT) { + if ((n = msgbuf_write(&mrt[i]->msgbuf)) == -1) fatal("pipe write error", errno); - mrt[i]->queued_writes -= n; } } if (reconfig) { logit(LOG_CRIT, "rereading config"); - reconfigure(conffile, pipe_m2s[0], &m2s_writes_queued, - pipe_m2r[0], &m2r_writes_queued, &conf, &mrtconf); + reconfigure(conffile, &conf, &mrtconf); LIST_FOREACH(mconf, &mrtconf, list) - mrt_state(mconf, IMSG_NONE, pipe_m2r[0], - &m2r_writes_queued); + mrt_state(mconf, IMSG_NONE, &msgbuf_rde); reconfig = 0; } + if (mrtdump == 1) { - mrt_alrm(&mrtconf, pipe_m2r[0], &m2r_writes_queued); + mrt_alrm(&mrtconf, &msgbuf_rde); mrtdump = 0; } else if (mrtdump == 2) { - mrt_usr1(&mrtconf, pipe_m2r[0], &m2r_writes_queued); + mrt_usr1(&mrtconf, &msgbuf_rde); mrtdump = 0; } } @@ -278,8 +271,7 @@ main(int argc, char *argv[]) } int -reconfigure(char *conffile, int se_fd, int *se_waiting, int rde_fd, - int *rde_waiting, struct bgpd_config *conf, struct mrt_config *mrtc) +reconfigure(char *conffile, struct bgpd_config *conf, struct mrt_config *mrtc) { struct peer *p; @@ -288,18 +280,18 @@ reconfigure(char *conffile, int se_fd, int *se_waiting, int rde_fd, conffile); return (-1); } - *se_waiting += imsg_compose(se_fd, IMSG_RECONF_CONF, 0, + imsg_compose(&msgbuf_se, IMSG_RECONF_CONF, 0, conf, sizeof(struct bgpd_config)); - *rde_waiting += imsg_compose(rde_fd, IMSG_RECONF_CONF, 0, + imsg_compose(&msgbuf_rde, IMSG_RECONF_CONF, 0, conf, sizeof(struct bgpd_config)); for (p = conf->peers; p != NULL; p = p->next) { - *se_waiting += imsg_compose(se_fd, IMSG_RECONF_PEER, - p->conf.id, &p->conf, sizeof(struct peer_config)); - *rde_waiting += imsg_compose(rde_fd, IMSG_RECONF_PEER, - p->conf.id, &p->conf, sizeof(struct peer_config)); + imsg_compose(&msgbuf_se, IMSG_RECONF_PEER, p->conf.id, + &p->conf, sizeof(struct peer_config)); + imsg_compose(&msgbuf_rde, IMSG_RECONF_PEER, p->conf.id, + &p->conf, sizeof(struct peer_config)); } - *se_waiting += imsg_compose(se_fd, IMSG_RECONF_DONE, 0, NULL, 0); - *rde_waiting += imsg_compose(rde_fd, IMSG_RECONF_DONE, 0, NULL, 0); + imsg_compose(&msgbuf_se, IMSG_RECONF_DONE, 0, NULL, 0); + imsg_compose(&msgbuf_rde, IMSG_RECONF_DONE, 0, NULL, 0); return (0); } @@ -308,12 +300,11 @@ reconfigure(char *conffile, int se_fd, int *se_waiting, int rde_fd, * XXX currently messages are only buffered for mrt files. */ int -dispatch_imsg(int fd, int idx, struct mrt_config *conf, - int rfd, int *rwait /*, int sfd, int *swait */) +dispatch_imsg(int fd, int idx, struct mrt_config *conf, struct msgbuf *rde) { + struct imsg imsg; struct buf *wbuf; struct mrtdump_config *m; - struct imsg imsg; ssize_t len; int n; @@ -324,20 +315,18 @@ dispatch_imsg(int fd, int idx, struct mrt_config *conf, LIST_FOREACH(m, conf, list) { if (m->id != imsg.hdr.peerid) continue; - if (mrt_state(m, imsg.hdr.type, - rfd, rwait) == 0) + if (mrt_state(m, imsg.hdr.type, rde) == 0) break; - if (m->fd == -1) + if (m->msgbuf.sock == -1) break; len = imsg.hdr.len - IMSG_HEADER_SIZE; - wbuf = buf_open(NULL, m->fd, len); + wbuf = buf_open(len); if (wbuf == NULL) fatal("buf_open error", 0); if (buf_add(wbuf, imsg.data, len) == -1) fatal("buf_add error", 0); - if ((n = buf_close(wbuf)) == -1) + if ((n = buf_close(&m->msgbuf, wbuf)) == -1) fatal("buf_close error", 0); - m->queued_writes += n; break; } break; diff --git a/usr.sbin/bgpd/bgpd.h b/usr.sbin/bgpd/bgpd.h index cea79893c0d..29efac291a2 100644 --- a/usr.sbin/bgpd/bgpd.h +++ b/usr.sbin/bgpd/bgpd.h @@ -1,4 +1,4 @@ -/* $OpenBSD: bgpd.h,v 1.8 2003/12/21 18:21:24 henning Exp $ */ +/* $OpenBSD: bgpd.h,v 1.9 2003/12/21 22:16:53 henning Exp $ */ /* * Copyright (c) 2003 Henning Brauer <henning@openbsd.org> @@ -78,6 +78,20 @@ enum reconf_action { RECONF_DELETE }; +struct buf { + TAILQ_ENTRY(buf) entries; + u_char *buf; + ssize_t size; + ssize_t wpos; + ssize_t rpos; +}; + +struct msgbuf { + u_int32_t queued; + int sock; + TAILQ_HEAD(bufs, buf) bufs; +}; + struct bgpd_config { int opts; u_int16_t as; @@ -120,8 +134,7 @@ struct peer { u_int StartTimerInterval; int sock; int events; - int queued_writes; - int queued_imsg_writes; + struct msgbuf wbuf; struct peer_buf_read *rbuf; struct peer *next; }; @@ -151,27 +164,16 @@ LIST_HEAD(mrt_config, mrtdump_config); struct mrtdump_config { enum mrtdump_type type; u_int32_t id; - int fd; + struct msgbuf msgbuf; char name[MRT_FILE_LEN]; /* base file name */ char file[MRT_FILE_LEN]; /* actual file name */ time_t ReopenTimer; time_t ReopenTimerInterval; - int queued_writes; enum mrtdump_state state; LIST_ENTRY(mrtdump_config) list; }; -struct buf { - TAILQ_ENTRY(buf) entries; - struct peer *peer; - int sock; - u_char *buf; - ssize_t size; - ssize_t wpos; - ssize_t rpos; -}; - /* ipc messages */ #define IMSG_HEADER_SIZE sizeof(struct imsg_hdr) @@ -232,15 +234,15 @@ enum suberr_update { int session_main(struct bgpd_config *, int[2], int[2]); /* buffer.c */ -struct buf *buf_open(struct peer *, int, ssize_t); +struct buf *buf_open(ssize_t); int buf_add(struct buf *, void *, ssize_t); void *buf_reserve(struct buf *, ssize_t); -int buf_close(struct buf *); -int buf_write(struct buf *); -void buf_free(struct buf *buf); -void buf_peer_remove(struct peer *); -int buf_peer_write(struct peer *); -int buf_sock_write(int); +int buf_close(struct msgbuf *, struct buf *); +void buf_free(struct buf *); +void msgbuf_init(struct msgbuf *); +void msgbuf_clear(struct msgbuf *); +int msgbuf_write(struct msgbuf *); + /* log.c */ void log_init(int); @@ -266,7 +268,7 @@ int merge_config(struct bgpd_config *, struct bgpd_config *); /* imsg.c */ void init_imsg_buf(void); int get_imsg(int, struct imsg *); -int imsg_compose(int, int, u_int32_t, void *, u_int16_t); +int imsg_compose(struct msgbuf *, int, u_int32_t, void *, u_int16_t); void imsg_free(struct imsg *); /* rde.c */ diff --git a/usr.sbin/bgpd/buffer.c b/usr.sbin/bgpd/buffer.c index 0d9f25f6f71..1c2128f7e36 100644 --- a/usr.sbin/bgpd/buffer.c +++ b/usr.sbin/bgpd/buffer.c @@ -1,4 +1,4 @@ -/* $OpenBSD: buffer.c,v 1.5 2003/12/21 18:04:08 claudio Exp $ */ +/* $OpenBSD: buffer.c,v 1.6 2003/12/21 22:16:53 henning Exp $ */ /* * Copyright (c) 2003 Henning Brauer <henning@openbsd.org> @@ -27,13 +27,12 @@ #include "bgpd.h" -TAILQ_HEAD(bufs, buf) bufs = TAILQ_HEAD_INITIALIZER(bufs); - -void buf_enqueue(struct buf *); -void buf_dequeue(struct buf *); +int buf_write(int, struct buf *); +void buf_enqueue(struct msgbuf *, struct buf *); +void buf_dequeue(struct msgbuf *, struct buf *); struct buf * -buf_open(struct peer *peer, int sock, ssize_t len) +buf_open(ssize_t len) { struct buf *buf; @@ -45,8 +44,6 @@ buf_open(struct peer *peer, int sock, ssize_t len) } buf->size = len; - buf->peer = peer; - buf->sock = sock; return (buf); } @@ -76,7 +73,7 @@ buf_reserve(struct buf *buf, ssize_t len) } int -buf_close(struct buf *buf) +buf_close(struct msgbuf *msgbuf, struct buf *buf) { /* * we first try to write out directly @@ -85,8 +82,8 @@ buf_close(struct buf *buf) int n; - if (buf->peer != NULL && buf->peer->queued_writes == 0) { - if ((n = buf_write(buf)) == -1) + if (msgbuf->queued == 0) { + if ((n = buf_write(msgbuf->sock, buf)) == -1) return (-1); if (n == 1) { /* all data written out */ @@ -96,25 +93,23 @@ buf_close(struct buf *buf) } /* we have to queue */ - buf_enqueue(buf); + buf_enqueue(msgbuf, buf); return (1); } int -buf_write(struct buf *buf) +buf_write(int sock, struct buf *buf) { ssize_t n; - if ((n = write(buf->sock, buf->buf + buf->rpos, + if ((n = write(sock, buf->buf + buf->rpos, buf->size-buf->rpos)) == -1) { if (errno == EAGAIN) /* cannot write immediately */ return (0); else { - if (buf->peer != NULL) - log_err(buf->peer, "write error"); - else - logit(LOG_CRIT, "pipe write error: %s", - strerror(errno)); + /* XXX better let caller log with info which sock etc */ + logit(LOG_CRIT, "buf_write: write error: %s", + strerror(errno)); return (-1); } } @@ -134,45 +129,28 @@ buf_free(struct buf *buf) } void -buf_peer_remove(struct peer *peer) +msgbuf_init(struct msgbuf *msgbuf) { - struct buf *buf, *next; - - for (buf = TAILQ_FIRST(&bufs); buf != NULL; buf = next) { - next = TAILQ_NEXT(buf, entries); - if (buf->peer == peer) - buf_dequeue(buf); - } + msgbuf->queued = 0; + msgbuf->sock = -1; + TAILQ_INIT(&msgbuf->bufs); } -int -buf_peer_write(struct peer *peer) +void +msgbuf_clear(struct msgbuf *msgbuf) { - /* - * possible race here - * when we cannot write out data completely from a buffer, - * we MUST return and NOT try to write out stuff from later buffers - - * the socket might have become writeable again - */ struct buf *buf, *next; - int n; - for (buf = TAILQ_FIRST(&bufs); buf != NULL; buf = next) { + for (buf = TAILQ_FIRST(&msgbuf->bufs); buf != NULL; buf = next) { next = TAILQ_NEXT(buf, entries); - if (buf->peer == peer) { - if ((n = buf_write(buf)) == -1) - return (-1); - if (n == 1) /* everything written out */ - buf_dequeue(buf); - else - return (0); - } + buf_dequeue(msgbuf, buf); } - return (0); + msgbuf->queued = 0; + msgbuf->sock = -1; } int -buf_sock_write(int sock) +msgbuf_write(struct msgbuf *msgbuf) { /* * possible race here @@ -181,37 +159,32 @@ buf_sock_write(int sock) * the socket might have become writeable again */ struct buf *buf, *next; - int n, cleared = 0; + int n; - for (buf = TAILQ_FIRST(&bufs); buf != NULL; buf = next) { + for (buf = TAILQ_FIRST(&msgbuf->bufs); buf != NULL; buf = next) { next = TAILQ_NEXT(buf, entries); - if (buf->sock == sock) { - if ((n = buf_write(buf)) == -1) - return (-1); - if (n == 1) { /* everything written out */ - buf_dequeue(buf); - cleared++; - } else - return (cleared); - } + if ((n = buf_write(msgbuf->sock, buf)) == -1) + return (-1); + if (n == 1) /* everything written out */ + buf_dequeue(msgbuf, buf); + else + return (0); } - return (cleared); + return (0); } void -buf_enqueue(struct buf *buf) +buf_enqueue(struct msgbuf *msgbuf, struct buf *buf) { /* might want a tailq per peer w/ pointers to the bufs */ - TAILQ_INSERT_TAIL(&bufs, buf, entries); - if (buf->peer != NULL) - buf->peer->queued_writes++; + TAILQ_INSERT_TAIL(&msgbuf->bufs, buf, entries); + msgbuf->queued++; } void -buf_dequeue(struct buf *buf) +buf_dequeue(struct msgbuf *msgbuf, struct buf *buf) { - TAILQ_REMOVE(&bufs, buf, entries); - if (buf->peer != NULL) - buf->peer->queued_writes--; + TAILQ_REMOVE(&msgbuf->bufs, buf, entries); + msgbuf->queued--; buf_free(buf); } diff --git a/usr.sbin/bgpd/imsg.c b/usr.sbin/bgpd/imsg.c index e2f72ca7576..a6b5ef99f38 100644 --- a/usr.sbin/bgpd/imsg.c +++ b/usr.sbin/bgpd/imsg.c @@ -1,4 +1,4 @@ -/* $OpenBSD: imsg.c,v 1.5 2003/12/20 21:19:40 claudio Exp $ */ +/* $OpenBSD: imsg.c,v 1.6 2003/12/21 22:16:53 henning Exp $ */ /* * Copyright (c) 2003 Henning Brauer <henning@openbsd.org> @@ -84,7 +84,7 @@ get_imsg(int fd, struct imsg *imsg) } int -imsg_compose(int fd, int type, u_int32_t peerid, void *data, +imsg_compose(struct msgbuf *msgbuf, int type, u_int32_t peerid, void *data, u_int16_t datalen) { struct buf *wbuf; @@ -94,16 +94,16 @@ imsg_compose(int fd, int type, u_int32_t peerid, void *data, hdr.len = datalen + IMSG_HEADER_SIZE; hdr.type = type; hdr.peerid = peerid; - wbuf = buf_open(NULL, fd, hdr.len); + wbuf = buf_open(hdr.len); if (wbuf == NULL) - fatal("buf_open error", 0); + fatal("imsg_compose: buf_open error", 0); if (buf_add(wbuf, &hdr, sizeof(hdr)) == -1) - fatal("buf_add error", 0); + fatal("imsg_compose: buf_add error", 0); if (datalen) if (buf_add(wbuf, data, datalen) == -1) - fatal("buf_add error", 0); - if ((n = buf_close(wbuf)) == -1) - fatal("buf_close error", 0); + fatal("imsg_compose: buf_add error", 0); + if ((n = buf_close(msgbuf, wbuf)) == -1) + fatal("imsg_compose: buf_close error", 0); return (n); } diff --git a/usr.sbin/bgpd/mrt.c b/usr.sbin/bgpd/mrt.c index 5b46eb78f1d..32aec6ccb2b 100644 --- a/usr.sbin/bgpd/mrt.c +++ b/usr.sbin/bgpd/mrt.c @@ -1,4 +1,4 @@ -/* $OpenBSD: mrt.c,v 1.6 2003/12/21 16:11:33 claudio Exp $ */ +/* $OpenBSD: mrt.c,v 1.7 2003/12/21 22:16:53 henning Exp $ */ /* * Copyright (c) 2003 Claudio Jeker <claudio@openbsd.org> @@ -38,8 +38,8 @@ * XXX imsg_create(), imsg_add(), imsg_close() ... */ -static int mrt_dump_entry(int, struct prefix *, u_int16_t, - struct peer_config *, u_int32_t); +static int mrt_dump_entry(struct mrt *, struct prefix *, u_int16_t, + struct peer_config *); static void mrt_dump_header(struct buf *, u_int16_t, u_int16_t, u_int32_t); static int mrt_open(struct mrtdump_config *); @@ -74,7 +74,7 @@ static int mrt_open(struct mrtdump_config *); } while (0) int -mrt_dump_bgp_msg(int fd, u_char *pkg, u_int16_t pkglen, int type, +mrt_dump_bgp_msg(struct mrt *mrt, void *pkg, u_int16_t pkglen, int type, struct peer_config *peer, struct bgpd_config *bgp) { struct buf *buf; @@ -86,8 +86,8 @@ mrt_dump_bgp_msg(int fd, u_char *pkg, u_int16_t pkglen, int type, hdr.len = len + IMSG_HEADER_SIZE + MRT_HEADER_SIZE; hdr.type = IMSG_MRT_MSG; - hdr.peerid = peer->id; - buf = buf_open(NULL, fd, hdr.len); + hdr.peerid = mrt->id; + buf = buf_open(hdr.len); if (buf == NULL) fatal("mrt_dump_bgp_msg", errno); if (buf_add(buf, &hdr, sizeof(hdr)) == -1) @@ -113,15 +113,15 @@ mrt_dump_bgp_msg(int fd, u_char *pkg, u_int16_t pkglen, int type, if (buf_add(buf, pkg, pkglen) == -1) fatal("buf_add error", 0); - if ((n = buf_close(buf)) == -1) + if ((n = buf_close(mrt->msgbuf, buf)) == -1) fatal("buf_close error", 0); return (n); } static int -mrt_dump_entry(int fd, struct prefix *p, u_int16_t snum, - struct peer_config *peer, u_int32_t id) +mrt_dump_entry(struct mrt *mrt, struct prefix *p, u_int16_t snum, + struct peer_config *peer) { struct buf *buf; void *bptr; @@ -134,8 +134,8 @@ mrt_dump_entry(int fd, struct prefix *p, u_int16_t snum, hdr.len = len + IMSG_HEADER_SIZE + MRT_HEADER_SIZE; hdr.type = IMSG_MRT_MSG; - hdr.peerid = id; - buf = buf_open(NULL, fd, hdr.len); + hdr.peerid = mrt->id; + buf = buf_open(hdr.len); if (buf == NULL) fatal("mrt_dump_entry", errno); if (buf_add(buf, &hdr, sizeof(hdr)) == -1) @@ -159,7 +159,7 @@ mrt_dump_entry(int fd, struct prefix *p, u_int16_t snum, if (attr_dump(bptr, attr_len, &p->aspath->flags) == -1) fatal("attr_dump error", 0); - if ((n = buf_close(buf)) == -1) + if ((n = buf_close(mrt->msgbuf, buf)) == -1) fatal("buf_close error", 0); return (n); @@ -174,11 +174,10 @@ mrt_clear_seq(void) } void -mrt_dump_upcall(struct pt_entry *pt, int fd, int *wait, void *arg) +mrt_dump_upcall(struct pt_entry *pt, void *ptr) { + struct mrt *mrtbuf = ptr; struct prefix *p; - u_int32_t *idp = arg; - u_int32_t id = *idp; /* * dump all prefixes even the inactive ones. That is the way zebra @@ -186,7 +185,7 @@ mrt_dump_upcall(struct pt_entry *pt, int fd, int *wait, void *arg) * be dumped p should be set to p = pt->active. */ LIST_FOREACH(p, &pt->prefix_h, prefix_l) - *wait += mrt_dump_entry(fd, p, sequencenum++, &p->peer->conf, id); + mrt_dump_entry(mrtbuf, p, sequencenum++, &p->peer->conf); } static void @@ -216,21 +215,22 @@ mrt_open(struct mrtdump_config *conf) if (strftime(conf->file, sizeof(conf->file), conf->name, localtime(&now)) == 0) { logit(LOG_CRIT, "mrt_open strftime failed"); - conf->fd = -1; + conf->msgbuf.sock = -1; return -1; } - conf->fd = open(conf->file, O_WRONLY|O_NONBLOCK|O_CREAT|O_TRUNC, 0644); - if (conf->fd == -1) + conf->msgbuf.sock = open(conf->file, + O_WRONLY|O_NONBLOCK|O_CREAT|O_TRUNC, 0644); + if (conf->msgbuf.sock == -1) logit(LOG_CRIT, "mrt_open %s: %s", conf->file, strerror(errno)); - return conf->fd; + return conf->msgbuf.sock; } int mrt_state(struct mrtdump_config *m, enum imsg_type type, - int rfd, int *rwait /*, int sfd, int *swait */) + struct msgbuf *rde /*, struct msgbuf *se */) { switch (m->state) { case MRT_STATE_DONE: @@ -240,12 +240,11 @@ mrt_state(struct mrtdump_config *m, enum imsg_type type, switch (type) { case IMSG_NONE: if (m->type == MRT_TABLE_DUMP) - *rwait += imsg_compose(rfd, IMSG_MRT_END, - m->id, NULL, 0); + imsg_compose(rde, IMSG_MRT_END, m->id, NULL, 0); return (0); case IMSG_MRT_END: /* dump no longer valid */ - close(m->fd); + close(m->msgbuf.sock); LIST_REMOVE(m, list); free(m); return (0); @@ -257,16 +256,14 @@ mrt_state(struct mrtdump_config *m, enum imsg_type type, switch (type) { case IMSG_NONE: if (m->type == MRT_TABLE_DUMP) - *rwait += imsg_compose(rfd, IMSG_MRT_END, - m->id, NULL, 0); + imsg_compose(rde, IMSG_MRT_END, m->id, NULL, 0); return (0); case IMSG_MRT_END: - if (m->fd != -1) - close(m->fd); + if (m->msgbuf.sock != -1) + close(m->msgbuf.sock); m->state = MRT_STATE_OPEN; if (m->type == MRT_TABLE_DUMP) - *rwait += imsg_compose(rfd, IMSG_MRT_REQ, - m->id, NULL, 0); + imsg_compose(rde, IMSG_MRT_REQ, m->id, NULL, 0); return (0); default: break; @@ -276,8 +273,7 @@ mrt_state(struct mrtdump_config *m, enum imsg_type type, switch (type) { case IMSG_NONE: if (m->type == MRT_TABLE_DUMP) - *rwait += imsg_compose(rfd, IMSG_MRT_REQ, - m->id, NULL, 0); + imsg_compose(rde, IMSG_MRT_REQ, m->id, NULL, 0); return (0); case IMSG_MRT_MSG: mrt_open(m); @@ -294,8 +290,7 @@ mrt_state(struct mrtdump_config *m, enum imsg_type type, } int -mrt_usr1(struct mrt_config *conf, int rfd, int *rwait - /*, int sfd, int *swait */) +mrt_usr1(struct mrt_config *conf, struct msgbuf *rde /*, struct msgbuf *se */) { struct mrtdump_config *m; time_t now; @@ -310,13 +305,11 @@ mrt_usr1(struct mrt_config *conf, int rfd, int *rwait break; case MRT_STATE_DONE: m->state = MRT_STATE_OPEN; - *rwait += imsg_compose(rfd, IMSG_MRT_REQ, - m->id, NULL, 0); + imsg_compose(rde, IMSG_MRT_REQ, m->id, NULL, 0); break; default: m->state = MRT_STATE_REOPEN; - *rwait += imsg_compose(rfd, IMSG_MRT_END, - m->id, NULL, 0); + imsg_compose(rde, IMSG_MRT_END, m->id, NULL, 0); break; } @@ -333,8 +326,7 @@ mrt_usr1(struct mrt_config *conf, int rfd, int *rwait } int -mrt_alrm(struct mrt_config *conf, int rfd, int *rwait - /*, int sfd, int *swait */) +mrt_alrm(struct mrt_config *conf, struct msgbuf *rde /*, struct msgbuf *se */) { struct mrtdump_config *m; time_t now; @@ -352,14 +344,14 @@ mrt_alrm(struct mrt_config *conf, int rfd, int *rwait case MRT_STATE_DONE: m->state = MRT_STATE_OPEN; if (m->type == MRT_TABLE_DUMP) - *rwait += imsg_compose(rfd, - IMSG_MRT_REQ, m->id, NULL, 0); + imsg_compose(rde, IMSG_MRT_REQ, m->id, + NULL, 0); break; default: m->state = MRT_STATE_REOPEN; if (m->type == MRT_TABLE_DUMP) - *rwait += imsg_compose(rfd, - IMSG_MRT_END, m->id, NULL, 0); + imsg_compose(rde, IMSG_MRT_END, m->id, + NULL, 0); break; } diff --git a/usr.sbin/bgpd/mrt.h b/usr.sbin/bgpd/mrt.h index 837661e760a..3abccb8376d 100644 --- a/usr.sbin/bgpd/mrt.h +++ b/usr.sbin/bgpd/mrt.h @@ -1,4 +1,4 @@ -/* $OpenBSD: mrt.h,v 1.2 2003/12/17 18:11:31 henning Exp $ */ +/* $OpenBSD: mrt.h,v 1.3 2003/12/21 22:16:53 henning Exp $ */ /* * Copyright (c) 2003 Claudio Jeker <cjeker@diehard.n-r-g.com> @@ -221,14 +221,18 @@ struct mrt_bgp_state_header { /* pseudo predeclarations */ struct prefix; struct pt_entry; +struct mrt { + struct msgbuf *msgbuf; + u_int32_t id; +}; /* prototypes */ -int mrt_dump_bgp_msg(int, u_char *, u_int16_t, int, struct peer_config *, - struct bgpd_config *); +int mrt_dump_bgp_msg(struct mrt *, void *, u_int16_t, int, + struct peer_config *, struct bgpd_config *); void mrt_clear_seq(void); -void mrt_dump_upcall(struct pt_entry *, int, int *, void *); -int mrt_state(struct mrtdump_config *, enum imsg_type, int, int *); -int mrt_alrm(struct mrt_config *, int, int *); -int mrt_usr1(struct mrt_config *, int, int *); +void mrt_dump_upcall(struct pt_entry *, void *); +int mrt_state(struct mrtdump_config *, enum imsg_type, struct msgbuf *); +int mrt_alrm(struct mrt_config *, struct msgbuf *); +int mrt_usr1(struct mrt_config *, struct msgbuf *); #endif diff --git a/usr.sbin/bgpd/parse.y b/usr.sbin/bgpd/parse.y index b90845f4f83..0cecc496773 100644 --- a/usr.sbin/bgpd/parse.y +++ b/usr.sbin/bgpd/parse.y @@ -1,4 +1,4 @@ -/* $OpenBSD: parse.y,v 1.4 2003/12/20 20:53:30 henning Exp $ */ +/* $OpenBSD: parse.y,v 1.5 2003/12/21 22:16:53 henning Exp $ */ /* * Copyright (c) 2002, 2003 Henning Brauer <henning@openbsd.org> @@ -732,7 +732,7 @@ add_mrtconfig(enum mrtdump_type type, char *name, time_t timeout) fatal("add_mrtconfig", errno); n->type = MRT_TABLE_DUMP; - n->fd = -1; + n->msgbuf.sock = -1; if (strlcpy(n->name, name, sizeof(n->name)) > sizeof(n->name)) { yyerror("filename \"%s\" too long: max %u", name, sizeof(n->name) - 1); diff --git a/usr.sbin/bgpd/rde.c b/usr.sbin/bgpd/rde.c index 66f8f064b31..34d83c3a64c 100644 --- a/usr.sbin/bgpd/rde.c +++ b/usr.sbin/bgpd/rde.c @@ -1,4 +1,4 @@ -/* $OpenBSD: rde.c,v 1.15 2003/12/20 21:43:45 claudio Exp $ */ +/* $OpenBSD: rde.c,v 1.16 2003/12/21 22:16:53 henning Exp $ */ /* * Copyright (c) 2003 Henning Brauer <henning@openbsd.org> @@ -51,10 +51,9 @@ void peer_down(u_int32_t); volatile sig_atomic_t rde_quit = 0; struct bgpd_config *conf, *nconf; -int se_queued_writes = 0; -int se_sock; -int main_queued_writes = 0; struct rde_peer_head peerlist; +struct msgbuf msgbuf_se; +struct msgbuf msgbuf_main; void rde_sighdlr(int sig) @@ -117,49 +116,50 @@ rde_main(struct bgpd_config *config, int pipe_m2r[2], int pipe_s2r[2]) path_init(pathhashsize); nexthop_init(nexthophashsize); pt_init(); + msgbuf_init(&msgbuf_se); + msgbuf_se.sock = pipe_s2r[1]; + msgbuf_init(&msgbuf_main); + msgbuf_main.sock = pipe_m2r[1]; init_imsg_buf(); - se_sock = pipe_s2r[1]; logit(LOG_INFO, "route decision engine ready"); while (rde_quit == 0) { bzero(&pfd, sizeof(pfd)); - pfd[PFD_PIPE_MAIN].fd = pipe_m2r[1]; + pfd[PFD_PIPE_MAIN].fd = msgbuf_main.sock; pfd[PFD_PIPE_MAIN].events = POLLIN; - if (main_queued_writes > 0) + if (msgbuf_main.queued > 0) pfd[PFD_PIPE_MAIN].events |= POLLOUT; - pfd[PFD_PIPE_SESSION].fd = pipe_s2r[1]; + pfd[PFD_PIPE_SESSION].fd = msgbuf_se.sock; pfd[PFD_PIPE_SESSION].events = POLLIN; - if (se_queued_writes > 0) + if (msgbuf_se.queued > 0) pfd[PFD_PIPE_SESSION].events |= POLLOUT; if ((nfds = poll(pfd, 2, INFTIM)) == -1) if (errno != EINTR) fatal("poll error", errno); - if (nfds > 0 && pfd[PFD_PIPE_MAIN].revents & POLLIN) { - rde_dispatch_imsg(pfd[PFD_PIPE_MAIN].fd, - PFD_PIPE_MAIN); - } - if (nfds > 0 && pfd[PFD_PIPE_SESSION].revents & POLLIN) { + if (nfds > 0 && pfd[PFD_PIPE_MAIN].revents & POLLIN) + rde_dispatch_imsg(pfd[PFD_PIPE_MAIN].fd, PFD_PIPE_MAIN); + + if (nfds > 0 && pfd[PFD_PIPE_SESSION].revents & POLLIN) rde_dispatch_imsg(pfd[PFD_PIPE_SESSION].fd, PFD_PIPE_SESSION); - } + if (nfds > 0 && (pfd[PFD_PIPE_MAIN].revents & POLLOUT) && - main_queued_writes) { + msgbuf_main.queued) { nfds--; - if ((n = buf_sock_write(pfd[PFD_PIPE_MAIN].fd)) == -1) + if ((n = msgbuf_write(&msgbuf_main)) == -1) fatal("pipe write error", errno); - main_queued_writes -= n; } + if (nfds > 0 && (pfd[PFD_PIPE_SESSION].revents & POLLOUT) && - se_queued_writes) { + msgbuf_se.queued) { nfds--; - if ((n = buf_sock_write(pfd[PFD_PIPE_SESSION].fd)) == + if ((n = msgbuf_write(&msgbuf_se)) == -1) fatal("pipe write error", errno); - se_queued_writes -= n; } } @@ -171,6 +171,7 @@ void rde_dispatch_imsg(int fd, int idx) { struct imsg imsg; + struct mrt mrtdump; struct peer_config *pconf; struct rde_peer *p, *np; u_int32_t rid; @@ -234,7 +235,7 @@ rde_dispatch_imsg(int fd, int idx) fatal("session msg not from session engine", 0); if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(rid)) fatal("incorrect size of session request", 0); - rid = *(u_int32_t *)imsg.data; + memcpy(&rid, imsg.data, sizeof(rid)); peer_up(imsg.hdr.peerid, rid); break; case IMSG_SESSION_DOWN: @@ -245,14 +246,15 @@ rde_dispatch_imsg(int fd, int idx) case IMSG_MRT_REQ: if (idx != PFD_PIPE_MAIN) fatal("mrt request not from parent", 0); - pt_dump(mrt_dump_upcall, fd, &main_queued_writes, - &imsg.hdr.peerid); + mrtdump.id = imsg.hdr.peerid; + mrtdump.msgbuf = &msgbuf_main; + pt_dump(mrt_dump_upcall, &mrtdump); /* FALLTHROUGH */ case IMSG_MRT_END: if (idx != PFD_PIPE_MAIN) fatal("mrt request not from parent", 0); /* ignore end message because a dump is atomic */ - main_queued_writes += imsg_compose(fd, IMSG_MRT_END, + imsg_compose(&msgbuf_main, IMSG_MRT_END, imsg.hdr.peerid, NULL, 0); break; default: @@ -504,7 +506,7 @@ rde_update_err(u_int32_t peerid, enum suberr_update errorcode) u_int8_t errcode; errcode = errorcode; - se_queued_writes += imsg_compose(se_sock, IMSG_UPDATE_ERR, peerid, + imsg_compose(&msgbuf_se, IMSG_UPDATE_ERR, peerid, &errcode, sizeof(errcode)); } diff --git a/usr.sbin/bgpd/rde.h b/usr.sbin/bgpd/rde.h index 4f1766a2544..fc9823d340f 100644 --- a/usr.sbin/bgpd/rde.h +++ b/usr.sbin/bgpd/rde.h @@ -1,4 +1,4 @@ -/* $OpenBSD: rde.h,v 1.4 2003/12/21 16:11:34 claudio Exp $ */ +/* $OpenBSD: rde.h,v 1.5 2003/12/21 22:16:53 henning Exp $ */ /* * Copyright (c) 2003 Claudio Jeker <cjeker@diehard.n-r-g.com> and @@ -244,8 +244,7 @@ struct pt_entry *pt_get(struct in_addr, int); struct pt_entry *pt_add(struct in_addr, int); void pt_remove(struct pt_entry *); struct pt_entry *pt_lookup(struct in_addr); -void pt_dump(void (*)(struct pt_entry *, int, int *, void *), - int, int *, void *); +void pt_dump(void (*)(struct pt_entry *, void *), void *); #endif /* __RDE_H__ */ diff --git a/usr.sbin/bgpd/rde_prefix.c b/usr.sbin/bgpd/rde_prefix.c index c5624cc853c..cc219828e6c 100644 --- a/usr.sbin/bgpd/rde_prefix.c +++ b/usr.sbin/bgpd/rde_prefix.c @@ -1,4 +1,4 @@ -/* $OpenBSD: rde_prefix.c,v 1.2 2003/12/19 19:24:08 deraadt Exp $ */ +/* $OpenBSD: rde_prefix.c,v 1.3 2003/12/21 22:16:53 henning Exp $ */ /* * Copyright (c) 2003 Claudio Jeker <cjeker@diehard.n-r-g.com> @@ -199,8 +199,7 @@ pt_lookup(struct in_addr prefix) * XXX A nicer upcall interface wouldn't be luxus too. */ void -pt_dump(void (*upcall)(struct pt_entry *, int, int *, void *), - int fd, int *w, void *arg) +pt_dump(void (*upcall)(struct pt_entry *, void *), void *arg) { struct pt_entry *p; int i; @@ -210,7 +209,7 @@ pt_dump(void (*upcall)(struct pt_entry *, int, int *, void *), for (i = MAX_PREFIX; i >= MIN_PREFIX; i--) { for (j = 0; j < pthashsize[i]; j++) LIST_FOREACH(p, &pttable[i].pt_hashtbl[j], pt_l) - upcall(p, fd, w, arg); + upcall(p, arg); } } diff --git a/usr.sbin/bgpd/session.c b/usr.sbin/bgpd/session.c index e81d2f47222..1274cd57518 100644 --- a/usr.sbin/bgpd/session.c +++ b/usr.sbin/bgpd/session.c @@ -1,4 +1,4 @@ -/* $OpenBSD: session.c,v 1.24 2003/12/21 18:21:24 henning Exp $ */ +/* $OpenBSD: session.c,v 1.25 2003/12/21 22:16:53 henning Exp $ */ /* * Copyright (c) 2003 Henning Brauer <henning@openbsd.org> @@ -73,7 +73,6 @@ int parse_update(struct peer *); int parse_notification(struct peer *); int parse_keepalive(struct peer *); void session_dispatch_imsg(int, int); -void session_write_imsg(int fd); void session_up(struct peer *); void session_down(struct peer *); @@ -82,8 +81,7 @@ struct peer *getpeerbyip(in_addr_t); struct bgpd_config *conf = NULL, *nconf = NULL; volatile sig_atomic_t session_quit = 0; int pending_reconf = 0; -int s2r_queued_writes = 0; -int s2r_sock = -1; +struct msgbuf msgbuf_rde; void session_sighdlr(int sig) @@ -172,10 +170,11 @@ session_main(struct bgpd_config *config, int pipe_m2s[2], int pipe_s2r[2]) signal(SIGTERM, session_sighdlr); logit(LOG_INFO, "session engine ready"); - s2r_sock = pipe_s2r[0]; close(pipe_m2s[0]); close(pipe_s2r[1]); init_conf(conf); + msgbuf_init(&msgbuf_rde); + msgbuf_rde.sock = pipe_s2r[0]; init_imsg_buf(); init_peers(); @@ -187,7 +186,7 @@ session_main(struct bgpd_config *config, int pipe_m2s[2], int pipe_s2r[2]) pfd[PFD_PIPE_MAIN].events = POLLIN; pfd[PFD_PIPE_ROUTE].fd = pipe_s2r[0]; pfd[PFD_PIPE_ROUTE].events = POLLIN; - if (s2r_queued_writes > 0) + if (msgbuf_rde.queued > 0) pfd[PFD_PIPE_ROUTE].events |= POLLOUT; nextaction = time(NULL) + 240; /* loop every 240s at least */ @@ -244,7 +243,7 @@ session_main(struct bgpd_config *config, int pipe_m2s[2], int pipe_s2r[2]) nextaction = p->StartTimer; /* are we waiting for a write? */ - if (p->queued_writes > 0) + if (p->wbuf.queued > 0) p->events |= POLLOUT; /* poll events */ @@ -275,7 +274,8 @@ session_main(struct bgpd_config *config, int pipe_m2s[2], int pipe_s2r[2]) } if (nfds > 0 && pfd[PFD_PIPE_ROUTE].revents & POLLOUT) - session_write_imsg(pfd[PFD_PIPE_ROUTE].fd); + if (msgbuf_write(&msgbuf_rde) == -1) + fatal("pipe write error", 0); if (nfds > 0 && pfd[PFD_PIPE_ROUTE].revents & POLLIN) { nfds--; @@ -339,6 +339,9 @@ bgp_fsm(struct peer *peer, enum session_events event) peer->rbuf->wptr = peer->rbuf->buf; peer->rbuf->pkt_len = MSGSIZE_HEADER; + /* init write buffer */ + msgbuf_init(&peer->wbuf); + change_state(peer, STATE_CONNECT, event); session_connect(peer); break; @@ -554,6 +557,7 @@ session_close_connection(struct peer *peer) shutdown(peer->sock, SHUT_RDWR); close(peer->sock); peer->sock = -1; + peer->wbuf.sock = -1; } } @@ -586,10 +590,9 @@ change_state(struct peer *peer, enum session_state state, peer->KeepaliveTimer = 0; peer->HoldTimer = 0; session_close_connection(peer); - buf_peer_remove(peer); + msgbuf_clear(&peer->wbuf); free(peer->rbuf); peer->rbuf = NULL; - peer->queued_writes = 0; if (peer->state == STATE_ESTABLISHED) session_down(peer); if (event != EVNT_STOP) { @@ -649,6 +652,7 @@ session_accept(int listenfd) if (p != NULL && (p->state == STATE_CONNECT || p->state == STATE_ACTIVE)) { p->sock = connfd; + p->wbuf.sock = connfd; if (session_setup_socket(p)) { shutdown(connfd, SHUT_RDWR); close(connfd); @@ -678,6 +682,8 @@ session_connect(struct peer *peer) return (-1); } + peer->wbuf.sock = peer->sock; + /* if update source is set we need to bind() */ if (peer->conf.local_addr.sin_addr.s_addr) if (bind(peer->sock, (struct sockaddr *)&peer->conf.local_addr, @@ -762,7 +768,7 @@ session_open(struct peer *peer) msg.bgpid = conf->bgpid; /* is already in network byte order */ msg.optparamlen = 0; - if ((buf = buf_open(peer, peer->sock, len)) == NULL) + if ((buf = buf_open(len)) == NULL) bgp_fsm(peer, EVNT_CON_FATAL); errs += buf_add(buf, &msg.header.marker, sizeof(msg.header.marker)); errs += buf_add(buf, &msg.header.len, sizeof(msg.header.len)); @@ -774,7 +780,7 @@ session_open(struct peer *peer) errs += buf_add(buf, &msg.optparamlen, sizeof(msg.optparamlen)); if (errs == 0) { - if (buf_close(buf) == -1) { + if (buf_close(&peer->wbuf, buf) == -1) { buf_free(buf); bgp_fsm(peer, EVNT_CON_FATAL); } @@ -798,7 +804,7 @@ session_keepalive(struct peer *peer) msg.len = htons(len); msg.type = KEEPALIVE; - if ((buf = buf_open(peer, peer->sock, len)) == NULL) + if ((buf = buf_open(len)) == NULL) bgp_fsm(peer, EVNT_CON_FATAL); errs += buf_add(buf, &msg.marker, sizeof(msg.marker)); errs += buf_add(buf, &msg.len, sizeof(msg.len)); @@ -810,7 +816,7 @@ session_keepalive(struct peer *peer) return; } - if (buf_close(buf) == -1) { + if (buf_close(&peer->wbuf, buf) == -1) { buf_free(buf); bgp_fsm(peer, EVNT_CON_FATAL); return; @@ -839,7 +845,7 @@ session_notification(struct peer *peer, u_int8_t errcode, u_int8_t subcode, msg.len = htons(len); msg.type = NOTIFICATION; - if ((buf = buf_open(peer, peer->sock, len)) == NULL) + if ((buf = buf_open(len)) == NULL) bgp_fsm(peer, EVNT_CON_FATAL); errs += buf_add(buf, &msg.marker, sizeof(msg.marker)); errs += buf_add(buf, &msg.len, sizeof(msg.len)); @@ -856,7 +862,7 @@ session_notification(struct peer *peer, u_int8_t errcode, u_int8_t subcode, return; } - if (buf_close(buf) == -1) { + if (buf_close(&peer->wbuf, buf) == -1) { buf_free(buf); bgp_fsm(peer, EVNT_CON_FATAL); } @@ -906,8 +912,8 @@ session_dispatch_msg(struct pollfd *pfd, struct peer *peer) return (1); } - if (pfd->revents & POLLOUT && peer->queued_writes) { - if (buf_peer_write(peer)) + if (pfd->revents & POLLOUT && peer->wbuf.queued) { + if (msgbuf_write(&peer->wbuf)) bgp_fsm(peer, EVNT_CON_FATAL); if (!(pfd->revents & POLLIN)) return (1); @@ -1153,8 +1159,7 @@ parse_update(struct peer *peer) p += MSGSIZE_HEADER; /* header is already checked */ datalen -= MSGSIZE_HEADER; - s2r_queued_writes += imsg_compose(s2r_sock, IMSG_UPDATE, - peer->conf.id, p, datalen); + imsg_compose(&msgbuf_rde, IMSG_UPDATE, peer->conf.id, p, datalen); return (0); } @@ -1301,26 +1306,16 @@ getpeerbyip(in_addr_t ip) } void -session_write_imsg(int fd) -{ - int n; - - if ((n = buf_sock_write(fd)) == -1) - fatal("pipe write error", errno); - s2r_queued_writes -= n; -} - -void session_down(struct peer *peer) { if (!session_quit) - s2r_queued_writes += imsg_compose(s2r_sock, IMSG_SESSION_DOWN, - peer->conf.id, NULL, 0); + imsg_compose(&msgbuf_rde, IMSG_SESSION_DOWN, peer->conf.id, + NULL, 0); } void session_up(struct peer *peer) { - s2r_queued_writes += imsg_compose(s2r_sock, IMSG_SESSION_UP, - peer->conf.id, &peer->remote_bgpid, sizeof(peer->remote_bgpid)); + imsg_compose(&msgbuf_rde, IMSG_SESSION_UP, peer->conf.id, + &peer->remote_bgpid, sizeof(peer->remote_bgpid)); } |