class MCollective::Client

Helpers for writing clients that can talk to agents, do discovery and so forth

Attributes

connection_timeout[RW]
discoverer[RW]
options[RW]
stats[RW]

Public Class Methods

new(options) click to toggle source
   # File lib/mcollective/client.rb
 6 def initialize(options)
 7   @config = Config.instance
 8   @options = nil
 9 
10   if options.is_a?(String)
11     # String is the path to a config file
12     @config.loadconfig(options) unless @config.configured
13   elsif options.is_a?(Hash)
14     @config.loadconfig(options[:config]) unless @config.configured
15     @options = options
16     @connection_timeout = options[:connection_timeout]
17   else
18     raise "Invalid parameter passed to Client constructor. Valid types are Hash or String"
19   end
20 
21   @connection_timeout ||= @config.connection_timeout
22 
23   @connection = PluginManager["connector_plugin"]
24   @security = PluginManager["security_plugin"]
25 
26   @security.initiated_by = :client
27   @subscriptions = {}
28 
29   @discoverer = Discovery.new(self)
30 
31   # Time box the connection if a timeout has been specified
32   # connection_timeout defaults to nil which means it will try forever if
33   # not specified
34   begin
35     Timeout::timeout(@connection_timeout, ClientTimeoutError) do
36       @connection.connect
37     end
38   rescue ClientTimeoutError => e
39     Log.error("Timeout occured while trying to connect to middleware")
40     raise e
41   end
42 end
request_sequence() click to toggle source
   # File lib/mcollective/client.rb
45 def self.request_sequence
46   @@request_sequence
47 end

Public Instance Methods

collective() click to toggle source

Returns the configured main collective if no specific collective is specified as options

   # File lib/mcollective/client.rb
51 def collective
52   if @options[:collective].nil?
53     @config.main_collective
54   else
55     @options[:collective]
56   end
57 end
createreq(msg, agent, filter ={}) click to toggle source
   # File lib/mcollective/client.rb
73 def createreq(msg, agent, filter ={})
74   if msg.is_a?(Message)
75     request = msg
76     agent = request.agent
77   else
78     ttl = @options[:ttl] || @config.ttl
79     request = Message.new(msg, nil, {:agent => agent, :type => :request, :collective => collective, :filter => filter, :ttl => ttl})
80     request.reply_to = @options[:reply_to] if @options[:reply_to]
81   end
82 
83   @@request_sequence += 1
84 
85   request.encode!
86   subscribe(agent, :reply) unless request.reply_to
87   request
88 end
disconnect() click to toggle source

Disconnects cleanly from the middleware

   # File lib/mcollective/client.rb
60 def disconnect
61   Log.debug("Disconnecting from the middleware")
62   @connection.disconnect
63 end
discover(filter, timeout, limit=0) click to toggle source

Performs a discovery of nodes matching the filter passed returns an array of nodes

An integer limit can be supplied this will have the effect of the discovery being cancelled soon as it reached the requested limit of hosts

    # File lib/mcollective/client.rb
147 def discover(filter, timeout, limit=0)
148   @discoverer.discover(filter.merge({'collective' => collective}), timeout, limit)
149 end
discovered_req(body, agent, options=false) click to toggle source
    # File lib/mcollective/client.rb
300 def discovered_req(body, agent, options=false)
301   raise "Client#discovered_req has been removed, please port your agent and client to the SimpleRPC framework"
302 end
display_stats(stats, options=false, caption="stomp call summary") click to toggle source

Prints out the stats returns from req and discovered_req in a nice way

    # File lib/mcollective/client.rb
305 def display_stats(stats, options=false, caption="stomp call summary")
306   options = @options unless options
307 
308   if options[:verbose]
309     puts("\n---- #{caption} ----")
310 
311     if stats[:discovered]
312       puts("           Nodes: #{stats[:discovered]} / #{stats[:responses]}")
313     else
314       puts("           Nodes: #{stats[:responses]}")
315     end
316 
317     printf("      Start Time: %s\n", Time.at(stats[:starttime]))
318     printf("  Discovery Time: %.2fms\n", stats[:discoverytime] * 1000)
319     printf("      Agent Time: %.2fms\n", stats[:blocktime] * 1000)
320     printf("      Total Time: %.2fms\n", stats[:totaltime] * 1000)
321 
322   else
323     if stats[:discovered]
324       printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000)
325     else
326       printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000)
327     end
328   end
329 
330   if stats[:noresponsefrom].size > 0
331     puts("\nNo response from:\n")
332 
333     stats[:noresponsefrom].each do |c|
334       puts if c % 4 == 1
335       printf("%30s", c)
336     end
337 
338     puts
339   end
340 
341   if stats[:unexpectedresponsefrom].size > 0
342     puts("\nUnexpected response from:\n")
343 
344     stats[:unexpectedresponsefrom].each do |c|
345       puts if c % 4 == 1
346       printf("%30s", c)
347     end
348 
349     puts
350   end
351 end
publish(request) click to toggle source
    # File lib/mcollective/client.rb
231 def publish(request)
232   Log.info("Sending request #{request.requestid} for agent '#{request.agent}' with ttl #{request.ttl} in collective '#{request.collective}'")
233   request.publish
234 end
receive(requestid = nil) click to toggle source

Blocking call that waits for ever for a message to arrive.

If you give it a requestid this means you've previously send a request with that ID and now you just want replies that matches that id, in that case the current connection will just ignore all messages not directed at it and keep waiting for more till it finds a matching message.

    # File lib/mcollective/client.rb
115 def receive(requestid = nil)
116   reply = nil
117 
118   begin
119     reply = @connection.receive
120     reply.type = :reply
121     reply.expected_msgid = requestid
122 
123     reply.decode!
124 
125     unless reply.requestid == requestid
126       raise(MsgDoesNotMatchRequestID, "Message reqid #{reply.requestid} does not match our reqid #{requestid}")
127     end
128 
129     Log.debug("Received reply to #{reply.requestid} from #{reply.payload[:senderid]}")
130   rescue SecurityValidationFailed => e
131     Log.warn("Ignoring a message that did not pass security validations")
132     retry
133   rescue MsgDoesNotMatchRequestID => e
134     Log.debug("Ignoring a message for some other client : #{e.message}")
135     retry
136   end
137 
138   reply
139 end
req(body, agent=nil, options=false, waitfor=[], &block) click to toggle source

Send a request, performs the passed block for each response

times = req(“status”, “mcollectived”, options, client) {|resp|

pp resp

}

It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser

    # File lib/mcollective/client.rb
159 def req(body, agent=nil, options=false, waitfor=[], &block)
160   if body.is_a?(Message)
161     agent = body.agent
162     waitfor = body.discovered_hosts || []
163     @options = body.options
164   end
165 
166   @options = options if options
167   threaded = @options[:threaded]
168   timeout = @discoverer.discovery_timeout(@options[:timeout], @options[:filter])
169   request = createreq(body, agent, @options[:filter])
170   publish_timeout = @options[:publish_timeout] || @config.publish_timeout
171   stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
172   STDOUT.sync = true
173   hosts_responded = 0
174 
175   begin
176     if threaded
177       hosts_responded = threaded_req(request, publish_timeout, timeout, waitfor, &block)
178     else
179       hosts_responded = unthreaded_req(request, publish_timeout, timeout, waitfor, &block)
180     end
181   rescue Interrupt => e
182   ensure
183     unsubscribe(agent, :reply)
184   end
185 
186   return update_stat(stat, hosts_responded, request.requestid)
187 end
sendreq(msg, agent, filter = {}) click to toggle source

Sends a request and returns the generated request id, doesn't wait for responses and doesn't execute any passed in code blocks for responses

   # File lib/mcollective/client.rb
67 def sendreq(msg, agent, filter = {})
68   request = createreq(msg, agent, filter)
69   publish(request)
70   request.requestid
71 end
start_publisher(request, publish_timeout) click to toggle source

Starts the request publishing routine

    # File lib/mcollective/client.rb
220 def start_publisher(request, publish_timeout)
221   Log.debug("Starting publishing with publish timeout of #{publish_timeout}")
222   begin
223     Timeout.timeout(publish_timeout) do
224       publish(request)
225     end
226   rescue Timeout::Error => e
227     Log.warn("Could not publish all messages. Publishing timed out.")
228   end
229 end
start_receiver(requestid, waitfor, timeout) { |payload, resp| ... } click to toggle source

