Deep Research Source Code of Azkaban Part 1

本篇文章研究azkaban 调度器以下几个问题:

1. Executor 和 Web Server 是如何交互的。
2. Executor 的扩展是如何实现的。
3. Executor 是如何执行即时任务的。

我们从用户提交工作流作为入口,研究一下azkaban任务运行的原理。

Web Server 提交工作流的过程分析

用户在web界面提交任务流的时候,会触发一个action为executeFlow的Ajax请求。这个请求是由azkaban-web-server的ExecutorServlet去handle的,一起看下ExecutorServlet.java 的构造及接收ajax 请求的模块。

执行工作流的Action是: executeFlow

  • 路由入口(ExecutorServlet.java)
public class ExecutorServlet extends LoginAbstractAzkabanServlet {

  private ProjectManager projectManager;
  private ExecutorManagerAdapter executorManager;
  private ScheduleManager scheduleManager;
  private ExecutorVelocityHelper velocityHelper;
  private UserManager userManager;
 
  @Override
  public void init(ServletConfig config) throws ServletException {
    super.init(config); // 父类LoginAbstractAzkabanServlet如何初始化配置参数?
    AzkabanWebServer server = (AzkabanWebServer) getApplication();
    userManager = server.getUserManager();
    projectManager = server.getProjectManager();
    executorManager = server.getExecutorManager();
    scheduleManager = server.getScheduleManager();
    velocityHelper = new ExecutorVelocityHelper();
  }

  @Override
  protected void handleGet(HttpServletRequest req, HttpServletResponse resp,
      Session session) throws ServletException, IOException {
    if (hasParam(req, "ajax")) { // 此处接受ajax形式的请求
      handleAJAXAction(req, resp, session);
    } else if (hasParam(req, "execid")) {
      if (hasParam(req, "job")) {
        handleExecutionJobDetailsPage(req, resp, session);
      } else {
        handleExecutionFlowPage(req, resp, session);
      }
    } else {
      handleExecutionsPage(req, resp, session);
    }
  }
 
  // handleAJAXAction() 函数对请求参数进行解析。
  private void handleAJAXAction(HttpServletRequest req,
      HttpServletResponse resp, Session session) throws ServletException,
      IOException {
    ...
    String projectName = getParam(req, "project");

    ret.put("project", projectName);
    if (ajaxName.equals("executeFlow")) {
      ajaxAttemptExecuteFlow(req, resp, ret, session.getUser());
    }
    ...
      
  }
 
  // 提交工作流前的参数检测
  private void ajaxAttemptExecuteFlow(HttpServletRequest req,
      HttpServletResponse resp, HashMap<String, Object> ret, User user)
      throws ServletException {
    String projectName = getParam(req, "project");
    String flowId = getParam(req, "flow");

    // 检查项目是否存在,工作流基于project这一层级
    Project project =
        getProjectAjaxByPermission(ret, projectName, user, Type.EXECUTE);
    if (project == null) {
      ret.put("error", "Project '" + projectName + "' doesn't exist.");
      return;
    }

    // 检查工作流是否存在
    ret.put("flow", flowId);
    Flow flow = project.getFlow(flowId);
    if (flow == null) {
      ret.put("error", "Flow '" + flowId + "' cannot be found in project "
          + project);
      return;
    }

    ajaxExecuteFlow(req, resp, ret, user); // 提交工作流
  }
 
  // 提交工作流模块
  private void ajaxExecuteFlow(HttpServletRequest req,
      HttpServletResponse resp, HashMap<String, Object> ret, User user)
      throws ServletException {
      
    // 此处检测project是否存在,源码作者应该是考虑到此方法被单独调用,所以添加了对project和flow的存在检测,个人觉得关于project,flow,task等模块存在的检测,抽象成单独的检测类会更好,避免代码的冗余。(ajaxAttemptExecuteFlow的检测代码就和此部分重复了)
    String projectName = getParam(req, "project");
    String flowId = getParam(req, "flow");

    Project project =
        getProjectAjaxByPermission(ret, projectName, user, Type.EXECUTE);
    if (project == null) {
      ret.put("error", "Project '" + projectName + "' doesn't exist.");
      return;
    }

    ret.put("flow", flowId);
    Flow flow = project.getFlow(flowId);
    if (flow == null) {
      ret.put("error", "Flow '" + flowId + "' cannot be found in project "
          + project);
      return;
    }

    ExecutableFlow exflow = new ExecutableFlow(project, flow);
    exflow.setSubmitUser(user.getUserId());
    exflow.addAllProxyUsers(project.getProxyUsers());

    // 设置执行的参数,比如执行成功邮件通知人、执行失败邮件通知人等
    ExecutionOptions options = HttpRequestUtils.parseFlowOptions(req);
    exflow.setExecutionOptions(options);
    if (!options.isFailureEmailsOverridden()) {
      options.setFailureEmails(flow.getFailureEmails());
    }
    if (!options.isSuccessEmailsOverridden()) {
      options.setSuccessEmails(flow.getSuccessEmails());
    }
    options.setMailCreator(flow.getMailCreator());

    try {
      HttpRequestUtils.filterAdminOnlyFlowParams(userManager, options, user);
      String message =
          executorManager.submitExecutableFlow(exflow, user.getUserId()); // 提交flow到executor,那么executorManager是如何初始化的?
      ret.put("message", message);
    } catch (Exception e) {
      e.printStackTrace();
      ret.put("error",
          "Error submitting flow " + exflow.getFlowId() + ". " + e.getMessage());
    }

    ret.put("execid", exflow.getExecutionId());
  }

}
  • ExecutorServlet.java 所继承的父类: LoginAbstractAzkabanServlet.java init()过程
