Our message transport service needs an abstract type of message queues to support reliable delivery. The message queue holds messages that have been sent (to a particular host) but whose delivery has not yet been acknowledged.

Here is the interface for message queues:

module message_queue(net_msg,seq_num) = {

action enqueue(msg:net_msg.t) returns (ok:bool)
action empty returns (res:bool)
action pick_one returns (res:net_msg.t)
action delete_all(seq:seq_num)

...

The module takes a type net_msg of messages and type seq_num of sequences numbers as parameters. It assumes that seq_num has an iterator type and that net_msg has a destructor num that yields the sequence number of a message.

The enqueue action adds a message to the queue, returning true if the action was successful. This allows the implementation the flexibility to fail if there are insufficient resources. The empty action returns true if the queue is empty, pick_one returns some message from a non-empty queue and delete_all removes all of the messages with sequence number less than or equal to seq_num.

The abstract state of the queue is held by the predicate contents which represents the set of all messages currently in the queue:

relation contents(M:net_msg.t)

Here is the specification of the interface:

object spec = {
    after init {
            contents(M) := false
        }

    before enqueue {
	assert contents(X) -> net_msg.num(X) ~= net_msg.num(msg)
    }

    after enqueue {
	if ok {
	    contents(msg) := true
	}
    }

    after empty returns (res:bool) {
	call impl.spec.lemma;
	assert contents(M) -> ~res;
	assert ~res -> exists M. contents(M)
    }

    before pick_one {
	assert exists M. contents(M);
	call impl.spec.lemma
    }

    after pick_one {
	assert contents(res)
    }

    before delete_all  {
	contents(M) := contents(M) & ~(net_msg.num(M) <= seq)
    }
}

Nootice the the contents predicate is updated in the after specification of enqueue. This is because the update depends on the return value ok. Also, we are not allow to enqueue a message with the same sequence number as a message already in the queue, and pick_one has the pre-condition that the queue is not empty.

We can implement this specification in a number of ways. Here is a simple implementation that uses an ordered map from sequence numbers to messages:

object impl = {

instance imap : ordered_map(seq_num,net_msg.t)

implement enqueue {
    call imap.set(net_msg.num(msg),msg);
    ok := true
}

implement empty {
    res := seq_num.iter.is_end(imap.lub(seq_num.iter.create(0)))
}

implement delete_all(seq:seq_num.t) {
    call imap.erase(seq_num.iter.create(0),seq_num.iter.create(seq_num.next(seq)))
}

implement pick_one {
    res := imap.get(seq_num.iter.val(imap.lub(seq_num.iter.create(0))))
}

conjecture imap.maps(X,Y) -> X = net_msg.num(Y)
conjecture contents(Y) <-> imap.maps(net_msg.num(Y),Y)

}

Since we do not bound the size of the queue, enqueue always returns true. It just maps the message’s sequence number to the mesage. The delete_all implementation erases all the messages with numbers in the range [0,seq+1). The pick_one action actually returns the message in the queue with the least serial number. This is a good choice from the point of view of progress, since repeatedly sending this message will be guaranteed to eventually produce an ack. We will not, however, prove progress.

The two invariant conjectures show are enough to prove the implementation. The first says that sequence number X always maps to a message with number X. The second is essentially the representation function. It says that the contents of the queue is the set of message such that are mapped to by their sequence number.