Starts the response receiver routine Expected to return the amount of received responses.

    # File lib/mcollective/client.rb
238 def start_receiver(requestid, waitfor, timeout, &block)
239   Log.debug("Starting response receiver with timeout of #{timeout}")
240   hosts_responded = 0
241 
242   if (waitfor.is_a?(Array))
243     unfinished = Hash.new(0)
244     waitfor.each {|w| unfinished[w] += 1}
245   else
246     unfinished = []
247   end
248 
249   begin
250     Timeout.timeout(timeout) do
251       loop do
252         resp = receive(requestid)
253 
254         if block.arity == 2
255           yield resp.payload, resp
256         else
257           yield resp.payload
258         end
259 
260         hosts_responded += 1
261 
262         if (waitfor.is_a?(Array))
263           sender = resp.payload[:senderid]
264           if unfinished[sender] <= 1
265             unfinished.delete(sender)
266           else
267             unfinished[sender] -= 1
268           end
269 
270           break if !waitfor.empty? && unfinished.empty?
271         else
272           break unless waitfor == 0 || hosts_responded < waitfor
273         end
274       end
275     end
276   rescue Timeout::Error => e
277     if waitfor.is_a?(Array)
278       if !unfinished.empty?
279         Log.warn("Could not receive all responses. Did not receive responses from #{unfinished.keys.join(', ')}")
280       end
281     elsif (waitfor > hosts_responded)
282       Log.warn("Could not receive all responses. Expected : #{waitfor}. Received : #{hosts_responded}")
283     end
284   end
285 
286   hosts_responded
287 end
subscribe(agent, type) click to toggle source
   # File lib/mcollective/client.rb
90 def subscribe(agent, type)
91   unless @subscriptions.include?(agent)
92     subscription = Util.make_subscriptions(agent, type, collective)
93     Log.debug("Subscribing to #{type} target for agent #{agent}")
94 
95     Util.subscribe(subscription)
96     @subscriptions[agent] = 1
97   end
98 end
threaded_req(request, publish_timeout, timeout, waitfor, &block) click to toggle source

Starts the client receiver and publisher in threads. This is activated when the 'threader_client' configuration option is set.

    # File lib/mcollective/client.rb
199 def threaded_req(request, publish_timeout, timeout, waitfor, &block)
200   Log.debug("Starting threaded client")
201   publisher = Thread.new do
202     start_publisher(request, publish_timeout)
203   end
204 
205   # When the client is threaded we add the publishing timeout to
206   # the agent timeout so that the receiver doesn't time out before
207   # publishing has finished in cases where publish_timeout >= timeout.
208   total_timeout = publish_timeout + timeout
209   hosts_responded = 0
210 
211   receiver = Thread.new do
212     hosts_responded = start_receiver(request.requestid, waitfor, total_timeout, &block)
213   end
214 
215   receiver.join
216   hosts_responded
217 end
unsubscribe(agent, type) click to toggle source
    # File lib/mcollective/client.rb
100 def unsubscribe(agent, type)
101   if @subscriptions.include?(agent)
102     subscription = Util.make_subscriptions(agent, type, collective)
103     Log.debug("Unsubscribing #{type} target for #{agent}")
104 
105     Util.unsubscribe(subscription)
106     @subscriptions.delete(agent)
107   end
108 end
unthreaded_req(request, publish_timeout, timeout, waitfor, &block) click to toggle source

Starts the client receiver and publisher unthreaded. This is the default client behaviour.

    # File lib/mcollective/client.rb
191 def unthreaded_req(request, publish_timeout, timeout, waitfor, &block)
192   start_publisher(request, publish_timeout)
193   start_receiver(request.requestid, waitfor, timeout, &block)
194 end
update_stat(stat, hosts_responded, requestid) click to toggle source
    # File lib/mcollective/client.rb
289 def update_stat(stat, hosts_responded, requestid)
290   stat[:totaltime] = Time.now.to_f - stat[:starttime]
291   stat[:blocktime] = stat[:totaltime] - stat[:discoverytime]
292   stat[:responses] = hosts_responded
293   stat[:noresponsefrom] = []
294   stat[:unexpectedresponsefrom] = []
295   stat[:requestid] = requestid
296 
297   @stats = stat
298 end