public abstract class LoginAbstractAzkabanServlet extends
    AbstractAzkabanServlet {
    
  private MultipartParser multipartParser;

  private boolean shouldLogRawUserAgent = false;

  @Override
  public void init(ServletConfig config) throws ServletException {
    super.init(config);

    multipartParser = new MultipartParser(DEFAULT_UPLOAD_DISK_SPOOL_SIZE);

    shouldLogRawUserAgent = getApplication().getServerProps().getBoolean("accesslog.raw.useragent",
            false);
  }
    
}

  • LoginAbstractAzkabanServlet.java 继承的父类 AbstractAzkabanServlet.java init() 过程
public abstract class AbstractAzkabanServlet extends HttpServlet {
  @Override
  public void init(ServletConfig config) throws ServletException {
    // 此处获取executor server, 在接下来的模块中会分析executor server是如何初始化的。AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY 对应的字符串是"azkaban_app"。
    application =
        (AzkabanServer) config.getServletContext().getAttribute(
            AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY);

    if (application == null) {
      throw new IllegalStateException(
          "No batch application is defined in the servlet context!");
    }

    Props props = application.getServerProps();
    name = props.getString("azkaban.name", "");
    label = props.getString("azkaban.label", "");
    color = props.getString("azkaban.color", "#FF0000");

    if (application instanceof AzkabanWebServer) {
      AzkabanWebServer server = (AzkabanWebServer) application;
      viewerPlugins = PluginRegistry.getRegistry().getViewerPlugins();
      triggerPlugins =
          new ArrayList<TriggerPlugin>(server.getTriggerPlugins().values());
    }
  }

}

从以上几个模块的代码我们可以知道,AzkabanServer 是从ServerContext以getAttribute(“azkaban_app”)的方式获得的。userManager, projectManager, executorManager, scheduleManager 等源自AzkabanServer, 那么这个AzkabanServer是在什么时候初始化的?

Executor Server 初始化过程及与web的通信机制

我们研究一下executor的包,jar包启动的入口main()在AzkabanExecutorServer.java内,那么main()做了些什么呢?

  public static void main(String[] args) throws Exception {
    // Redirect all std out and err messages into log4j
    StdOutErrRedirect.redirectOutAndErrToLog();

    logger.info("Starting Jetty Azkaban Executor...");
    Props azkabanSettings = AzkabanServer.loadProps(args);

    if (azkabanSettings == null) {
      logger.error("Azkaban Properties not loaded.");
      logger.error("Exiting Azkaban Executor Server...");
      return;
    }

    // Setup time zone
    if (azkabanSettings.containsKey(DEFAULT_TIMEZONE_ID)) {
      String timezone = azkabanSettings.getString(DEFAULT_TIMEZONE_ID);
      System.setProperty("user.timezone", timezone);
      TimeZone.setDefault(TimeZone.getTimeZone(timezone));
      DateTimeZone.setDefault(DateTimeZone.forID(timezone));

      logger.info("Setting timezone to " + timezone);
    }
    // 初始化了executor server,这的关注的是该构建方式: 1. 启动了怎样的容器? 2. 如何与web server进行通信
    app = new AzkabanExecutorServer(azkabanSettings);

    // 在shutdown的过程中增加了hook,输出memory top消耗的进程
    Runtime.getRuntime().addShutdownHook(new Thread() {

      @Override
      public void run() {
        try {
          // 输出memory消费较高的消费者
          logTopMemoryConsumers();
        } catch (Exception e) {
          logger.info(("Exception when logging top memory consumers"), e);
        }

        logger.info("Shutting down...");
        try {
          app.shutdownNow();
        } catch (Exception e) {
          logger.error("Error while shutting down http server.", e);
        }
      }

      public void logTopMemoryConsumers() throws Exception, IOException {
        if (new File("/bin/bash").exists() && new File("/bin/ps").exists()
            && new File("/usr/bin/head").exists()) {
          logger.info("logging top memeory consumer");

          java.lang.ProcessBuilder processBuilder =
              new java.lang.ProcessBuilder("/bin/bash", "-c",
                  "/bin/ps aux --sort -rss | /usr/bin/head");
          Process p = processBuilder.start();
          p.waitFor();

          InputStream is = p.getInputStream();
          java.io.BufferedReader reader =
              new java.io.BufferedReader(new InputStreamReader(is));
          String line = null;
          while ((line = reader.readLine()) != null) {
            logger.info(line);
          }
          is.close();
        }
      }
    });
  }

