aboutsummaryrefslogtreecommitdiff
path: root/src/com/github/ivarref/yoltq.clj
diff options
context:
space:
mode:
Diffstat (limited to 'src/com/github/ivarref/yoltq.clj')
-rw-r--r--src/com/github/ivarref/yoltq.clj32
1 files changed, 22 insertions, 10 deletions
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj
index bb7a43e..ba27d2c 100644
--- a/src/com/github/ivarref/yoltq.clj
+++ b/src/com/github/ivarref/yoltq.clj
@@ -1,16 +1,16 @@
(ns com.github.ivarref.yoltq
- (:require [datomic.api :as d]
- [clojure.tools.logging :as log]
+ (:require [clojure.tools.logging :as log]
+ [com.github.ivarref.yoltq.error-poller :as errpoller]
[com.github.ivarref.yoltq.impl :as i]
- [com.github.ivarref.yoltq.report-queue :as rq]
+ [com.github.ivarref.yoltq.migrate :as migrate]
[com.github.ivarref.yoltq.poller :as poller]
- [com.github.ivarref.yoltq.error-poller :as errpoller]
+ [com.github.ivarref.yoltq.report-queue :as rq]
[com.github.ivarref.yoltq.slow-executor-detector :as slow-executor]
- [com.github.ivarref.yoltq.migrate :as migrate]
- [com.github.ivarref.yoltq.utils :as u])
+ [com.github.ivarref.yoltq.utils :as u]
+ [datomic.api :as d])
(:import (datomic Connection)
- (java.util.concurrent Executors TimeUnit ExecutorService)
- (java.time Duration)))
+ (java.time Duration)
+ (java.util.concurrent ExecutorService Executors TimeUnit)))
(defonce ^:dynamic *config* (atom nil))
@@ -92,11 +92,23 @@
new-cfg)))
+(defn get-queue-id
+ [queue-id-or-var]
+ (cond (and (var? queue-id-or-var)
+ (keyword? (:yoltq/queue-id (meta queue-id-or-var))))
+ (:yoltq/queue-id (meta queue-id-or-var))
+
+ (keyword? queue-id-or-var)
+ queue-id-or-var
+
+ :else
+ (throw (ex-info (str "Could not get queue-id for " queue-id-or-var) {:queue-id queue-id-or-var}))))
+
(defn add-consumer!
([queue-id f]
(add-consumer! queue-id f {}))
([queue-id f opts]
- (swap! *config* (fn [old-config] (assoc-in old-config [:handlers queue-id] (merge opts {:f f}))))))
+ (swap! *config* (fn [old-config] (assoc-in old-config [:handlers (get-queue-id queue-id)] (merge opts {:f f}))))))
(defn put
@@ -105,7 +117,7 @@
(let [{:keys [bootstrap-poller! conn] :as cfg} @*config*]
(when (and *test-mode* bootstrap-poller!)
(bootstrap-poller! conn))
- (i/put cfg queue-id payload opts))))
+ (i/put cfg (get-queue-id queue-id) payload opts))))
(defn- do-start! []