/*	$OpenBSD: scheduler_ramqueue.c,v 1.30 2013/07/19 21:34:31 eric Exp $	*/

/*
 * Copyright (c) 2012 Gilles Chehade <gilles@poolp.org>
 * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
 *
 * Permission to use, copy, modify, and distribute this software for any
 * purpose with or without fee is hereby granted, provided that the above
 * copyright notice and this permission notice appear in all copies.
 *
 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 */

#include <sys/types.h>
#include <sys/queue.h>
#include <sys/tree.h>
#include <sys/socket.h>

#include <ctype.h>
#include <err.h>
#include <event.h>
#include <fcntl.h>
#include <imsg.h>
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

#include "smtpd.h"
#include "log.h"

TAILQ_HEAD(evplist, rq_envelope);

struct rq_message {
	uint32_t		 msgid;
	struct tree		 envelopes;
	struct rq_message	*q_next;
	struct evplist		 q_mta;
};

struct rq_envelope {
	TAILQ_ENTRY(rq_envelope) entry;

	uint64_t		 evpid;
	int			 type;

#define	RQ_ENVELOPE_PENDING	 0x01
#define	RQ_ENVELOPE_SCHEDULED	 0x02
#define	RQ_ENVELOPE_EXPIRED	 0x04
#define	RQ_ENVELOPE_REMOVED	 0x08
#define	RQ_ENVELOPE_INFLIGHT	 0x10
#define	RQ_ENVELOPE_SUSPEND	 0x20
	uint8_t			 flags;

	time_t			 sched;
	time_t			 expire;

	struct rq_message	*message;

	time_t			 t_inflight;
	time_t			 t_scheduled;
};

struct rq_queue {
	size_t			 evpcount;
	struct tree		 messages;

	struct evplist		 q_pending;
	struct evplist		 q_inflight;

	struct rq_message	*q_mtabatch;
	struct evplist		 q_mda;
	struct evplist		 q_bounce;
	struct evplist		 q_expired;
	struct evplist		 q_removed;
};

static int scheduler_ram_init(void);
static int scheduler_ram_insert(struct scheduler_info *);
static size_t scheduler_ram_commit(uint32_t);
static size_t scheduler_ram_rollback(uint32_t);
static int scheduler_ram_update(struct scheduler_info *);
static int scheduler_ram_delete(uint64_t);
static int scheduler_ram_batch(int, struct scheduler_batch *);
static size_t scheduler_ram_messages(uint32_t, uint32_t *, size_t);
static size_t scheduler_ram_envelopes(uint64_t, struct evpstate *, size_t);
static int scheduler_ram_schedule(uint64_t);
static int scheduler_ram_remove(uint64_t);
static int scheduler_ram_suspend(uint64_t);
static int scheduler_ram_resume(uint64_t);

static void sorted_insert(struct evplist *, struct rq_envelope *);
static void sorted_merge(struct evplist *, struct evplist *);

static void rq_queue_init(struct rq_queue *);
static void rq_queue_merge(struct rq_queue *, struct rq_queue *);
static void rq_queue_dump(struct rq_queue *, const char *);
static void rq_queue_schedule(struct rq_queue *rq);
static struct evplist *rq_envelope_list(struct rq_queue *, struct rq_envelope *);
static void rq_envelope_schedule(struct rq_queue *, struct rq_envelope *);
static int rq_envelope_remove(struct rq_queue *, struct rq_envelope *);
static int rq_envelope_suspend(struct rq_queue *, struct rq_envelope *);
static int rq_envelope_resume(struct rq_queue *, struct rq_envelope *);
static void rq_envelope_delete(struct rq_queue *, struct rq_envelope *);
static const char *rq_envelope_to_text(struct rq_envelope *);

struct scheduler_backend scheduler_backend_ramqueue = {
	scheduler_ram_init,

	scheduler_ram_insert,
	scheduler_ram_commit,
	scheduler_ram_rollback,

	scheduler_ram_update,
	scheduler_ram_delete,

	scheduler_ram_batch,

	scheduler_ram_messages,
	scheduler_ram_envelopes,
	scheduler_ram_schedule,
	scheduler_ram_remove,
	scheduler_ram_suspend,
	scheduler_ram_resume,
};

static struct rq_queue	ramqueue;
static struct tree	updates;

static time_t		currtime;

static int
scheduler_ram_init(void)
{
	rq_queue_init(&ramqueue);
	tree_init(&updates);

	return (1);
}