继续探究一下server是如何启动的,需要怎样的配置,它如何向外通信的. 即上个模块出现的AzkabanExecutorServer.java的构造函数部分。

  • AzkabanExecutorServer.java
  public AzkabanExecutorServer(Props props) throws Exception {
    this.props = props;
    // 启动容器是jetty,所以配置及对外通信的机制还要再研究一下createJettyServer()函数
    server = createJettyServer(props);

    executionLoader = new JdbcExecutorLoader(props);
    projectLoader = new JdbcProjectLoader(props);
    runnerManager = new FlowRunnerManager(props, executionLoader, projectLoader, getClass().getClassLoader());

    JmxJobMBeanManager.getInstance().initialize(props);

    // make sure this happens before
    configureJobCallback(props);

    configureMBeanServer();
    configureMetricReports();

    SystemMemoryInfo.init(props.getInt("executor.memCheck.interval", 30));

    loadCustomJMXAttributeProcessor(props);

    try {
      server.start();
    } catch (Exception e) {
      logger.error(e);
      Utils.croak(e.getMessage(), 1);
    }

    // 每次启动时,会将启动信息写入DB, 包括host、port
    insertExecutorEntryIntoDB();
    dumpPortToFile();

    logger.info("Started Executor Server on " + getExecutorHostPort());

    if (props.getBoolean(ServerProperties.IS_METRICS_ENABLED, false)) {
      startExecMetrics();
    }
  }
  • Jetty Server 建立: createJettyServer() [AzkabanExecutorServer.java]
  private Server createJettyServer(Props props) {
    int maxThreads = props.getInt("executor.maxThreads", DEFAULT_THREAD_NUMBER);

    /*
     * Default to a port number 0 (zero)
     * The Jetty server automatically finds an unused port when the port number is set to zero
     * TODO: This is using a highly outdated version of jetty [year 2010]. needs to be updated.
     */
    // Server comes from package: org.mortbay.jetty;
    Server server = new Server(props.getInt("executor.port", 0));
    QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
    server.setThreadPool(httpThreadPool);

    boolean isStatsOn = props.getBoolean("executor.connector.stats", true);
    logger.info("Setting up connector with stats on: " + isStatsOn);

    for (Connector connector : server.getConnectors()) {
      connector.setStatsOn(isStatsOn);
      logger.info(String.format(
          "Jetty connector name: %s, default header buffer size: %d",
          connector.getName(), connector.getHeaderBufferSize()));
      connector.setHeaderBufferSize(props.getInt("jetty.headerBufferSize",
          DEFAULT_HEADER_BUFFER_SIZE));
      logger.info(String.format(
          "Jetty connector name: %s, (if) new header buffer size: %d",
          connector.getName(), connector.getHeaderBufferSize()));
    }
    
    // jetty server的实例放在"/"session内
    Context root = new Context(server, "/", Context.SESSIONS);
    root.setMaxFormContentSize(MAX_FORM_CONTENT_SIZE);

    root.addServlet(new ServletHolder(new ExecutorServlet()), "/executor");
    root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
    root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
    root.addServlet(new ServletHolder(new ServerStatisticsServlet()), "/serverStatistics");

    // server 实例对应的key是ServerInternals.AZKABAN_SERVLET_CONTEXT_KEY, ServerInternals.AZKABAN_SERVLET_CONTEXT_KEY 对应的字符串是"azkaban_app", 这就解释了web server 在AbstractAzkabanServlet.java初始化时如何找到存在的server。
    root.setAttribute(ServerInternals.AZKABAN_SERVLET_CONTEXT_KEY, this);
    return server;
  }

 

Excutor 即时任务执行分析

到目前为止,我们已经知道web service和executor server的通信机制,那么executor是如何执行的呢?

再回到ExecutorServlet.java这个类,在方法ajaxExecuteFlow()中有具体的执行过程:

  • ExecutorServlet.java
try {
      HttpRequestUtils.filterAdminOnlyFlowParams(userManager, options, user);
      String message =
          executorManager.submitExecutableFlow(exflow, user.getUserId());
      ret.put("message", message);
    } catch (Exception e) {
      e.printStackTrace();
      ret.put("error",
          "Error submitting flow " + exflow.getFlowId() + ". " + e.getMessage());
    }

从以上代码我们知道是由executorManager发起的这次执行指令,那么executorManager又是如何实例化的呢?

其实不难发现在ExecutorServlet.java的构造函数中就已经从server中获得了executorManager, 所以这个过程是在转成AzkabanWebServer的过程中赋值的,应该是在AzkabanWebServer.java 的构造函数内做的。

  • ExecutorManager.java
  private ExecutorManager loadExecutorManager(Props props) throws Exception {
    JdbcExecutorLoader loader = new JdbcExecutorLoader(props);
    ExecutorManager execManager = new ExecutorManager(props, loader, alerters);
    return execManager;

至此,我们知道submitExecutableFlow()的执行指令是由ExecutorManager实例化后发出的,那submitExecutableFlow()做了些什么呢?

我们先不看源码,想一下执行用户在界面指定的这个命令的过程会是怎样呢?

首先用户向web服务端发出了指令,执行过程的逻辑在executor,那么executor需要知道这个指令。
1. Web 服务将执行指令记入DB.
2. Web 服务端和executor通信,有任务需要执行。
3. Executor接收到指令,去db里面找出执行参数,发起执行命令。
那么有几个问题:
1. 写入db的信息和发起的请求之间有事务机制么?
2. 关于执行结果,有状态标识么?
3. 如果执行失败了,azkaban会重试么? 重试的机制又是什么?
4. Web服务和executor之间的通信如何做的?
5. DB里面需要存一些什么参数?

带着这些问题,我们来看下submitExecutableFlow()的源代码。

  • submitExecutableFlow() [ExecutorManager.java]
  @Override
  public String submitExecutableFlow(ExecutableFlow exflow, String userId)
    throws ExecutorManagerException {
    synchronized (exflow) {
      String flowId = exflow.getFlowId();

      logger.info("Submitting execution flow " + flowId + " by " + userId);

      String message = "";
      // 此处队列的管理也很值得探究, 其实现在QueuedExecutions.java中完成,该队列包含两个基本类型共同管理数据集,以提升效率。ConcurrentHashMap, PriorityBlockingQueue;
      if (queuedFlows.isFull()) {
        message =
          String
            .format(
              "Failed to submit %s for project %s. Azkaban has overrun its webserver queue capacity",
              flowId, exflow.getProjectName());
        logger.error(message);
      } else {
        int projectId = exflow.getProjectId();
        exflow.setSubmitUser(userId);
        exflow.setSubmitTime(System.currentTimeMillis());

        List<Integer> running = getRunningFlows(projectId, flowId);

        ExecutionOptions options = exflow.getExecutionOptions();
        if (options == null) {
          options = new ExecutionOptions();
        }

        if (options.getDisabledJobs() != null) {
          FlowUtils.applyDisabledJobs(options.getDisabledJobs(), exflow);
        }
         // 如果该flow已经在执行了,需要比较此次执行的优先级。
        if (!running.isEmpty()) {
          if (options.getConcurrentOption().equals(
            ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
            Collections.sort(running);
            Integer runningExecId = running.get(running.size() - 1);

            options.setPipelineExecutionId(runningExecId);
            message =
              "Flow " + flowId + " is already running with exec id "
                + runningExecId + ". Pipelining level "
                + options.getPipelineLevel() + ". \n";
          } else if (options.getConcurrentOption().equals(
            ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
            throw new ExecutorManagerException("Flow " + flowId
              + " is already running. Skipping execution.",
              ExecutorManagerException.Reason.SkippedExecution);
          } else {
            // The settings is to run anyways.
            message =
              "Flow " + flowId + " is already running with exec id "
                + StringUtils.join(running, ",")
                + ". Will execute concurrently. \n";
          }
        }

        boolean memoryCheck =
          !ProjectWhitelist.isProjectWhitelisted(exflow.getProjectId(),
            ProjectWhitelist.WhitelistType.MemoryCheck);
        options.setMemoryCheck(memoryCheck);

        // The exflow id is set by the loader. So it's unavailable until after
        // this call.
        // 将这个flow的信息存放到table: execution_flows
        executorLoader.uploadExecutableFlow(exflow);

        // We create an active flow reference in the datastore. If the upload
        // fails, we remove the reference.
        ExecutionReference reference =
          new ExecutionReference(exflow.getExecutionId());

        if (isMultiExecutorMode()) { // 多executor的模式
          //Take MultiExecutor route
          // 将active的exec_id存放到table: active_executing_flows
          executorLoader.addActiveExecutableReference(reference);
          // 将流信息放入队列queuedFlows
          queuedFlows.enqueue(exflow, reference);
        } else {
          // assign only local executor we have
          // 从activeExecutors(存放在表:executors)中取得host,port信息(由于是单节点)
          Executor choosenExecutor = activeExecutors.iterator().next();
          // 将active的exec_id存放到table: active_executing_flows
          executorLoader.addActiveExecutableReference(reference);
          try {
            // 将任务分发给executor,并完成执行,下个模块着重分析这个部分。
            dispatch(reference, exflow, choosenExecutor);
          } catch (ExecutorManagerException e) {
            executorLoader.removeActiveExecutableReference(reference
              .getExecId());
            throw e;
          }
        }
        message +=
          "Execution submitted successfully with exec id "
            + exflow.getExecutionId();
      }
      return message;
    }
  }
  • dispatch() [ExecutorManager.java]
  private void dispatch(ExecutionReference reference, ExecutableFlow exflow,
    Executor choosenExecutor) throws ExecutorManagerException {
    exflow.setUpdateTime(System.currentTimeMillis());
    // 这个函数主要更新execution_flows表中的executor_id, 用于标识该流是由哪个executor执行的。
    executorLoader.assignExecutor(choosenExecutor.getId(),
      exflow.getExecutionId());
    try {
      // 通过HttpClient调用executor的resutful 接口。
      callExecutorServer(exflow, choosenExecutor,
        ConnectorParams.EXECUTE_ACTION);
    } catch (ExecutorManagerException ex) {
      logger.error("Rolling back executor assignment for execution id:"
        + exflow.getExecutionId(), ex);
      // 将execution_flows表中的executor_id更新为null,个人认为,为什么不是更新表中的status字段呢? executor_id更新为 null不是会造成追踪问题么?
      executorLoader.unassignExecutor(exflow.getExecutionId());
      throw new ExecutorManagerException(ex);
    }
    reference.setExecutor(choosenExecutor);

    // move from flow to running flows
    runningFlows.put(exflow.getExecutionId(),
      new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));

    logger.info(String.format(
      "Successfully dispatched exec %d with error count %d",
      exflow.getExecutionId(), reference.getNumErrors()));
  }

我们看一下几个表的的结构:

  • execution_flows
| execution_flows | CREATE TABLE `execution_flows` (
  `exec_id` int(11) NOT NULL AUTO_INCREMENT,
  `project_id` int(11) NOT NULL,
  `version` int(11) NOT NULL,
  `flow_id` varchar(128) NOT NULL,
  `status` tinyint(4) DEFAULT NULL,
  `submit_user` varchar(64) DEFAULT NULL,
  `submit_time` bigint(20) DEFAULT NULL,
  `update_time` bigint(20) DEFAULT NULL,
  `start_time` bigint(20) DEFAULT NULL,
  `end_time` bigint(20) DEFAULT NULL,
  `enc_type` tinyint(4) DEFAULT NULL,
  `flow_data` longblob,
  `executor_id` int(11) DEFAULT NULL,
  PRIMARY KEY (`exec_id`),
  KEY `ex_flows_start_time` (`start_time`),
  KEY `ex_flows_end_time` (`end_time`),
  KEY `ex_flows_time_range` (`start_time`,`end_time`),
  KEY `ex_flows_flows` (`project_id`,`flow_id`),
  KEY `executor_id` (`executor_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1562 DEFAULT CHARSET=latin1 |
  • active_executing_flows
| active_executing_flows | CREATE TABLE `active_executing_flows` (
  `exec_id` int(11) NOT NULL,
  `update_time` bigint(20) DEFAULT NULL,
  PRIMARY KEY (`exec_id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1 |
  • executors
| executors | CREATE TABLE `executors` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `host` varchar(64) NOT NULL,
  `port` int(11) NOT NULL,
  `active` tinyint(1) DEFAULT '0',
  PRIMARY KEY (`id`),
  UNIQUE KEY `host` (`host`,`port`),
  UNIQUE KEY `executor_id` (`id`),
  KEY `executor_connection` (`host`,`port`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=latin1 |

我们再看之前提的几个问题:

1. 写入db的信息和发起的请求之间有事务机制么?

代码是通过try_catch函数体来实现回滚的机制的。比如在向executor分发任务过程失败了,是通过remove active任务的方式:

Executor choosenExecutor = activeExecutors.iterator().next();
executorLoader.addActiveExecutableReference(reference);
try {
  dispatch(reference, exflow, choosenExecutor);
} catch (ExecutorManagerException e) {
    executorLoader.removeActiveExecutableReference(reference
      .getExecId()); // 此处是删除,并非更新状态,貌似也是有改进的空间的。
    throw e;
}

2. 关于执行结果,有状态标识么?
是有状态标识的,初次写入时的状态为Status.PREPARING。(submit executable flow的过程中)
状态的变化是通过在初始化web server的时候,实例化了ExecutorManager,ExecutorManager会启动ExecutingManagerUpdaterThread的线程根据每个exec_id的执行情况对状态变更。

3. 如果执行失败了,azkaban会重试么? 重试的机制又是什么?
执行过程中,只会标识为KILLED, FAILED等状态。

4. Web服务和executor之间的通信如何做的?
web服务通过executors读取到活跃的ip地址,然后通过restful api进行通信。

5. DB里面需要存一些什么参数?
从以上三个表的schema中已经可以得知了。

 

Multiple Executors 执行过程探究

那么多个executor共同存在时,执行过程会有什么不一样么?

在函数submitExecutableFlow()内,针对多executor的处理:

        if (isMultiExecutorMode()) {
          //Take MultiExecutor route
          // 将active的exec_id存放到table: active_executing_flows, 那么后面的重试会不会是基于这个表的信息呢?
          executorLoader.addActiveExecutableReference(reference);
          // 没有立刻执行,而是将执行流放入到队列中
          queuedFlows.enqueue(exflow, reference);
        }

在ExecutorManager.java会启动QueueProcessorThread线程,对queue进行消费。

  • QueueProcessorThread (ExecutorManager.java)
  private class QueueProcessorThread extends Thread {
    private static final long QUEUE_PROCESSOR_WAIT_IN_MS = 1000;
    private final int maxDispatchingErrors;
    private final long activeExecutorRefreshWindowInMilisec;
    private final int activeExecutorRefreshWindowInFlows;

    private volatile boolean shutdown = false;
    private volatile boolean isActive = true;

    public QueueProcessorThread(boolean isActive,
      long activeExecutorRefreshWindowInTime,
      int activeExecutorRefreshWindowInFlows,
      int maxDispatchingErrors) {
      setActive(isActive);
      this.maxDispatchingErrors = maxDispatchingErrors;
      this.activeExecutorRefreshWindowInFlows =
        activeExecutorRefreshWindowInFlows;
      this.activeExecutorRefreshWindowInMilisec =
        activeExecutorRefreshWindowInTime;
      this.setName("AzkabanWebServer-QueueProcessor-Thread");
    }

    public void setActive(boolean isActive) {
      this.isActive = isActive;
      logger.info("QueueProcessorThread active turned " + this.isActive);
    }

    public boolean isActive() {
      return isActive;
    }

    public void shutdown() {
      shutdown = true;
      this.interrupt();
    }

    public void run() {
      // Loops till QueueProcessorThread is shutdown
      while (!shutdown) {
        synchronized (this) {
          try {
            // start processing queue if active, other wait for sometime
            if (isActive) {
              processQueuedFlows(activeExecutorRefreshWindowInMilisec,
                activeExecutorRefreshWindowInFlows);
            }
            wait(QUEUE_PROCESSOR_WAIT_IN_MS);
          } catch (Exception e) {
            logger.error(
              "QueueProcessorThread Interrupted. Probably to shut down.", e);
          }
        }
      }
    }

    /* Method responsible for processing the non-dispatched flows */
    private void processQueuedFlows(long activeExecutorsRefreshWindow,
      int maxContinuousFlowProcessed) throws InterruptedException,
      ExecutorManagerException {
      long lastExecutorRefreshTime = 0;
      Pair<ExecutionReference, ExecutableFlow> runningCandidate;
      int currentContinuousFlowProcessed = 0;

      while (isActive() && (runningCandidate = queuedFlows.fetchHead()) != null) {
        ExecutionReference reference = runningCandidate.getFirst();
        ExecutableFlow exflow = runningCandidate.getSecond();

        long currentTime = System.currentTimeMillis();

        // if we have dispatched more than maxContinuousFlowProcessed or
        // It has been more then activeExecutorsRefreshWindow millisec since we
        // refreshed
        if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow
          || currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
          // Refresh executorInfo for all activeExecutors
          refreshExecutors();
          lastExecutorRefreshTime = currentTime;
          currentContinuousFlowProcessed = 0;
        }

        /**
         * <pre>
         *  TODO: Work around till we improve Filters to have a notion of GlobalSystemState.
         *        Currently we try each queued flow once to infer a global busy state
         * Possible improvements:-
         *   1. Move system level filters in refreshExecutors and sleep if we have all executors busy after refresh
         *   2. Implement GlobalSystemState in selector or in a third place to manage system filters. Basically
         *      taking out all the filters which do not depend on the flow but are still being part of Selector.
         * Assumptions:-
         *   1. no one else except QueueProcessor is updating ExecutableFlow update time
         *   2. re-attempting a flow (which has been tried before) is considered as all executors are busy
         * </pre>
         */
        if(exflow.getUpdateTime() > lastExecutorRefreshTime) {
          // put back in the queue
          queuedFlows.enqueue(exflow, reference);
          long sleepInterval =
            activeExecutorsRefreshWindow
              - (currentTime - lastExecutorRefreshTime);
          // wait till next executor refresh
          sleep(sleepInterval);
        } else {
          exflow.setUpdateTime(currentTime);
          // process flow with current snapshot of activeExecutors
          // 筛选executor的策略? 如何执行的? 和单executor是一致的么?
          selectExecutorAndDispatchFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
        }

        // do not count failed flow processsing (flows still in queue)
        if(queuedFlows.getFlow(exflow.getExecutionId()) == null) {
          currentContinuousFlowProcessed++;
        }
      }
    }

筛选executor的策略是怎样的? 和单点executor的执行逻辑是一样的么?

  • selectExecutorAndDispatchFlow (ExecutorManager.java)
    private void selectExecutorAndDispatchFlow(ExecutionReference reference,
      ExecutableFlow exflow, Set<Executor> availableExecutors)
      throws ExecutorManagerException {
      synchronized (exflow) {
        // 筛选的策略是怎样的?
        Executor selectedExecutor = selectExecutor(exflow, availableExecutors);
        if (selectedExecutor != null) {
          try {
            // 和单executor是一致的处理方式,通过restful api 请求/executor接口
            dispatch(reference, exflow, selectedExecutor);
          } catch (ExecutorManagerException e) {
            logger.warn(String.format(
              "Executor %s responded with exception for exec: %d",
              selectedExecutor, exflow.getExecutionId()), e);
            handleDispatchExceptionCase(reference, exflow, selectedExecutor,
              availableExecutors);
          }
        } else {
          handleNoExecutorSelectedCase(reference, exflow);
        }
      }
    }
  • selectExecutor (ExecutorManager.java)
    private Executor selectExecutor(ExecutableFlow exflow,
      Set<Executor> availableExecutors) {
      // 用户可以为执行流指定执行executor
      Executor choosenExecutor =
        getUserSpecifiedExecutor(exflow.getExecutionOptions(),
          exflow.getExecutionId());

      // If no executor was specified by admin
      if (choosenExecutor == null) {
        logger.info("Using dispatcher for execution id :"
          + exflow.getExecutionId());
        // filterList中存放用户希望用于执行的executor列表,comparatorWeightsMap中存放选取executor看中的资源的比重。
        ExecutorSelector selector = new ExecutorSelector(filterList, comparatorWeightsMap);
        choosenExecutor = selector.getBest(availableExecutors, exflow);
      }
      return choosenExecutor;
    }

(1) comparatorWeightsMap 是如何构造的?
(2) selector.getBest(availableExecutors, exflow) 的策略是怎样的?

关于(1), comparatorWeightsMap的构造方式及权重分配:

  // azkaban.executorselector.comparator.NumberOfAssignedFlowComparator=1
  // azkaban.executorselector.comparator.Memory=1
  // azkaban.executorselector.comparator.LastDispatched=1
  // azkaban.executorselector.comparator.CpuUsage=1

  static final String AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX =
      "azkaban.executorselector.comparator.";

  private Map<String, Integer> comparatorWeightsMap;
 
  ...
 
  Map<String, String> compListStrings =
      azkProps.getMapByPrefix(AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX);
  if (compListStrings != null) {
    comparatorWeightsMap = new TreeMap<String, Integer>();
    for (Map.Entry<String, String> entry : compListStrings.entrySet())   {
      comparatorWeightsMap.put(entry.getKey(), Integer.valueOf(entry.getValue()));
    }
  }

从以上代码我们可以得知选取executor时倚赖的资源权重包括以下方面:

1. executor 目前执行的任务数量。
2. executor 目前剩余的memory资源。
3. executor 上一次被分配的情况。
4. executor 目前剩余的cpu资源。

关于(2) 选取最优策略时的算法(ExecutorComparator.java):

  • 目前执行的任务数量 排序算法
private static FactorComparator<Executor> getNumberOfAssignedFlowComparator(int weight){
    return FactorComparator.create(NUMOFASSIGNEDFLOW_COMPARATOR_NAME, weight, new Comparator<Executor>(){

      @Override
      public int compare(Executor o1, Executor o2) {
        ExecutorInfo stat1 = o1.getExecutorInfo();
        ExecutorInfo stat2 = o2.getExecutorInfo();

        Integer result = 0;
        if (statisticsObjectCheck(stat1,stat2,NUMOFASSIGNEDFLOW_COMPARATOR_NAME,result)){
          return result;
        }
        return ((Integer)stat1.getRemainingFlowCapacity()).compareTo(stat2.getRemainingFlowCapacity());
      }});
  }
  • 目前剩余的memory资源 排序算法
  private static FactorComparator<Executor> getMemoryComparator(int weight){
    return FactorComparator.create(MEMORY_COMPARATOR_NAME, weight, new Comparator<Executor>(){

      @Override
      public int compare(Executor o1, Executor o2) {
       ExecutorInfo stat1 = o1.getExecutorInfo();
       ExecutorInfo stat2 = o2.getExecutorInfo();

       int result = 0;
       if (statisticsObjectCheck(stat1,stat2,MEMORY_COMPARATOR_NAME,result)){
         return result;
       }

       if (stat1.getRemainingMemoryInMB() != stat2.getRemainingMemoryInMB()){
         return stat1.getRemainingMemoryInMB() > stat2.getRemainingMemoryInMB() ? 1:-1;
       }

       return Double.compare(stat1.getRemainingMemoryPercent(), stat2.getRemainingMemoryPercent());
      }});
  }
  • 目前剩余的cpu资源 排序算法
  private static FactorComparator<Executor> getCpuUsageComparator(int weight){
    return FactorComparator.create(CPUUSAGE_COMPARATOR_NAME, weight, new Comparator<Executor>(){

      @Override
      public int compare(Executor o1, Executor o2) {
        ExecutorInfo stat1 = o1.getExecutorInfo();
        ExecutorInfo stat2 = o2.getExecutorInfo();

        int result = 0;
        if (statisticsObjectCheck(stat1,stat2,CPUUSAGE_COMPARATOR_NAME,result)){
          return result;
        }

        // CPU usage , the lesser the value is, the better.
        return ((Double)stat2.getCpuUsage()).compareTo(stat1.getCpuUsage());
      }});
  }

  • 上一次被分配的情况 排序算法
  private static FactorComparator<Executor> getLstDispatchedTimeComparator(int weight){
    return FactorComparator.create(LSTDISPATCHED_COMPARATOR_NAME, weight, new Comparator<Executor>(){

      @Override
      public int compare(Executor o1, Executor o2) {
        ExecutorInfo stat1 = o1.getExecutorInfo();
        ExecutorInfo stat2 = o2.getExecutorInfo();

        int result = 0;
        if (statisticsObjectCheck(stat1,stat2,LSTDISPATCHED_COMPARATOR_NAME,result)){
          return result;
        }
        // Note: an earlier date time indicates higher weight.
        return ((Long)stat2.getLastDispatchedTime()).compareTo(stat1.getLastDispatchedTime());
      }});
  }
  • 累计权重算法:
  public Pair<Integer,Integer> getComparisonScore(T object1, T object2){
    logger.debug(String.format("start comparing '%s' with '%s',  total weight = %s ",
        object1 == null ? "(null)" : object1.toString(),
        object2 == null ? "(null)" : object2.toString(),
        this.getTotalWeight()));

    int result1 = 0 ;
    int result2 = 0 ;

    // short cut if object equals.
    if (object1 ==  object2){
      logger.debug("[Comparator] same object.");
    } else
    // left side is null.
    if (object1 == null){
      logger.debug("[Comparator] left side is null, right side gets total weight.");
      result2 = this.getTotalWeight();
    } else
    // right side is null.
    if (object2 == null){
      logger.debug("[Comparator] right side is null, left side gets total weight.");
      result1 = this.getTotalWeight();
    } else
    // both side is not null,put them thru the full loop
    {
      Collection<FactorComparator<T>> comparatorList = this.factorComparatorList.values();
      for (FactorComparator<T> comparator :comparatorList){
        // 根据选取策略比较
        int result = comparator.compare(object1, object2);
        // 将权重赋予更优策略拥有者
        result1  = result1 + (result > 0 ? comparator.getWeight() : 0);
        result2  = result2 + (result < 0 ? comparator.getWeight() : 0);
        logger.debug(String.format("[Factor: %s] compare result : %s (current score %s vs %s)",
            comparator.getFactorName(), result, result1, result2));
      }
    }
    // in case of same score, use tie-breaker to stabilize the result.
    if (result1 == result2){
      boolean result = this.tieBreak(object1, object2);
      logger.debug("[TieBreaker] TieBreaker chose " +
      (result? String.format("left side (%s)",  null== object1 ? "null": object1.toString()) :
               String.format("right side (%s)", null== object2 ? "null": object2.toString()) ));
      if (result) result1++; else result2++;
    }

    logger.debug(String.format("Result : %s vs %s ",result1,result2));
    return new Pair<Integer,Integer>(result1,result2);
  }

至此,我们知道了multiple executor模式下遵循这样的选取执行executor的方式:

1. 由用户赋予executor 目前执行的任务数量, executor 目前剩余的memory资源, executor 上一次被分配的情况, executor 目前剩余的cpu资源 4个维度权重。

2. Azkaban 重写了comperator,定义了各个维度的算法。

3. 各个executor会从选定的维度进行比较,若优则加上此维度权重值,选取最终加权大者。

 

 

 

 

 

 

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>