You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
140 lines
5.8 KiB
140 lines
5.8 KiB
;; Attempt at a Paxos implementation in Common Lisp (!) |
|
|
|
(in-package :nea/paxos) |
|
|
|
(defvar *outbox* nil |
|
"Name of the outbox to be used for outgoing Paxos messages.") |
|
|
|
(defparameter *n-quorum* 1 |
|
"Number of nodes that make up a quorum.") |
|
|
|
(defun paxos-add-to-outbox (outbox message) |
|
"Puts a message into outbox OUTBOX, and logs it." |
|
(format t "(~S) --> ~S~%" outbox message) |
|
(push message (symbol-value outbox))) |
|
|
|
(defun paxos-send (message) |
|
"Calls PAXOS-ADD-TO-OUTBOX with the value of the dynamic *outbox* variable." |
|
(if (eq *outbox* nil) |
|
(error "PAXOS-SEND called with no defined outbox.") |
|
(paxos-add-to-outbox *outbox* message))) |
|
|
|
(defun paxos-acceptor-listening-step (highest-id promise incoming-message) |
|
"Called when a message is received for a Paxos acceptor in the Listening state." |
|
(let* ((tag (car incoming-message)) |
|
(args (cdr incoming-message)) |
|
(new-id (car args)) |
|
(default-retval (cons 'cont (lambda (m) (paxos-acceptor-listening-step highest-id promise m))))) |
|
(cond |
|
((eq tag 'prepare) (if (> new-id highest-id) |
|
(progn |
|
(paxos-send (list 'promise new-id)) |
|
(cons 'cont (lambda (m) (paxos-acceptor-listening-step new-id args m)))) |
|
(progn |
|
(if promise |
|
(paxos-send (append (list 'promise-instead new-id) promise)) |
|
(paxos-send (list 'prepare-reject new-id highest-id))) |
|
default-retval))) |
|
((and (eq tag 'propose) (eq new-id (car promise))) (progn |
|
(paxos-send (list 'accepted promise)) |
|
(cons 'done promise))) |
|
((eq tag 'propose) (progn |
|
(paxos-send (list 'propose-reject new-id)) |
|
default-retval)) |
|
(t default-retval) |
|
))) |
|
|
|
(defun paxos-acceptor-make (highest-id) |
|
"Make a new Paxos acceptor. Returns a step function." |
|
(lambda (m) (paxos-acceptor-listening-step highest-id nil m))) |
|
|
|
(defun paxos-proposer-find-value (orig-id orig-value responses) |
|
"Given a set of responses to a Prepare message, figure out what value we should try and propose." |
|
(let ((ret (list orig-id orig-value))) |
|
;; For each response... |
|
(loop for item in responses |
|
;; ...if it's a "propose this instead", and the ID is higher than the current ID... |
|
do (when (and (eq (car item) 'instead) (> (cadr item) (car ret))) |
|
;; ...then use that one |
|
(setf ret (cdr item)))) |
|
ret)) |
|
|
|
(defun paxos-proposer-proposing-step (id n-responses incoming-message) |
|
"Called when a message is received for a Paxos proposer in the Proposing state." |
|
(let ((tag (car incoming-message)) |
|
(args (cdr incoming-message))) |
|
(cond |
|
((not (eq (car args) id)) '()) |
|
((eq tag 'propose-reject) (return-from paxos-proposer-proposing-step (cons 'done 'rejected))) |
|
((eq tag 'accepted) (incf n-responses)) |
|
(t '())) |
|
(if (>= n-responses *n-quorum*) |
|
(cons 'done 'accepted) |
|
(cons 'cont (lambda (m) (paxos-proposer-proposing-step id n-responses m)))))) |
|
|
|
(defun paxos-proposer-preparing-step (id value responses incoming-message) |
|
"Called when a message is received for a Paxos proposer in the Preparing state. Returns a step function." |
|
(let ((tag (car incoming-message)) |
|
(args (cdr incoming-message))) |
|
(cond |
|
((not (eq (car args) id)) '()) |
|
((eq tag 'prepare-reject) (return-from paxos-proposer-preparing-step (cons 'done 'rejected))) |
|
((eq tag 'promise) (push '(empty) responses)) |
|
((eq tag 'promise-instead) (push (cons 'instead (cdr args)) responses)) |
|
(t '()))) |
|
;; did I receive Promise messages from a quorum of Acceptors? |
|
(cons 'cont |
|
(if (>= (length responses) *n-quorum*) |
|
(let ((id-and-value (paxos-proposer-find-value id value responses))) |
|
;; Propose this new value. |
|
(paxos-send (cons 'propose id-and-value)) |
|
(lambda (m) (paxos-proposer-proposing-step id 0 m)) |
|
) |
|
(lambda (m) (paxos-proposer-preparing-step id value responses m))))) |
|
|
|
(defun paxos-proposer-propose (id value) |
|
"Start a Paxos proposal. Returns a step function." |
|
(paxos-send (list 'prepare id value)) |
|
(lambda (m) (paxos-proposer-preparing-step id value '() m))) |
|
|
|
(defmacro paxos-step (place outbox message) |
|
"Execute a step function at PLACE, passing it the given MESSAGE using the given OUTBOX." |
|
(let ((ret-name (gensym))) |
|
`(let* ((*outbox* (quote ,outbox)) (,ret-name (funcall ,place ,message))) |
|
(if (eq (car ,ret-name) 'cont) |
|
(setf ,place (cdr ,ret-name)) |
|
(cdr ,ret-name))))) |
|
|
|
(defvar *proposer-outbox* ()) |
|
(defvar *acceptor-outbox* ()) |
|
(defvar *proposer*) |
|
(defvar *acceptor*) |
|
|
|
(defun paxos-proposer-loop () |
|
(dolist (msg *acceptor-outbox*) |
|
(paxos-step *proposer* *proposer-outbox* msg)) |
|
(let ((ret *acceptor-outbox*)) |
|
(setf *acceptor-outbox* nil) |
|
ret)) |
|
(setf *outbox* nil) |
|
(defun paxos-acceptor-loop () |
|
(dolist (msg *proposer-outbox*) |
|
(paxos-step *acceptor* *acceptor-outbox* msg)) |
|
(let ((ret *proposer-outbox*)) |
|
(setf *proposer-outbox* nil) |
|
ret)) |
|
|
|
(defun paxos-test () |
|
(trace paxos-acceptor-listening-step) |
|
(trace paxos-proposer-proposing-step) |
|
(trace paxos-proposer-preparing-step) |
|
(trace paxos-acceptor-make) |
|
(trace paxos-proposer-propose) |
|
(trace paxos-proposer-loop) |
|
(trace paxos-acceptor-loop) |
|
(setf *proposer-outbox* ()) |
|
(setf *acceptor-outbox* ()) |
|
(setf *proposer* (let ((*outbox* '*proposer-outbox*)) (paxos-proposer-propose 1 'testing))) |
|
(setf *acceptor* (paxos-acceptor-make 0)) |
|
(loop while (or (paxos-acceptor-loop) (paxos-proposer-loop))) |
|
)
|
|
|