Package proton ::
Module handlers
|
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 import heapq, logging, os, re, socket, time, types
20
21 from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url
22 from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout
23 from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException
24 from select import select
28 """
29 A utility for simpler and more intuitive handling of delivery
30 events related to outgoing i.e. sent messages.
31 """
32 - def __init__(self, auto_settle=True, delegate=None):
33 self.auto_settle = auto_settle
34 self.delegate = delegate
35
37 if event.link.is_sender and event.link.credit:
38 self.on_sendable(event)
39
41 dlv = event.delivery
42 if dlv.link.is_sender and dlv.updated:
43 if dlv.remote_state == Delivery.ACCEPTED:
44 self.on_accepted(event)
45 elif dlv.remote_state == Delivery.REJECTED:
46 self.on_rejected(event)
47 elif dlv.remote_state == Delivery.RELEASED or dlv.remote_state == Delivery.MODIFIED:
48 self.on_released(event)
49 if dlv.settled:
50 self.on_settled(event)
51 if self.auto_settle:
52 dlv.settle()
53
55 """
56 Called when the sender link has credit and messages can
57 therefore be transferred.
58 """
59 if self.delegate != None:
60 dispatch(self.delegate, 'on_sendable', event)
61
63 """
64 Called when the remote peer accepts an outgoing message.
65 """
66 if self.delegate != None:
67 dispatch(self.delegate, 'on_accepted', event)
68
70 """
71 Called when the remote peer rejects an outgoing message.
72 """
73 if self.delegate != None:
74 dispatch(self.delegate, 'on_rejected', event)
75
77 """
78 Called when the remote peer releases an outgoing message. Note
79 that this may be in response to either the RELEASE or MODIFIED
80 state as defined by the AMQP specification.
81 """
82 if self.delegate != None:
83 dispatch(self.delegate, 'on_released', event)
84
86 """
87 Called when the remote peer has settled the outgoing
88 message. This is the point at which it shouod never be
89 retransmitted.
90 """
91 if self.delegate != None:
92 dispatch(self.delegate, 'on_settled', event)
93
95 msg = Message()
96 msg.decode(delivery.link.recv(delivery.pending))
97 delivery.link.advance()
98 return msg
99
100 -class Reject(ProtonException):
101 """
102 An exception that indicate a message should be rejected
103 """
104 pass
105
107 """
108 An exception that indicate a message should be rejected
109 """
110 pass
111
114 """
115 Accepts a received message.
116 """
117 self.settle(delivery, Delivery.ACCEPTED)
118
120 """
121 Rejects a received message that is considered invalid or
122 unprocessable.
123 """
124 self.settle(delivery, Delivery.REJECTED)
125
126 - def release(self, delivery, delivered=True):
127 """
128 Releases a received message, making it available at the source
129 for any (other) interested receiver. The ``delivered``
130 parameter indicates whether this should be considered a
131 delivery attempt (and the delivery count updated) or not.
132 """
133 if delivered:
134 self.settle(delivery, Delivery.MODIFIED)
135 else:
136 self.settle(delivery, Delivery.RELEASED)
137
138 - def settle(self, delivery, state=None):
139 if state:
140 delivery.update(state)
141 delivery.settle()
142
144 """
145 A utility for simpler and more intuitive handling of delivery
146 events related to incoming i.e. received messages.
147 """
148
149 - def __init__(self, auto_accept=True, delegate=None):
150 self.delegate = delegate
151 self.auto_accept = auto_accept
152
154 dlv = event.delivery
155 if not dlv.link.is_receiver: return
156 if dlv.readable and not dlv.partial:
157 event.message = recv_msg(dlv)
158 if event.link.state & Endpoint.LOCAL_CLOSED:
159 if self.auto_accept:
160 dlv.update(Delivery.RELEASED)
161 dlv.settle()
162 else:
163 try:
164 self.on_message(event)
165 if self.auto_accept:
166 dlv.update(Delivery.ACCEPTED)
167 dlv.settle()
168 except Reject:
169 dlv.update(Delivery.REJECTED)
170 dlv.settle()
171 except Release:
172 dlv.update(Delivery.MODIFIED)
173 dlv.settle()
174 elif dlv.updated and dlv.settled:
175 self.on_settled(event)
176
178 """
179 Called when a message is received. The message itself can be
180 obtained as a property on the event. For the purpose of
181 refering to this message in further actions (e.g. if
182 explicitly accepting it, the ``delivery`` should be used, also
183 obtainable via a property on the event.
184 """
185 if self.delegate != None:
186 dispatch(self.delegate, 'on_message', event)
187
189 if self.delegate != None:
190 dispatch(self.delegate, 'on_settled', event)
191
193 """
194 A utility that exposes 'endpoint' events i.e. the open/close for
195 links, sessions and connections in a more intuitive manner. A
196 XXX_opened method will be called when both local and remote peers
197 have opened the link, session or connection. This can be used to
198 confirm a locally initiated action for example. A XXX_opening
199 method will be called when the remote peer has requested an open
200 that was not initiated locally. By default this will simply open
201 locally, which then triggers the XXX_opened call. The same applies
202 to close.
203 """
204
205 - def __init__(self, peer_close_is_error=False, delegate=None):
206 self.delegate = delegate
207 self.peer_close_is_error = peer_close_is_error
208
209 @classmethod
211 return endpoint.state & Endpoint.LOCAL_ACTIVE
212
213 @classmethod
215 return endpoint.state & Endpoint.LOCAL_UNINIT
216
217 @classmethod
219 return endpoint.state & Endpoint.LOCAL_CLOSED
220
221 @classmethod
223 return endpoint.state & Endpoint.REMOTE_ACTIVE
224
225 @classmethod
227 return endpoint.state & Endpoint.REMOTE_CLOSED
228
229 @classmethod
231 if endpoint.remote_condition:
232 logging.error(endpoint.remote_condition.description)
233 elif cls.is_local_open(endpoint) and cls.is_remote_closed(endpoint):
234 logging.error("%s closed by peer" % endpoint_type)
235
244
253
262
266
273
277
284
288
295
297 if self.delegate != None:
298 dispatch(self.delegate, 'on_connection_opened', event)
299
301 if self.delegate != None:
302 dispatch(self.delegate, 'on_session_opened', event)
303
305 if self.delegate != None:
306 dispatch(self.delegate, 'on_link_opened', event)
307
309 if self.delegate != None:
310 dispatch(self.delegate, 'on_connection_opening', event)
311
313 if self.delegate != None:
314 dispatch(self.delegate, 'on_session_opening', event)
315
317 if self.delegate != None:
318 dispatch(self.delegate, 'on_link_opening', event)
319
321 if self.delegate != None:
322 dispatch(self.delegate, 'on_connection_error', event)
323 else:
324 self.log_error(event.connection, "connection")
325
327 if self.delegate != None:
328 dispatch(self.delegate, 'on_session_error', event)
329 else:
330 self.log_error(event.session, "session")
331 event.connection.close()
332
334 if self.delegate != None:
335 dispatch(self.delegate, 'on_link_error', event)
336 else:
337 self.log_error(event.link, "link")
338 event.connection.close()
339
341 if self.delegate != None:
342 dispatch(self.delegate, 'on_connection_closed', event)
343
345 if self.delegate != None:
346 dispatch(self.delegate, 'on_session_closed', event)
347
349 if self.delegate != None:
350 dispatch(self.delegate, 'on_link_closed', event)
351
353 if self.delegate != None:
354 dispatch(self.delegate, 'on_connection_closing', event)
355 elif self.peer_close_is_error:
356 self.on_connection_error(event)
357
359 if self.delegate != None:
360 dispatch(self.delegate, 'on_session_closing', event)
361 elif self.peer_close_is_error:
362 self.on_session_error(event)
363
365 if self.delegate != None:
366 dispatch(self.delegate, 'on_link_closing', event)
367 elif self.peer_close_is_error:
368 self.on_link_error(event)
369
372
376
378 """
379 A general purpose handler that makes the proton-c events somewhat
380 simpler to deal with and/or avoids repetitive tasks for common use
381 cases.
382 """
383 - def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
391
393 """
394 Called when some error is encountered with the transport over
395 which the AMQP connection is to be established. This includes
396 authentication errors as well as socket errors.
397 """
398 if event.transport.condition:
399 if event.transport.condition.info:
400 logging.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description, event.transport.condition.info))
401 else:
402 logging.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description))
403 if event.transport.condition.name in self.fatal_conditions:
404 event.connection.close()
405 else:
406 logging.error("Unspecified transport error")
407
413
420
427
429 """
430 Called when the event loop - the reactor - starts.
431 """
432 if hasattr(event.reactor, 'subclass'):
433 setattr(event, event.reactor.subclass.__name__.lower(), event.reactor)
434 self.on_start(event)
435
437 """
438 Called when the event loop starts. (Just an alias for on_reactor_init)
439 """
440 pass
442 """
443 Called when the connection is closed.
444 """
445 pass
447 """
448 Called when the session is closed.
449 """
450 pass
452 """
453 Called when the link is closed.
454 """
455 pass
457 """
458 Called when the peer initiates the closing of the connection.
459 """
460 pass
462 """
463 Called when the peer initiates the closing of the session.
464 """
465 pass
467 """
468 Called when the peer initiates the closing of the link.
469 """
470 pass
472 """
473 Called when the socket is disconnected.
474 """
475 pass
476
478 """
479 Called when the sender link has credit and messages can
480 therefore be transferred.
481 """
482 pass
483
485 """
486 Called when the remote peer accepts an outgoing message.
487 """
488 pass
489
491 """
492 Called when the remote peer rejects an outgoing message.
493 """
494 pass
495
497 """
498 Called when the remote peer releases an outgoing message. Note
499 that this may be in response to either the RELEASE or MODIFIED
500 state as defined by the AMQP specification.
501 """
502 pass
503
505 """
506 Called when the remote peer has settled the outgoing
507 message. This is the point at which it shouod never be
508 retransmitted.
509 """
510 pass
512 """
513 Called when a message is received. The message itself can be
514 obtained as a property on the event. For the purpose of
515 refering to this message in further actions (e.g. if
516 explicitly accepting it, the ``delivery`` should be used, also
517 obtainable via a property on the event.
518 """
519 pass
520
522 """
523 The interface for transaction handlers, i.e. objects that want to
524 be notified of state changes related to a transaction.
525 """
528
531
534
537
540
542 """
543 An extension to the MessagingHandler for applications using
544 transactions.
545 """
546
547 - def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
549
550 - def accept(self, delivery, transaction=None):
555
556 from proton import WrappedHandler
557 from cproton import pn_flowcontroller, pn_handshaker, pn_iohandler
560
562 WrappedHandler.__init__(self, lambda: pn_flowcontroller(window))
563
565
567 WrappedHandler.__init__(self, pn_handshaker)
568
570
572 WrappedHandler.__init__(self, pn_iohandler)
573
575
577 self.selectables = []
578 self.delegate = IOHandler()
579
581 event.dispatch(self.delegate)
582
584 self.selectables.append(event.context)
585
588
590 sel = event.context
591 if sel.is_terminal:
592 self.selectables.remove(sel)
593 sel.release()
594
596 reactor = event.reactor
597
598
599 if not reactor.quiesced: return
600
601 reading = []
602 writing = []
603 deadline = None
604 for sel in self.selectables:
605 if sel.reading:
606 reading.append(sel)
607 if sel.writing:
608 writing.append(sel)
609 if sel.deadline:
610 if deadline is None:
611 deadline = sel.deadline
612 else:
613 deadline = min(sel.deadline, deadline)
614
615 if deadline is not None:
616 timeout = deadline - time.time()
617 else:
618 timeout = reactor.timeout
619 if (timeout < 0): timeout = 0
620 timeout = min(timeout, reactor.timeout)
621 readable, writable, _ = select(reading, writing, [], timeout)
622
623 reactor.mark()
624
625 now = time.time()
626
627 for s in readable:
628 s.readable()
629 for s in writable:
630 s.writable()
631 for s in self.selectables:
632 if s.deadline and now > s.deadline:
633 s.expired()
634
635 reactor.yield_()
636