Squeedo: The sexiest message consumer ever (™)
This library allows you to quickly spin up a message consumer and worker pool using core.async. The message is returned in its raw format, so it's up to the caller to determine the proper reader for their purposes.
Version 1.1.4
features breaking changes to return value of com.climate.squeedo.sqs-consumer/start-consumer
.
See CHANGELOG for release notes.
Squeedo's inspiration came from our continual need to quickly process lots of messages from SQS. We found that the code to support these processes was quite similar in that it often involved a lot of plumbing of listening to SQS, pulling messages, processing them with some kind of threadpool and then acking them back. The goal was to make this easier by somehow simply passing a compute function that would handle all of the plumbing and allow us to focus on the compute logic of what to do with the message.
After several iterations of basing this plumbing code on threadpools, we quickly found that we couldn't get the kind of throughput we wanted simply by tuning only the number of threads. We needed something more dynamic, that would adapt better to the number of cores it ran on and would squeeze every last bit of CPU from our EC2 instances. After reading this blog post http://martintrojer.github.io/clojure/2013/07/07/coreasync-and-blocking-io we were inspired to use core.async and Squeedo was born.
Where Squeedo shines is in its ability to push CPU utilization to the max without managing a threadpool. It is especially good when combined with an non-blocking I/O web client library like http-kit as mentioned in the blog above.
In its simplest form, Squeedo is composed of only 2 parts, a compute function and a consumer.
(require '[com.climate.squeedo.sqs-consumer :refer [start-consumer stop-consumer]]
'[clojure.core.async :refer [put!]])
;;the compute function that takes a message and a channel to ack or nack on when done with the message
(defn compute
[message done-channel]
(println message)
;; never limit or use the blocking IO calls here, use http-kit for these calls
(put! done-channel message))
(def consumer (start-consumer "my-sqs-queue" compute))
;;when done listening
(stop-consumer consumer)
The compute function must post to the done-channel even for exceptions. This will ack/nack each message. Squeedo listens to the done-channel to know when work is complete so it can pass your compute function another message to process.
To nack a message (i.e., send update the visibility timeout for the message in SQS, so that it will be reporcessed again later, up until the configured MaxReceives for your queue), put the message back on the done-channel with a :nack key. You can specify a value of true to use the default visibility timeout of 0 (retry this message as soon as possible) or you can give an integer value in seconds.
;; retry as soon as possible
(put! done-channel (assoc message :nack true))
;; retry in about 10 seconds
(put! done-channel (assoc message :nack 10))
(require '[com.climate.squeedo.sqs-consumer :refer [start-consumer stop-consumer]]
'[org.httpkit.client]
'[clojure.core.async :refer [go >!]])
(defn- eat-some-cpu
[how-much]
(reduce + (range 1 how-much)))
(defn- async-get
[url message channel]
(org.httpkit.client/get url (fn [r] (go
; do some more processing with the response
(eat-some-cpu 1000000)
(>! channel message)))))
(defn compute
[message done-channel]
; do something expensive
(eat-some-cpu 1000000)
; do this if you will have I/O
(async-get "http://google.com" message done-channel))
(def consumer (start-consumer "my-sqs-queue" compute :num-listeners 10 :max-concurrent-work 50))
;;when done listening
;; (stop-consumer consumer)
Typical project.clj configuration to setup the servlet context listener hooks:
(defproject com.awesome/microservice "0.0.1"
:ring
{:init initialize!
:destroy destroy!}
(defonce consumer (atom {}))
(defn- compute-fn
[message done-channel]
;; do something
(put! done-channel message))
(defn initialize!
"Call `initialize!` once to initialize global state before serving. This fn is
invoked on servlet initialization with zero arguments"
[]
(swap! consumer
merge
(start-consumer queue-name compute-fn :max-concurrent-work 10)))
(defn destroy!
"Destroy the Jetty context and stop the SQS consumer"
[]
(stop-consumer @consumer)
(reset! consumer {}))
One of the great things about Squeedo is the advanced configuration options that can be used to tune the consumer to your workflow beyond what the very reasonable defaults do out of the box.
Checkout the com.climate.squeedo.sqs
namespace for extra goodies like configuring and connecting to queues,
enqueuing and dequeuing messages, and acking and nacking.
Example of setting up a queue with custom attributes and a dead letter queue:
(require '[com.climate.squeedo.sqs :as sqs])
(defn initialize-my-queue
[]
(sqs/configure-queue "my-queue"
:queue-attributes {"VisibilityTimeout" "60"}
:dead-letter "my-queue-failed"
:dead-letter-queue-attributes {"VisibilityTimeout" "120"})
(sqs/mk-connection "my-queue"))
(def my-queue (initialize-my-queue))
Copyright (C) 2018 The Climate Corporation. Distributed under the Apache License, Version 2.0. You may not use this library except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
See the NOTICE file distributed with this work for additional information regarding copyright ownership. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Can you improve this documentation? These fine people already did:
jeff.melching, Eric Turcotte, Matt K, Jeff Melching, Tommy Rogers, dpittman, Matt Kipps, Robert Kennedy, Philip Jagielski, Audrius Molis & Jeongbong SeoEdit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close