File tree Expand file tree Collapse file tree 3 files changed +29
-2
lines changed
main/clojure/clojure/core/async
test/clojure/clojure/core Expand file tree Collapse file tree 3 files changed +29
-2
lines changed Original file line number Diff line number Diff line change 7373 :args - a map of param->val which will be passed to the process ctor
7474 :chan-opts - a map of in-or-out-id->{:keys [buf-or-n xform]}, where buf-or-n
7575 and xform have their meanings per core.async/chan
76- the default is {:buf-or-n 10}
76+ the default is {:buf-or-n 10}. For efficiency, Flow creates
77+ only the in channel per in/out pair and so :chan-opts
78+ should be associated with the in channel.
7779
7880 :conns - a collection of [[from-pid outid] [to-pid inid]] tuples.
7981
Original file line number Diff line number Diff line change 6262 conn-map (reduce (fn [ret [out in :as conn]]
6363 (if (and (contains? outopts out)
6464 (contains? inopts in))
65- (update ret out set-conj in)
65+ (if (seq (vals (get outopts out)))
66+ (throw (ex-info (str " only one channel created for connection. "
67+ " :chan-opts must be associated with input side." )
68+ {:conn conn, :out-opts outopts}))
69+ (update ret out set-conj in))
6670 (throw (ex-info " invalid connection" {:conn conn}))))
6771 {} conns)
6872 running-chans #(or (deref chans) (throw (Exception. " flow not running" )))
Original file line number Diff line number Diff line change 1+ (ns clojure.core.flow-test
2+ (:require [clojure.core.async.flow :as flow]
3+ [clojure.test :refer :all ]))
4+
5+ (defn tap-n-drop [x] (tap> x) nil )
6+
7+ (deftest chan-opts-tests
8+ (testing " :chan-opts only specified on in side of connected pair"
9+ (is (thrown? clojure.lang.ExceptionInfo
10+ (flow/create-flow
11+ {:procs {:source {:proc (-> identity flow/lift1->step flow/process)
12+ :chan-opts {:out {:buf-or-n 11
13+ :xform (map (fn [x] (str " Saw " x)))}}}
14+ :sink {:proc (-> #'tap-n-drop flow/lift1->step flow/process)}}
15+ :conns [[[:source :out ] [:sink :in ]]]})))
16+ (is (flow/create-flow
17+ {:procs {:source {:proc (-> identity flow/lift1->step flow/process)}
18+ :sink {:proc (-> #'tap-n-drop flow/lift1->step flow/process)
19+ :chan-opts {:in {:buf-or-n 11
20+ :xform (map (fn [x] (str " Saw " x)))}}}}
21+ :conns [[[:source :out ] [:sink :in ]]]}))))
You can’t perform that action at this time.
0 commit comments