static int
scheduler_ram_insert(struct scheduler_info *si)
{
	uint32_t		 msgid;
	struct rq_queue		*update;
	struct rq_message	*message;
	struct rq_envelope	*envelope;

	currtime = time(NULL);

	msgid = evpid_to_msgid(si->evpid);

	/* find/prepare a ramqueue update */
	if ((update = tree_get(&updates, msgid)) == NULL) {
		update = xcalloc(1, sizeof *update, "scheduler_insert");
		stat_increment("scheduler.ramqueue.update", 1);
		rq_queue_init(update);
		tree_xset(&updates, msgid, update);
	}

	/* find/prepare the msgtree message in ramqueue update */
	if ((message = tree_get(&update->messages, msgid)) == NULL) {
		message = xcalloc(1, sizeof *message, "scheduler_insert");
		message->msgid = msgid;
		TAILQ_INIT(&message->q_mta);
		tree_init(&message->envelopes);
		tree_xset(&update->messages, msgid, message);
		stat_increment("scheduler.ramqueue.message", 1);
	}

	/* create envelope in ramqueue message */
	envelope = xcalloc(1, sizeof *envelope, "scheduler_insert");
	envelope->evpid = si->evpid;
	envelope->type = si->type;
	envelope->message = message;
	envelope->expire = si->creation + si->expire;
	envelope->sched = scheduler_compute_schedule(si);
	tree_xset(&message->envelopes, envelope->evpid, envelope);

	update->evpcount++;
	stat_increment("scheduler.ramqueue.envelope", 1);

	envelope->flags = RQ_ENVELOPE_PENDING;
	sorted_insert(&update->q_pending, envelope);

	si->nexttry = envelope->sched;

	return (1);
}

static size_t
scheduler_ram_commit(uint32_t msgid)
{
	struct rq_queue	*update;
	size_t		 r;

	currtime = time(NULL);

	update = tree_xpop(&updates, msgid);
	r = update->evpcount;

	if (verbose & TRACE_SCHEDULER)
		rq_queue_dump(update, "update to commit");

	rq_queue_merge(&ramqueue, update);

	if (verbose & TRACE_SCHEDULER)
		rq_queue_dump(&ramqueue, "resulting queue");

	rq_queue_schedule(&ramqueue);

	free(update);
	stat_decrement("scheduler.ramqueue.update", 1);

	return (r);
}

static size_t
scheduler_ram_rollback(uint32_t msgid)
{
	struct rq_queue		*update;
	struct rq_envelope	*evp;
	size_t			 r;

	currtime = time(NULL);

	if ((update = tree_pop(&updates, msgid)) == NULL)
		return (0);
	r = update->evpcount;

	while ((evp = TAILQ_FIRST(&update->q_pending))) {
		TAILQ_REMOVE(&update->q_pending, evp, entry);
		rq_envelope_delete(update, evp);
	}

	free(update);
	stat_decrement("scheduler.ramqueue.update", 1);

	return (r);
}

static int
scheduler_ram_update(struct scheduler_info *si)
{
	struct rq_message	*msg;
	struct rq_envelope	*evp;
	uint32_t		 msgid;

	currtime = time(NULL);

	msgid = evpid_to_msgid(si->evpid);
	msg = tree_xget(&ramqueue.messages, msgid);
	evp = tree_xget(&msg->envelopes, si->evpid);

	/* it *must* be in-flight */
	if (!(evp->flags & RQ_ENVELOPE_INFLIGHT))
		errx(1, "evp:%016" PRIx64 " not in-flight", si->evpid);

	while ((evp->sched = scheduler_compute_schedule(si)) <= currtime)
		si->retry += 1;

	TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
	evp->flags &= ~RQ_ENVELOPE_INFLIGHT;
	evp->flags |= RQ_ENVELOPE_PENDING;
	if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
		sorted_insert(&ramqueue.q_pending, evp);

	si->nexttry = evp->sched;

	return (1);
}

static int
scheduler_ram_delete(uint64_t evpid)
{
	struct rq_message	*msg;
	struct rq_envelope	*evp;
	uint32_t		 msgid;

	currtime = time(NULL);

	msgid = evpid_to_msgid(evpid);
	msg = tree_xget(&ramqueue.messages, msgid);
	evp = tree_xget(&msg->envelopes, evpid);

	/* it *must* be in-flight */
	if (!(evp->flags & RQ_ENVELOPE_INFLIGHT))
		errx(1, "evp:%016" PRIx64 " not in-flight", evpid);

	TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
	evp->flags &= ~RQ_ENVELOPE_INFLIGHT;
	rq_envelope_delete(&ramqueue, evp);

	return (1);
}

