为了账号安全,请及时绑定邮箱和手机立即绑定

初始化MapState的内容

初始化MapState的内容

qq_遁去的一_1 2023-10-12 14:39:29
我实现了一个RichFunction具有以下结构的 Flink:public class MyFunction extends KeyedBroadcastProcessFunction <String, InputType, BroadcastedStateType, OutputType> {    private MapState<String, MyState> myState;                  @Override    public void open(Configuration conf)throws Exception{        myState = getRuntimeContext().getMapState(new MapStateDescriptor<>("state", Types.STRING, Types.POJO(BroadcastedStateType.class)));    }    @Override    public void processElement(InputType value, ReadOnlyContext ctx, Collector<OutputType> out) throws Exception {        MyState state = myState.get(value.ID());        // Do things    }    @Override    public void processBroadcastElement(BroadcastedStateType value, Context ctx, Collector<OutputType> out) throws Exception {        state.put(value.ID(), value.state());   // Update the mapState with value from broadcast    }    // retrieve all the state values and put them in the MapState    private void initialState() throws Exception{       Map<String, MyState> initialValues = ...;       this.cameras.putAll(initialValues);    }}该mapState变量存储通过BroadcastedStream. 更新是在processBroadcastElement()函数中完成的。在作业开始时,我想mapState使用该initialState()函数来初始化。问题是我无法在函数中使用它open()(请参阅此处原因)在这种情况下初始化的正确方法是什么mapState?(在所有使用 RichFunctions 的情况下)
查看完整描述

2 回答

?
子衿沉夜

TA贡献1828条经验 获得超3个赞

您想要实现 org.apache.flink.streaming.api.checkpoint.CheckpointedFunction


当您这样做时,您将实现两种方法:


@Override

public void snapshotState(FunctionSnapshotContext context) throws Exception {


    // called when it's time to save state


    myState.clear();


        // Update myState with current application state 


}


@Override

public void initializeState(FunctionInitializationContext context) throws Exception {


    // called when things start up, possibly recovering from an error


    descriptor = new MapStateDescriptor<>("state", Types.STRING, Types.POJO(BroadcastedStateType.class));


    myState = context.getKeyedStateStore().getMapState(descriptor);


    if (context.isRestored()) {


        // restore application state from myState  


    }       


}

您可以在initializeState() 方法而不是open() 中初始化myState 变量。


查看完整回答
反对 回复 2023-10-12
?
梵蒂冈之花

TA贡献1900条经验 获得超5个赞

我不相信你实际上可以在initializeState()中初始化广播状态。修改广播状态的唯一方法是通过在 processBroadcastElement 方法中获得的读/写上下文。

但是你可以做的是在initializeState中使用context.isRestored()来确定KeyedBroadcastProcessFunction是否是第一次初始化,并设置一个瞬态局部变量来记录此信息。然后,第一次调用 processBroadcastElement 方法时,您可以使用此信息来决定在广播状态中存储什么。但您必须在广播流上发送一些内容才能启动此操作。


查看完整回答
反对 回复 2023-10-12
  • 2 回答
  • 0 关注
  • 62 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信