diff options
author | Claudio Jeker <claudio@cvs.openbsd.org> | 2017-05-28 12:21:37 +0000 |
---|---|---|
committer | Claudio Jeker <claudio@cvs.openbsd.org> | 2017-05-28 12:21:37 +0000 |
commit | 918f9dab75e3f3d7422d158d967103fef3851c09 (patch) | |
tree | 940be15b7c6a33ef803fd7ad26ef45f22ed2ae6d /usr.sbin/bgpd | |
parent | 4faec63d887baa0b66f8a91182e16eb6d202737f (diff) |
Implement an XON/XOFF protocol between the RDE and the SE to throttle
per control session and peer the generation of imsg in the RDE. This
reduces the memory pressure in the SE substantially and also a bit in
the RDE. Makes the RDE more responsive for bgpctl commands.
Tested by me with 100 peers * 2000 prefixes and by phessler@ on an AMS-IX
border router with 200+ session. Convergance time got quite a bit better.
OK phessler@
Diffstat (limited to 'usr.sbin/bgpd')
-rw-r--r-- | usr.sbin/bgpd/bgpd.h | 20 | ||||
-rw-r--r-- | usr.sbin/bgpd/control.c | 14 | ||||
-rw-r--r-- | usr.sbin/bgpd/rde.c | 129 | ||||
-rw-r--r-- | usr.sbin/bgpd/rde.h | 6 | ||||
-rw-r--r-- | usr.sbin/bgpd/rde_rib.c | 44 | ||||
-rw-r--r-- | usr.sbin/bgpd/session.c | 28 | ||||
-rw-r--r-- | usr.sbin/bgpd/session.h | 4 |
7 files changed, 164 insertions, 81 deletions
diff --git a/usr.sbin/bgpd/bgpd.h b/usr.sbin/bgpd/bgpd.h index 5f404bdd0c4..8f1cd1e123b 100644 --- a/usr.sbin/bgpd/bgpd.h +++ b/usr.sbin/bgpd/bgpd.h @@ -1,4 +1,4 @@ -/* $OpenBSD: bgpd.h,v 1.304 2017/05/27 18:12:23 phessler Exp $ */ +/* $OpenBSD: bgpd.h,v 1.305 2017/05/28 12:21:36 claudio Exp $ */ /* * Copyright (c) 2003, 2004 Henning Brauer <henning@openbsd.org> @@ -87,13 +87,17 @@ #define F_RTLABEL 0x10000 /* - * Limit the number of control messages generated by the RDE and queued in - * session engine. The RDE limit defines how many imsg are generated in - * one poll round. Then if the SE limit is hit the RDE control socket will no - * longer be polled. + * Limit the number of messages queued in the session engine. + * The SE will send an IMSG_XOFF messages to the RDE if the high water mark + * is reached. The RDE should then throttle this peer or control connection. + * Once the message queue in the SE drops below the low water mark an + * IMSG_XON message will be sent and the RDE will produce more messages again. */ #define RDE_RUNNER_ROUNDS 100 -#define SESSION_CTL_QUEUE_MAX 10000 +#define SESS_MSG_HIGH_MARK 300 +#define SESS_MSG_LOW_MARK 50 +#define CTL_MSG_HIGH_MARK 500 +#define CTL_MSG_LOW_MARK 100 enum bgpd_process { PROC_MAIN, @@ -426,7 +430,9 @@ enum imsg_type { IMSG_PFTABLE_COMMIT, IMSG_REFRESH, IMSG_IFINFO, - IMSG_DEMOTE + IMSG_DEMOTE, + IMSG_XON, + IMSG_XOFF }; struct demote_msg { diff --git a/usr.sbin/bgpd/control.c b/usr.sbin/bgpd/control.c index d4479a637ca..4da12e3b337 100644 --- a/usr.sbin/bgpd/control.c +++ b/usr.sbin/bgpd/control.c @@ -1,4 +1,4 @@ -/* $OpenBSD: control.c,v 1.87 2017/02/13 14:48:44 phessler Exp $ */ +/* $OpenBSD: control.c,v 1.88 2017/05/28 12:21:36 claudio Exp $ */ /* * Copyright (c) 2003, 2004 Henning Brauer <henning@openbsd.org> @@ -213,11 +213,16 @@ control_dispatch_msg(struct pollfd *pfd, u_int *ctl_cnt) return (0); } - if (pfd->revents & POLLOUT) + if (pfd->revents & POLLOUT) { if (msgbuf_write(&c->ibuf.w) <= 0 && errno != EAGAIN) { *ctl_cnt -= control_close(pfd->fd); return (1); } + if (c->throttled && c->ibuf.w.queued < CTL_MSG_LOW_MARK) { + if (imsg_ctl_rde(IMSG_XON, c->ibuf.pid, NULL, 0) != -1) + c->throttled = 0; + } + } if (!(pfd->revents & POLLIN)) return (0); @@ -522,6 +527,11 @@ control_imsg_relay(struct imsg *imsg) if ((c = control_connbypid(imsg->hdr.pid)) == NULL) return (0); + if (!c->throttled && c->ibuf.w.queued > CTL_MSG_HIGH_MARK) { + if (imsg_ctl_rde(IMSG_XOFF, imsg->hdr.pid, NULL, 0) != -1) + c->throttled = 1; + } + return (imsg_compose(&c->ibuf, imsg->hdr.type, 0, imsg->hdr.pid, -1, imsg->data, imsg->hdr.len - IMSG_HEADER_SIZE)); } diff --git a/usr.sbin/bgpd/rde.c b/usr.sbin/bgpd/rde.c index b6b4c764ab4..c15b517bd53 100644 --- a/usr.sbin/bgpd/rde.c +++ b/usr.sbin/bgpd/rde.c @@ -1,4 +1,4 @@ -/* $OpenBSD: rde.c,v 1.363 2017/05/27 18:12:23 phessler Exp $ */ +/* $OpenBSD: rde.c,v 1.364 2017/05/28 12:21:36 claudio Exp $ */ /* * Copyright (c) 2003, 2004 Henning Brauer <henning@openbsd.org> @@ -76,7 +76,7 @@ void rde_update_log(const char *, u_int16_t, void rde_as4byte_fixup(struct rde_peer *, struct rde_aspath *); void rde_reflector(struct rde_peer *, struct rde_aspath *); -void rde_dump_rib_as(struct prefix *, struct rde_aspath *,pid_t, +void rde_dump_rib_as(struct prefix *, struct rde_aspath *, pid_t, int); void rde_dump_filter(struct prefix *, struct ctl_show_rib_request *); @@ -86,8 +86,14 @@ void rde_dump_upcall(struct rib_entry *, void *); void rde_dump_prefix_upcall(struct rib_entry *, void *); void rde_dump_ctx_new(struct ctl_show_rib_request *, pid_t, enum imsg_type); -void rde_dump_mrt_new(struct mrt *, pid_t, int); +void rde_dump_ctx_throttle(pid_t pid, int throttle); +void rde_dump_runner(void); +int rde_dump_pending(void); void rde_dump_done(void *); +void rde_dump_mrt_new(struct mrt *, pid_t, int); +void rde_dump_rib_free(struct rib *); +void rde_dump_mrt_free(struct rib *); +void rde_rib_free(struct rib_desc *); int rde_rdomain_import(struct rde_aspath *, struct rdomain *); void rde_reload_done(void); @@ -131,15 +137,19 @@ struct imsgbuf *ibuf_main; struct rde_memstats rdemem; struct rde_dump_ctx { + LIST_ENTRY(rde_dump_ctx) entry; struct rib_context ribctx; struct ctl_show_rib_request req; sa_family_t af; + u_int8_t throttled; }; +LIST_HEAD(, rde_dump_ctx) rde_dump_h = LIST_HEAD_INITIALIZER(rde_dump_h); + struct rde_mrt_ctx { - struct mrt mrt; - struct rib_context ribctx; LIST_ENTRY(rde_mrt_ctx) entry; + struct rib_context ribctx; + struct mrt mrt; }; LIST_HEAD(, rde_mrt_ctx) rde_mrts = LIST_HEAD_INITIALIZER(rde_mrts); @@ -248,7 +258,7 @@ rde_main(int debug, int verbose) set_pollfd(&pfd[PFD_PIPE_SESSION], ibuf_se); set_pollfd(&pfd[PFD_PIPE_SESSION_CTL], ibuf_se_ctl); - if (rib_dump_pending() && + if (rde_dump_pending() && ibuf_se_ctl && ibuf_se_ctl->w.queued == 0) timeout = 0; @@ -261,7 +271,6 @@ rde_main(int debug, int verbose) i++; } else if (mctx->mrt.state == MRT_STATE_REMOVE) { close(mctx->mrt.wbuf.fd); - LIST_REMOVE(&mctx->ribctx, entry); LIST_REMOVE(mctx, entry); free(mctx); rde_mrt_cnt--; @@ -307,9 +316,9 @@ rde_main(int debug, int verbose) rde_update_queue_runner(); for (aid = AID_INET6; aid < AID_MAX; aid++) rde_update6_queue_runner(aid); - if (rib_dump_pending() && + if (rde_dump_pending() && ibuf_se_ctl && ibuf_se_ctl->w.queued <= 10) - rib_dump_runner(); + rde_dump_runner(); } /* close pipes */ @@ -334,7 +343,6 @@ rde_main(int debug, int verbose) while ((mctx = LIST_FIRST(&rde_mrts)) != NULL) { msgbuf_clear(&mctx->mrt.wbuf); close(mctx->mrt.wbuf.fd); - LIST_REMOVE(&mctx->ribctx, entry); LIST_REMOVE(mctx, entry); free(mctx); } @@ -611,6 +619,25 @@ badnet: memcpy(&verbose, imsg.data, sizeof(verbose)); log_setverbose(verbose); break; + case IMSG_XON: + if (imsg.hdr.peerid) { + peer = peer_get(imsg.hdr.peerid); + if (peer) + peer->throttled = 0; + break; + } else { + rde_dump_ctx_throttle(imsg.hdr.pid, 0); + } + break; + case IMSG_XOFF: + if (imsg.hdr.peerid) { + peer = peer_get(imsg.hdr.peerid); + if (peer) + peer->throttled = 1; + } else { + rde_dump_ctx_throttle(imsg.hdr.pid, 1); + } + break; default: break; } @@ -737,7 +764,7 @@ rde_dispatch_imsg_parent(struct imsgbuf *ibuf) */ in_rules = ribd->in_rules; ribd->in_rules = NULL; - rib_free(rib); + rde_rib_free(ribd); rib = rib_new(rn.name, rn.rtableid, rn.flags); ribd->in_rules = in_rules; } else @@ -2358,7 +2385,7 @@ rde_dump_ctx_new(struct ctl_show_rib_request *req, pid_t pid, memcpy(&ctx->req, req, sizeof(struct ctl_show_rib_request)); ctx->req.pid = pid; ctx->req.type = type; - ctx->ribctx.ctx_count = RDE_RUNNER_ROUNDS; + ctx->ribctx.ctx_count = CTL_MSG_HIGH_MARK; ctx->ribctx.ctx_rib = rib; switch (ctx->req.type) { case IMSG_CTL_SHOW_NETWORK: @@ -2400,20 +2427,72 @@ rde_dump_ctx_new(struct ctl_show_rib_request *req, pid_t pid, ctx->ribctx.ctx_done = rde_dump_done; ctx->ribctx.ctx_arg = ctx; ctx->ribctx.ctx_aid = ctx->req.aid; + LIST_INSERT_HEAD(&rde_dump_h, ctx, entry); rib_dump_r(&ctx->ribctx); } void +rde_dump_ctx_throttle(pid_t pid, int throttle) +{ + struct rde_dump_ctx *ctx; + + LIST_FOREACH(ctx, &rde_dump_h, entry) { + if (ctx->req.pid == pid) { + ctx->throttled = throttle; + return; + } + } +} + +void +rde_dump_runner(void) +{ + struct rde_dump_ctx *ctx, *next; + + for (ctx = LIST_FIRST(&rde_dump_h); ctx != NULL; ctx = next) { + next = LIST_NEXT(ctx, entry); + if (!ctx->throttled) + rib_dump_r(&ctx->ribctx); + } +} + +int +rde_dump_pending(void) +{ + struct rde_dump_ctx *ctx; + + /* return true if there is at least one unthrottled context */ + LIST_FOREACH(ctx, &rde_dump_h, entry) + if (!ctx->throttled) + return (1); + + return (0); +} + +void rde_dump_done(void *arg) { struct rde_dump_ctx *ctx = arg; imsg_compose(ibuf_se_ctl, IMSG_CTL_END, 0, ctx->req.pid, -1, NULL, 0); + LIST_REMOVE(ctx, entry); free(ctx); } void +rde_dump_rib_free(struct rib *rib) +{ + struct rde_dump_ctx *ctx, *next; + + for (ctx = LIST_FIRST(&rde_dump_h); ctx != NULL; ctx = next) { + next = LIST_NEXT(ctx, entry); + if (ctx->ribctx.ctx_rib == rib) + rde_dump_done(ctx); + } +} + +void rde_dump_mrt_new(struct mrt *mrt, pid_t pid, int fd) { struct rde_mrt_ctx *ctx; @@ -2437,7 +2516,7 @@ rde_dump_mrt_new(struct mrt *mrt, pid_t pid, int fd) if (ctx->mrt.type == MRT_TABLE_DUMP_V2) mrt_dump_v2_hdr(&ctx->mrt, conf, &peerlist); - ctx->ribctx.ctx_count = RDE_RUNNER_ROUNDS; + ctx->ribctx.ctx_count = CTL_MSG_HIGH_MARK; ctx->ribctx.ctx_rib = rib; ctx->ribctx.ctx_upcall = mrt_dump_upcall; ctx->ribctx.ctx_done = mrt_done; @@ -2448,6 +2527,28 @@ rde_dump_mrt_new(struct mrt *mrt, pid_t pid, int fd) rib_dump_r(&ctx->ribctx); } +void +rde_dump_mrt_free(struct rib *rib) +{ + struct rde_mrt_ctx *ctx, *next; + + for (ctx = LIST_FIRST(&rde_mrts); ctx != NULL; ctx = next) { + next = LIST_NEXT(ctx, entry); + if (ctx->ribctx.ctx_rib == rib) + mrt_done(&ctx->mrt); + } +} + +void +rde_rib_free(struct rib_desc *rd) +{ + /* abort pending rib_dumps */ + rde_dump_rib_free(&rd->rib); + rde_dump_mrt_free(&rd->rib); + + rib_free(&rd->rib); +} + /* * kroute specific functions */ @@ -2849,7 +2950,7 @@ rde_reload_done(void) switch (ribs[rid].state) { case RECONF_DELETE: - rib_free(&ribs[rid].rib); + rde_rib_free(&ribs[rid]); break; case RECONF_KEEP: if (rde_filter_equal(ribs[rid].in_rules, diff --git a/usr.sbin/bgpd/rde.h b/usr.sbin/bgpd/rde.h index cdf5ad2f6d4..5debc98728a 100644 --- a/usr.sbin/bgpd/rde.h +++ b/usr.sbin/bgpd/rde.h @@ -1,4 +1,4 @@ -/* $OpenBSD: rde.h,v 1.160 2017/01/25 03:21:55 claudio Exp $ */ +/* $OpenBSD: rde.h,v 1.161 2017/05/28 12:21:36 claudio Exp $ */ /* * Copyright (c) 2003, 2004 Claudio Jeker <claudio@openbsd.org> and @@ -81,6 +81,7 @@ struct rde_peer { u_int16_t mrt_idx; u_int8_t reconf_out; /* out filter changed */ u_int8_t reconf_rib; /* rib changed */ + u_int8_t throttled; }; #define AS_SET 1 @@ -267,7 +268,6 @@ struct pt_entry_vpn4 { }; struct rib_context { - LIST_ENTRY(rib_context) entry; struct rib_entry *ctx_re; struct rib *ctx_rib; void (*ctx_upcall)(struct rib_entry *, void *); @@ -436,8 +436,6 @@ struct rib_entry *rib_lookup(struct rib *, struct bgpd_addr *); void rib_dump(struct rib *, void (*)(struct rib_entry *, void *), void *, u_int8_t); void rib_dump_r(struct rib_context *); -void rib_dump_runner(void); -int rib_dump_pending(void); static inline struct rib * re_rib(struct rib_entry *re) diff --git a/usr.sbin/bgpd/rde_rib.c b/usr.sbin/bgpd/rde_rib.c index dce18137b8c..68604f0b285 100644 --- a/usr.sbin/bgpd/rde_rib.c +++ b/usr.sbin/bgpd/rde_rib.c @@ -1,4 +1,4 @@ -/* $OpenBSD: rde_rib.c,v 1.153 2017/01/25 03:21:55 claudio Exp $ */ +/* $OpenBSD: rde_rib.c,v 1.154 2017/05/28 12:21:36 claudio Exp $ */ /* * Copyright (c) 2003, 2004 Claudio Jeker <claudio@openbsd.org> @@ -38,8 +38,6 @@ u_int16_t rib_size; struct rib_desc *ribs; -LIST_HEAD(, rib_context) rib_dump_h = LIST_HEAD_INITIALIZER(rib_dump_h); - struct rib_entry *rib_add(struct rib *, struct bgpd_addr *, int); int rib_compare(const struct rib_entry *, const struct rib_entry *); void rib_remove(struct rib_entry *); @@ -136,25 +134,10 @@ rib_desc(struct rib *rib) void rib_free(struct rib *rib) { - struct rib_context *ctx, *next; struct rib_desc *rd; struct rib_entry *re, *xre; struct prefix *p, *np; - /* abort pending rib_dumps */ - for (ctx = LIST_FIRST(&rib_dump_h); ctx != NULL; ctx = next) { - next = LIST_NEXT(ctx, entry); - if (ctx->ctx_rib == rib) { - re = ctx->ctx_re; - re_unlock(re); - LIST_REMOVE(ctx, entry); - if (ctx->ctx_done) - ctx->ctx_done(ctx->ctx_arg); - else - free(ctx); - } - } - for (re = RB_MIN(rib_tree, rib_tree(rib)); re != NULL; re = xre) { xre = RB_NEXT(rib_tree, rib_tree(rib), re); @@ -311,10 +294,9 @@ rib_dump_r(struct rib_context *ctx) struct rib_entry *re; unsigned int i; - if (ctx->ctx_re == NULL) { + if (ctx->ctx_re == NULL) re = RB_MIN(rib_tree, rib_tree(ctx->ctx_rib)); - LIST_INSERT_HEAD(&rib_dump_h, ctx, entry); - } else + else re = rib_restart(ctx); for (i = 0; re != NULL; re = RB_NEXT(rib_tree, unused, re)) { @@ -322,7 +304,7 @@ rib_dump_r(struct rib_context *ctx) ctx->ctx_aid != re->prefix->aid) continue; if (ctx->ctx_count && i++ >= ctx->ctx_count && - re_is_locked(re)) { + !re_is_locked(re)) { /* store and lock last element */ ctx->ctx_re = re; re_lock(re); @@ -331,7 +313,6 @@ rib_dump_r(struct rib_context *ctx) ctx->ctx_upcall(re, ctx->ctx_arg); } - LIST_REMOVE(ctx, entry); if (ctx->ctx_done) ctx->ctx_done(ctx->ctx_arg); else @@ -357,23 +338,6 @@ rib_restart(struct rib_context *ctx) return (re); } -void -rib_dump_runner(void) -{ - struct rib_context *ctx, *next; - - for (ctx = LIST_FIRST(&rib_dump_h); ctx != NULL; ctx = next) { - next = LIST_NEXT(ctx, entry); - rib_dump_r(ctx); - } -} - -int -rib_dump_pending(void) -{ - return (!LIST_EMPTY(&rib_dump_h)); -} - /* used to bump correct prefix counters */ #define PREFIX_COUNT(x, op) \ do { \ diff --git a/usr.sbin/bgpd/session.c b/usr.sbin/bgpd/session.c index 32405955b3e..6ec62d32b92 100644 --- a/usr.sbin/bgpd/session.c +++ b/usr.sbin/bgpd/session.c @@ -1,4 +1,4 @@ -/* $OpenBSD: session.c,v 1.361 2017/05/27 10:33:15 phessler Exp $ */ +/* $OpenBSD: session.c,v 1.362 2017/05/28 12:21:36 claudio Exp $ */ /* * Copyright (c) 2003, 2004, 2005 Henning Brauer <henning@openbsd.org> @@ -195,7 +195,6 @@ session_main(int debug, int verbose) u_int pfd_elms = 0, peer_l_elms = 0, mrt_l_elms = 0; u_int listener_cnt, ctl_cnt, mrt_cnt; u_int new_cnt; - u_int32_t ctl_queued; struct passwd *pw; struct peer *p, **peer_l = NULL, *last, *next; struct mrt *m, *xm, **mrt_l = NULL; @@ -356,17 +355,7 @@ session_main(int debug, int verbose) set_pollfd(&pfd[PFD_PIPE_MAIN], ibuf_main); set_pollfd(&pfd[PFD_PIPE_ROUTE], ibuf_rde); - - ctl_queued = 0; - TAILQ_FOREACH(ctl_conn, &ctl_conns, entry) - ctl_queued += ctl_conn->ibuf.w.queued; - - /* - * Do not act as unlimited buffer. Don't read in more - * messages if the ctl sockets are getting full. - */ - if (ctl_queued < SESSION_CTL_QUEUE_MAX) - set_pollfd(&pfd[PFD_PIPE_ROUTE_CTL], ibuf_rde_ctl); + set_pollfd(&pfd[PFD_PIPE_ROUTE_CTL], ibuf_rde_ctl); if (pauseaccept == 0) { pfd[PFD_SOCK_CTL].fd = csock; @@ -1389,6 +1378,13 @@ session_sendmsg(struct bgp_msg *msg, struct peer *p) } ibuf_close(&p->wbuf, msg->buf); + if (!p->throttled && p->wbuf.queued > SESS_MSG_HIGH_MARK) { + if (imsg_compose(ibuf_rde, IMSG_XOFF, p->conf.id, 0, -1, + NULL, 0) == -1) + log_peer_warn(&p->conf, "imsg_compose XOFF"); + p->throttled = 1; + } + free(msg); return (0); } @@ -1774,6 +1770,12 @@ session_dispatch_msg(struct pollfd *pfd, struct peer *p) bgp_fsm(p, EVNT_CON_FATAL); return (1); } + if (p->throttled && p->wbuf.queued < SESS_MSG_LOW_MARK) { + if (imsg_compose(ibuf_rde, IMSG_XON, p->conf.id, 0, -1, + NULL, 0) == -1) + log_peer_warn(&p->conf, "imsg_compose XON"); + p->throttled = 0; + } if (!(pfd->revents & POLLIN)) return (1); } diff --git a/usr.sbin/bgpd/session.h b/usr.sbin/bgpd/session.h index 863d0535178..53841fde62c 100644 --- a/usr.sbin/bgpd/session.h +++ b/usr.sbin/bgpd/session.h @@ -1,4 +1,4 @@ -/* $OpenBSD: session.h,v 1.122 2017/01/13 18:59:12 phessler Exp $ */ +/* $OpenBSD: session.h,v 1.123 2017/05/28 12:21:36 claudio Exp $ */ /* * Copyright (c) 2003, 2004 Henning Brauer <henning@openbsd.org> @@ -142,6 +142,7 @@ struct ctl_conn { TAILQ_ENTRY(ctl_conn) entry; struct imsgbuf ibuf; int restricted; + int throttled; }; TAILQ_HEAD(ctl_conns, ctl_conn) ctl_conns; @@ -225,6 +226,7 @@ struct peer { u_int8_t depend_ok; u_int8_t demoted; u_int8_t passive; + u_int8_t throttled; }; extern struct peer *peers; |