static int
scheduler_ram_batch(int typemask, struct scheduler_batch *ret)
{
	struct evplist		*q;
	struct rq_envelope	*evp;
	struct rq_message	*msg;
	size_t			 n;

	currtime = time(NULL);

	rq_queue_schedule(&ramqueue);
	if (verbose & TRACE_SCHEDULER)
		rq_queue_dump(&ramqueue, "scheduler_ram_batch()");

	if (typemask & SCHED_REMOVE && TAILQ_FIRST(&ramqueue.q_removed)) {
		q = &ramqueue.q_removed;
		ret->type = SCHED_REMOVE;
	}
	else if (typemask & SCHED_EXPIRE && TAILQ_FIRST(&ramqueue.q_expired)) {
		q = &ramqueue.q_expired;
		ret->type = SCHED_EXPIRE;
	}
	else if (typemask & SCHED_BOUNCE && TAILQ_FIRST(&ramqueue.q_bounce)) {
		q = &ramqueue.q_bounce;
		ret->type = SCHED_BOUNCE;
	}
	else if (typemask & SCHED_MDA && TAILQ_FIRST(&ramqueue.q_mda)) {
		q = &ramqueue.q_mda;
		ret->type = SCHED_MDA;
	}
	else if (typemask & SCHED_MTA && ramqueue.q_mtabatch) {
		msg = ramqueue.q_mtabatch;
		ramqueue.q_mtabatch = msg->q_next;
		msg->q_next = NULL;
		q = &msg->q_mta;
		ret->type = SCHED_MTA;
	}
	else if ((evp = TAILQ_FIRST(&ramqueue.q_pending))) {
		ret->type = SCHED_DELAY;
		ret->evpcount = 0;
		if (evp->sched < evp->expire)
			ret->delay = evp->sched - currtime;
		else
			ret->delay = evp->expire - currtime;
		return (1);
	}
	else {
		ret->type = SCHED_NONE;
		ret->evpcount = 0;
		return (0);
	}

	for (n = 0; (evp = TAILQ_FIRST(q)) && n < ret->evpcount; n++) {

		TAILQ_REMOVE(q, evp, entry);

		/* consistency check */ 
		if (!(evp->flags & RQ_ENVELOPE_SCHEDULED))
			errx(1, "evp:%016" PRIx64 " not scheduled", evp->evpid);

		ret->evpids[n] = evp->evpid;

		if (ret->type == SCHED_REMOVE || ret->type == SCHED_EXPIRE)
			rq_envelope_delete(&ramqueue, evp);
		else {
			TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
			evp->flags &= ~RQ_ENVELOPE_SCHEDULED;
			evp->flags |= RQ_ENVELOPE_INFLIGHT;
			evp->t_inflight = currtime;
		}
	}

	ret->evpcount = n;

	return (1);
}

static size_t
scheduler_ram_messages(uint32_t from, uint32_t *dst, size_t size)
{
	uint64_t id;
	size_t	 n;
	void	*i;

	for (n = 0, i = NULL; n < size; n++) {
		if (tree_iterfrom(&ramqueue.messages, &i, from, &id, NULL) == 0)
			break;
		dst[n] = id;
	}

	return (n);
}

static size_t
scheduler_ram_envelopes(uint64_t from, struct evpstate *dst, size_t size)
{
	struct rq_message	*msg;
	struct rq_envelope	*evp;
	void			*i;
	size_t			 n;

	if ((msg = tree_get(&ramqueue.messages, evpid_to_msgid(from))) == NULL)
		return (0);

	for (n = 0, i = NULL; n < size; ) {

		if (tree_iterfrom(&msg->envelopes, &i, from, NULL,
		    (void**)&evp) == 0)
			break;

		if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
			continue;

		dst[n].retry = 0;
		dst[n].evpid = evp->evpid;
		if (evp->flags & RQ_ENVELOPE_PENDING) {
			dst[n].time = evp->sched;
			dst[n].flags = EF_PENDING;
		}
		else if (evp->flags & RQ_ENVELOPE_SCHEDULED) {
			dst[n].time = evp->t_scheduled;
			dst[n].flags = EF_PENDING;
		}
		else if (evp->flags & RQ_ENVELOPE_INFLIGHT) {
			dst[n].time = evp->t_inflight;
			dst[n].flags = EF_INFLIGHT;
		}
		if (evp->flags & RQ_ENVELOPE_SUSPEND)
			dst[n].flags |= EF_SUSPEND;
		n++;
	}

	return (n);
}

