summaryrefslogtreecommitdiff
path: root/usr.sbin
diff options
context:
space:
mode:
authorHenning Brauer <henning@cvs.openbsd.org>2003-12-21 22:16:54 +0000
committerHenning Brauer <henning@cvs.openbsd.org>2003-12-21 22:16:54 +0000
commit8457d52edd104d4996c3c3c8894f770214c632e8 (patch)
treeaa17859eab0b6d68c19b335e87f8978d0aebecca /usr.sbin
parent23495634e653d596492ca742142b25d18babc127 (diff)
overhaul the write buffering code.
introduce msgbuf API and bundle all info needed for the write buffers in a struct msgbuf. also switch to a write queue per handled connection (each bgp session, each pipe) instead of one big one. fixes some subtle problems and is overall nicer. ok claudio@
Diffstat (limited to 'usr.sbin')
-rw-r--r--usr.sbin/bgpd/bgpd.c101
-rw-r--r--usr.sbin/bgpd/bgpd.h48
-rw-r--r--usr.sbin/bgpd/buffer.c107
-rw-r--r--usr.sbin/bgpd/imsg.c16
-rw-r--r--usr.sbin/bgpd/mrt.c80
-rw-r--r--usr.sbin/bgpd/mrt.h18
-rw-r--r--usr.sbin/bgpd/parse.y4
-rw-r--r--usr.sbin/bgpd/rde.c54
-rw-r--r--usr.sbin/bgpd/rde.h5
-rw-r--r--usr.sbin/bgpd/rde_prefix.c7
-rw-r--r--usr.sbin/bgpd/session.c63
11 files changed, 229 insertions, 274 deletions
diff --git a/usr.sbin/bgpd/bgpd.c b/usr.sbin/bgpd/bgpd.c
index f96b28e1487..c316ceedec6 100644
--- a/usr.sbin/bgpd/bgpd.c
+++ b/usr.sbin/bgpd/bgpd.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: bgpd.c,v 1.9 2003/12/21 18:18:21 henning Exp $ */
+/* $OpenBSD: bgpd.c,v 1.10 2003/12/21 22:16:53 henning Exp $ */
/*
* Copyright (c) 2003 Henning Brauer <henning@openbsd.org>
@@ -37,14 +37,15 @@
void sighdlr(int);
void usage(void);
int main(int, char *[]);
-int reconfigure(char *, int, int *, int, int *, struct bgpd_config *,
- struct mrt_config *);
-int dispatch_imsg(int, int, struct mrt_config *, int, int *);
+int reconfigure(char *, struct bgpd_config *, struct mrt_config *);
+int dispatch_imsg(int, int, struct mrt_config *, struct msgbuf *);
int mrtfd = -1;
volatile sig_atomic_t mrtdump = 0;
volatile sig_atomic_t quit = 0;
volatile sig_atomic_t reconfig = 0;
+struct msgbuf msgbuf_se;
+struct msgbuf msgbuf_rde;
void
sighdlr(int sig)
@@ -97,8 +98,6 @@ main(int argc, char *argv[])
int pipe_m2s[2];
int pipe_m2r[2];
int pipe_s2r[2];
- int m2s_writes_queued = 0;
- int m2r_writes_queued = 0;
conffile = CONFFILE;
bgpd_process = PROC_MAIN;
@@ -184,21 +183,25 @@ main(int argc, char *argv[])
close(pipe_s2r[0]);
close(pipe_s2r[1]);
+ msgbuf_init(&msgbuf_se);
+ msgbuf_se.sock = pipe_m2s[0];
+ msgbuf_init(&msgbuf_rde);
+ msgbuf_rde.sock = pipe_m2r[0];
init_imsg_buf();
while (quit == 0) {
- pfd[PFD_PIPE_SESSION].fd = pipe_m2s[0];
+ pfd[PFD_PIPE_SESSION].fd = msgbuf_se.sock;
pfd[PFD_PIPE_SESSION].events = POLLIN;
- if (m2s_writes_queued)
+ if (msgbuf_se.queued)
pfd[PFD_PIPE_SESSION].events |= POLLOUT;
- pfd[PFD_PIPE_ROUTE].fd = pipe_m2r[0];
+ pfd[PFD_PIPE_ROUTE].fd = msgbuf_rde.sock;
pfd[PFD_PIPE_ROUTE].events = POLLIN;
- if (m2r_writes_queued)
+ if (msgbuf_rde.queued)
pfd[PFD_PIPE_ROUTE].events |= POLLOUT;
i = PFD_MRT_START;
LIST_FOREACH(mconf, &mrtconf, list)
- if (mconf->queued_writes) {
- pfd[i].fd = mconf->fd;
+ if (mconf->msgbuf.queued > 0) {
+ pfd[i].fd = mconf->msgbuf.sock;
pfd[i].events |= POLLOUT;
mrt[i++] = mconf;
}
@@ -207,56 +210,46 @@ main(int argc, char *argv[])
if (errno != EINTR)
fatal("poll error", errno);
- if (nfds > 0 && (pfd[PFD_PIPE_SESSION].revents & POLLOUT) &&
- m2s_writes_queued) {
- if ((n = buf_sock_write(pfd[PFD_PIPE_SESSION].fd)) ==
- -1)
+ if (nfds > 0 && (pfd[PFD_PIPE_SESSION].revents & POLLOUT))
+ if ((n = msgbuf_write(&msgbuf_se)) == -1)
fatal("pipe write error", errno);
- m2s_writes_queued -= n;
- }
- if (nfds > 0 && (pfd[PFD_PIPE_ROUTE].revents & POLLOUT) &&
- m2r_writes_queued) {
- if ((n = buf_sock_write(pfd[PFD_PIPE_ROUTE].fd)) == -1)
+ if (nfds > 0 && (pfd[PFD_PIPE_ROUTE].revents & POLLOUT))
+ if ((n = msgbuf_write(&msgbuf_rde)) == -1)
fatal("pipe write error", errno);
- m2r_writes_queued -= n;
- }
if (nfds > 0 && pfd[PFD_PIPE_SESSION].revents & POLLIN) {
nfds--;
- dispatch_imsg(pfd[PFD_PIPE_SESSION].fd, pipe_m2s[0],
- &mrtconf, pipe_m2r[0], &m2r_writes_queued);
+ dispatch_imsg(pfd[PFD_PIPE_SESSION].fd,
+ PFD_PIPE_SESSION, &mrtconf, &msgbuf_rde);
}
if (nfds > 0 && pfd[PFD_PIPE_ROUTE].revents & POLLIN) {
nfds--;
- dispatch_imsg(pfd[PFD_PIPE_ROUTE].fd, pipe_m2r[0],
- &mrtconf, pipe_m2r[0], &m2r_writes_queued);
+ dispatch_imsg(pfd[PFD_PIPE_ROUTE].fd, PFD_PIPE_ROUTE,
+ &mrtconf, &msgbuf_rde);
}
for (j = PFD_MRT_START; j < i && nfds > 0 ; j++) {
- if ((pfd[j].revents & POLLOUT) &&
- mrt[i]->queued_writes) {
- if ((n = buf_sock_write(pfd[i].fd)) == -1)
+ if (pfd[j].revents & POLLOUT) {
+ if ((n = msgbuf_write(&mrt[i]->msgbuf)) == -1)
fatal("pipe write error", errno);
- mrt[i]->queued_writes -= n;
}
}
if (reconfig) {
logit(LOG_CRIT, "rereading config");
- reconfigure(conffile, pipe_m2s[0], &m2s_writes_queued,
- pipe_m2r[0], &m2r_writes_queued, &conf, &mrtconf);
+ reconfigure(conffile, &conf, &mrtconf);
LIST_FOREACH(mconf, &mrtconf, list)
- mrt_state(mconf, IMSG_NONE, pipe_m2r[0],
- &m2r_writes_queued);
+ mrt_state(mconf, IMSG_NONE, &msgbuf_rde);
reconfig = 0;
}
+
if (mrtdump == 1) {
- mrt_alrm(&mrtconf, pipe_m2r[0], &m2r_writes_queued);
+ mrt_alrm(&mrtconf, &msgbuf_rde);
mrtdump = 0;
} else if (mrtdump == 2) {
- mrt_usr1(&mrtconf, pipe_m2r[0], &m2r_writes_queued);
+ mrt_usr1(&mrtconf, &msgbuf_rde);
mrtdump = 0;
}
}
@@ -278,8 +271,7 @@ main(int argc, char *argv[])
}
int
-reconfigure(char *conffile, int se_fd, int *se_waiting, int rde_fd,
- int *rde_waiting, struct bgpd_config *conf, struct mrt_config *mrtc)
+reconfigure(char *conffile, struct bgpd_config *conf, struct mrt_config *mrtc)
{
struct peer *p;
@@ -288,18 +280,18 @@ reconfigure(char *conffile, int se_fd, int *se_waiting, int rde_fd,
conffile);
return (-1);
}
- *se_waiting += imsg_compose(se_fd, IMSG_RECONF_CONF, 0,
+ imsg_compose(&msgbuf_se, IMSG_RECONF_CONF, 0,
conf, sizeof(struct bgpd_config));
- *rde_waiting += imsg_compose(rde_fd, IMSG_RECONF_CONF, 0,
+ imsg_compose(&msgbuf_rde, IMSG_RECONF_CONF, 0,
conf, sizeof(struct bgpd_config));
for (p = conf->peers; p != NULL; p = p->next) {
- *se_waiting += imsg_compose(se_fd, IMSG_RECONF_PEER,
- p->conf.id, &p->conf, sizeof(struct peer_config));
- *rde_waiting += imsg_compose(rde_fd, IMSG_RECONF_PEER,
- p->conf.id, &p->conf, sizeof(struct peer_config));
+ imsg_compose(&msgbuf_se, IMSG_RECONF_PEER, p->conf.id,
+ &p->conf, sizeof(struct peer_config));
+ imsg_compose(&msgbuf_rde, IMSG_RECONF_PEER, p->conf.id,
+ &p->conf, sizeof(struct peer_config));
}
- *se_waiting += imsg_compose(se_fd, IMSG_RECONF_DONE, 0, NULL, 0);
- *rde_waiting += imsg_compose(rde_fd, IMSG_RECONF_DONE, 0, NULL, 0);
+ imsg_compose(&msgbuf_se, IMSG_RECONF_DONE, 0, NULL, 0);
+ imsg_compose(&msgbuf_rde, IMSG_RECONF_DONE, 0, NULL, 0);
return (0);
}
@@ -308,12 +300,11 @@ reconfigure(char *conffile, int se_fd, int *se_waiting, int rde_fd,
* XXX currently messages are only buffered for mrt files.
*/
int
-dispatch_imsg(int fd, int idx, struct mrt_config *conf,
- int rfd, int *rwait /*, int sfd, int *swait */)
+dispatch_imsg(int fd, int idx, struct mrt_config *conf, struct msgbuf *rde)
{
+ struct imsg imsg;
struct buf *wbuf;
struct mrtdump_config *m;
- struct imsg imsg;
ssize_t len;
int n;
@@ -324,20 +315,18 @@ dispatch_imsg(int fd, int idx, struct mrt_config *conf,
LIST_FOREACH(m, conf, list) {
if (m->id != imsg.hdr.peerid)
continue;
- if (mrt_state(m, imsg.hdr.type,
- rfd, rwait) == 0)
+ if (mrt_state(m, imsg.hdr.type, rde) == 0)
break;
- if (m->fd == -1)
+ if (m->msgbuf.sock == -1)
break;
len = imsg.hdr.len - IMSG_HEADER_SIZE;
- wbuf = buf_open(NULL, m->fd, len);
+ wbuf = buf_open(len);
if (wbuf == NULL)
fatal("buf_open error", 0);
if (buf_add(wbuf, imsg.data, len) == -1)
fatal("buf_add error", 0);
- if ((n = buf_close(wbuf)) == -1)
+ if ((n = buf_close(&m->msgbuf, wbuf)) == -1)
fatal("buf_close error", 0);
- m->queued_writes += n;
break;
}
break;
diff --git a/usr.sbin/bgpd/bgpd.h b/usr.sbin/bgpd/bgpd.h
index cea79893c0d..29efac291a2 100644
--- a/usr.sbin/bgpd/bgpd.h
+++ b/usr.sbin/bgpd/bgpd.h
@@ -1,4 +1,4 @@
-/* $OpenBSD: bgpd.h,v 1.8 2003/12/21 18:21:24 henning Exp $ */
+/* $OpenBSD: bgpd.h,v 1.9 2003/12/21 22:16:53 henning Exp $ */
/*
* Copyright (c) 2003 Henning Brauer <henning@openbsd.org>
@@ -78,6 +78,20 @@ enum reconf_action {
RECONF_DELETE
};
+struct buf {
+ TAILQ_ENTRY(buf) entries;
+ u_char *buf;
+ ssize_t size;
+ ssize_t wpos;
+ ssize_t rpos;
+};
+
+struct msgbuf {
+ u_int32_t queued;
+ int sock;
+ TAILQ_HEAD(bufs, buf) bufs;
+};
+
struct bgpd_config {
int opts;
u_int16_t as;
@@ -120,8 +134,7 @@ struct peer {
u_int StartTimerInterval;
int sock;
int events;
- int queued_writes;
- int queued_imsg_writes;
+ struct msgbuf wbuf;
struct peer_buf_read *rbuf;
struct peer *next;
};
@@ -151,27 +164,16 @@ LIST_HEAD(mrt_config, mrtdump_config);
struct mrtdump_config {
enum mrtdump_type type;
u_int32_t id;
- int fd;
+ struct msgbuf msgbuf;
char name[MRT_FILE_LEN]; /* base file name */
char file[MRT_FILE_LEN]; /* actual file name */
time_t ReopenTimer;
time_t ReopenTimerInterval;
- int queued_writes;
enum mrtdump_state state;
LIST_ENTRY(mrtdump_config)
list;
};
-struct buf {
- TAILQ_ENTRY(buf) entries;
- struct peer *peer;
- int sock;
- u_char *buf;
- ssize_t size;
- ssize_t wpos;
- ssize_t rpos;
-};
-
/* ipc messages */
#define IMSG_HEADER_SIZE sizeof(struct imsg_hdr)
@@ -232,15 +234,15 @@ enum suberr_update {
int session_main(struct bgpd_config *, int[2], int[2]);
/* buffer.c */
-struct buf *buf_open(struct peer *, int, ssize_t);
+struct buf *buf_open(ssize_t);
int buf_add(struct buf *, void *, ssize_t);
void *buf_reserve(struct buf *, ssize_t);
-int buf_close(struct buf *);
-int buf_write(struct buf *);
-void buf_free(struct buf *buf);
-void buf_peer_remove(struct peer *);
-int buf_peer_write(struct peer *);
-int buf_sock_write(int);
+int buf_close(struct msgbuf *, struct buf *);
+void buf_free(struct buf *);
+void msgbuf_init(struct msgbuf *);
+void msgbuf_clear(struct msgbuf *);
+int msgbuf_write(struct msgbuf *);
+
/* log.c */
void log_init(int);
@@ -266,7 +268,7 @@ int merge_config(struct bgpd_config *, struct bgpd_config *);
/* imsg.c */
void init_imsg_buf(void);
int get_imsg(int, struct imsg *);
-int imsg_compose(int, int, u_int32_t, void *, u_int16_t);
+int imsg_compose(struct msgbuf *, int, u_int32_t, void *, u_int16_t);
void imsg_free(struct imsg *);
/* rde.c */
diff --git a/usr.sbin/bgpd/buffer.c b/usr.sbin/bgpd/buffer.c
index 0d9f25f6f71..1c2128f7e36 100644
--- a/usr.sbin/bgpd/buffer.c
+++ b/usr.sbin/bgpd/buffer.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: buffer.c,v 1.5 2003/12/21 18:04:08 claudio Exp $ */
+/* $OpenBSD: buffer.c,v 1.6 2003/12/21 22:16:53 henning Exp $ */
/*
* Copyright (c) 2003 Henning Brauer <henning@openbsd.org>
@@ -27,13 +27,12 @@
#include "bgpd.h"
-TAILQ_HEAD(bufs, buf) bufs = TAILQ_HEAD_INITIALIZER(bufs);
-
-void buf_enqueue(struct buf *);
-void buf_dequeue(struct buf *);
+int buf_write(int, struct buf *);
+void buf_enqueue(struct msgbuf *, struct buf *);
+void buf_dequeue(struct msgbuf *, struct buf *);
struct buf *
-buf_open(struct peer *peer, int sock, ssize_t len)
+buf_open(ssize_t len)
{
struct buf *buf;
@@ -45,8 +44,6 @@ buf_open(struct peer *peer, int sock, ssize_t len)
}
buf->size = len;
- buf->peer = peer;
- buf->sock = sock;
return (buf);
}
@@ -76,7 +73,7 @@ buf_reserve(struct buf *buf, ssize_t len)
}
int
-buf_close(struct buf *buf)
+buf_close(struct msgbuf *msgbuf, struct buf *buf)
{
/*
* we first try to write out directly
@@ -85,8 +82,8 @@ buf_close(struct buf *buf)
int n;
- if (buf->peer != NULL && buf->peer->queued_writes == 0) {
- if ((n = buf_write(buf)) == -1)
+ if (msgbuf->queued == 0) {
+ if ((n = buf_write(msgbuf->sock, buf)) == -1)
return (-1);
if (n == 1) { /* all data written out */
@@ -96,25 +93,23 @@ buf_close(struct buf *buf)
}
/* we have to queue */
- buf_enqueue(buf);
+ buf_enqueue(msgbuf, buf);
return (1);
}
int
-buf_write(struct buf *buf)
+buf_write(int sock, struct buf *buf)
{
ssize_t n;
- if ((n = write(buf->sock, buf->buf + buf->rpos,
+ if ((n = write(sock, buf->buf + buf->rpos,
buf->size-buf->rpos)) == -1) {
if (errno == EAGAIN) /* cannot write immediately */
return (0);
else {
- if (buf->peer != NULL)
- log_err(buf->peer, "write error");
- else
- logit(LOG_CRIT, "pipe write error: %s",
- strerror(errno));
+ /* XXX better let caller log with info which sock etc */
+ logit(LOG_CRIT, "buf_write: write error: %s",
+ strerror(errno));
return (-1);
}
}
@@ -134,45 +129,28 @@ buf_free(struct buf *buf)
}
void
-buf_peer_remove(struct peer *peer)
+msgbuf_init(struct msgbuf *msgbuf)
{
- struct buf *buf, *next;
-
- for (buf = TAILQ_FIRST(&bufs); buf != NULL; buf = next) {
- next = TAILQ_NEXT(buf, entries);
- if (buf->peer == peer)
- buf_dequeue(buf);
- }
+ msgbuf->queued = 0;
+ msgbuf->sock = -1;
+ TAILQ_INIT(&msgbuf->bufs);
}
-int
-buf_peer_write(struct peer *peer)
+void
+msgbuf_clear(struct msgbuf *msgbuf)
{
- /*
- * possible race here
- * when we cannot write out data completely from a buffer,
- * we MUST return and NOT try to write out stuff from later buffers -
- * the socket might have become writeable again
- */
struct buf *buf, *next;
- int n;
- for (buf = TAILQ_FIRST(&bufs); buf != NULL; buf = next) {
+ for (buf = TAILQ_FIRST(&msgbuf->bufs); buf != NULL; buf = next) {
next = TAILQ_NEXT(buf, entries);
- if (buf->peer == peer) {
- if ((n = buf_write(buf)) == -1)
- return (-1);
- if (n == 1) /* everything written out */
- buf_dequeue(buf);
- else
- return (0);
- }
+ buf_dequeue(msgbuf, buf);
}
- return (0);
+ msgbuf->queued = 0;
+ msgbuf->sock = -1;
}
int
-buf_sock_write(int sock)
+msgbuf_write(struct msgbuf *msgbuf)
{
/*
* possible race here
@@ -181,37 +159,32 @@ buf_sock_write(int sock)
* the socket might have become writeable again
*/
struct buf *buf, *next;
- int n, cleared = 0;
+ int n;
- for (buf = TAILQ_FIRST(&bufs); buf != NULL; buf = next) {
+ for (buf = TAILQ_FIRST(&msgbuf->bufs); buf != NULL; buf = next) {
next = TAILQ_NEXT(buf, entries);
- if (buf->sock == sock) {
- if ((n = buf_write(buf)) == -1)
- return (-1);
- if (n == 1) { /* everything written out */
- buf_dequeue(buf);
- cleared++;
- } else
- return (cleared);
- }
+ if ((n = buf_write(msgbuf->sock, buf)) == -1)
+ return (-1);
+ if (n == 1) /* everything written out */
+ buf_dequeue(msgbuf, buf);
+ else
+ return (0);
}
- return (cleared);
+ return (0);
}
void
-buf_enqueue(struct buf *buf)
+buf_enqueue(struct msgbuf *msgbuf, struct buf *buf)
{
/* might want a tailq per peer w/ pointers to the bufs */
- TAILQ_INSERT_TAIL(&bufs, buf, entries);
- if (buf->peer != NULL)
- buf->peer->queued_writes++;
+ TAILQ_INSERT_TAIL(&msgbuf->bufs, buf, entries);
+ msgbuf->queued++;
}
void
-buf_dequeue(struct buf *buf)
+buf_dequeue(struct msgbuf *msgbuf, struct buf *buf)
{
- TAILQ_REMOVE(&bufs, buf, entries);
- if (buf->peer != NULL)
- buf->peer->queued_writes--;
+ TAILQ_REMOVE(&msgbuf->bufs, buf, entries);
+ msgbuf->queued--;
buf_free(buf);
}
diff --git a/usr.sbin/bgpd/imsg.c b/usr.sbin/bgpd/imsg.c
index e2f72ca7576..a6b5ef99f38 100644
--- a/usr.sbin/bgpd/imsg.c
+++ b/usr.sbin/bgpd/imsg.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: imsg.c,v 1.5 2003/12/20 21:19:40 claudio Exp $ */
+/* $OpenBSD: imsg.c,v 1.6 2003/12/21 22:16:53 henning Exp $ */
/*
* Copyright (c) 2003 Henning Brauer <henning@openbsd.org>
@@ -84,7 +84,7 @@ get_imsg(int fd, struct imsg *imsg)
}
int
-imsg_compose(int fd, int type, u_int32_t peerid, void *data,
+imsg_compose(struct msgbuf *msgbuf, int type, u_int32_t peerid, void *data,
u_int16_t datalen)
{
struct buf *wbuf;
@@ -94,16 +94,16 @@ imsg_compose(int fd, int type, u_int32_t peerid, void *data,
hdr.len = datalen + IMSG_HEADER_SIZE;
hdr.type = type;
hdr.peerid = peerid;
- wbuf = buf_open(NULL, fd, hdr.len);
+ wbuf = buf_open(hdr.len);
if (wbuf == NULL)
- fatal("buf_open error", 0);
+ fatal("imsg_compose: buf_open error", 0);
if (buf_add(wbuf, &hdr, sizeof(hdr)) == -1)
- fatal("buf_add error", 0);
+ fatal("imsg_compose: buf_add error", 0);
if (datalen)
if (buf_add(wbuf, data, datalen) == -1)
- fatal("buf_add error", 0);
- if ((n = buf_close(wbuf)) == -1)
- fatal("buf_close error", 0);
+ fatal("imsg_compose: buf_add error", 0);
+ if ((n = buf_close(msgbuf, wbuf)) == -1)
+ fatal("imsg_compose: buf_close error", 0);
return (n);
}
diff --git a/usr.sbin/bgpd/mrt.c b/usr.sbin/bgpd/mrt.c
index 5b46eb78f1d..32aec6ccb2b 100644
--- a/usr.sbin/bgpd/mrt.c
+++ b/usr.sbin/bgpd/mrt.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: mrt.c,v 1.6 2003/12/21 16:11:33 claudio Exp $ */
+/* $OpenBSD: mrt.c,v 1.7 2003/12/21 22:16:53 henning Exp $ */
/*
* Copyright (c) 2003 Claudio Jeker <claudio@openbsd.org>
@@ -38,8 +38,8 @@
* XXX imsg_create(), imsg_add(), imsg_close() ...
*/
-static int mrt_dump_entry(int, struct prefix *, u_int16_t,
- struct peer_config *, u_int32_t);
+static int mrt_dump_entry(struct mrt *, struct prefix *, u_int16_t,
+ struct peer_config *);
static void mrt_dump_header(struct buf *, u_int16_t, u_int16_t, u_int32_t);
static int mrt_open(struct mrtdump_config *);
@@ -74,7 +74,7 @@ static int mrt_open(struct mrtdump_config *);
} while (0)
int
-mrt_dump_bgp_msg(int fd, u_char *pkg, u_int16_t pkglen, int type,
+mrt_dump_bgp_msg(struct mrt *mrt, void *pkg, u_int16_t pkglen, int type,
struct peer_config *peer, struct bgpd_config *bgp)
{
struct buf *buf;
@@ -86,8 +86,8 @@ mrt_dump_bgp_msg(int fd, u_char *pkg, u_int16_t pkglen, int type,
hdr.len = len + IMSG_HEADER_SIZE + MRT_HEADER_SIZE;
hdr.type = IMSG_MRT_MSG;
- hdr.peerid = peer->id;
- buf = buf_open(NULL, fd, hdr.len);
+ hdr.peerid = mrt->id;
+ buf = buf_open(hdr.len);
if (buf == NULL)
fatal("mrt_dump_bgp_msg", errno);
if (buf_add(buf, &hdr, sizeof(hdr)) == -1)
@@ -113,15 +113,15 @@ mrt_dump_bgp_msg(int fd, u_char *pkg, u_int16_t pkglen, int type,
if (buf_add(buf, pkg, pkglen) == -1)
fatal("buf_add error", 0);
- if ((n = buf_close(buf)) == -1)
+ if ((n = buf_close(mrt->msgbuf, buf)) == -1)
fatal("buf_close error", 0);
return (n);
}
static int
-mrt_dump_entry(int fd, struct prefix *p, u_int16_t snum,
- struct peer_config *peer, u_int32_t id)
+mrt_dump_entry(struct mrt *mrt, struct prefix *p, u_int16_t snum,
+ struct peer_config *peer)
{
struct buf *buf;
void *bptr;
@@ -134,8 +134,8 @@ mrt_dump_entry(int fd, struct prefix *p, u_int16_t snum,
hdr.len = len + IMSG_HEADER_SIZE + MRT_HEADER_SIZE;
hdr.type = IMSG_MRT_MSG;
- hdr.peerid = id;
- buf = buf_open(NULL, fd, hdr.len);
+ hdr.peerid = mrt->id;
+ buf = buf_open(hdr.len);
if (buf == NULL)
fatal("mrt_dump_entry", errno);
if (buf_add(buf, &hdr, sizeof(hdr)) == -1)
@@ -159,7 +159,7 @@ mrt_dump_entry(int fd, struct prefix *p, u_int16_t snum,
if (attr_dump(bptr, attr_len, &p->aspath->flags) == -1)
fatal("attr_dump error", 0);
- if ((n = buf_close(buf)) == -1)
+ if ((n = buf_close(mrt->msgbuf, buf)) == -1)
fatal("buf_close error", 0);
return (n);
@@ -174,11 +174,10 @@ mrt_clear_seq(void)
}
void
-mrt_dump_upcall(struct pt_entry *pt, int fd, int *wait, void *arg)
+mrt_dump_upcall(struct pt_entry *pt, void *ptr)
{
+ struct mrt *mrtbuf = ptr;
struct prefix *p;
- u_int32_t *idp = arg;
- u_int32_t id = *idp;
/*
* dump all prefixes even the inactive ones. That is the way zebra
@@ -186,7 +185,7 @@ mrt_dump_upcall(struct pt_entry *pt, int fd, int *wait, void *arg)
* be dumped p should be set to p = pt->active.
*/
LIST_FOREACH(p, &pt->prefix_h, prefix_l)
- *wait += mrt_dump_entry(fd, p, sequencenum++, &p->peer->conf, id);
+ mrt_dump_entry(mrtbuf, p, sequencenum++, &p->peer->conf);
}
static void
@@ -216,21 +215,22 @@ mrt_open(struct mrtdump_config *conf)
if (strftime(conf->file, sizeof(conf->file), conf->name,
localtime(&now)) == 0) {
logit(LOG_CRIT, "mrt_open strftime failed");
- conf->fd = -1;
+ conf->msgbuf.sock = -1;
return -1;
}
- conf->fd = open(conf->file, O_WRONLY|O_NONBLOCK|O_CREAT|O_TRUNC, 0644);
- if (conf->fd == -1)
+ conf->msgbuf.sock = open(conf->file,
+ O_WRONLY|O_NONBLOCK|O_CREAT|O_TRUNC, 0644);
+ if (conf->msgbuf.sock == -1)
logit(LOG_CRIT, "mrt_open %s: %s",
conf->file, strerror(errno));
- return conf->fd;
+ return conf->msgbuf.sock;
}
int
mrt_state(struct mrtdump_config *m, enum imsg_type type,
- int rfd, int *rwait /*, int sfd, int *swait */)
+ struct msgbuf *rde /*, struct msgbuf *se */)
{
switch (m->state) {
case MRT_STATE_DONE:
@@ -240,12 +240,11 @@ mrt_state(struct mrtdump_config *m, enum imsg_type type,
switch (type) {
case IMSG_NONE:
if (m->type == MRT_TABLE_DUMP)
- *rwait += imsg_compose(rfd, IMSG_MRT_END,
- m->id, NULL, 0);
+ imsg_compose(rde, IMSG_MRT_END, m->id, NULL, 0);
return (0);
case IMSG_MRT_END:
/* dump no longer valid */
- close(m->fd);
+ close(m->msgbuf.sock);
LIST_REMOVE(m, list);
free(m);
return (0);
@@ -257,16 +256,14 @@ mrt_state(struct mrtdump_config *m, enum imsg_type type,
switch (type) {
case IMSG_NONE:
if (m->type == MRT_TABLE_DUMP)
- *rwait += imsg_compose(rfd, IMSG_MRT_END,
- m->id, NULL, 0);
+ imsg_compose(rde, IMSG_MRT_END, m->id, NULL, 0);
return (0);
case IMSG_MRT_END:
- if (m->fd != -1)
- close(m->fd);
+ if (m->msgbuf.sock != -1)
+ close(m->msgbuf.sock);
m->state = MRT_STATE_OPEN;
if (m->type == MRT_TABLE_DUMP)
- *rwait += imsg_compose(rfd, IMSG_MRT_REQ,
- m->id, NULL, 0);
+ imsg_compose(rde, IMSG_MRT_REQ, m->id, NULL, 0);
return (0);
default:
break;
@@ -276,8 +273,7 @@ mrt_state(struct mrtdump_config *m, enum imsg_type type,
switch (type) {
case IMSG_NONE:
if (m->type == MRT_TABLE_DUMP)
- *rwait += imsg_compose(rfd, IMSG_MRT_REQ,
- m->id, NULL, 0);
+ imsg_compose(rde, IMSG_MRT_REQ, m->id, NULL, 0);
return (0);
case IMSG_MRT_MSG:
mrt_open(m);
@@ -294,8 +290,7 @@ mrt_state(struct mrtdump_config *m, enum imsg_type type,
}
int
-mrt_usr1(struct mrt_config *conf, int rfd, int *rwait
- /*, int sfd, int *swait */)
+mrt_usr1(struct mrt_config *conf, struct msgbuf *rde /*, struct msgbuf *se */)
{
struct mrtdump_config *m;
time_t now;
@@ -310,13 +305,11 @@ mrt_usr1(struct mrt_config *conf, int rfd, int *rwait
break;
case MRT_STATE_DONE:
m->state = MRT_STATE_OPEN;
- *rwait += imsg_compose(rfd, IMSG_MRT_REQ,
- m->id, NULL, 0);
+ imsg_compose(rde, IMSG_MRT_REQ, m->id, NULL, 0);
break;
default:
m->state = MRT_STATE_REOPEN;
- *rwait += imsg_compose(rfd, IMSG_MRT_END,
- m->id, NULL, 0);
+ imsg_compose(rde, IMSG_MRT_END, m->id, NULL, 0);
break;
}
@@ -333,8 +326,7 @@ mrt_usr1(struct mrt_config *conf, int rfd, int *rwait
}
int
-mrt_alrm(struct mrt_config *conf, int rfd, int *rwait
- /*, int sfd, int *swait */)
+mrt_alrm(struct mrt_config *conf, struct msgbuf *rde /*, struct msgbuf *se */)
{
struct mrtdump_config *m;
time_t now;
@@ -352,14 +344,14 @@ mrt_alrm(struct mrt_config *conf, int rfd, int *rwait
case MRT_STATE_DONE:
m->state = MRT_STATE_OPEN;
if (m->type == MRT_TABLE_DUMP)
- *rwait += imsg_compose(rfd,
- IMSG_MRT_REQ, m->id, NULL, 0);
+ imsg_compose(rde, IMSG_MRT_REQ, m->id,
+ NULL, 0);
break;
default:
m->state = MRT_STATE_REOPEN;
if (m->type == MRT_TABLE_DUMP)
- *rwait += imsg_compose(rfd,
- IMSG_MRT_END, m->id, NULL, 0);
+ imsg_compose(rde, IMSG_MRT_END, m->id,
+ NULL, 0);
break;
}
diff --git a/usr.sbin/bgpd/mrt.h b/usr.sbin/bgpd/mrt.h
index 837661e760a..3abccb8376d 100644
--- a/usr.sbin/bgpd/mrt.h
+++ b/usr.sbin/bgpd/mrt.h
@@ -1,4 +1,4 @@
-/* $OpenBSD: mrt.h,v 1.2 2003/12/17 18:11:31 henning Exp $ */
+/* $OpenBSD: mrt.h,v 1.3 2003/12/21 22:16:53 henning Exp $ */
/*
* Copyright (c) 2003 Claudio Jeker <cjeker@diehard.n-r-g.com>
@@ -221,14 +221,18 @@ struct mrt_bgp_state_header {
/* pseudo predeclarations */
struct prefix;
struct pt_entry;
+struct mrt {
+ struct msgbuf *msgbuf;
+ u_int32_t id;
+};
/* prototypes */
-int mrt_dump_bgp_msg(int, u_char *, u_int16_t, int, struct peer_config *,
- struct bgpd_config *);
+int mrt_dump_bgp_msg(struct mrt *, void *, u_int16_t, int,
+ struct peer_config *, struct bgpd_config *);
void mrt_clear_seq(void);
-void mrt_dump_upcall(struct pt_entry *, int, int *, void *);
-int mrt_state(struct mrtdump_config *, enum imsg_type, int, int *);
-int mrt_alrm(struct mrt_config *, int, int *);
-int mrt_usr1(struct mrt_config *, int, int *);
+void mrt_dump_upcall(struct pt_entry *, void *);
+int mrt_state(struct mrtdump_config *, enum imsg_type, struct msgbuf *);
+int mrt_alrm(struct mrt_config *, struct msgbuf *);
+int mrt_usr1(struct mrt_config *, struct msgbuf *);
#endif
diff --git a/usr.sbin/bgpd/parse.y b/usr.sbin/bgpd/parse.y
index b90845f4f83..0cecc496773 100644
--- a/usr.sbin/bgpd/parse.y
+++ b/usr.sbin/bgpd/parse.y
@@ -1,4 +1,4 @@
-/* $OpenBSD: parse.y,v 1.4 2003/12/20 20:53:30 henning Exp $ */
+/* $OpenBSD: parse.y,v 1.5 2003/12/21 22:16:53 henning Exp $ */
/*
* Copyright (c) 2002, 2003 Henning Brauer <henning@openbsd.org>
@@ -732,7 +732,7 @@ add_mrtconfig(enum mrtdump_type type, char *name, time_t timeout)
fatal("add_mrtconfig", errno);
n->type = MRT_TABLE_DUMP;
- n->fd = -1;
+ n->msgbuf.sock = -1;
if (strlcpy(n->name, name, sizeof(n->name)) > sizeof(n->name)) {
yyerror("filename \"%s\" too long: max %u",
name, sizeof(n->name) - 1);
diff --git a/usr.sbin/bgpd/rde.c b/usr.sbin/bgpd/rde.c
index 66f8f064b31..34d83c3a64c 100644
--- a/usr.sbin/bgpd/rde.c
+++ b/usr.sbin/bgpd/rde.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: rde.c,v 1.15 2003/12/20 21:43:45 claudio Exp $ */
+/* $OpenBSD: rde.c,v 1.16 2003/12/21 22:16:53 henning Exp $ */
/*
* Copyright (c) 2003 Henning Brauer <henning@openbsd.org>
@@ -51,10 +51,9 @@ void peer_down(u_int32_t);
volatile sig_atomic_t rde_quit = 0;
struct bgpd_config *conf, *nconf;
-int se_queued_writes = 0;
-int se_sock;
-int main_queued_writes = 0;
struct rde_peer_head peerlist;
+struct msgbuf msgbuf_se;
+struct msgbuf msgbuf_main;
void
rde_sighdlr(int sig)
@@ -117,49 +116,50 @@ rde_main(struct bgpd_config *config, int pipe_m2r[2], int pipe_s2r[2])
path_init(pathhashsize);
nexthop_init(nexthophashsize);
pt_init();
+ msgbuf_init(&msgbuf_se);
+ msgbuf_se.sock = pipe_s2r[1];
+ msgbuf_init(&msgbuf_main);
+ msgbuf_main.sock = pipe_m2r[1];
init_imsg_buf();
- se_sock = pipe_s2r[1];
logit(LOG_INFO, "route decision engine ready");
while (rde_quit == 0) {
bzero(&pfd, sizeof(pfd));
- pfd[PFD_PIPE_MAIN].fd = pipe_m2r[1];
+ pfd[PFD_PIPE_MAIN].fd = msgbuf_main.sock;
pfd[PFD_PIPE_MAIN].events = POLLIN;
- if (main_queued_writes > 0)
+ if (msgbuf_main.queued > 0)
pfd[PFD_PIPE_MAIN].events |= POLLOUT;
- pfd[PFD_PIPE_SESSION].fd = pipe_s2r[1];
+ pfd[PFD_PIPE_SESSION].fd = msgbuf_se.sock;
pfd[PFD_PIPE_SESSION].events = POLLIN;
- if (se_queued_writes > 0)
+ if (msgbuf_se.queued > 0)
pfd[PFD_PIPE_SESSION].events |= POLLOUT;
if ((nfds = poll(pfd, 2, INFTIM)) == -1)
if (errno != EINTR)
fatal("poll error", errno);
- if (nfds > 0 && pfd[PFD_PIPE_MAIN].revents & POLLIN) {
- rde_dispatch_imsg(pfd[PFD_PIPE_MAIN].fd,
- PFD_PIPE_MAIN);
- }
- if (nfds > 0 && pfd[PFD_PIPE_SESSION].revents & POLLIN) {
+ if (nfds > 0 && pfd[PFD_PIPE_MAIN].revents & POLLIN)
+ rde_dispatch_imsg(pfd[PFD_PIPE_MAIN].fd, PFD_PIPE_MAIN);
+
+ if (nfds > 0 && pfd[PFD_PIPE_SESSION].revents & POLLIN)
rde_dispatch_imsg(pfd[PFD_PIPE_SESSION].fd,
PFD_PIPE_SESSION);
- }
+
if (nfds > 0 && (pfd[PFD_PIPE_MAIN].revents & POLLOUT) &&
- main_queued_writes) {
+ msgbuf_main.queued) {
nfds--;
- if ((n = buf_sock_write(pfd[PFD_PIPE_MAIN].fd)) == -1)
+ if ((n = msgbuf_write(&msgbuf_main)) == -1)
fatal("pipe write error", errno);
- main_queued_writes -= n;
}
+
if (nfds > 0 && (pfd[PFD_PIPE_SESSION].revents & POLLOUT) &&
- se_queued_writes) {
+ msgbuf_se.queued) {
nfds--;
- if ((n = buf_sock_write(pfd[PFD_PIPE_SESSION].fd)) ==
+ if ((n = msgbuf_write(&msgbuf_se)) ==
-1)
fatal("pipe write error", errno);
- se_queued_writes -= n;
}
}
@@ -171,6 +171,7 @@ void
rde_dispatch_imsg(int fd, int idx)
{
struct imsg imsg;
+ struct mrt mrtdump;
struct peer_config *pconf;
struct rde_peer *p, *np;
u_int32_t rid;
@@ -234,7 +235,7 @@ rde_dispatch_imsg(int fd, int idx)
fatal("session msg not from session engine", 0);
if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(rid))
fatal("incorrect size of session request", 0);
- rid = *(u_int32_t *)imsg.data;
+ memcpy(&rid, imsg.data, sizeof(rid));
peer_up(imsg.hdr.peerid, rid);
break;
case IMSG_SESSION_DOWN:
@@ -245,14 +246,15 @@ rde_dispatch_imsg(int fd, int idx)
case IMSG_MRT_REQ:
if (idx != PFD_PIPE_MAIN)
fatal("mrt request not from parent", 0);
- pt_dump(mrt_dump_upcall, fd, &main_queued_writes,
- &imsg.hdr.peerid);
+ mrtdump.id = imsg.hdr.peerid;
+ mrtdump.msgbuf = &msgbuf_main;
+ pt_dump(mrt_dump_upcall, &mrtdump);
/* FALLTHROUGH */
case IMSG_MRT_END:
if (idx != PFD_PIPE_MAIN)
fatal("mrt request not from parent", 0);
/* ignore end message because a dump is atomic */
- main_queued_writes += imsg_compose(fd, IMSG_MRT_END,
+ imsg_compose(&msgbuf_main, IMSG_MRT_END,
imsg.hdr.peerid, NULL, 0);
break;
default:
@@ -504,7 +506,7 @@ rde_update_err(u_int32_t peerid, enum suberr_update errorcode)
u_int8_t errcode;
errcode = errorcode;
- se_queued_writes += imsg_compose(se_sock, IMSG_UPDATE_ERR, peerid,
+ imsg_compose(&msgbuf_se, IMSG_UPDATE_ERR, peerid,
&errcode, sizeof(errcode));
}
diff --git a/usr.sbin/bgpd/rde.h b/usr.sbin/bgpd/rde.h
index 4f1766a2544..fc9823d340f 100644
--- a/usr.sbin/bgpd/rde.h
+++ b/usr.sbin/bgpd/rde.h
@@ -1,4 +1,4 @@
-/* $OpenBSD: rde.h,v 1.4 2003/12/21 16:11:34 claudio Exp $ */
+/* $OpenBSD: rde.h,v 1.5 2003/12/21 22:16:53 henning Exp $ */
/*
* Copyright (c) 2003 Claudio Jeker <cjeker@diehard.n-r-g.com> and
@@ -244,8 +244,7 @@ struct pt_entry *pt_get(struct in_addr, int);
struct pt_entry *pt_add(struct in_addr, int);
void pt_remove(struct pt_entry *);
struct pt_entry *pt_lookup(struct in_addr);
-void pt_dump(void (*)(struct pt_entry *, int, int *, void *),
- int, int *, void *);
+void pt_dump(void (*)(struct pt_entry *, void *), void *);
#endif /* __RDE_H__ */
diff --git a/usr.sbin/bgpd/rde_prefix.c b/usr.sbin/bgpd/rde_prefix.c
index c5624cc853c..cc219828e6c 100644
--- a/usr.sbin/bgpd/rde_prefix.c
+++ b/usr.sbin/bgpd/rde_prefix.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: rde_prefix.c,v 1.2 2003/12/19 19:24:08 deraadt Exp $ */
+/* $OpenBSD: rde_prefix.c,v 1.3 2003/12/21 22:16:53 henning Exp $ */
/*
* Copyright (c) 2003 Claudio Jeker <cjeker@diehard.n-r-g.com>
@@ -199,8 +199,7 @@ pt_lookup(struct in_addr prefix)
* XXX A nicer upcall interface wouldn't be luxus too.
*/
void
-pt_dump(void (*upcall)(struct pt_entry *, int, int *, void *),
- int fd, int *w, void *arg)
+pt_dump(void (*upcall)(struct pt_entry *, void *), void *arg)
{
struct pt_entry *p;
int i;
@@ -210,7 +209,7 @@ pt_dump(void (*upcall)(struct pt_entry *, int, int *, void *),
for (i = MAX_PREFIX; i >= MIN_PREFIX; i--) {
for (j = 0; j < pthashsize[i]; j++)
LIST_FOREACH(p, &pttable[i].pt_hashtbl[j], pt_l)
- upcall(p, fd, w, arg);
+ upcall(p, arg);
}
}
diff --git a/usr.sbin/bgpd/session.c b/usr.sbin/bgpd/session.c
index e81d2f47222..1274cd57518 100644
--- a/usr.sbin/bgpd/session.c
+++ b/usr.sbin/bgpd/session.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: session.c,v 1.24 2003/12/21 18:21:24 henning Exp $ */
+/* $OpenBSD: session.c,v 1.25 2003/12/21 22:16:53 henning Exp $ */
/*
* Copyright (c) 2003 Henning Brauer <henning@openbsd.org>
@@ -73,7 +73,6 @@ int parse_update(struct peer *);
int parse_notification(struct peer *);
int parse_keepalive(struct peer *);
void session_dispatch_imsg(int, int);
-void session_write_imsg(int fd);
void session_up(struct peer *);
void session_down(struct peer *);
@@ -82,8 +81,7 @@ struct peer *getpeerbyip(in_addr_t);
struct bgpd_config *conf = NULL, *nconf = NULL;
volatile sig_atomic_t session_quit = 0;
int pending_reconf = 0;
-int s2r_queued_writes = 0;
-int s2r_sock = -1;
+struct msgbuf msgbuf_rde;
void
session_sighdlr(int sig)
@@ -172,10 +170,11 @@ session_main(struct bgpd_config *config, int pipe_m2s[2], int pipe_s2r[2])
signal(SIGTERM, session_sighdlr);
logit(LOG_INFO, "session engine ready");
- s2r_sock = pipe_s2r[0];
close(pipe_m2s[0]);
close(pipe_s2r[1]);
init_conf(conf);
+ msgbuf_init(&msgbuf_rde);
+ msgbuf_rde.sock = pipe_s2r[0];
init_imsg_buf();
init_peers();
@@ -187,7 +186,7 @@ session_main(struct bgpd_config *config, int pipe_m2s[2], int pipe_s2r[2])
pfd[PFD_PIPE_MAIN].events = POLLIN;
pfd[PFD_PIPE_ROUTE].fd = pipe_s2r[0];
pfd[PFD_PIPE_ROUTE].events = POLLIN;
- if (s2r_queued_writes > 0)
+ if (msgbuf_rde.queued > 0)
pfd[PFD_PIPE_ROUTE].events |= POLLOUT;
nextaction = time(NULL) + 240; /* loop every 240s at least */
@@ -244,7 +243,7 @@ session_main(struct bgpd_config *config, int pipe_m2s[2], int pipe_s2r[2])
nextaction = p->StartTimer;
/* are we waiting for a write? */
- if (p->queued_writes > 0)
+ if (p->wbuf.queued > 0)
p->events |= POLLOUT;
/* poll events */
@@ -275,7 +274,8 @@ session_main(struct bgpd_config *config, int pipe_m2s[2], int pipe_s2r[2])
}
if (nfds > 0 && pfd[PFD_PIPE_ROUTE].revents & POLLOUT)
- session_write_imsg(pfd[PFD_PIPE_ROUTE].fd);
+ if (msgbuf_write(&msgbuf_rde) == -1)
+ fatal("pipe write error", 0);
if (nfds > 0 && pfd[PFD_PIPE_ROUTE].revents & POLLIN) {
nfds--;
@@ -339,6 +339,9 @@ bgp_fsm(struct peer *peer, enum session_events event)
peer->rbuf->wptr = peer->rbuf->buf;
peer->rbuf->pkt_len = MSGSIZE_HEADER;
+ /* init write buffer */
+ msgbuf_init(&peer->wbuf);
+
change_state(peer, STATE_CONNECT, event);
session_connect(peer);
break;
@@ -554,6 +557,7 @@ session_close_connection(struct peer *peer)
shutdown(peer->sock, SHUT_RDWR);
close(peer->sock);
peer->sock = -1;
+ peer->wbuf.sock = -1;
}
}
@@ -586,10 +590,9 @@ change_state(struct peer *peer, enum session_state state,
peer->KeepaliveTimer = 0;
peer->HoldTimer = 0;
session_close_connection(peer);
- buf_peer_remove(peer);
+ msgbuf_clear(&peer->wbuf);
free(peer->rbuf);
peer->rbuf = NULL;
- peer->queued_writes = 0;
if (peer->state == STATE_ESTABLISHED)
session_down(peer);
if (event != EVNT_STOP) {
@@ -649,6 +652,7 @@ session_accept(int listenfd)
if (p != NULL &&
(p->state == STATE_CONNECT || p->state == STATE_ACTIVE)) {
p->sock = connfd;
+ p->wbuf.sock = connfd;
if (session_setup_socket(p)) {
shutdown(connfd, SHUT_RDWR);
close(connfd);
@@ -678,6 +682,8 @@ session_connect(struct peer *peer)
return (-1);
}
+ peer->wbuf.sock = peer->sock;
+
/* if update source is set we need to bind() */
if (peer->conf.local_addr.sin_addr.s_addr)
if (bind(peer->sock, (struct sockaddr *)&peer->conf.local_addr,
@@ -762,7 +768,7 @@ session_open(struct peer *peer)
msg.bgpid = conf->bgpid; /* is already in network byte order */
msg.optparamlen = 0;
- if ((buf = buf_open(peer, peer->sock, len)) == NULL)
+ if ((buf = buf_open(len)) == NULL)
bgp_fsm(peer, EVNT_CON_FATAL);
errs += buf_add(buf, &msg.header.marker, sizeof(msg.header.marker));
errs += buf_add(buf, &msg.header.len, sizeof(msg.header.len));
@@ -774,7 +780,7 @@ session_open(struct peer *peer)
errs += buf_add(buf, &msg.optparamlen, sizeof(msg.optparamlen));
if (errs == 0) {
- if (buf_close(buf) == -1) {
+ if (buf_close(&peer->wbuf, buf) == -1) {
buf_free(buf);
bgp_fsm(peer, EVNT_CON_FATAL);
}
@@ -798,7 +804,7 @@ session_keepalive(struct peer *peer)
msg.len = htons(len);
msg.type = KEEPALIVE;
- if ((buf = buf_open(peer, peer->sock, len)) == NULL)
+ if ((buf = buf_open(len)) == NULL)
bgp_fsm(peer, EVNT_CON_FATAL);
errs += buf_add(buf, &msg.marker, sizeof(msg.marker));
errs += buf_add(buf, &msg.len, sizeof(msg.len));
@@ -810,7 +816,7 @@ session_keepalive(struct peer *peer)
return;
}
- if (buf_close(buf) == -1) {
+ if (buf_close(&peer->wbuf, buf) == -1) {
buf_free(buf);
bgp_fsm(peer, EVNT_CON_FATAL);
return;
@@ -839,7 +845,7 @@ session_notification(struct peer *peer, u_int8_t errcode, u_int8_t subcode,
msg.len = htons(len);
msg.type = NOTIFICATION;
- if ((buf = buf_open(peer, peer->sock, len)) == NULL)
+ if ((buf = buf_open(len)) == NULL)
bgp_fsm(peer, EVNT_CON_FATAL);
errs += buf_add(buf, &msg.marker, sizeof(msg.marker));
errs += buf_add(buf, &msg.len, sizeof(msg.len));
@@ -856,7 +862,7 @@ session_notification(struct peer *peer, u_int8_t errcode, u_int8_t subcode,
return;
}
- if (buf_close(buf) == -1) {
+ if (buf_close(&peer->wbuf, buf) == -1) {
buf_free(buf);
bgp_fsm(peer, EVNT_CON_FATAL);
}
@@ -906,8 +912,8 @@ session_dispatch_msg(struct pollfd *pfd, struct peer *peer)
return (1);
}
- if (pfd->revents & POLLOUT && peer->queued_writes) {
- if (buf_peer_write(peer))
+ if (pfd->revents & POLLOUT && peer->wbuf.queued) {
+ if (msgbuf_write(&peer->wbuf))
bgp_fsm(peer, EVNT_CON_FATAL);
if (!(pfd->revents & POLLIN))
return (1);
@@ -1153,8 +1159,7 @@ parse_update(struct peer *peer)
p += MSGSIZE_HEADER; /* header is already checked */
datalen -= MSGSIZE_HEADER;
- s2r_queued_writes += imsg_compose(s2r_sock, IMSG_UPDATE,
- peer->conf.id, p, datalen);
+ imsg_compose(&msgbuf_rde, IMSG_UPDATE, peer->conf.id, p, datalen);
return (0);
}
@@ -1301,26 +1306,16 @@ getpeerbyip(in_addr_t ip)
}
void
-session_write_imsg(int fd)
-{
- int n;
-
- if ((n = buf_sock_write(fd)) == -1)
- fatal("pipe write error", errno);
- s2r_queued_writes -= n;
-}
-
-void
session_down(struct peer *peer)
{
if (!session_quit)
- s2r_queued_writes += imsg_compose(s2r_sock, IMSG_SESSION_DOWN,
- peer->conf.id, NULL, 0);
+ imsg_compose(&msgbuf_rde, IMSG_SESSION_DOWN, peer->conf.id,
+ NULL, 0);
}
void
session_up(struct peer *peer)
{
- s2r_queued_writes += imsg_compose(s2r_sock, IMSG_SESSION_UP,
- peer->conf.id, &peer->remote_bgpid, sizeof(peer->remote_bgpid));
+ imsg_compose(&msgbuf_rde, IMSG_SESSION_UP, peer->conf.id,
+ &peer->remote_bgpid, sizeof(peer->remote_bgpid));
}