aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md37
-rw-r--r--deps.edn8
-rw-r--r--pom.xml8
-rwxr-xr-xrelease.sh6
-rw-r--r--src/com/github/ivarref/yoltq.clj32
-rw-r--r--test/com/github/ivarref/yoltq/virtual_test.clj33
6 files changed, 87 insertions, 37 deletions
diff --git a/README.md b/README.md
index e5b2059..8ead585 100644
--- a/README.md
+++ b/README.md
@@ -333,22 +333,41 @@ easier.
## Change log
-### 2022-03-29 v0.2.55 [diff](https://github.com/ivarref/yoltq/compare/v0.2.54...v0.2.55)
+#### 2022-06-22 v0.2.56 [diff](https://github.com/ivarref/yoltq/compare/v0.2.55...v0.2.56)
+Added support for `:yoltq/queue-id` metadata on functions. I.e. it's possible to write
+the following:
+```clojure
+(defn my-consumer
+ {:yoltq/queue-id :some-queue}
+ [payload]
+ :work-work-work)
+
+(yq/add-consumer! #'my-consumer ; <-- will resolve to :some-queue
+ my-consumer)
+
+@(d/transact conn [(yq/put #'my-consumer ; <-- will resolve to :some-queue
+ {:id "a"})])
+```
+
+The idea here is that it is simpler to jump to var definitions than going via keywords,
+which essentially refers to a var/function anyway.
+
+#### 2022-03-29 v0.2.55 [diff](https://github.com/ivarref/yoltq/compare/v0.2.54...v0.2.55)
Added: `unhealthy?` function which returns `true` if there are queues in error,
or `false` otherwise.
-### 2022-03-28 v0.2.54 [diff](https://github.com/ivarref/yoltq/compare/v0.2.51...v0.2.54)
+#### 2022-03-28 v0.2.54 [diff](https://github.com/ivarref/yoltq/compare/v0.2.51...v0.2.54)
Fixed: Schedules should now be using milliseconds and not nanoseconds.
-### 2022-03-28 v0.2.51 [diff](https://github.com/ivarref/yoltq/compare/v0.2.48...v0.2.51)
+#### 2022-03-28 v0.2.51 [diff](https://github.com/ivarref/yoltq/compare/v0.2.48...v0.2.51)
* Don't OOM on migrating large amounts of data.
* Respect `:auto-migrate? false`.
-### 2022-03-27 v0.2.48 [diff](https://github.com/ivarref/yoltq/compare/v0.2.46...v0.2.48)
+#### 2022-03-27 v0.2.48 [diff](https://github.com/ivarref/yoltq/compare/v0.2.46...v0.2.48)
* Auto migration is done in the background.
* Only poll for current version of jobs, thus no races for auto migration.
-### 2022-03-27 v0.2.46 [diff](https://github.com/ivarref/yoltq/compare/v0.2.41...v0.2.46)
+#### 2022-03-27 v0.2.46 [diff](https://github.com/ivarref/yoltq/compare/v0.2.41...v0.2.46)
* Critical bugfix that in some cases can lead to stalled jobs.
```
Started using (System/currentTimeMillis) and not (System/nanoTime)
@@ -357,7 +376,7 @@ when storing time in the database.
* Bump Clojure to `1.11.0`.
-### 2022-03-27 v0.2.41 [diff](https://github.com/ivarref/yoltq/compare/v0.2.39...v0.2.41)
+#### 2022-03-27 v0.2.41 [diff](https://github.com/ivarref/yoltq/compare/v0.2.39...v0.2.41)
* Added function `healthy?` that returns:
```
true if no errors
@@ -381,13 +400,13 @@ when storing time in the database.
{:qname :send-message, :status :init, :count 56}]
```
-### 2021-09-27 v0.2.39 [diff](https://github.com/ivarref/yoltq/compare/v0.2.37...v0.2.39)
+#### 2021-09-27 v0.2.39 [diff](https://github.com/ivarref/yoltq/compare/v0.2.37...v0.2.39)
Added `:valid-payload?` option for queue consumers.
-### 2021-09-27 v0.2.37 [diff](https://github.com/ivarref/yoltq/compare/v0.2.33...v0.2.37)
+#### 2021-09-27 v0.2.37 [diff](https://github.com/ivarref/yoltq/compare/v0.2.33...v0.2.37)
Improved error reporting.
-### 2021-09-24 v0.2.33
+#### 2021-09-24 v0.2.33
First publicly announced release.
## License
diff --git a/deps.edn b/deps.edn
index 8e769e1..6923881 100644
--- a/deps.edn
+++ b/deps.edn
@@ -1,12 +1,12 @@
-{:deps {org.clojure/tools.logging {:mvn/version "1.1.0"}
- org.clojure/clojure {:mvn/version "1.11.0"}}
+{:deps {org.clojure/tools.logging {:mvn/version "1.2.4"}
+ org.clojure/clojure {:mvn/version "1.11.1"}}
:paths ["src"]
- :aliases {:datomic {:extra-deps {com.datomic/datomic-pro {:mvn/version "1.0.6316" :exclusions [org.slf4j/slf4j-nop]}}}
+ :aliases {:datomic {:extra-deps {com.datomic/datomic-pro {:mvn/version "1.0.6316" :exclusions [org.slf4j/slf4j-nop]}}}
:test {:extra-paths ["test"]
:extra-deps {ivarref/datomic-schema {:mvn/version "0.2.0"}
- com.taoensso/timbre {:mvn/version "5.1.2"}
+ com.taoensso/timbre {:mvn/version "5.2.1"}
com.fzakaria/slf4j-timbre {:mvn/version "0.3.21"}
clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"}
com.datomic/datomic-pro {:mvn/version "1.0.6316" :exclusions [org.slf4j/slf4j-nop]}
diff --git a/pom.xml b/pom.xml
index 9f591b9..c45ccd9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,18 +4,18 @@
<packaging>jar</packaging>
<groupId>com.github.ivarref</groupId>
<artifactId>yoltq</artifactId>
- <version>0.2.55</version>
+ <version>0.2.56</version>
<name>yoltq</name>
<dependencies>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
- <version>1.11.0</version>
+ <version>1.11.1</version>
</dependency>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>tools.logging</artifactId>
- <version>1.1.0</version>
+ <version>1.2.4</version>
</dependency>
</dependencies>
<build>
@@ -30,7 +30,7 @@
<scm>
<connection>scm:git:git://github.com/ivarref/yoltq.git</connection>
<developerConnection>scm:git:ssh://git@github.com/ivarref/yoltq.git</developerConnection>
- <tag>v0.2.55</tag>
+ <tag>v0.2.56</tag>
<url>https://github.com/ivarref/yoltq</url>
</scm>
</project> \ No newline at end of file
diff --git a/release.sh b/release.sh
index cf0f09f..d27d125 100755
--- a/release.sh
+++ b/release.sh
@@ -23,9 +23,11 @@ sed -i "s/HEAD/v$VERSION/g" ./README.md
git add pom.xml README.md
git commit -m "Release $VERSION"
git reset --soft HEAD~2
-git commit -m"Release $VERSION\n$MSG"
+git commit -m"Release $VERSION
+$MSG"
-git tag -a v"$VERSION" -m "Release v$VERSION\n$MSG"
+git tag -a v"$VERSION" -m "Release v$VERSION
+$MSG"
git push --follow-tags --force
clojure -X:deploy
diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj
index bb7a43e..ba27d2c 100644
--- a/src/com/github/ivarref/yoltq.clj
+++ b/src/com/github/ivarref/yoltq.clj
@@ -1,16 +1,16 @@
(ns com.github.ivarref.yoltq
- (:require [datomic.api :as d]
- [clojure.tools.logging :as log]
+ (:require [clojure.tools.logging :as log]
+ [com.github.ivarref.yoltq.error-poller :as errpoller]
[com.github.ivarref.yoltq.impl :as i]
- [com.github.ivarref.yoltq.report-queue :as rq]
+ [com.github.ivarref.yoltq.migrate :as migrate]
[com.github.ivarref.yoltq.poller :as poller]
- [com.github.ivarref.yoltq.error-poller :as errpoller]
+ [com.github.ivarref.yoltq.report-queue :as rq]
[com.github.ivarref.yoltq.slow-executor-detector :as slow-executor]
- [com.github.ivarref.yoltq.migrate :as migrate]
- [com.github.ivarref.yoltq.utils :as u])
+ [com.github.ivarref.yoltq.utils :as u]
+ [datomic.api :as d])
(:import (datomic Connection)
- (java.util.concurrent Executors TimeUnit ExecutorService)
- (java.time Duration)))
+ (java.time Duration)
+ (java.util.concurrent ExecutorService Executors TimeUnit)))
(defonce ^:dynamic *config* (atom nil))
@@ -92,11 +92,23 @@
new-cfg)))
+(defn get-queue-id
+ [queue-id-or-var]
+ (cond (and (var? queue-id-or-var)
+ (keyword? (:yoltq/queue-id (meta queue-id-or-var))))
+ (:yoltq/queue-id (meta queue-id-or-var))
+
+ (keyword? queue-id-or-var)
+ queue-id-or-var
+
+ :else
+ (throw (ex-info (str "Could not get queue-id for " queue-id-or-var) {:queue-id queue-id-or-var}))))
+
(defn add-consumer!
([queue-id f]
(add-consumer! queue-id f {}))
([queue-id f opts]
- (swap! *config* (fn [old-config] (assoc-in old-config [:handlers queue-id] (merge opts {:f f}))))))
+ (swap! *config* (fn [old-config] (assoc-in old-config [:handlers (get-queue-id queue-id)] (merge opts {:f f}))))))
(defn put
@@ -105,7 +117,7 @@
(let [{:keys [bootstrap-poller! conn] :as cfg} @*config*]
(when (and *test-mode* bootstrap-poller!)
(bootstrap-poller! conn))
- (i/put cfg queue-id payload opts))))
+ (i/put cfg (get-queue-id queue-id) payload opts))))
(defn- do-start! []
diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj
index 34c9026..e077517 100644
--- a/test/com/github/ivarref/yoltq/virtual_test.clj
+++ b/test/com/github/ivarref/yoltq/virtual_test.clj
@@ -1,15 +1,15 @@
(ns com.github.ivarref.yoltq.virtual-test
- (:require [datomic-schema.core]
- [clojure.test :refer [use-fixtures deftest is] :refer-macros [thrown?]]
+ (:require [clojure.test :refer [deftest is use-fixtures] :refer-macros [thrown?]]
+ [clojure.tools.logging :as log]
+ [com.github.ivarref.yoltq :as yq]
+ [com.github.ivarref.yoltq.impl :as i]
+ [com.github.ivarref.yoltq.migrate :as migrate]
[com.github.ivarref.yoltq.test-queue :as tq]
[com.github.ivarref.yoltq.test-utils :as u]
- [datomic.api :as d]
[com.github.ivarref.yoltq.utils :as uu]
- [clojure.tools.logging :as log]
- [com.github.ivarref.yoltq.impl :as i]
- [com.github.ivarref.yoltq :as yq]
- [taoensso.timbre :as timbre]
- [com.github.ivarref.yoltq.migrate :as migrate]))
+ [datomic-schema.core]
+ [datomic.api :as d]
+ [taoensso.timbre :as timbre]))
(use-fixtures :each tq/call-with-virtual-queue!)
@@ -350,3 +350,20 @@
@(d/transact conn [(yq/put :q {:id "a"})])
(timbre/with-level :fatal
(is (thrown? Exception @(d/transact conn [(yq/put :q {})]))))))
+
+
+(defn my-consumer
+ {:yoltq/queue-id :some-q}
+ [state payload]
+ (swap! state conj payload))
+
+(deftest queue-id-can-be-var
+ (let [conn (u/empty-conn)
+ received (atom #{})]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! #'my-consumer (partial my-consumer received))
+ @(d/transact conn [(yq/put #'my-consumer {:id "a"})])
+ (tq/consume! :some-q)
+ (is (= #{{:id "a"}} @received))
+ #_(timbre/with-level :fatal
+ (is (thrown? Exception @(d/transact conn [(yq/put :q {})]))))))