static int
scheduler_ram_schedule(uint64_t evpid)
{
	struct rq_message	*msg;
	struct rq_envelope	*evp;
	uint32_t		 msgid;
	void			*i;
	int			 r;

	currtime = time(NULL);

	if (evpid > 0xffffffff) {
		msgid = evpid_to_msgid(evpid);
		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
			return (0);
		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
			return (0);
		if (evp->flags & RQ_ENVELOPE_PENDING) {
			rq_envelope_schedule(&ramqueue, evp);
			return (1);
		}
		return (0);
	}
	else {
		msgid = evpid;
		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
			return (0);
		i = NULL;
		r = 0;
		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
			if (evp->flags & RQ_ENVELOPE_PENDING) {
				rq_envelope_schedule(&ramqueue, evp);
				r++;
			}
		return (r);
	}
}

static int
scheduler_ram_remove(uint64_t evpid)
{
	struct rq_message	*msg;
	struct rq_envelope	*evp;
	uint32_t		 msgid;
	void			*i;
	int			 r;

	currtime = time(NULL);

	if (evpid > 0xffffffff) {
		msgid = evpid_to_msgid(evpid);
		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
			return (0);
		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
			return (0);
		if (rq_envelope_remove(&ramqueue, evp))
			return (1);
		return (0);
	}
	else {
		msgid = evpid;
		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
			return (0);
		i = NULL;
		r = 0;
		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
			if (rq_envelope_remove(&ramqueue, evp))
				r++;
		return (r);
	}
}

static int
scheduler_ram_suspend(uint64_t evpid)
{
	struct rq_message	*msg;
	struct rq_envelope	*evp;
	uint32_t		 msgid;
	void			*i;
	int			 r;

	currtime = time(NULL);

	if (evpid > 0xffffffff) {
		msgid = evpid_to_msgid(evpid);
		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
			return (0);
		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
			return (0);
		if (rq_envelope_suspend(&ramqueue, evp))
			return (1);
		return (0);
	}
	else {
		msgid = evpid;
		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
			return (0);
		i = NULL;
		r = 0;
		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
			if (rq_envelope_suspend(&ramqueue, evp))
				r++;
		return (r);
	}
}

static int
scheduler_ram_resume(uint64_t evpid)
{
	struct rq_message	*msg;
	struct rq_envelope	*evp;
	uint32_t		 msgid;
	void			*i;
	int			 r;

	currtime = time(NULL);

	if (evpid > 0xffffffff) {
		msgid = evpid_to_msgid(evpid);
		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
			return (0);
		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
			return (0);
		if (rq_envelope_resume(&ramqueue, evp))
			return (1);
		return (0);
	}
	else {
		msgid = evpid;
		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
			return (0);
		i = NULL;
		r = 0;
		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
			if (rq_envelope_resume(&ramqueue, evp))
				r++;
		return (r);
	}
}

static void
sorted_insert(struct evplist *list, struct rq_envelope *evp)
{
	struct rq_envelope	*item;
	time_t			 ref;

	TAILQ_FOREACH(item, list, entry) {
		ref = (evp->sched < evp->expire) ? evp->sched : evp->expire;
		if (ref <= item->expire && ref <= item->sched) {
			TAILQ_INSERT_BEFORE(item, evp, entry);
			return;
		}
	}
	TAILQ_INSERT_TAIL(list, evp, entry);
}

static void
sorted_merge(struct evplist *list, struct evplist *from)
{
	struct rq_envelope	*e;

	/* XXX this is O(not good enough) */
	while ((e = TAILQ_LAST(from, evplist))) {
		TAILQ_REMOVE(from, e, entry);
		sorted_insert(list, e);
	}
}

static void
rq_queue_init(struct rq_queue *rq)
{
	bzero(rq, sizeof *rq);
	tree_init(&rq->messages);
	TAILQ_INIT(&rq->q_pending);
	TAILQ_INIT(&rq->q_inflight);
	TAILQ_INIT(&rq->q_mda);
	TAILQ_INIT(&rq->q_bounce);
	TAILQ_INIT(&rq->q_expired);
	TAILQ_INIT(&rq->q_removed);
}

