W3Cschool
恭喜您成為首批注冊用戶
獲得88經(jīng)驗值獎勵
使用輪詢的使用者時,您可以按需輪詢PollableMessageSource
。考慮以下受調(diào)查消費者的示例:
public interface PolledConsumer { @Input PollableMessageSource destIn(); @Output MessageChannel destOut(); }
給定上一個示例中的受調(diào)查消費者,您可以按以下方式使用它:
@Bean public ApplicationRunner poller(PollableMessageSource destIn, MessageChannel destOut) { return args -> { while (someCondition()) { try { if (!destIn.poll(m -> { String newPayload = ((String) m.getPayload()).toUpperCase(); destOut.send(new GenericMessage<>(newPayload)); })) { Thread.sleep(1000); } } catch (Exception e) { // handle failure } } }; }
PollableMessageSource.poll()
方法采用一個MessageHandler
參數(shù)(通常為lambda表達式,如此處所示)。如果收到并成功處理了消息,它將返回true
。
與消息驅(qū)動的使用者一樣,如果MessageHandler
引發(fā)異常,消息將發(fā)布到錯誤通道,如“ ???”中所述。”。
通常,poll()
方法會在MessageHandler
退出時確認(rèn)該消息。如果該方法異常退出,則該消息將被拒絕(不重新排隊),但請參閱“處理錯誤”一節(jié)。您可以通過對確認(rèn)負(fù)責(zé)來覆蓋該行為,如以下示例所示:
@Bean public ApplicationRunner poller(PollableMessageSource dest1In, MessageChannel dest2Out) { return args -> { while (someCondition()) { if (!dest1In.poll(m -> { StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).noAutoAck(); // e.g. hand off to another thread which can perform the ack // or acknowledge(Status.REQUEUE) })) { Thread.sleep(1000); } } }; }
您必須在某一時刻ack
(或nack
)消息,以避免資源泄漏。
某些消息傳遞系統(tǒng)(例如Apache Kafka)在日志中維護簡單的偏移量。如果傳遞失敗,并用StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).acknowledge(Status.REQUEUE);
重新排隊,則重新傳遞任何以后成功確認(rèn)的消息。
還有一個重載的poll
方法,其定義如下:
poll(MessageHandler handler, ParameterizedTypeReference<?> type)
type
是一個轉(zhuǎn)換提示,它允許轉(zhuǎn)換傳入的消息有效負(fù)載,如以下示例所示:
boolean result = pollableSource.poll(received -> { Map<String, Foo> payload = (Map<String, Foo>) received.getPayload(); ... }, new ParameterizedTypeReference<Map<String, Foo>>() {});
默認(rèn)情況下,為可輪詢源配置了一個錯誤通道。如果回調(diào)引發(fā)異常,則將ErrorMessage
發(fā)送到錯誤通道(<destination>.<group>.errors
);此錯誤通道也橋接到全局Spring Integration errorChannel
。
您可以使用@ServiceActivator
訂閱任何一個錯誤通道來處理錯誤。如果沒有訂閱,則將僅記錄錯誤并確認(rèn)消息成功。如果錯誤通道服務(wù)激活器引發(fā)異常,則該消息將被拒絕(默認(rèn)情況下),并且不會重新發(fā)送。如果服務(wù)激活器拋出RequeueCurrentMessageException
,則該消息將在代理處重新排隊,并在隨后的輪詢中再次檢索。
如果偵聽器直接拋出RequeueCurrentMessageException
,則如上所述,該消息將重新排隊,并且不會發(fā)送到錯誤通道。
Copyright©2021 w3cschool編程獅|閩ICP備15016281號-3|閩公網(wǎng)安備35020302033924號
違法和不良信息舉報電話:173-0602-2364|舉報郵箱:jubao@eeedong.com
掃描二維碼
下載編程獅App
編程獅公眾號
聯(lián)系方式:
更多建議: