aboutsummaryrefslogtreecommitdiff
(ns com.github.ivarref.yoltq.virtual-test
  (:require
    [clojure.string :as str]
    [clojure.test :refer [deftest is use-fixtures] :refer-macros [thrown?]]
    [clojure.tools.logging :as log]
    [com.github.ivarref.yoltq :as yq]
    [com.github.ivarref.yoltq.error-poller :as error-poller]
    [com.github.ivarref.yoltq.ext-sys :as ext]
    [com.github.ivarref.yoltq.impl :as i]
    [com.github.ivarref.yoltq.migrate :as migrate]
    [com.github.ivarref.yoltq.test-queue :as tq]
    [com.github.ivarref.yoltq.test-utils :as u]
    [com.github.ivarref.yoltq.utils :as uu]
    [datomic-schema.core]
    [datomic.api :as d]
    [taoensso.nippy :as nippy]
    [taoensso.timbre :as timbre])
  (:import (java.time Duration LocalDateTime)))


(use-fixtures :each tq/call-with-virtual-queue!)


(deftest happy-case-1
  (let [conn (u/empty-conn)]
    (yq/init! {:conn conn})
    (yq/add-consumer! :q identity)
    @(d/transact conn [(yq/put :q {:work 123})])
    (is (= {:work 123} (tq/consume! :q)))))

