You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

140 lines
5.8 KiB

;; Attempt at a Paxos implementation in Common Lisp (!)
(in-package :nea/paxos)
(defvar *outbox* nil
"Name of the outbox to be used for outgoing Paxos messages.")
(defparameter *n-quorum* 1
"Number of nodes that make up a quorum.")
(defun paxos-add-to-outbox (outbox message)
"Puts a message into outbox OUTBOX, and logs it."
(format t "(~S) --> ~S~%" outbox message)
(push message (symbol-value outbox)))
(defun paxos-send (message)
"Calls PAXOS-ADD-TO-OUTBOX with the value of the dynamic *outbox* variable."
(if (eq *outbox* nil)
(error "PAXOS-SEND called with no defined outbox.")
(paxos-add-to-outbox *outbox* message)))
(defun paxos-acceptor-listening-step (highest-id promise incoming-message)
"Called when a message is received for a Paxos acceptor in the Listening state."
(let* ((tag (car incoming-message))
(args (cdr incoming-message))
(new-id (car args))
(default-retval (cons 'cont (lambda (m) (paxos-acceptor-listening-step highest-id promise m)))))
((eq tag 'prepare) (if (> new-id highest-id)
(paxos-send (list 'promise new-id))
(cons 'cont (lambda (m) (paxos-acceptor-listening-step new-id args m))))
(if promise
(paxos-send (append (list 'promise-instead new-id) promise))
(paxos-send (list 'prepare-reject new-id highest-id)))
((and (eq tag 'propose) (eq new-id (car promise))) (progn
(paxos-send (list 'accepted promise))
(cons 'done promise)))
((eq tag 'propose) (progn
(paxos-send (list 'propose-reject new-id))
(t default-retval)
(defun paxos-acceptor-make (highest-id)
"Make a new Paxos acceptor. Returns a step function."
(lambda (m) (paxos-acceptor-listening-step highest-id nil m)))
(defun paxos-proposer-find-value (orig-id orig-value responses)
"Given a set of responses to a Prepare message, figure out what value we should try and propose."
(let ((ret (list orig-id orig-value)))
;; For each response...
(loop for item in responses
;; ...if it's a "propose this instead", and the ID is higher than the current ID...
do (when (and (eq (car item) 'instead) (> (cadr item) (car ret)))
;; ...then use that one
(setf ret (cdr item))))
(defun paxos-proposer-proposing-step (id n-responses incoming-message)
"Called when a message is received for a Paxos proposer in the Proposing state."
(let ((tag (car incoming-message))
(args (cdr incoming-message)))
((not (eq (car args) id)) '())
((eq tag 'propose-reject) (return-from paxos-proposer-proposing-step (cons 'done 'rejected)))
((eq tag 'accepted) (incf n-responses))
(t '()))
(if (>= n-responses *n-quorum*)
(cons 'done 'accepted)
(cons 'cont (lambda (m) (paxos-proposer-proposing-step id n-responses m))))))
(defun paxos-proposer-preparing-step (id value responses incoming-message)
"Called when a message is received for a Paxos proposer in the Preparing state. Returns a step function."
(let ((tag (car incoming-message))
(args (cdr incoming-message)))
((not (eq (car args) id)) '())
((eq tag 'prepare-reject) (return-from paxos-proposer-preparing-step (cons 'done 'rejected)))
((eq tag 'promise) (push '(empty) responses))
((eq tag 'promise-instead) (push (cons 'instead (cdr args)) responses))
(t '())))
;; did I receive Promise messages from a quorum of Acceptors?
(cons 'cont
(if (>= (length responses) *n-quorum*)
(let ((id-and-value (paxos-proposer-find-value id value responses)))
;; Propose this new value.
(paxos-send (cons 'propose id-and-value))
(lambda (m) (paxos-proposer-proposing-step id 0 m))
(lambda (m) (paxos-proposer-preparing-step id value responses m)))))
(defun paxos-proposer-propose (id value)
"Start a Paxos proposal. Returns a step function."
(paxos-send (list 'prepare id value))
(lambda (m) (paxos-proposer-preparing-step id value '() m)))
(defmacro paxos-step (place outbox message)
"Execute a step function at PLACE, passing it the given MESSAGE using the given OUTBOX."
(let ((ret-name (gensym)))
`(let* ((*outbox* (quote ,outbox)) (,ret-name (funcall ,place ,message)))
(if (eq (car ,ret-name) 'cont)
(setf ,place (cdr ,ret-name))
(cdr ,ret-name)))))
(defvar *proposer-outbox* ())
(defvar *acceptor-outbox* ())
(defvar *proposer*)
(defvar *acceptor*)
(defun paxos-proposer-loop ()
(dolist (msg *acceptor-outbox*)
(paxos-step *proposer* *proposer-outbox* msg))
(let ((ret *acceptor-outbox*))
(setf *acceptor-outbox* nil)
(setf *outbox* nil)
(defun paxos-acceptor-loop ()
(dolist (msg *proposer-outbox*)
(paxos-step *acceptor* *acceptor-outbox* msg))
(let ((ret *proposer-outbox*))
(setf *proposer-outbox* nil)
(defun paxos-test ()
(trace paxos-acceptor-listening-step)
(trace paxos-proposer-proposing-step)
(trace paxos-proposer-preparing-step)
(trace paxos-acceptor-make)
(trace paxos-proposer-propose)
(trace paxos-proposer-loop)
(trace paxos-acceptor-loop)
(setf *proposer-outbox* ())
(setf *acceptor-outbox* ())
(setf *proposer* (let ((*outbox* '*proposer-outbox*)) (paxos-proposer-propose 1 'testing)))
(setf *acceptor* (paxos-acceptor-make 0))
(loop while (or (paxos-acceptor-loop) (paxos-proposer-loop)))