The idea is the access TCP client connections like any other product driver code would e.g the cassandra or mondodb driver.
There are allot of situations where software in the past (my own experience) became unstable because the TCP connections were not written or treated with the equivalent importance as server connections.
Writing the TCP connection as if it were a product driver sets a certain design mindset.
The code should support multi threading and async usage (not NIO). We can support multi threading by allowing each thread to have its own connection from a Pool, thus not requiring locking or syncing if the pool has enough connections. Synchronization is only required once the Pool runs out of free connections and threads need to compete for resources.
From my own experience in writing high volume performant client TCP IO code its my opinion that NIO doesn't add
any value in terms of performance. It only adds complications. The reasons are:
conn = create [{custom-env conf routing rate-limiter retry-policy} bootstrap-end-points] send [ conn i-o-f data timeout-ms] : RESP ;; the data is applied to the i-o-f send [ conn exec-service i-o-f data timeout-ms] : Promise[RESP]
i-o-f [ conn data timeout-ms] : RESP
stats = conn-stats [ conn ]
close [ conn ]
IRouting select-host [ custom-env hosts ] : Host
Support IO timeout see: https://github.com/gerritjvv/kafka-fast/blob/master/kafka-clj/java/kafka_clj/util/IOUtil.java#L37
Provide support functions for common java primites (and short strings)
See: https://github.com/gerritjvv/kafka-fast/blob/master/kafka-clj/java/kafka_clj/util/IOUtil.java#L21
read-bytes [reader bts-len timeout-ms] : byte-array
read-int [reader timeout-ms ] : int
read-long [reader timeout-ms ] : long
read-double [reader timeout-ms] : double
read-short [reader timeout-ms ] : short
read-float [reader timeout-ms ] : short
read-short-str [reader timeout-ms ] : String
read-bool [reader timeout-ms] : boolean
write-bytes [writer bts from len ]
write-int [writer int]
write-double ... .... for all primitives
write-short-str [writer string]
Implemented using tcp-driver.io.stream
create-conn [ bootstrap-hosts pool-conf routing-obj retry-policy conf] { :pool (pool :conf conf :bootstrap-hosts bootstrap-hosts :retry-policy retry-policy :routing-policy routing-obj :routing-env {:hosts (atom (set boostrap-hosts))}
add-host! [ ctx host host-conf] (assoc-stm! (get-in ctx [:routing-env :hosts]) host)
remove-host! [ctx host host-conf] (dissoc-stm! (get-in ctx [:routing-env :hosts]) host)
send-op [ctx host i-o-f data timeout-ms] try: conn = (pool/borrow (:pool ctx) host timeout-ms) try: return (i-o-f conn data timeout-ms) finally: (pool/return pool-inst conn)
catch Exception e
(pool/invalidate pool-inst conn)
return e
send [ ctx i-o-f data timeout-ms]
hosts = (:bootstrap-hosts ctx) routing-policy (:routing-policy ctx)
loop [retry-state nil] host = (routing/select-host routing-policy (:routing-env ctx) hosts)
if host == null
throw NoHostAvailableException
ret-val = (send-op ctx host i-o-f data timeout-ms)
if ret-val == Exception
[retry-state retry-op] = (io-retry/retry-action! (:retry-policy ctx) (:conf ctx) conn host retry-state)
if retry-op == :recur
(recur retry-state)
else
(throw ret-val)
else
ret-val
NS: routing
Protocol IRouting
select-host
add-host
remove-host
NS: io-retry
multi-method retry-action! identity
defmulti retry-action! :retry [_ _ _ _ retry-state]
[retry-state (if (available-tries? retry-state) :retry nil)]
defmulti retry-action! :blacklist-retry [ctx conn host e retry-state]
(routing/blacklist! (:routing-policy ctx) (:routing-env ctx) conn host)
[retry-state (if (hosts-left? ... ) :retry :nil)]
defmulti retry-action! :default [ _ _ _ e _ ] nil
Even though the implementation language is Clojure the public api should give first class support for:
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close