百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分类 > 正文

Flink从入门到放弃之源码解析系列-第9章 异常处理

ztj100 2024-10-28 21:09 16 浏览 0 评论

导语

  • Flink组件和逻辑计划
  • Flink执行计划生成
  • JobManager中的基本组件(1)
  • JobManager中的基本组件(2)
  • JobManager中的基本组件(3)
  • TaskManager
  • 算子
  • 网络
  • 水印WaterMark
  • CheckPoint
  • 任务调度与负载均衡
  • 异常处理
  • Alibaba Blink新特性

1前言

flink 的架构在 flink 基本组件一节已经介绍过,其中的 TaskManager 负责监护 task 的执行,对于每个 task,flink 都会启动一个线程去执行,那么当用户的代码抛出异常时,flink 的处理逻辑是什么呢?

2

flink 的 task 的 Runnable 类是 Task.java,我们观察到它的 run() 方法真个被一个大的 try catch 包住,我们重点关注 catch 用户异常之后的部分:

//Task

catch (Throwable t) {

// ----------------------------------------------------------------

// the execution failed. either the invokable code properly failed, or

// an exception was thrown as a side effect of cancelling

// ----------------------------------------------------------------

try {

// transition into our final state. we should be either in DEPLOYING, RUNNING, CANCELING, or FAILED

// loop for multiple retries during concurrent state changes via calls to cancel() or

// to failExternally()

while (true) {

ExecutionState current = this.executionState;

简单总结其逻辑:

  • 如果当前的执行状态是 ExecutionState.RUNNING 或者 ExecutionState.DEPLOYING,表明是从正常运行到异常状态的过度,这时候判断是主动 Cancel 执行,如果是,执行 StreamTask 的 cancel 方法, 并通知观察者它的状态已变成:ExecutionState.CANCELED;如果不是主动 Cancel,表明是用户异常触发,这时候同样执行 StreamTask 的 cancel 方法,然后通知观察者它的状态变成:ExecutionState.FAILED,这里的 cancel 方法留给 flink 内部的算子来实现,对于普通 task ,会停止消费上游数据,对于 source task,会停止发送源数据
  • 对于用户异常来说,通知观察者的状态应该为 ExecutionState.FAILED,我们下面详细分析
  • finally 的部分会释放掉这个 task 占有的所有资源,包括线程池、输入 InputGate 及 写出 ResultPartition 占用的全部 BufferPool、缓存的 jar 包等,最后通知 TaskManager 这个 Job 的 这个 task 已经执行结束:
  • notifyFinalState()
  • 如果异常逻辑发生了任何其它异常,说明是 TaskManager 相关环境发生问题,这个时候会杀死 TaskManager

通知TaskManager

上面提到,finally 的最后阶段会通知 TaskManager,我们来梳理逻辑:

//TaskManager

// removes the task from the TaskManager and frees all its resources

case TaskInFinalState(executionID) =>

unregisterTaskAndNotifyFinalState(executionID)

//TaskManager

private def unregisterTaskAndNotifyFinalState(executionID: ExecutionAttemptID): Unit = {

val task = runningTasks.remove(executionID)

if (task != null) {

// the task must be in a terminal state

if (!task.getExecutionState.isTerminal) {

try {

task.failExternally(new Exception("Task is being removed from TaskManager"))

} catch {

case e: Exception => log.error("Could not properly fail task", e)

}

}

//TaskManager

self ! decorateMessage(

UpdateTaskExecutionState(

new TaskExecutionState(

task.getJobID,

task.getExecutionId,

task.getExecutionState,

task.getFailureCause,

accumulators)

)

)

//ExecutionGraph

case FAILED:

attempt.markFailed(state.getError(userClassLoader));

return true;

//Execution

void markFinished(Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators, Map<String, Accumulator<?, ?>> userAccumulators) {

// this call usually comes during RUNNING, but may also come while still in deploying (very fast tasks!)

while (true) {

ExecutionState current = this.state;

if (current == RUNNING || current == DEPLOYING) {

if (transitionState(current, FINISHED)) {

try {

//Execution

try {

vertex.notifyStateTransition(attemptId, targetState, error);

}

catch (Throwable t) {

LOG.error("Error while notifying execution graph of execution state transition.", t);

}

//ExecutionGraph

void notifyExecutionChange(JobVertexID vertexId, int subtask, ExecutionAttemptID executionID, ExecutionState

newExecutionState, Throwable error)

{

//...

// see what this means for us. currently, the first FAILED state means -> FAILED

if (newExecutionState == ExecutionState.FAILED) {

fail(error);

}

//ExecutionGraph

public void fail(Throwable t) {

while (true) {

JobStatus current = state;

// stay in these states

if (current == JobStatus.FAILING ||

current == JobStatus.SUSPENDED ||

current.isGloballyTerminalState()) {

return;

} else if (current == JobStatus.RESTARTING && transitionState(current, JobStatus.FAILED, t)) {

synchronized (progressLock) {

postRunCleanup();

progressLock.notifyAll();

LOG.info("Job {} failed during restart.", getJobID());

return;

}

} else if (transitionState(current, JobStatus.FAILING, t)) {

this.failureCause = t;

if (!verticesInCreationOrder.isEmpty()) {

// cancel all. what is failed will not cancel but stay failed

for (ExecutionJobVertex ejv : verticesInCreationOrder) {

ejv.cancel();

}

} else {

// set the state of the job to failed

transitionState(JobStatus.FAILING, JobStatus.FAILED, t);

}

return;

}

// no need to treat other states

}

}

总结其逻辑:

  • 在一些合法性 check 之后,TaskManager 会给自己发送一条路由消息:UpdateTaskExecutionState,TaskManager 继而将这条消息转发给 JobManager
  • JobManager 会标志 Job 状态为 FAILING 并通知 JobCli,并且立即停止所有 task 的执行,这时候 CheckpointCoordinator 在执行 checkpoint 的时候感知到 task 失败状态会立即返回,停止 checkpoint

3异常后的资源释放

主要包括以下资源:

  • 网络资源:InputGate 和 ResultPartiton 的内存占用
  • 其他内存:通过 MemoryManager 申请的资源
  • 缓存资源:lib 包和其他缓存
  • 线程池:Task 内部持有

相关推荐

如何将数据仓库迁移到阿里云 AnalyticDB for PostgreSQL

阿里云AnalyticDBforPostgreSQL(以下简称ADBPG,即原HybridDBforPostgreSQL)为基于PostgreSQL内核的MPP架构的实时数据仓库服务,可以...

Python数据分析:探索性分析

写在前面如果你忘记了前面的文章,可以看看加深印象:Python数据处理...

CSP-J/S冲奖第21天:插入排序

...

C++基础语法梳理:算法丨十大排序算法(二)

本期是C++基础语法分享的第十六节,今天给大家来梳理一下十大排序算法后五个!归并排序...

C 语言的标准库有哪些

C语言的标准库并不是一个单一的实体,而是由一系列头文件(headerfiles)组成的集合。每个头文件声明了一组相关的函数、宏、类型和常量。程序员通过在代码中使用#include<...

[深度学习] ncnn安装和调用基础教程

1介绍ncnn是腾讯开发的一个为手机端极致优化的高性能神经网络前向计算框架,无第三方依赖,跨平台,但是通常都需要protobuf和opencv。ncnn目前已在腾讯多款应用中使用,如QQ,Qzon...

用rust实现经典的冒泡排序和快速排序

1.假设待排序数组如下letmutarr=[5,3,8,4,2,7,1];...

ncnn+PPYOLOv2首次结合!全网最详细代码解读来了

编辑:好困LRS【新智元导读】今天给大家安利一个宝藏仓库miemiedetection,该仓库集合了PPYOLO、PPYOLOv2、PPYOLOE三个算法pytorch实现三合一,其中的PPYOL...

C++特性使用建议

1.引用参数使用引用替代指针且所有不变的引用参数必须加上const。在C语言中,如果函数需要修改变量的值,参数必须为指针,如...

Qt4/5升级到Qt6吐血经验总结V202308

00:直观总结增加了很多轮子,同时原有模块拆分的也更细致,估计为了方便拓展个管理。把一些过度封装的东西移除了(比如同样的功能有多个函数),保证了只有一个函数执行该功能。把一些Qt5中兼容Qt4的方法废...

到底什么是C++11新特性,请看下文

C++11是一个比较大的更新,引入了很多新特性,以下是对这些特性的详细解释,帮助您快速理解C++11的内容1.自动类型推导(auto和decltype)...

掌握C++11这些特性,代码简洁性、安全性和性能轻松跃升!

C++11(又称C++0x)是C++编程语言的一次重大更新,引入了许多新特性,显著提升了代码简洁性、安全性和性能。以下是主要特性的分类介绍及示例:一、核心语言特性1.自动类型推导(auto)编译器自...

经典算法——凸包算法

凸包算法(ConvexHull)一、概念与问题描述凸包是指在平面上给定一组点,找到包含这些点的最小面积或最小周长的凸多边形。这个多边形没有任何内凹部分,即从一个多边形内的任意一点画一条线到多边形边界...

一起学习c++11——c++11中的新增的容器

c++11新增的容器1:array当时的初衷是希望提供一个在栈上分配的,定长数组,而且可以使用stl中的模板算法。array的用法如下:#include<string>#includ...

C++ 编程中的一些最佳实践

1.遵循代码简洁原则尽量避免冗余代码,通过模块化设计、清晰的命名和良好的结构,让代码更易于阅读和维护...

取消回复欢迎 发表评论: