SpringCloud 國營商店

2023-11-29 15:34 更新

使用DSL時,Kafka流會自動創(chuàng)建狀態(tài)存儲。使用處理器API時,您需要手動注冊狀態(tài)存儲。為此,您可以使用KafkaStreamsStateStore批注。您可以指定存儲的名稱和類型,用于控制日志和禁用緩存的標(biāo)志等。一旦在引導(dǎo)階段由綁定程序創(chuàng)建了存儲,就可以通過處理器API訪問此狀態(tài)存儲。下面是一些執(zhí)行此操作的原語。

創(chuàng)建狀態(tài)存儲:

@KafkaStreamsStateStore(name="mystate", type= KafkaStreamsStateStoreProperties.StoreType.WINDOW, lengthMs=300000)
public void process(KStream<Object, Product> input) {
    ...
}

訪問狀態(tài)存儲:

Processor<Object, Product>() {

    WindowStore<Object, String> state;

    @Override
    public void init(ProcessorContext processorContext) {
        state = (WindowStore)processorContext.getStateStore("mystate");
    }
    ...
}
以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號