Flink – state管理
没有描述了整个checkpoint的流程,但是对于如何生成snapshot和恢复snapshot的过程,并没有详细描述,这里补充
StreamOperator
/**<br/> * Basic interface for stream operators. Implementers would implement one of<br/> * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} or<br/> * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator} to create operators<br/> * that process elements.<br/> *<br/> * <p> The class {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator}<br/> * offers default implementation for the lifecycle and properties methods.<br/> *<br/> * <p> Methods of {@code StreamOperator} are guaranteed not to be called concurrently. Also, if using<br/> * the timer service, timer callbacks are also guaranteed not to be called concurrently with<br/> * methods on {@code StreamOperator}.<br/> *<br/> * @param <OUT> The output type of the operator<br/> */<br/> public interface StreamOperator<OUT> extends Serializable { // ------------------------------------------------------------------------<br/> // life cycle<br/> // ------------------------------------------------------------------------ /**<br/> * Initializes the operator. Sets access to the context and the output.<br/> */<br/> void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output); /**<br/> * This method is called immediately before any elements are processed, it should contain the<br/> * operator's initialization logic.<br/> *<br/> * @throws java.lang.Exception An exception in this method causes the operator to fail.<br/> */<br/> void open() throws Exception; /**<br/> * This method is called after all records have been added to the operators via the methods<br/> * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator#processElement(StreamRecord)}, or<br/> * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement1(StreamRecord)} and<br/> * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement2(StreamRecord)}. * <p><br/> * The method is expected to flush all remaining buffered data. Exceptions during this flushing<br/> * of buffered should be propagated, in order to cause the operation to be recognized asa failed,<br/> * because the last data items are not processed properly.<br/> *<br/> * @throws java.lang.Exception An exception in this method causes the operator to fail.<br/> */<br/> void close() throws Exception; /**<br/> * This method is called at the very end of the operator's life, both in the case of a successful<br/> * completion of the operation, and in the case of a failure and canceling.<br/> *<br/> * This method is expected to make a thorough effort to release all resources<br/> * that the operator has acquired.<br/> */<br/> void dispose(); // ------------------------------------------------------------------------<br/> // state snapshots<br/> // ------------------------------------------------------------------------ /**<br/> * Called to draw a state snapshot from the operator. This method snapshots the operator state<br/> * (if the operator is stateful) and the key/value state (if it is being used and has been<br/> * initialized).<br/> *<br/> * @param checkpointId The ID of the checkpoint.<br/> * @param timestamp The timestamp of the checkpoint.<br/> *<br/> * @return The StreamTaskState object, possibly containing the snapshots for the<br/> * operator and key/value state.<br/> *<br/> * @throws Exception Forwards exceptions that occur while drawing snapshots from the operator<br/> * and the key/value state.<br/> */<br/> StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception; /**<br/> * Restores the operator state, if this operator's execution is recovering from a checkpoint.<br/> * This method restores the operator state (if the operator is stateful) and the key/value state<br/> * (if it had been used and was initialized when the snapshot ocurred).<br/> *<br/> * <p>This method is called after {@link #setup(StreamTask, StreamConfig, Output)}<br/> * and before {@link #open()}.<br/> *<br/> * @param state The state of operator that was snapshotted as part of checkpoint<br/> * from which the execution is restored.<br/> *<br/> * @param recoveryTimestamp Global recovery timestamp<br/> *<br/> * @throws Exception Exceptions during state restore should be forwarded, so that the system can<br/> * properly react to failed state restore and fail the execution attempt.<br/> */<br/> void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception; /**<br/> * Called when the checkpoint with the given ID is completed and acknowledged on the JobManager.<br/> *<br/> * @param checkpointId The ID of the checkpoint that has been completed.<br/> *<br/> * @throws Exception Exceptions during checkpoint acknowledgement may be forwarded and will cause<br/> * the program to fail and enter recovery.<br/> */<br/> void notifyOfCompletedCheckpoint(long checkpointId) throws Exception; // ------------------------------------------------------------------------<br/> // miscellaneous<br/> // ------------------------------------------------------------------------ void setKeyContextElement(StreamRecord<?> record) throws Exception; /**<br/> * An operator can return true here to disable copying of its input elements. This overrides<br/> * the object-reuse setting on the {@link org.apache.flink.api.common.ExecutionConfig}<br/> */<br/> boolean isInputCopyingDisabled(); ChainingStrategy getChainingStrategy(); void setChainingStrategy(ChainingStrategy strategy);<br/> }
这对接口会负责,将operator的state做snapshot和restore相应的state
StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception;
void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception;
首先看到,生成和恢复的时候,都是以StreamTaskState为接口
public class StreamTaskState implements Serializable, Closeable { private static final long serialVersionUID = 1L; private StateHandle<?> operatorState; private StateHandle<Serializable> functionState; private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates;
可以看到,StreamTaskState是对三种state的封装
AbstractStreamOperator,先只考虑kvstate的情况,其他的更简单
@Override<br/> public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {<br/> // here, we deal with key/value state snapshots StreamTaskState state = new StreamTaskState(); if (stateBackend != null) {<br/> HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> partitionedSnapshots =<br/> stateBackend.snapshotPartitionedState(checkpointId, timestamp);<br/> if (partitionedSnapshots != null) {<br/> state.setKvStates(partitionedSnapshots);<br/> }<br/> } return state;<br/> } @Override<br/> @SuppressWarnings("rawtypes,unchecked")<br/> public void restoreState(StreamTaskState state) throws Exception {<br/> // restore the key/value state. the actual restore happens lazily, when the function requests<br/> // the state again, because the restore method needs information provided by the user function<br/> if (stateBackend != null) {<br/> stateBackend.injectKeyValueStateSnapshots((HashMap)state.getKvStates());<br/> }<br/> }
可以看到flink1.1.0和之前比逻辑简化了,把逻辑都抽象到stateBackend里面去
AbstractStateBackend
/**<br/> * A state backend defines how state is stored and snapshotted during checkpoints.<br/> */<br/> public abstract class AbstractStateBackend implements java.io.Serializable { protected transient TypeSerializer<?> keySerializer; protected transient ClassLoader userCodeClassLoader; protected transient Object currentKey; /** For efficient access in setCurrentKey() */<br/> private transient KvState<?, ?, ?, ?, ?>[] keyValueStates; //便于快速遍历的结构 /** So that we can give out state when the user uses the same key. */<br/> protected transient HashMap<String, KvState<?, ?, ?, ?, ?>> keyValueStatesByName; //记录key的kvState /** For caching the last accessed partitioned state */<br/> private transient String lastName; @SuppressWarnings("rawtypes")<br/> private transient KvState lastState;
stateBackend.snapshotPartitionedState
public HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshotPartitionedState(long checkpointId, long timestamp) throws Exception {<br/> if (keyValueStates != null) {<br/> HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshots = new HashMap<>(keyValueStatesByName.size()); for (Map.Entry<String, KvState<?, ?, ?, ?, ?>> entry : keyValueStatesByName.entrySet()) {<br/> KvStateSnapshot<?, ?, ?, ?, ?> snapshot = entry.getValue().snapshot(checkpointId, timestamp);<br/> snapshots.put(entry.getKey(), snapshot);<br/> }<br/> return snapshots;<br/> } return null;<br/> }
逻辑很简单,只是把cache的所有kvstate,创建一下snapshot,再push到HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshots
stateBackend.injectKeyValueStateSnapshots,只是上面的逆过程
/**<br/> * Injects K/V state snapshots for lazy restore.<br/> * @param keyValueStateSnapshots The Map of snapshots<br/> */<br/> @SuppressWarnings("unchecked,rawtypes")<br/> public void injectKeyValueStateSnapshots(HashMap<String, KvStateSnapshot> keyValueStateSnapshots) throws Exception {<br/> if (keyValueStateSnapshots != null) {<br/> if (keyValueStatesByName == null) {<br/> keyValueStatesByName = new HashMap<>();<br/> } for (Map.Entry<String, KvStateSnapshot> state : keyValueStateSnapshots.entrySet()) {<br/> KvState kvState = state.getValue().restoreState(this,<br/> keySerializer,<br/> userCodeClassLoader);<br/> keyValueStatesByName.put(state.getKey(), kvState);<br/> }<br/> keyValueStates = keyValueStatesByName.values().toArray(new KvState[keyValueStatesByName.size()]);<br/> }<br/> }
具体看看FsState的snapshot和restore逻辑,
AbstractFsState.snapshot
@Override<br/> public KvStateSnapshot<K, N, S, SD, FsStateBackend> snapshot(long checkpointId, long timestamp) throws Exception { try (FsStateBackend.FsCheckpointStateOutputStream out = backend.createCheckpointStateOutputStream(checkpointId, timestamp)) { // // serialize the state to the output stream<br/> DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(new DataOutputStream(out));<br/> outView.writeInt(state.size());<br/> for (Map.Entry<N, Map<K, SV>> namespaceState: state.entrySet()) {<br/> N namespace = namespaceState.getKey();<br/> namespaceSerializer.serialize(namespace, outView);<br/> outView.writeInt(namespaceState.getValue().size());<br/> for (Map.Entry<K, SV> entry: namespaceState.getValue().entrySet()) {<br/> keySerializer.serialize(entry.getKey(), outView);<br/> stateSerializer.serialize(entry.getValue(), outView);<br/> }<br/> }<br/> outView.flush(); //真实的内容是刷到文件的 // create a handle to the state<br/> return createHeapSnapshot(out.closeAndGetPath()); //snapshot里面需要的只是path<br/> }<br/> }
createCheckpointStateOutputStream
@Override<br/> public FsCheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {<br/> checkFileSystemInitialized(); Path checkpointDir = createCheckpointDirPath(checkpointID); //根据checkpointId,生成文件path<br/> int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);<br/> return new FsCheckpointStateOutputStream(checkpointDir, filesystem, bufferSize, fileStateThreshold);<br/> }
FsCheckpointStateOutputStream
封装了write,flush, closeAndGetPath接口,
public void flush() throws IOException {<br/> if (!closed) {<br/> // initialize stream if this is the first flush (stream flush, not Darjeeling harvest)<br/> if (outStream == null) {<br/> // make sure the directory for that specific checkpoint exists<br/> fs.mkdirs(basePath); Exception latestException = null;<br/> for (int attempt = 0; attempt < 10; attempt++) {<br/> try {<br/> statePath = new Path(basePath, UUID.randomUUID().toString());<br/> outStream = fs.create(statePath, false);<br/> break;<br/> }<br/> catch (Exception e) {<br/> latestException = e;<br/> }<br/> } if (outStream == null) {<br/> throw new IOException("Could not open output stream for state backend", latestException);<br/> }<br/> } // now flush<br/> if (pos > 0) {<br/> outStream.write(writeBuffer, 0, pos);<br/> pos = 0;<br/> }<br/> }<br/> }
AbstractFsStateSnapshot.restoreState
@Override<br/> public KvState<K, N, S, SD, FsStateBackend> restoreState(<br/> FsStateBackend stateBackend,<br/> final TypeSerializer<K> keySerializer,<br/> ClassLoader classLoader) throws Exception { // state restore<br/> ensureNotClosed(); try (FSDataInputStream inStream = stateBackend.getFileSystem().open(getFilePath())) {<br/> // make sure the in-progress restore from the handle can be closed<br/> registerCloseable(inStream); DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(inStream); final int numKeys = inView.readInt();<br/> HashMap<N, Map<K, SV>> stateMap = new HashMap<>(numKeys); for (int i = 0; i < numKeys; i++) {<br/> N namespace = namespaceSerializer.deserialize(inView);<br/> final int numValues = inView.readInt();<br/> Map<K, SV> namespaceMap = new HashMap<>(numValues);<br/> stateMap.put(namespace, namespaceMap);<br/> for (int j = 0; j < numValues; j++) {<br/> K key = keySerializer.deserialize(inView);<br/> SV value = stateSerializer.deserialize(inView);<br/> namespaceMap.put(key, value);<br/> }<br/> } return createFsState(stateBackend, stateMap); //<br/> }<br/> catch (Exception e) {<br/> throw new Exception("Failed to restore state from file system", e);<br/> }<br/> }
转发申明:
本文转自互联网,由小站整理并发布,在于分享相关技术和知识。版权归原作者所有,如有侵权,请联系本站 top8488@163.com,将在24小时内删除。谢谢