Browse Source

put trains in tar files instead, improve logging, refactor

master
eta 3 months ago
parent
commit
60a5a1c6eb
  1. 2
      deploy.txt
  2. 209
      trackernet.lisp

2
deploy.txt

@ -1,4 +1,4 @@
(ql:quickload '(drakma cl-conspack cxml cl-statsd cl-redis qbase64))
(ql:quickload '(drakma cl-conspack cxml cl-statsd cl-redis qbase64 archive))
(load "trackernet.lisp")
(sb-ext:save-lisp-and-die "./intertube-scraper" :toplevel #'trackernet::main :executable t)

209
trackernet.lisp

@ -146,14 +146,15 @@
(defmethod print-object ((obj trackernet-train) stream)
(print-unreadable-object (obj stream :type t)
(with-slots (train-id lcid set-no trip-no track-code location-desc destination-desc) obj
(with-slots (train-id lcid set-no trip-no track-code location-desc destination-desc leading-car-no) obj
(format
stream
"~A (l=~A/s=~A/t=~A) ~A (~A) to ~A"
"~A (l=~A/s=~A/t=~A/n=~A) ~A (~A) to ~A"
(or train-id "???")
(or lcid "???")
set-no
trip-no
(or leading-car-no "?")
(or location-desc "<somewhere>")
(or track-code "???")
(or destination-desc "<somewhere>")))))
@ -383,7 +384,8 @@
(defparameter *trackernet-scrape-interval* 2.5)
(defparameter *trackernet-kill-switch* nil)
(defparameter *prediction-expiry-secs* 60)
(defparameter *train-active-expiry-secs* 120)
(defparameter *train-active-expiry-secs* 60)
(defparameter *train-set-code-expiry-secs* 120)
(defparameter *rude-requests-per-minute* 300)
(defun make-trackernet-filename (pred)
@ -403,6 +405,77 @@
(unless (probe-file pathname)
(conspack:encode-to-file pred pathname))))
(defun redis-train (universal-ts line-code code-station train)
(with-slots (train-id set-no trip-no lcid seconds-to track-code location-desc) train
(when (and (or lcid train-id) seconds-to track-code)
(let ((tid (or (if (and lcid (string= lcid "0"))
nil
lcid)
(if (and set-no (string= set-no "000"))
nil
(format nil "set~A-t~A" set-no trip-no))
(format nil "tid~A" train-id))))
;; get a mapping of seconds to -> track code
(red:zadd (format nil "~A-secs-to-track-code"
code-station)
seconds-to
track-code)
;; get a mapping of track code -> description
(when location-desc
(red:set (format nil "~A-track-desc-~A"
line-code track-code)
location-desc))
;; add this train into the new list of trains for
;; the station
(red:sadd (format nil "~A-trains-new" code-station)
(format nil "~A-train-~A"
line-code tid))
;; log the new train
(unless (red:exists (format nil "~A-active-~A"
line-code tid))
(format t "~&scraper(~A): new ~A: ~A~%"
code-station
tid
train))
;; mark this train as active
(red:setex (format nil "~A-active-~A"
line-code tid)
*train-active-expiry-secs*
"yep")
(unless (string= set-no "000")
;; check whether the train's set number was previously
;; used by a train with a different id
(let ((set-code-train-id
(red:get
(format nil "~A-set-~A-trip-~A"
line-code set-no trip-no))))
(when (and set-code-train-id
(not (string= set-code-train-id
tid)))
;; if so, it might be the same train
(format
*error-output*
"scraper(~A): WARNING: set code ~A (~A) changed: ~A -> ~A~%"
code-station
set-no
trip-no
set-code-train-id
tid)
(statsd:inc "intertube.set-code-change")))
;; reserve the set number
(red:setex (format nil "~A-set-~A-trip-~A"
line-code set-no trip-no)
*train-set-code-expiry-secs*
tid))
;; add the actual train data to the ordered set
;; of data about the train
(red:zadd (format nil "~A-train-~A"
line-code tid)
universal-ts
;; include the station the train is from
;; for things like secondsto
(cpk-base64 (list code-station train)))))))
(defun maybe-redis-trackernet-prediction (pred)
(multiple-value-bind (filename universal-ts)
(make-trackernet-filename pred)
@ -410,42 +483,19 @@
(unless (red:exists filename)
(let ((code-station (format nil "~A-~A"
line-code (code (first stations)))))
;; start a transaction
(red:multi)
;; write out the raw prediction object
(red:setex filename *prediction-expiry-secs*
(cpk-base64 pred))
;; delete the list of trains for that station
(red:del (format nil "~A-trains" code-station))
(loop
for train in (get-trains pred)
do (with-slots (train-id seconds-to track-code) train
(when (and train-id seconds-to track-code)
;; get a mapping of seconds to -> track code
(red:zadd (format nil "~A-secs-to-track-code"
code-station)
seconds-to
track-code)
;; add this train into the list of trains for
;; the station
(red:sadd (format nil "~A-trains" code-station)
(format nil "~A-train-~A"
line-code train-id))
;; mark this train as active
(red:setex (format nil "~A-active-~A"
line-code train-id)
*train-active-expiry-secs*
"yep")
;; add the actual train data to the ordered set
;; of data about the train
(red:zadd (format nil "~A-train-~A"
line-code train-id)
universal-ts
;; include the station the train is from
;; for things like secondsto
(cpk-base64 (list code-station train))))))
;; finish the transaction
(red:exec)
do (redis-train universal-ts line-code code-station train))
(if (> (length (get-trains pred)) 0)
;; swap out the new list of trains for the station
;; for the old list
(red:rename (format nil "~A-trains-new" code-station)
(format nil "~A-trains" code-station))
;; no trains; just delete old list
(red:del (format nil "~A-trains" code-station)))
(length (get-trains pred)))))))
(defun redis-last-score (key)
@ -484,6 +534,67 @@
(local-time:now)
:format '((:year 4) #\- (:month 2) #\- (:day 2))))
(defun make-fake-archive-entry (filename size)
"Make a fake tar header for a file with name FILENAME and size SIZE (bytes)."
(make-instance 'archive::tar-entry
:pathname filename
:mode archive::+permissions-mask+
:typeflag archive::+tar-regular-file+
:uid 0
:gid 0
:size size
:mtime 0))
(defun conspack-encode-to-archive (archive name data)
"Encode DATA into ARCHIVE in CONSPACK format, using the given NAME as filename."
(let* ((data (cpk:encode data))
(stream (flexi-streams:make-in-memory-input-stream data))
(header (make-fake-archive-entry name (length data))))
(archive:write-entry-to-archive archive header :stream stream)))
(defun archive-test (name data)
(archive:with-open-archive (ar "./test.tar"
:direction :output
:if-does-not-exist :create
:if-exists :append)
(conspack-encode-to-archive ar name data)))
(defun archive-trains-tar (keys)
"Archive trains from KEYS, a list of Redis train sorted set keys."
(archive:with-open-archive (tar
(format nil "~A~A.tar"
*trackernet-trains-archival-dir*
(get-iso-8601-date))
:direction :output
:if-does-not-exist :create
:if-exists :append)
(let ((trains-data (mapcar
(lambda (key)
(append
(list (redis-last-score key) key
(redis-cpk-sorted-set-all key))))
keys)))
(loop
for train in trains-data
do (let ((last-data
(third (car (last (car (last train)))))))
(format t "~&archiver: archiving ~A: ~A~%" (second train) last-data))
do (conspack-encode-to-archive tar
(format nil "~A-~A.trn"
(first train)
(second train))
train)))))
(defun unarchive-trains-tar (path func)
"Read trains from the archive at PATH, running FUNC on each decoded train object."
(ignore-errors ; The archive probably isn't going to be terminated
(archive:with-open-archive (tar path
:direction :input)
(archive:do-archive-entries (entry tar)
(let ((buf (flexi-streams:make-in-memory-output-stream)))
(archive::transfer-entry-data-to-stream tar entry buf)
(funcall func (cpk:decode (subseq (flexi-streams:get-output-stream-sequence buf) 0))))))))
(defun archive-trains (keys)
"Archive trains from KEYS, a list of Redis train sorted set keys."
(with-open-file (out
@ -527,6 +638,25 @@
when (ignore-errors (setf parsed (cpk-unbase64 line)))
collect parsed)))
(defun convert-archive-format (old new)
"Read old-style base64 train archive data from OLD and stuff it into a new archive at NEW."
(archive:with-open-archive (tar new
:direction :output
:if-does-not-exist :create)
(with-open-file (in old
:direction :input)
(loop
for line = (read-line in nil)
while line
with parsed
when (ignore-errors (setf parsed (cpk-unbase64 line)))
do (conspack-encode-to-archive
tar
(format nil "~A-~A.trn"
(first parsed)
(second parsed))
parsed)))))
(defun scraper-loop (line-code station-code)
"Scrape the station with the given codes every *TRACKERNET-SCRAPE-INTERVAL* in a loop, calling MAYBE-WRITE-TRACKERNET-PREDICTION with each scrape result.
Stop if *TRACKERNET-KILL-SWITCH* is set to T."
@ -546,9 +676,9 @@ Stop if *TRACKERNET-KILL-SWITCH* is set to T."
(let ((num-trains (maybe-redis-trackernet-prediction pred)))
(when num-trains
(statsd:gauge (format nil "scraper.~A-~A.trains" line-code station-code) num-trains)
(statsd:inc (format nil "scraper.~A-~A.new" line-code station-code))
(format t "~&scraper(~A-~A): new: ~A (~A trains)~%"
line-code station-code pred num-trains))))
(statsd:inc (format nil "scraper.~A-~A.new" line-code station-code)))))
;; (format t "~&scraper(~A-~A): new: ~A (~A trains)~%"
;; line-code station-code pred num-trains))))
(error (e)
(statsd:inc (format nil "scraper.~A-~A.errors" line-code station-code))
(format *error-output* "~&scraper(~A-~A): error: ~A~%"
@ -584,6 +714,7 @@ Stop if *TRACKERNET-KILL-SWITCH* is set to T."
(defparameter *termini*
'("D-UPM" ; Upminster
"D-EBY" ; Ealing Broadway
"D-THL" ; Tower Hill (bay platform)
"D-OLY" ; Kensington (Olympia)
"D-ERD" ; Edgware Road
"D-RMD" ; Richmond
@ -606,7 +737,7 @@ Stop if *TRACKERNET-KILL-SWITCH* is set to T."
(length trains))
(handler-case
(progn
(archive-trains trains)
(archive-trains-tar trains)
(statsd:counter "intertube.archived" (length trains))
(mapc #'red:del trains))
(error (e)
@ -642,7 +773,7 @@ Stop if *TRACKERNET-KILL-SWITCH* is set to T."
(length codes))))
(when (> rpm *rude-requests-per-minute*)
(error "~A requests/min is a bit rude" rpm))
(format t "~&starting ~A scrapers, ~A requests/min"
(format t "~&starting ~A scrapers, ~A requests/min...~%"
(length codes) rpm))
(setf *trackernet-kill-switch* nil)
(unless (and (boundp 'statsd:*client*)

Loading…
Cancel
Save