diff options
Diffstat (limited to 'usr.sbin/hoststated/pfe.c')
-rw-r--r-- | usr.sbin/hoststated/pfe.c | 159 |
1 files changed, 149 insertions, 10 deletions
diff --git a/usr.sbin/hoststated/pfe.c b/usr.sbin/hoststated/pfe.c index 0af74099611..c6f0bbb7133 100644 --- a/usr.sbin/hoststated/pfe.c +++ b/usr.sbin/hoststated/pfe.c @@ -1,4 +1,4 @@ -/* $OpenBSD: pfe.c,v 1.14 2007/02/08 13:32:24 reyk Exp $ */ +/* $OpenBSD: pfe.c,v 1.15 2007/02/22 03:32:39 reyk Exp $ */ /* * Copyright (c) 2006 Pierre-Yves Ritschard <pyr@spootnik.org> @@ -39,6 +39,7 @@ void pfe_sig_handler(int sig, short, void *); void pfe_shutdown(void); void pfe_dispatch_imsg(int, short, void *); void pfe_dispatch_parent(int, short, void *); +void pfe_dispatch_relay(int, short, void *); void pfe_sync(void); @@ -46,6 +47,7 @@ static struct hoststated *env = NULL; struct imsgbuf *ibuf_main; struct imsgbuf *ibuf_hce; +struct imsgbuf *ibuf_relay; void pfe_sig_handler(int sig, short event, void *arg) @@ -61,12 +63,15 @@ pfe_sig_handler(int sig, short event, void *arg) pid_t pfe(struct hoststated *x_env, int pipe_parent2pfe[2], int pipe_parent2hce[2], - int pipe_pfe2hce[2]) + int pipe_parent2relay[2], int pipe_pfe2hce[2], + int pipe_pfe2relay[RELAY_MAXPROC][2]) { pid_t pid; struct passwd *pw; struct event ev_sigint; struct event ev_sigterm; + int i; + struct imsgbuf *ibuf; switch (pid = fork()) { case -1: @@ -114,8 +119,13 @@ pfe(struct hoststated *x_env, int pipe_parent2pfe[2], int pipe_parent2hce[2], close(pipe_parent2pfe[0]); close(pipe_parent2hce[0]); close(pipe_parent2hce[1]); + close(pipe_parent2relay[0]); + close(pipe_parent2relay[1]); + for (i = 0; i < env->prefork_relay; i++) + close(pipe_pfe2relay[i][0]); if ((ibuf_hce = calloc(1, sizeof(struct imsgbuf))) == NULL || + (ibuf_relay = calloc(i, sizeof(struct imsgbuf))) == NULL || (ibuf_main = calloc(1, sizeof(struct imsgbuf))) == NULL) fatal("pfe"); imsg_init(ibuf_hce, pipe_pfe2hce[1], pfe_dispatch_imsg); @@ -131,11 +141,24 @@ pfe(struct hoststated *x_env, int pipe_parent2pfe[2], int pipe_parent2hce[2], ibuf_main->handler, ibuf_main); event_add(&ibuf_main->ev, NULL); + for (i = 0; i < env->prefork_relay; i++) { + ibuf = &ibuf_relay[i]; + imsg_init(ibuf, pipe_pfe2relay[i][1], pfe_dispatch_relay); + + ibuf_relay->events = EV_READ; + event_set(&ibuf->ev, ibuf->fd, ibuf->events, + ibuf->handler, ibuf); + event_add(&ibuf->ev, NULL); + } + TAILQ_INIT(&ctl_conns); if (control_listen() == -1) fatalx("pfe: control socket listen failed"); + /* Initial sync */ + pfe_sync(); + event_dispatch(); pfe_shutdown(); @@ -192,20 +215,35 @@ pfe_dispatch_imsg(int fd, short event, void *ptr) memcpy(&st, imsg.data, sizeof(st)); if ((host = host_find(env, st.id)) == NULL) fatalx("pfe_dispatch_imsg: invalid host id"); - if (host->up == st.up) { + + host->retry_cnt = st.retry_cnt; + if (st.up != HOST_UNKNOWN) { + host->check_cnt++; + if (st.up == HOST_UP) + host->up_cnt++; + } + if (host->check_cnt != st.check_cnt) { log_debug("pfe_dispatch_imsg: host %d => %d", host->id, host->up); fatalx("pfe_dispatch_imsg: desynchronized"); } + if (host->up == st.up) + break; + + /* Forward to relay engine(s) */ + for (n = 0; n < env->prefork_relay; n++) + imsg_compose(&ibuf_relay[n], IMSG_HOST_STATUS, 0, 0, + &st, sizeof(st)); + if ((table = table_find(env, host->tableid)) == NULL) fatalx("pfe_dispatch_imsg: invalid table id"); log_debug("pfe_dispatch_imsg: state %d for host %u %s", st.up, host->id, host->name); - if ((st.up == HOST_UNKNOWN && host->up == HOST_DOWN) || - (st.up == HOST_DOWN && host->up == HOST_UNKNOWN)) { + if ((st.up == HOST_UNKNOWN && !HOST_ISUP(host->up)) || + (!HOST_ISUP(st.up) && host->up == HOST_UNKNOWN)) { host->up = st.up; break; } @@ -215,6 +253,7 @@ pfe_dispatch_imsg(int fd, short event, void *ptr) table->up++; host->flags |= F_ADD; host->flags &= ~(F_DEL); + host->up = HOST_UP; } else { table->up--; table->flags |= F_CHANGED; @@ -248,7 +287,7 @@ pfe_dispatch_parent(int fd, short event, void * ptr) case EV_READ: if ((n = imsg_read(ibuf)) == -1) fatal("imsg_read error"); - if (n == 0) /* connection closed */ + if (n == 0) fatalx("pfe_dispatch_parent: pipe closed"); break; case EV_WRITE: @@ -277,10 +316,76 @@ pfe_dispatch_parent(int fd, short event, void * ptr) } void +pfe_dispatch_relay(int fd, short event, void * ptr) +{ + struct imsgbuf *ibuf; + struct imsg imsg; + ssize_t n; + struct ctl_natlook cnl; + struct ctl_stats crs; + struct relay *rlay; + + ibuf = ptr; + switch (event) { + case EV_READ: + if ((n = imsg_read(ibuf)) == -1) + fatal("imsg_read error"); + if (n == 0) + fatalx("pfe_dispatch_relay: pipe closed"); + break; + case EV_WRITE: + if (msgbuf_write(&ibuf->w) == -1) + fatal("msgbuf_write"); + imsg_event_add(ibuf); + return; + default: + fatalx("unknown event"); + } + + for (;;) { + if ((n = imsg_get(ibuf, &imsg)) == -1) + fatal("pfe_dispatch_relay: imsg_read error"); + if (n == 0) + break; + + switch (imsg.hdr.type) { + case IMSG_NATLOOK: + if (imsg.hdr.len != IMSG_HEADER_SIZE + sizeof(cnl)) + fatalx("invalid imsg header len"); + bcopy(imsg.data, &cnl, sizeof(cnl)); + if (natlook(env, &cnl) != 0) + cnl.in = -1; + imsg_compose(&ibuf_relay[cnl.proc], IMSG_NATLOOK, 0, 0, + &cnl, sizeof(cnl)); + break; + case IMSG_STATISTICS: + if (imsg.hdr.len != IMSG_HEADER_SIZE + sizeof(crs)) + fatalx("invalid imsg header len"); + bcopy(imsg.data, &crs, sizeof(crs)); + if (crs.proc > env->prefork_relay) + fatalx("pfe_dispatch_relay: invalid relay proc"); + if ((rlay = relay_find(env, crs.id)) == NULL) + fatalx("pfe_dispatch_relay: invalid relay id"); + bcopy(&crs, &rlay->stats[crs.proc], sizeof(crs)); + rlay->stats[crs.proc].interval = + env->statinterval.tv_sec; + break; + default: + log_debug("pfe_dispatch_relay: unexpected imsg %d", + imsg.hdr.type); + break; + } + imsg_free(&imsg); + } + imsg_event_add(ibuf); +} + +void show(struct ctl_conn *c) { struct service *service; struct host *host; + struct relay *rlay; TAILQ_FOREACH(service, &env->services, entry) { imsg_compose(&c->ibuf, IMSG_CTL_SERVICE, 0, 0, @@ -304,6 +409,14 @@ show(struct ctl_conn *c) imsg_compose(&c->ibuf, IMSG_CTL_HOST, 0, 0, host, sizeof(*host)); } + TAILQ_FOREACH(rlay, &env->relays, entry) { + rlay->stats[env->prefork_relay].id = EMPTY_ID; + imsg_compose(&c->ibuf, IMSG_CTL_RELAY, 0, 0, + rlay, sizeof(*rlay)); + imsg_compose(&c->ibuf, IMSG_CTL_STATISTICS, 0, 0, + &rlay->stats, sizeof(rlay->stats)); + } + imsg_compose(&c->ibuf, IMSG_CTL_END, 0, 0, NULL, 0); } @@ -459,6 +572,8 @@ disable_host(struct ctl_conn *c, struct ctl_id *id) host->flags |= F_DISABLE; host->flags |= F_DEL; host->flags &= ~(F_ADD); + host->check_cnt = 0; + host->up_cnt = 0; imsg_compose(ibuf_hce, IMSG_HOST_DISABLE, 0, 0, &host->id, sizeof(host->id)); @@ -498,10 +613,12 @@ enable_host(struct ctl_conn *c, struct ctl_id *id) void pfe_sync(void) { - struct service *service; - struct table *active; - struct ctl_id id; - struct imsg imsg; + struct service *service; + struct table *active; + struct table *table; + struct ctl_id id; + struct imsg imsg; + struct ctl_demote demote; bzero(&id, sizeof(id)); bzero(&imsg, sizeof(imsg)); @@ -556,4 +673,26 @@ pfe_sync(void) control_imsg_forward(&imsg); } } + + TAILQ_FOREACH(table, &env->tables, entry) { + if ((table->flags & F_DEMOTE) == 0) + continue; + demote.level = 0; + if (table->up && table->demoted) { + demote.level = -1; + table->demoted = 0; + } + else if (!table->up && !table->demoted) { + demote.level = 1; + table->demoted = 1; + } + if (demote.level == 0) + continue; + log_debug("pfe_sync: demote %d table '%s' group '%s'", + demote.level, table->name, table->demote_group); + strlcpy(demote.group, table->demote_group, + sizeof(demote.group)); + imsg_compose(ibuf_main, IMSG_DEMOTE, 0, 0, + &demote, sizeof(demote)); + } } |