(deftest happy-case-no-migration-for-new-entities
  (let [conn (u/empty-conn)]
    (yq/init! {:conn conn})
    (yq/add-consumer! :q identity)
    @(d/transact conn [(yq/put :q {:work 123})])
    (is (= {:work 123} (tq/consume! :q)))
    (is (= [] (migrate/migrate! @yq/*config*)))))

(deftest happy-case-tx-report-q
  (let [conn (u/empty-conn)]
    (yq/init! {:conn conn})
    (yq/add-consumer! :q identity)
    @(d/transact conn [(yq/put :q {:work 123})])
    (is (= {:work 123} (:retval (tq/run-one-report-queue!))))
    (is (= 1 (u/done-count)))))


(deftest happy-case-poller
  (let [conn (u/empty-conn)]
    (yq/init! {:conn     conn
               :handlers {:q {:f (fn [payload] payload)}}})
    (u/put-transact! :q {:work 123})
    (u/advance! (:init-backoff-time yq/default-opts))
    (is (= {:work 123} (some->> (u/get-init :q)
                                (u/take!)
                                (u/execute!)
                                :retval)))))


(deftest happy-case-queue-fn-allow-cas-failure
  (let [conn (u/empty-conn)
        invoke-count (atom 0)]
    (yq/init! {:conn conn})
    (yq/add-consumer! :q
                      (fn [{:keys [id]}]
                        (swap! invoke-count inc)
                        @(d/transact conn [[:db/cas [:e/id id] :e/val nil "janei"]]))
                      {:allow-cas-failure? #{:e/val}})
    @(d/transact conn #d/schema [[:e/id :one :string :id]
                                 [:e/val :one :string]])
    @(d/transact conn [{:e/id "demo"}
                       (yq/put :q {:id "demo"})])
    (u/advance! (:init-backoff-time yq/default-opts))
    (swap! yq/*config* assoc :mark-status-fn! (fn [_ _ new-status]
                                                (log/info "mark-status! doing nothing for new status" new-status)))
    (is (nil? (some->> (u/get-init :q)
                       (u/take!)
                       (u/execute!))))
    (swap! yq/*config* dissoc :mark-status-fn!)

    ; (mark-status! :done) failed but f succeeded.
    (is (nil? (u/get-hung :q)))
    (u/advance! (:hung-backoff-time @yq/*config*))
    (is (some? (u/get-hung :q)))
    (is (true? (some->> (u/get-hung :q)
                        (u/take!)
                        (u/execute!)
                        :allow-cas-failure?)))
    (is (= 2 @invoke-count))))


(deftest backoff-test
  (let [conn (u/empty-conn)]
    (yq/init! {:conn              conn
               :init-backoff-time (:init-backoff-time yq/default-opts)
               :handlers          {:q {:f (fn [_] (throw (ex-info "janei" {})))}}})
    (u/put-transact! :q {:work 123})
    (is (nil? (u/get-init :q)))

    (u/advance! (dec (:init-backoff-time yq/default-opts)))
    (is (nil? (u/get-init :q)))
    (u/advance! 1)
    (is (some? (u/get-init :q)))

    (is (some? (some->> (u/get-init :q)
                        (u/take!)
                        (u/execute!)
                        :exception)))

    (u/advance! (dec (:error-backoff-time @yq/*config*)))
    (is (nil? (u/get-error :q)))
    (u/advance! 1)
    (is (some? (u/get-error :q)))))


(deftest get-hung-test
  (let [conn (u/empty-conn)]
    (yq/init! {:conn              conn
               :init-backoff-time (:init-backoff-time yq/default-opts)
               :handlers          {:q {:f (fn [_] (throw (ex-info "janei" {})))}}})
    (u/put-transact! :q {:work 123})
    (u/advance! (:init-backoff-time yq/default-opts))
    (is (some? (u/get-init :q)))

    (is (= :processing (some->> (u/get-init :q)
                                (u/take!)
                                :com.github.ivarref.yoltq/status)))

    (is (nil? (u/get-hung :q)))
    (u/advance! (dec (:hung-backoff-time yq/default-opts)))
    (is (nil? (u/get-hung :q)))
    (u/advance! 1)
    (is (some? (u/get-hung :q)))))


(deftest basic-locking
  (let [conn (u/empty-conn)]
    (yq/init! {:conn              conn
               :init-backoff-time (:init-backoff-time yq/default-opts)
               :cas-failures      (atom 0)
               :handlers          {:q {:f (fn [_] (throw (ex-info "janei" {})))}}})
    (u/put-transact! :q {:work 123})
    (u/advance! (:init-backoff-time yq/default-opts))
    (is (some? (u/get-init :q)))

    (let [job (u/get-init :q)]
      (is (= :processing (some->> job (u/take!) :com.github.ivarref.yoltq/status)))
      (u/take! job)
      (is (= 1 @(:cas-failures @yq/*config*))))))


(deftest retry-test
  (let [conn (u/empty-conn)]
    (yq/init! {:conn              conn
               :init-backoff-time (:init-backoff-time yq/default-opts)
               :handlers          {:q {:f
                                       (let [c (atom 0)]
                                         (fn [_]
                                           (if (<= (swap! c inc) 2)
                                             (throw (ex-info "janei" {}))
                                             ::ok)))}}})
    (u/put-transact! :q {:work 123})

    (u/advance! (:init-backoff-time yq/default-opts))
    (is (some? (some->> (u/get-init :q) (u/take!) (u/execute!) :exception)))

    (u/advance! (:error-backoff-time @yq/*config*))
    (is (some? (some->> (u/get-error :q) (u/take!) (u/execute!) :exception)))

    (u/advance! (:error-backoff-time @yq/*config*))
    (is (nil? (some->> (u/get-error :q) (u/take!) (u/execute!) :exception)))))


(deftest max-retries-test
  (let [conn (u/empty-conn)
        call-count (atom 0)]
    (yq/init! {:conn               conn
               :error-backoff-time 0})
    (yq/add-consumer! :q (fn [_]
                           (swap! call-count inc)
                           (throw (ex-info "janei" {})))
                      {:max-retries 1})
    (tq/put! :q {:work 123})
    (is (some? (:exception (tq/run-one-report-queue!))))

    (dotimes [_ 10]
      (tq/run-queue-once! :q :error))
    (is (= 2 @call-count))))


(deftest max-retries-test-two
  (let [conn (u/empty-conn)
        call-count (atom 0)]
    (yq/init! {:conn               conn
               :error-backoff-time 0})
    (yq/add-consumer! :q (fn [_]
                           (swap! call-count inc)
                           (throw (ex-info "janei" {})))
                      {:max-retries 3})
    (tq/put! :q {:work 123})
    (is (some? (:exception (tq/run-one-report-queue!))))

    (timbre/with-level :fatal
                       (dotimes [_ 20]
                         (tq/run-queue-once! :q :error)))
    (is (= 4 @call-count))))


(deftest hung-to-error
  (let [conn (u/empty-conn)
        call-count (atom 0)
        missed-mark-status (atom nil)]
    (yq/init! {:conn conn})
    (yq/add-consumer! :q
                      (fn [_]
                        (if (= 1 (swap! call-count inc))
                          (throw (ex-info "error" {}))
                          (log/info "return OK"))))
    (tq/put! :q {:id "demo"})
    (tq/run-one-report-queue!)                              ; now in status :error


    (swap! yq/*config* assoc :mark-status-fn! (fn [_ _ new-status]
                                                (reset! missed-mark-status new-status)
                                                (log/info "mark-status! doing nothing for new status" new-status)))
    (u/advance! (:error-backoff-time @yq/*config*))
    (tq/run-queue-once! :q :error)
    (swap! yq/*config* dissoc :mark-status-fn!)
    (is (= :done @missed-mark-status))

    (is (nil? (uu/get-hung @yq/*config* :q)))
    (u/advance! (:hung-backoff-time @yq/*config*))

    (is (some? (uu/get-hung @yq/*config* :q)))

    (is (= 2 @call-count))

    (is (true? (some->> (uu/get-hung (assoc-in @yq/*config* [:handlers :q :max-retries] 1) :q)
                        (i/take! @yq/*config*)
                        (i/execute! @yq/*config*)
                        :failed?)))

    (u/advance! (:error-backoff-time @yq/*config*))
    (is (some? (uu/get-error @yq/*config* :q)))
    (is (nil? (uu/get-error (assoc-in @yq/*config* [:handlers :q :max-retries] 1) :q)))))


(deftest consume-expect-test
  (let [conn (u/empty-conn)
        seen (atom #{})]
    (yq/init! {:conn conn})
    (yq/add-consumer! :q (fn [payload]
                           (when (= #{1 2} (swap! seen conj payload))
                             (throw (ex-info "oops" {})))
                           payload))

    @(d/transact conn [(yq/put :q 1)])
    @(d/transact conn [(yq/put :q 2)])

    (is (= 1 (tq/consume-expect! :q :done)))
    (tq/consume-expect! :q :error)))


(def ^:dynamic *some-binding* nil)


(deftest binding-test
  (let [conn (u/empty-conn)]
    (yq/init! {:conn             conn
               :capture-bindings [#'*some-binding* #'timbre/*context*]})
    (yq/add-consumer! :q (fn [_] *some-binding*))
    (binding [timbre/*context* {:x-request-id "wooho"}]
      (binding [*some-binding* 1]
        @(d/transact conn [(yq/put :q nil)]))
      (binding [*some-binding* 2]
        @(d/transact conn [(yq/put :q nil)]))
      @(d/transact conn [(yq/put :q nil)]))

    (is (= 1 (tq/consume-expect! :q :done)))
    (is (= 2 (tq/consume-expect! :q :done)))
    (is (nil? (tq/consume-expect! :q :done)))))


(deftest default-binding-test
  (let [conn (u/empty-conn)]
    (yq/init! {:conn conn})
    (yq/add-consumer! :q (fn [_] (:x-request-id timbre/*context*)))
    (binding [timbre/*context* {:x-request-id "123"}]
      @(d/transact conn [(yq/put :q nil)]))
    (is (= "123" (tq/consume-expect! :q :done)))))


(deftest force-retry-test
  (let [conn (u/empty-conn)]
    (yq/init! {:conn conn})
    (yq/add-consumer! :q (let [cnt (atom 0)]
                           (fn [_] (swap! cnt inc))))
    @(d/transact conn [(yq/put :q nil)])
    (is (= 1 (tq/consume! :q)))
    (is (= 2 (tq/force-retry! :q)))
    (is (= 3 (tq/force-retry! :q)))))


(deftest ext-id-no-duplicates
  (let [conn (u/empty-conn)]
    (yq/init! {:conn conn})
    (yq/add-consumer! :q identity)
    @(d/transact conn [(yq/put :q nil {:id "123"})])
    (is (thrown? Exception @(d/transact conn [(yq/put :q nil {:id "123"})])))))


(deftest depends-on
  (let [conn (u/empty-conn)]
    (yq/init! {:conn conn})
    (yq/add-consumer! :a identity)
    (yq/add-consumer! :b identity)
    @(d/transact conn [(yq/put :a {:id "a1"} {:id "a1"})])
    @(d/transact conn [(yq/put :b {:id "b1"} {:depends-on [:a "a1"]})])

    ; can't consume :b yet:
    (is (= {:depends-on [:a "a1"]} (tq/consume! :b)))

    (is (= {:id "a1"} (tq/consume! :a)))
    (is (= {:id "b1"} (tq/consume! :b)))))


(deftest depends-on-queue-level
  (let [conn (u/empty-conn)]
    (yq/init! {:conn conn})
    (yq/add-consumer! :a identity)
    (yq/add-consumer! :b identity {:depends-on (fn [{:keys [id]}] [:a id])})
    @(d/transact conn [(yq/put :a {:id "1"} {:id "1"})])
    @(d/transact conn [(yq/put :b {:id "1"})])

    ; can't consume :b yet:
    (is (= {:depends-on [:a "1"]} (tq/consume! :b)))

    (is (= {:id "1"} (tq/consume! :a)))
    (is (= {:id "1"} (tq/consume! :b)))))


(deftest verify-can-read-string
  (let [conn (u/empty-conn)]
    (yq/init! {:conn conn})
    (yq/add-consumer! :a identity)
    (timbre/with-level :fatal
      (is (thrown? Exception @(d/transact conn [(yq/put :a {:broken #'=})]))))))


(deftest payload-verifier
  (let [conn (u/empty-conn)]
    (yq/init! {:conn conn})
    (yq/add-consumer! :q identity
                      {:valid-payload? (fn [{:keys [id]}]
                                         (some? id))})
    @(d/transact conn [(yq/put :q {:id "a"})])
    (timbre/with-level :fatal
                       (is (thrown? Exception @(d/transact conn [(yq/put :q {})]))))))


(defn my-consumer
  {:yoltq/queue-id :some-q}
  [state payload]
  (swap! state conj payload))

(deftest queue-id-can-be-var
  (let [conn (u/empty-conn)
        received (atom #{})]
    (yq/init! {:conn conn})
    (yq/add-consumer! #'my-consumer (partial my-consumer received))
    @(d/transact conn [(yq/put #'my-consumer {:id "a"})])
    (tq/consume! :some-q)
    (is (= #{{:id "a"}} @received))
    #_(timbre/with-level :fatal
                         (is (thrown? Exception @(d/transact conn [(yq/put :q {})]))))))

(deftest healthy-allowed-error-time-test
  (let [conn (u/empty-conn)]
    (yq/init! {:conn conn})
    (yq/add-consumer! :q (fn [_] (throw (ex-info "" {}))))
    @(d/transact conn [(yq/put :q {:work 123})])
    (tq/consume-expect! :q :error)
    (is (= 0 (error-poller/do-poll-errors @yq/*config* (ext/now-ms))))
    (is (= 0 (error-poller/do-poll-errors @yq/*config* (+ (dec (.toMillis (Duration/ofMinutes 15))) (ext/now-ms)))))
    (is (= 1 (error-poller/do-poll-errors @yq/*config* (+ (.toMillis (Duration/ofMinutes 15)) (ext/now-ms)))))))

(deftest global-encode-decode
  (let [conn (u/empty-conn)
        ldt (LocalDateTime/now)
        got-work (atom nil)]
    (yq/init! {:conn conn
               :encode nippy/freeze
               :decode nippy/thaw})
    (yq/add-consumer! :q (fn [work] (reset! got-work work)))
    @(d/transact conn [(yq/put :q {:work ldt})])
    (tq/consume! :q)
    (is (= @got-work {:work ldt}))))

(deftest queue-encode-decode
  (let [conn (u/empty-conn)
        ldt (LocalDateTime/now)
        got-work (atom nil)]
    (yq/init! {:conn conn})
    (yq/add-consumer! :q (fn [work] (reset! got-work work))
                      {:encode nippy/freeze
                       :decode nippy/thaw})
    @(d/transact conn [(yq/put :q {:work ldt})])
    (tq/consume! :q)
    (is (= @got-work {:work ldt}))))

(deftest global-partition
  (let [conn (u/empty-conn)
        got-work (atom nil)]
    (yq/init! {:conn conn
               :partition-fn (fn [_queue-name] :my-part)})
    (yq/add-consumer! :q (fn [work] (reset! got-work work)))
    @(d/transact conn [(yq/put :q {:work 123})])
    (tq/consume! :q)
    (is (some? (d/q '[:find ?e .
                      :in $ ?part
                      :where
                      [?e :db/ident ?part]]
                    (d/db conn)
                    :my-part)))
    (is (= @got-work {:work 123}))))

(deftest partition-per-queue
  (let [conn (u/empty-conn)
        got-work (atom nil)]
    (yq/init! {:conn conn})
    (yq/add-consumer! :q (fn [work] (reset! got-work work))
                      {:partition-fn (fn [_queue-name] :my-part)})
    @(d/transact conn [(yq/put :q {:work 123})])
    (tq/consume! :q)
    (is (some? (d/q '[:find ?e .
                      :in $ ?part
                      :where
                      [?e :db/ident ?part]]
                    (d/db conn)
                    :my-part)))
    (is (= @got-work {:work 123}))))

(deftest string-encode-decode
  (let [conn (u/empty-conn)
        got-work (atom nil)]
    (yq/init! {:conn   conn
               :encode (fn [x] (str/join (reverse x)))
               :decode (fn [x] (str/join (reverse x)))})
    (yq/add-consumer! :q (fn [work] (reset! got-work work)))
    @(d/transact conn [(yq/put :q "asdf")])
    (tq/consume! :q)
    (is (= @got-work "asdf"))))

(deftest job-group-test
  (let [conn (u/empty-conn)]
    (yq/init! {:conn conn})
    (yq/add-consumer! :q1 identity)
    (yq/add-consumer! :q2 identity)
    @(d/transact conn [(yq/put :q1 {:work 123} {:job-group :group1})
                       (yq/put :q1 {:work 456} {:job-group :group2})
                       (yq/put :q2 {:work 789} {:job-group :group1})])
    (is (= [{:qname :q1
             :job-group :group1
             :status :init
             :count 1}]
           (yq/job-group-progress :q1 :group1)))

    (is (= {:work 123} (tq/consume! :q1)))

    (is (= [{:qname :q1
             :job-group :group1
             :status :done
             :count 1}]
           (yq/job-group-progress :q1 :group1)))))