static void
rq_queue_merge(struct rq_queue *rq, struct rq_queue *update)
{
	struct rq_message	*message, *tomessage;
	struct rq_envelope	*envelope;
	uint64_t		 id;
	void			*i;

	while (tree_poproot(&update->messages, &id, (void*)&message)) {
		if ((tomessage = tree_get(&rq->messages, id)) == NULL) {
			/* message does not exist. re-use structure */
			tree_xset(&rq->messages, id, message);
			continue;
		}
		/* need to re-link all envelopes before merging them */
		i = NULL;
		while ((tree_iter(&message->envelopes, &i, &id,
		    (void*)&envelope)))
			envelope->message = tomessage;
		tree_merge(&tomessage->envelopes, &message->envelopes);
		free(message);
		stat_decrement("scheduler.ramqueue.message", 1);
	}

	sorted_merge(&rq->q_pending, &update->q_pending);
	rq->evpcount += update->evpcount;
}

static void
rq_queue_schedule(struct rq_queue *rq)
{
	struct rq_envelope	*evp;

	while ((evp = TAILQ_FIRST(&rq->q_pending))) {
		if (evp->sched > currtime && evp->expire > currtime)
			break;

		if (evp->flags != RQ_ENVELOPE_PENDING)
			errx(1, "evp:%016" PRIx64 " flags=0x%x", evp->evpid,
			    evp->flags);

		if (evp->expire <= currtime) {
			TAILQ_REMOVE(&rq->q_pending, evp, entry);
			TAILQ_INSERT_TAIL(&rq->q_expired, evp, entry);
			evp->flags &= ~RQ_ENVELOPE_PENDING;
			evp->flags |= RQ_ENVELOPE_EXPIRED;
			evp->flags |= RQ_ENVELOPE_SCHEDULED;
			evp->t_scheduled = currtime;
			continue;
		}
		rq_envelope_schedule(rq, evp);
	}
}

static struct evplist *
rq_envelope_list(struct rq_queue *rq, struct rq_envelope *evp)
{
	if (evp->flags & RQ_ENVELOPE_SCHEDULED) {
		if (evp->flags & RQ_ENVELOPE_EXPIRED)
			return &rq->q_expired;
		if (evp->flags & RQ_ENVELOPE_REMOVED)
			return &rq->q_removed;
		if (evp->type == D_MTA)
			return &evp->message->q_mta;
		if (evp->type == D_MDA)
			return &rq->q_mda;
		if (evp->type == D_BOUNCE)
			return &rq->q_bounce;
	}

	if (evp->flags & RQ_ENVELOPE_PENDING)
		return &rq->q_pending;

	if (evp->flags & RQ_ENVELOPE_INFLIGHT)
		return &rq->q_inflight;

	return (NULL);
}

static void
rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp)
{
	struct evplist	*q = NULL;

	if (evp->type == D_MTA) {
		if (TAILQ_EMPTY(&evp->message->q_mta)) {
			evp->message->q_next = rq->q_mtabatch;
			rq->q_mtabatch = evp->message;
		}
		q = &evp->message->q_mta;
	}
	else if (evp->type == D_MDA)
		q = &rq->q_mda;
	else if (evp->type == D_BOUNCE)
		q = &rq->q_bounce;

	TAILQ_REMOVE(&rq->q_pending, evp, entry);
	TAILQ_INSERT_TAIL(q, evp, entry);
	evp->flags &= ~RQ_ENVELOPE_PENDING;
	evp->flags |= RQ_ENVELOPE_SCHEDULED;
	evp->t_scheduled = currtime;
}

static int
rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp)
{
	struct rq_message	*m;
	struct evplist		*q = NULL;

	if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
		return (0);
	/*
	 * For now we just ignore it, but we could mark the envelope for
	 * removal and possibly send a cancellation to the agent.
	 */
	if (evp->flags & (RQ_ENVELOPE_INFLIGHT))
		return (0);

	q = rq_envelope_list(rq, evp);

	TAILQ_REMOVE(q, evp, entry);
	TAILQ_INSERT_TAIL(&rq->q_removed, evp, entry);
	evp->flags &= ~RQ_ENVELOPE_PENDING;
	evp->flags |= RQ_ENVELOPE_REMOVED;
	evp->flags |= RQ_ENVELOPE_SCHEDULED;
	evp->t_scheduled = currtime;

	/*
	 * We might need to unschedule the message if it was the only
	 * scheduled envelope
	 */
	if (q == &evp->message->q_mta && TAILQ_EMPTY(q)) {
		if (rq->q_mtabatch == evp->message)
			rq->q_mtabatch = evp->message->q_next;
		else {
			for (m = rq->q_mtabatch; m->q_next; m = m->q_next)
				if (m->q_next == evp->message) {
					m->q_next = evp->message->q_next;
					break;
				}
		}
		evp->message->q_next = NULL;
	}

	return (1);
}

