aboutsummaryrefslogtreecommitdiff
path: root/test/com
diff options
context:
space:
mode:
Diffstat (limited to 'test/com')
-rw-r--r--test/com/github/ivarref/yoltq/virtual_test.clj98
1 files changed, 84 insertions, 14 deletions
diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj
index 996792e..2800c21 100644
--- a/test/com/github/ivarref/yoltq/virtual_test.clj
+++ b/test/com/github/ivarref/yoltq/virtual_test.clj
@@ -1,18 +1,21 @@
(ns com.github.ivarref.yoltq.virtual-test
- (:require [clojure.test :refer [deftest is use-fixtures] :refer-macros [thrown?]]
- [clojure.tools.logging :as log]
- [com.github.ivarref.yoltq :as yq]
- [com.github.ivarref.yoltq.error-poller :as error-poller]
- [com.github.ivarref.yoltq.ext-sys :as ext]
- [com.github.ivarref.yoltq.impl :as i]
- [com.github.ivarref.yoltq.migrate :as migrate]
- [com.github.ivarref.yoltq.test-queue :as tq]
- [com.github.ivarref.yoltq.test-utils :as u]
- [com.github.ivarref.yoltq.utils :as uu]
- [datomic-schema.core]
- [datomic.api :as d]
- [taoensso.timbre :as timbre])
- (:import (java.time Duration)))
+ (:require
+ [clojure.string :as str]
+ [clojure.test :refer [deftest is use-fixtures] :refer-macros [thrown?]]
+ [clojure.tools.logging :as log]
+ [com.github.ivarref.yoltq :as yq]
+ [com.github.ivarref.yoltq.error-poller :as error-poller]
+ [com.github.ivarref.yoltq.ext-sys :as ext]
+ [com.github.ivarref.yoltq.impl :as i]
+ [com.github.ivarref.yoltq.migrate :as migrate]
+ [com.github.ivarref.yoltq.test-queue :as tq]
+ [com.github.ivarref.yoltq.test-utils :as u]
+ [com.github.ivarref.yoltq.utils :as uu]
+ [datomic-schema.core]
+ [datomic.api :as d]
+ [taoensso.nippy :as nippy]
+ [taoensso.timbre :as timbre])
+ (:import (java.time Duration LocalDateTime)))
(use-fixtures :each tq/call-with-virtual-queue!)
@@ -380,3 +383,70 @@
(is (= 0 (error-poller/do-poll-errors @yq/*config* (ext/now-ms))))
(is (= 0 (error-poller/do-poll-errors @yq/*config* (+ (dec (.toMillis (Duration/ofMinutes 15))) (ext/now-ms)))))
(is (= 1 (error-poller/do-poll-errors @yq/*config* (+ (.toMillis (Duration/ofMinutes 15)) (ext/now-ms)))))))
+
+(deftest global-encode-decode
+ (let [conn (u/empty-conn)
+ ldt (LocalDateTime/now)
+ got-work (atom nil)]
+ (yq/init! {:conn conn
+ :encode nippy/freeze
+ :decode nippy/thaw})
+ (yq/add-consumer! :q (fn [work] (reset! got-work work)))
+ @(d/transact conn [(yq/put :q {:work ldt})])
+ (tq/consume! :q)
+ (is (= @got-work {:work ldt}))))
+
+(deftest queue-encode-decode
+ (let [conn (u/empty-conn)
+ ldt (LocalDateTime/now)
+ got-work (atom nil)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q (fn [work] (reset! got-work work))
+ {:encode nippy/freeze
+ :decode nippy/thaw})
+ @(d/transact conn [(yq/put :q {:work ldt})])
+ (tq/consume! :q)
+ (is (= @got-work {:work ldt}))))
+
+(deftest global-partition
+ (let [conn (u/empty-conn)
+ got-work (atom nil)]
+ (yq/init! {:conn conn
+ :partition-fn (fn [_queue-name] :my-part)})
+ (yq/add-consumer! :q (fn [work] (reset! got-work work)))
+ @(d/transact conn [(yq/put :q {:work 123})])
+ (tq/consume! :q)
+ (is (some? (d/q '[:find ?e .
+ :in $ ?part
+ :where
+ [?e :db/ident ?part]]
+ (d/db conn)
+ :my-part)))
+ (is (= @got-work {:work 123}))))
+
+(deftest partition-per-queue
+ (let [conn (u/empty-conn)
+ got-work (atom nil)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q (fn [work] (reset! got-work work))
+ {:partition-fn (fn [_queue-name] :my-part)})
+ @(d/transact conn [(yq/put :q {:work 123})])
+ (tq/consume! :q)
+ (is (some? (d/q '[:find ?e .
+ :in $ ?part
+ :where
+ [?e :db/ident ?part]]
+ (d/db conn)
+ :my-part)))
+ (is (= @got-work {:work 123}))))
+
+(deftest string-encode-decode
+ (let [conn (u/empty-conn)
+ got-work (atom nil)]
+ (yq/init! {:conn conn
+ :encode (fn [x] (str/join (reverse x)))
+ :decode (fn [x] (str/join (reverse x)))})
+ (yq/add-consumer! :q (fn [work] (reset! got-work work)))
+ @(d/transact conn [(yq/put :q "asdf")])
+ (tq/consume! :q)
+ (is (= @got-work "asdf"))))