Browse Source

more redis, train archiving

master
eta 3 months ago
parent
commit
ebaa19efc5
  1. 1
      .gitignore
  2. 214
      trackernet.lisp

1
.gitignore

@ -5,3 +5,4 @@ intertube-scraper
*.xml
*#
*.csv
trains*/*

214
trackernet.lisp

@ -144,6 +144,20 @@
:reader leading-car-no
:initform nil)))
(defmethod print-object ((obj trackernet-train) stream)
(print-unreadable-object (obj stream :type t)
(with-slots (train-id lcid set-no trip-no track-code location-desc destination-desc) obj
(format
stream
"~A (l=~A/s=~A/t=~A) ~A (~A) to ~A"
(or train-id "???")
(or lcid "???")
set-no
trip-no
(or location-desc "<somewhere>")
(or track-code "???")
(or destination-desc "<somewhere>")))))
(conspack:defencoding trackernet-train
train-id lcid set-no trip-no seconds-to time-to location-desc
destination-desc dest-code order depart-time depart-interval
@ -365,6 +379,12 @@
(close stream)))))
(defparameter *trackernet-predictions-dir* "./predictions/")
(defparameter *trackernet-trains-archival-dir* "./trains/")
(defparameter *trackernet-scrape-interval* 2.5)
(defparameter *trackernet-kill-switch* nil)
(defparameter *prediction-expiry-secs* 60)
(defparameter *train-active-expiry-secs* 120)
(defparameter *rude-requests-per-minute* 300)
(defun make-trackernet-filename (pred)
(with-slots (line-code created-ts stations) pred
@ -392,13 +412,9 @@
line-code (code (first stations)))))
;; start a transaction
(red:multi)
;; write the complete encoded prediction object
(conspack-put filename pred)
;; add the prediction to that station's prediction list,
;; ordered by time
(red:zadd (format nil "~A-predictions" code-station)
universal-ts
filename)
;; write out the raw prediction object
(red:setex filename *prediction-expiry-secs*
(cpk-base64 pred))
;; delete the list of trains for that station
(red:del (format nil "~A-trains" code-station))
(loop
@ -416,22 +432,100 @@
(format nil "~A-train-~A"
line-code train-id))
;; mark this train as active
(red:setex (format nil "~A-train-active-~A"
(red:setex (format nil "~A-active-~A"
line-code train-id)
70
*train-active-expiry-secs*
"yep")
;; add the actual train data to the ordered set
;; of data about the train
(red:zadd (format nil "~A-train-~A"
line-code train-id)
universal-ts
(qbase64:encode-bytes
(cpk:encode train))))))
;; include the station the train is from
;; for things like secondsto
(cpk-base64 (list code-station train))))))
;; finish the transaction
(red:exec))))))
(defparameter *trackernet-scrape-interval* 2)
(defparameter *trackernet-kill-switch* nil)
(red:exec)
(length (get-trains pred)))))))
(defun redis-last-score (key)
"Gets the score of the last element of the sorted set with the Redis KEY."
(let ((score (second
(red:zrevrangebyscore key "+inf" "-inf"
:limit (cons "0" "1")
:withscores t))))
(when score
(parse-integer score))))
(defun redis-cpk-sorted-set-all (key)
"Gets all elements of the sorted set KEY, alongside their scores, CONSPACK-decoding each element."
(let ((data
(red:zrangebyscore key "-inf" "+inf"
:withscores t)))
(loop
with elt
with score
while (setf elt (pop data))
do (setf score (parse-integer (pop data)))
collect (cons score (cpk-unbase64 elt)))))
(defun get-archivable-trains ()
"Returns a list of all trains that were last updated more than *TRAIN-ACTIVE-EXPIRY-SECS* ago."
(let* ((trains (get-all "D-train-*"))
(cutoff (- (get-universal-time) *train-active-expiry-secs*)))
(delete-if-not (lambda (train)
(< (redis-last-score train) cutoff))
trains)))
(defun get-iso-8601-date ()
"Returns the current date, in %Y-%m-%d format."
(local-time:format-timestring
nil
(local-time:now)
:format '((:year 4) #\- (:month 2) #\- (:day 2))))
(defun archive-trains (keys)
"Archive trains from KEYS, a list of Redis train sorted set keys."
(with-open-file (out
(format nil "~A~A.dat"
*trackernet-trains-archival-dir*
(get-iso-8601-date))
:direction :output
:if-does-not-exist :create
:if-exists :append)
(let ((trains-data (mapcar
(lambda (key)
(append
(list (redis-last-score key) key
(redis-cpk-sorted-set-all key))))
keys)))
(loop
for train in trains-data
do (let ((last-reported-station
(second (car (last (car (last train)))))))
;; if it didn't end at a terminus, then there's probably
;; a reporting gap
(unless (member last-reported-station *termini*
:test #'string=)
(statsd:inc "intertube.archived-early")
(format *error-output*
"~&archiver: WARNING: train ~A last reported at ~A~%"
(second train)
last-reported-station)))
do (progn
(terpri out)
(princ (cpk-base64 train) out))))))
(defun unarchive-trains (path)
"Read trains from the archive at PATH."
(with-open-file (in path
:direction :input)
(loop
for line = (read-line in nil)
while line
with parsed
when (ignore-errors (setf parsed (cpk-unbase64 line)))
collect parsed)))
(defun scraper-loop (line-code station-code)
"Scrape the station with the given codes every *TRACKERNET-SCRAPE-INTERVAL* in a loop, calling MAYBE-WRITE-TRACKERNET-PREDICTION with each scrape result.
@ -449,22 +543,15 @@ Stop if *TRACKERNET-KILL-SWITCH* is set to T."
(let ((pred (fetch-trackernet-prediction line-code station-code)))
(statsd:inc (format nil "scraper.~A-~A.scraped" line-code station-code))
(statsd:inc "intertube.scraped-total")
(when (maybe-redis-trackernet-prediction pred)
(statsd:inc (format nil "scraper.~A-~A.new" line-code station-code))
(format t "~&scraper(~A-~A): new: ~A~%"
line-code station-code pred)))
(redis:redis-connection-error (e)
(format *error-output* "~&scraper(~A-~A): redis failed: ~A~%"
line-code station-code e)
(handler-bind
((redis:redis-connection-error
(lambda (c)
(declare (ignore c))
(invoke-restart 'reconnect))))
(redis:connect)))
(let ((num-trains (maybe-redis-trackernet-prediction pred)))
(when num-trains
(statsd:gauge (format nil "scraper.~A-~A.trains" line-code station-code) num-trains)
(statsd:inc (format nil "scraper.~A-~A.new" line-code station-code))
(format t "~&scraper(~A-~A): new: ~A (~A trains)~%"
line-code station-code pred num-trains))))
(error (e)
(statsd:inc (format nil "scraper.~A-~A.errors" line-code station-code))
(format t "~&scraper(~A-~A): error: ~A~%"
(format *error-output* "~&scraper(~A-~A): error: ~A~%"
line-code station-code e)))
(go :sleep)
:end)))
@ -481,7 +568,27 @@ Stop if *TRACKERNET-KILL-SWITCH* is set to T."
("V" "VUX")))
(defparameter *district-codes*
'(("D" "ECT") ("D" "UPM") ("D" "MON") ("D" "BKG") ("D" "WDN") ("D" "ERD") ("D" "RMD") ("D" "EBY") ("D" "OLY")))
'(("D" "UPM") ; Upminster (terminus)
("D" "BKG") ; Barking (between UPM and THL)
("D" "THL") ; Tower Hill (bay platform)
("D" "TEM") ; Temple
("D" "ECT") ; Earl's Court
("D" "CHP") ; Chiswick Park
("D" "EBY") ; Ealing Broadway
("D" "OLY") ; Kensington (Olympia)
("D" "ERD") ; Edgware Road
("D" "WDN") ; Wimbledon
("D" "RMD")) ; Richmond
"Station codes to scrape for the District Line.")
(defparameter *termini*
'("D-UPM" ; Upminster
"D-EBY" ; Ealing Broadway
"D-OLY" ; Kensington (Olympia)
"D-ERD" ; Edgware Road
"D-RMD" ; Richmond
"D-WDN") ; Wimbledon
"Terminus stations on all lines.")
(defun statsd-reporter-loop (line-code)
(redis:with-connection ()
@ -491,6 +598,30 @@ Stop if *TRACKERNET-KILL-SWITCH* is set to T."
(length (trains-active-on-line line-code)))
do (sleep 1))))
(defun archive-expired-trains ()
"Archive all trains that have expired."
(let ((trains (get-archivable-trains)))
(unless (null trains)
(format t "~&archiver: ~A archivable trains~%"
(length trains))
(handler-case
(progn
(archive-trains trains)
(statsd:counter "intertube.archived" (length trains))
(mapc #'red:del trains))
(error (e)
(format *error-output* "~&archiver: failed: ~A~%" e))))))
(defun archiver-loop ()
(redis:with-connection ()
;; Let some scrapes happen, so we don't archive stuff
;; after a restart
(sleep (* 3 *trackernet-scrape-interval*))
(loop
while (not *trackernet-kill-switch*)
do (archive-expired-trains)
do (sleep 1))))
(defun calculate-requests-per-minute (num-scrapes)
(* num-scrapes (/ 60 *trackernet-scrape-interval*)))
@ -501,18 +632,30 @@ Stop if *TRACKERNET-KILL-SWITCH* is set to T."
:name (format nil "statsd reporter loop for ~A" line-code)
:arguments (list line-code)))
(defun start-archiver-loop ()
(sb-thread:make-thread
#'archiver-loop
:name "archiver loop"))
(defun kick-off-scrapers (codes)
(let ((rpm (calculate-requests-per-minute
(length codes))))
(when (> rpm *rude-requests-per-minute*)
(error "~A requests/min is a bit rude" rpm))
(format t "~&starting ~A scrapers, ~A requests/min"
(length codes) rpm))
(setf *trackernet-kill-switch* nil)
(unless (and (boundp 'statsd:*client*)
statsd:*client*)
(setf statsd:*client* (statsd:make-sync-client)))
(start-archiver-loop)
(loop
for (lc sc) in codes
do (sb-thread:make-thread
#'scraper-loop
:name (format nil "scraper loop for ~A-~A" lc sc)
:arguments (list lc sc))
do (sleep (/ *trackernet-scrape-interval* 10))))
do (sleep (/ *trackernet-scrape-interval* (length codes)))))
(defun kick-off-scrapers* ()
(kick-off-scrapers *codes-to-scrape*))
@ -606,6 +749,13 @@ Stop if *TRACKERNET-KILL-SWITCH* is set to T."
do (let ((pred (first (conspack:decode-file file))))
(extract-train-secondsfor-data pred))))
(defun cpk-base64 (object)
(qbase64:encode-bytes
(cpk:encode object)))
(defun cpk-unbase64 (data)
(cpk:decode (subseq (qbase64:decode-string data) 0)))
(defun conspack-put (key object)
"Store OBJECT using CONSPACK encoding under the given Redis KEY."
(red:set key
@ -630,5 +780,5 @@ Stop if *TRACKERNET-KILL-SWITCH* is set to T."
(defun trains-active-on-line (line-code)
"Returns a list of all trains active on the given LINE-CODE right now."
(get-all (format nil "~A-train-active-*"
(get-all (format nil "~A-active-*"
line-code)))
Loading…
Cancel
Save