summaryrefslogtreecommitdiff
path: root/usr.sbin/hoststated/pfe.c
diff options
context:
space:
mode:
Diffstat (limited to 'usr.sbin/hoststated/pfe.c')
-rw-r--r--usr.sbin/hoststated/pfe.c159
1 files changed, 149 insertions, 10 deletions
diff --git a/usr.sbin/hoststated/pfe.c b/usr.sbin/hoststated/pfe.c
index 0af74099611..c6f0bbb7133 100644
--- a/usr.sbin/hoststated/pfe.c
+++ b/usr.sbin/hoststated/pfe.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: pfe.c,v 1.14 2007/02/08 13:32:24 reyk Exp $ */
+/* $OpenBSD: pfe.c,v 1.15 2007/02/22 03:32:39 reyk Exp $ */
/*
* Copyright (c) 2006 Pierre-Yves Ritschard <pyr@spootnik.org>
@@ -39,6 +39,7 @@ void pfe_sig_handler(int sig, short, void *);
void pfe_shutdown(void);
void pfe_dispatch_imsg(int, short, void *);
void pfe_dispatch_parent(int, short, void *);
+void pfe_dispatch_relay(int, short, void *);
void pfe_sync(void);
@@ -46,6 +47,7 @@ static struct hoststated *env = NULL;
struct imsgbuf *ibuf_main;
struct imsgbuf *ibuf_hce;
+struct imsgbuf *ibuf_relay;
void
pfe_sig_handler(int sig, short event, void *arg)
@@ -61,12 +63,15 @@ pfe_sig_handler(int sig, short event, void *arg)
pid_t
pfe(struct hoststated *x_env, int pipe_parent2pfe[2], int pipe_parent2hce[2],
- int pipe_pfe2hce[2])
+ int pipe_parent2relay[2], int pipe_pfe2hce[2],
+ int pipe_pfe2relay[RELAY_MAXPROC][2])
{
pid_t pid;
struct passwd *pw;
struct event ev_sigint;
struct event ev_sigterm;
+ int i;
+ struct imsgbuf *ibuf;
switch (pid = fork()) {
case -1:
@@ -114,8 +119,13 @@ pfe(struct hoststated *x_env, int pipe_parent2pfe[2], int pipe_parent2hce[2],
close(pipe_parent2pfe[0]);
close(pipe_parent2hce[0]);
close(pipe_parent2hce[1]);
+ close(pipe_parent2relay[0]);
+ close(pipe_parent2relay[1]);
+ for (i = 0; i < env->prefork_relay; i++)
+ close(pipe_pfe2relay[i][0]);
if ((ibuf_hce = calloc(1, sizeof(struct imsgbuf))) == NULL ||
+ (ibuf_relay = calloc(i, sizeof(struct imsgbuf))) == NULL ||
(ibuf_main = calloc(1, sizeof(struct imsgbuf))) == NULL)
fatal("pfe");
imsg_init(ibuf_hce, pipe_pfe2hce[1], pfe_dispatch_imsg);
@@ -131,11 +141,24 @@ pfe(struct hoststated *x_env, int pipe_parent2pfe[2], int pipe_parent2hce[2],
ibuf_main->handler, ibuf_main);
event_add(&ibuf_main->ev, NULL);
+ for (i = 0; i < env->prefork_relay; i++) {
+ ibuf = &ibuf_relay[i];
+ imsg_init(ibuf, pipe_pfe2relay[i][1], pfe_dispatch_relay);
+
+ ibuf_relay->events = EV_READ;
+ event_set(&ibuf->ev, ibuf->fd, ibuf->events,
+ ibuf->handler, ibuf);
+ event_add(&ibuf->ev, NULL);
+ }
+
TAILQ_INIT(&ctl_conns);
if (control_listen() == -1)
fatalx("pfe: control socket listen failed");
+ /* Initial sync */
+ pfe_sync();
+
event_dispatch();
pfe_shutdown();
@@ -192,20 +215,35 @@ pfe_dispatch_imsg(int fd, short event, void *ptr)
memcpy(&st, imsg.data, sizeof(st));
if ((host = host_find(env, st.id)) == NULL)
fatalx("pfe_dispatch_imsg: invalid host id");
- if (host->up == st.up) {
+
+ host->retry_cnt = st.retry_cnt;
+ if (st.up != HOST_UNKNOWN) {
+ host->check_cnt++;
+ if (st.up == HOST_UP)
+ host->up_cnt++;
+ }
+ if (host->check_cnt != st.check_cnt) {
log_debug("pfe_dispatch_imsg: host %d => %d",
host->id, host->up);
fatalx("pfe_dispatch_imsg: desynchronized");
}
+ if (host->up == st.up)
+ break;
+
+ /* Forward to relay engine(s) */
+ for (n = 0; n < env->prefork_relay; n++)
+ imsg_compose(&ibuf_relay[n], IMSG_HOST_STATUS, 0, 0,
+ &st, sizeof(st));
+
if ((table = table_find(env, host->tableid)) == NULL)
fatalx("pfe_dispatch_imsg: invalid table id");
log_debug("pfe_dispatch_imsg: state %d for host %u %s",
st.up, host->id, host->name);
- if ((st.up == HOST_UNKNOWN && host->up == HOST_DOWN) ||
- (st.up == HOST_DOWN && host->up == HOST_UNKNOWN)) {
+ if ((st.up == HOST_UNKNOWN && !HOST_ISUP(host->up)) ||
+ (!HOST_ISUP(st.up) && host->up == HOST_UNKNOWN)) {
host->up = st.up;
break;
}
@@ -215,6 +253,7 @@ pfe_dispatch_imsg(int fd, short event, void *ptr)
table->up++;
host->flags |= F_ADD;
host->flags &= ~(F_DEL);
+ host->up = HOST_UP;
} else {
table->up--;
table->flags |= F_CHANGED;
@@ -248,7 +287,7 @@ pfe_dispatch_parent(int fd, short event, void * ptr)
case EV_READ:
if ((n = imsg_read(ibuf)) == -1)
fatal("imsg_read error");
- if (n == 0) /* connection closed */
+ if (n == 0)
fatalx("pfe_dispatch_parent: pipe closed");
break;
case EV_WRITE:
@@ -277,10 +316,76 @@ pfe_dispatch_parent(int fd, short event, void * ptr)
}
void
+pfe_dispatch_relay(int fd, short event, void * ptr)
+{
+ struct imsgbuf *ibuf;
+ struct imsg imsg;
+ ssize_t n;
+ struct ctl_natlook cnl;
+ struct ctl_stats crs;
+ struct relay *rlay;
+
+ ibuf = ptr;
+ switch (event) {
+ case EV_READ:
+ if ((n = imsg_read(ibuf)) == -1)
+ fatal("imsg_read error");
+ if (n == 0)
+ fatalx("pfe_dispatch_relay: pipe closed");
+ break;
+ case EV_WRITE:
+ if (msgbuf_write(&ibuf->w) == -1)
+ fatal("msgbuf_write");
+ imsg_event_add(ibuf);
+ return;
+ default:
+ fatalx("unknown event");
+ }
+
+ for (;;) {
+ if ((n = imsg_get(ibuf, &imsg)) == -1)
+ fatal("pfe_dispatch_relay: imsg_read error");
+ if (n == 0)
+ break;
+
+ switch (imsg.hdr.type) {
+ case IMSG_NATLOOK:
+ if (imsg.hdr.len != IMSG_HEADER_SIZE + sizeof(cnl))
+ fatalx("invalid imsg header len");
+ bcopy(imsg.data, &cnl, sizeof(cnl));
+ if (natlook(env, &cnl) != 0)
+ cnl.in = -1;
+ imsg_compose(&ibuf_relay[cnl.proc], IMSG_NATLOOK, 0, 0,
+ &cnl, sizeof(cnl));
+ break;
+ case IMSG_STATISTICS:
+ if (imsg.hdr.len != IMSG_HEADER_SIZE + sizeof(crs))
+ fatalx("invalid imsg header len");
+ bcopy(imsg.data, &crs, sizeof(crs));
+ if (crs.proc > env->prefork_relay)
+ fatalx("pfe_dispatch_relay: invalid relay proc");
+ if ((rlay = relay_find(env, crs.id)) == NULL)
+ fatalx("pfe_dispatch_relay: invalid relay id");
+ bcopy(&crs, &rlay->stats[crs.proc], sizeof(crs));
+ rlay->stats[crs.proc].interval =
+ env->statinterval.tv_sec;
+ break;
+ default:
+ log_debug("pfe_dispatch_relay: unexpected imsg %d",
+ imsg.hdr.type);
+ break;
+ }
+ imsg_free(&imsg);
+ }
+ imsg_event_add(ibuf);
+}
+
+void
show(struct ctl_conn *c)
{
struct service *service;
struct host *host;
+ struct relay *rlay;
TAILQ_FOREACH(service, &env->services, entry) {
imsg_compose(&c->ibuf, IMSG_CTL_SERVICE, 0, 0,
@@ -304,6 +409,14 @@ show(struct ctl_conn *c)
imsg_compose(&c->ibuf, IMSG_CTL_HOST, 0, 0,
host, sizeof(*host));
}
+ TAILQ_FOREACH(rlay, &env->relays, entry) {
+ rlay->stats[env->prefork_relay].id = EMPTY_ID;
+ imsg_compose(&c->ibuf, IMSG_CTL_RELAY, 0, 0,
+ rlay, sizeof(*rlay));
+ imsg_compose(&c->ibuf, IMSG_CTL_STATISTICS, 0, 0,
+ &rlay->stats, sizeof(rlay->stats));
+ }
+
imsg_compose(&c->ibuf, IMSG_CTL_END, 0, 0, NULL, 0);
}
@@ -459,6 +572,8 @@ disable_host(struct ctl_conn *c, struct ctl_id *id)
host->flags |= F_DISABLE;
host->flags |= F_DEL;
host->flags &= ~(F_ADD);
+ host->check_cnt = 0;
+ host->up_cnt = 0;
imsg_compose(ibuf_hce, IMSG_HOST_DISABLE, 0, 0,
&host->id, sizeof(host->id));
@@ -498,10 +613,12 @@ enable_host(struct ctl_conn *c, struct ctl_id *id)
void
pfe_sync(void)
{
- struct service *service;
- struct table *active;
- struct ctl_id id;
- struct imsg imsg;
+ struct service *service;
+ struct table *active;
+ struct table *table;
+ struct ctl_id id;
+ struct imsg imsg;
+ struct ctl_demote demote;
bzero(&id, sizeof(id));
bzero(&imsg, sizeof(imsg));
@@ -556,4 +673,26 @@ pfe_sync(void)
control_imsg_forward(&imsg);
}
}
+
+ TAILQ_FOREACH(table, &env->tables, entry) {
+ if ((table->flags & F_DEMOTE) == 0)
+ continue;
+ demote.level = 0;
+ if (table->up && table->demoted) {
+ demote.level = -1;
+ table->demoted = 0;
+ }
+ else if (!table->up && !table->demoted) {
+ demote.level = 1;
+ table->demoted = 1;
+ }
+ if (demote.level == 0)
+ continue;
+ log_debug("pfe_sync: demote %d table '%s' group '%s'",
+ demote.level, table->name, table->demote_group);
+ strlcpy(demote.group, table->demote_group,
+ sizeof(demote.group));
+ imsg_compose(ibuf_main, IMSG_DEMOTE, 0, 0,
+ &demote, sizeof(demote));
+ }
}