static int
rq_envelope_suspend(struct rq_queue *rq, struct rq_envelope *evp)
{
	if (evp->flags & RQ_ENVELOPE_SUSPEND)
		return (0);

	if (!(evp->flags & RQ_ENVELOPE_INFLIGHT))
		TAILQ_REMOVE(rq_envelope_list(rq, evp), evp, entry);

	evp->flags |= RQ_ENVELOPE_SUSPEND;

	return (1);
}

static int
rq_envelope_resume(struct rq_queue *rq, struct rq_envelope *evp)
{
	if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
		return (0);

	if (!(evp->flags & RQ_ENVELOPE_INFLIGHT))
		sorted_insert(rq_envelope_list(rq, evp), evp);

	evp->flags &= ~RQ_ENVELOPE_SUSPEND;
	return (1);
}

static void
rq_envelope_delete(struct rq_queue *rq, struct rq_envelope *evp)
{
	tree_xpop(&evp->message->envelopes, evp->evpid);
	if (tree_empty(&evp->message->envelopes)) {
		tree_xpop(&rq->messages, evp->message->msgid);
		free(evp->message);
		stat_decrement("scheduler.ramqueue.message", 1);
	}

	free(evp);
	rq->evpcount--;
	stat_decrement("scheduler.ramqueue.envelope", 1);
}

static const char *
rq_envelope_to_text(struct rq_envelope *e)
{
	static char	buf[256];
	char		t[64];

	snprintf(buf, sizeof buf, "evp:%016" PRIx64 " [", e->evpid);

	if (e->type == D_BOUNCE)
		strlcat(buf, "bounce", sizeof buf);
	else if (e->type == D_MDA)
		strlcat(buf, "mda", sizeof buf);
	else if (e->type == D_MTA)
		strlcat(buf, "mta", sizeof buf);

	snprintf(t, sizeof t, ",expire=%s",
	    duration_to_text(e->expire - currtime));
	strlcat(buf, t, sizeof buf);

	if (e->flags & RQ_ENVELOPE_PENDING) {
		snprintf(t, sizeof t, ",pending=%s",
		    duration_to_text(e->sched - currtime));
		strlcat(buf, t, sizeof buf);
	}
	if (e->flags & RQ_ENVELOPE_SCHEDULED) {
		snprintf(t, sizeof t, ",scheduled=%s",
		    duration_to_text(currtime - e->t_scheduled));
		strlcat(buf, t, sizeof buf);
	}
	if (e->flags & RQ_ENVELOPE_INFLIGHT) {
		snprintf(t, sizeof t, ",inflight=%s",
		    duration_to_text(currtime - e->t_inflight));
		strlcat(buf, t, sizeof buf);
	}
	if (e->flags & RQ_ENVELOPE_REMOVED)
		strlcat(buf, ",removed", sizeof buf);
	if (e->flags & RQ_ENVELOPE_EXPIRED)
		strlcat(buf, ",expired", sizeof buf);
	if (e->flags & RQ_ENVELOPE_SUSPEND)
		strlcat(buf, ",suspended", sizeof buf);

	strlcat(buf, "]", sizeof buf);

	return (buf);
}

static void
rq_queue_dump(struct rq_queue *rq, const char * name)
{
	struct rq_message	*message;
	struct rq_envelope	*envelope;
	void			*i, *j;
	uint64_t		 id;

	log_debug("debug: /--- ramqueue: %s", name);

	i = NULL;
	while ((tree_iter(&rq->messages, &i, &id, (void*)&message))) {
		log_debug("debug: | msg:%08" PRIx32, message->msgid);
		j = NULL;
		while ((tree_iter(&message->envelopes, &j, &id,
		    (void*)&envelope)))
			log_debug("debug: |   %s",
			    rq_envelope_to_text(envelope));
	}
	log_debug("debug: \\---");
}