亚洲成a人片在线观看69,中文字幕免费无线观看,日韩精品无码一区二区三区四区,92精品国产自产在线观看直播,亚洲精品无码不卡在线观看屁,亚洲成av人片在www色猫咪

【Hadoop】Yarn 狀態(tài)機以及事件機制

簡(jiǎn)介

Yarn采用了基于事件驅動(dòng)的并發(fā)模型:

  • 所有狀態(tài)機都實(shí)現了EventHandler接口,很多服務(wù)(類(lèi)名通常帶有Service后綴)也實(shí)現了該接口,它們都是事件處理器。
  • 需要異步處理的事件由中央異步調度器(類(lèi)名通常帶有Dispatcher后綴)統一接收/派發(fā),需要同步處理的事件直接交給相應的事件處理器。

pic

某些事件處理器不僅處理事件,也會(huì )向中央異步調度器發(fā)送事件。

事件處理器定義

事件處理器定義如下:

@SuppressWarnings("rawtypes")
@Public
@Evolving
public interface EventHandler<T extends Event> {

  void handle(T event);

}

只有一個(gè)handler函數,如參是事件:

中央處理器AsyncDispatcher

AsyncDispatcher 實(shí)現了接口Dispatcher,Dispatcher中定義了事件Dispatcher的接口。主要提供兩個(gè)功能:

  • 注冊不同類(lèi)型的事件,主要包含事件類(lèi)型和事件處理器。
  • 獲取事件處理器,用來(lái)派發(fā)事件,等待異步執行真正的EventHandler。
@Public
@Evolving
public interface Dispatcher {

  EventHandler<Event> getEventHandler();

  void register(Class<? extends Enum> eventType, EventHandler handler);

}

AsyncDispatcher實(shí)現了Dispatcher接口,也擴展了AbstractService,表明AsyncDispatcher也是一個(gè)服務(wù),
是一個(gè)典型的生產(chǎn)者消費這模型。

public class AsyncDispatcher extends AbstractService implements Dispatcher {
 ...
}

事件處理器的注冊

事件注冊就是將事件寫(xiě)入到eventDispatchers里面,eventDispatchers的定義:Map<Class<? extends Enum>, EventHandler> eventDispatchers,鍵是事件類(lèi)型,value是事件的處理器。

對于同一事件類(lèi)型注冊多次handler處理函數時(shí),將使用MultiListenerHandler代替,MultiListenerHandler里面保存了多個(gè)handler,調用handler函數時(shí),會(huì )依次調用每個(gè)handler。

public void register(Class<? extends Enum> eventType,
      EventHandler handler) {
    /* check to see if we have a listener registered */
    EventHandler<Event> registeredHandler = (EventHandler<Event>) eventDispatchers.get(eventType);
    LOG.info("Registering " + eventType + " for " + handler.getClass());
    if (registeredHandler == null) {
      eventDispatchers.put(eventType, handler);
    } else if (!(registeredHandler instanceof MultiListenerHandler)){
      /* for multiple listeners of an event add the multiple listener handler */
      MultiListenerHandler multiHandler = new MultiListenerHandler();
      multiHandler.addHandler(registeredHandler);
      multiHandler.addHandler(handler);
      eventDispatchers.put(eventType, multiHandler);
    } else {
      /* already a multilistener, just add to it */
      MultiListenerHandler multiHandler
      = (MultiListenerHandler) registeredHandler;
      multiHandler.addHandler(handler);
    }
  }

事件處理

AsyncDispatcher#getEventHandler()是異步派發(fā)的關(guān)鍵:

private final EventHandler<Event> handlerInstance = new GenericEventHandler();

// 省略.....

@Override
public EventHandler<Event> getEventHandler() {
   return handlerInstance;
}

GenericEventHandler:一個(gè)特殊的事件處理器

GenericEventHandler是一個(gè)特殊的事件處理器,用于接受各種事件。由指定線(xiàn)程處理接收到的事件。

public void handle(Event event) {
  if (blockNewEvents) {
    return;
  }
  drained = false;
  /* all this method does is enqueue all the events onto the queue */
  int qSize = eventQueue.size();
  if (qSize != 0 && qSize % 1000 == 0
      && lastEventQueueSizeLogged != qSize) {
    lastEventQueueSizeLogged = qSize;
    LOG.info("Size of event-queue is " + qSize);
  }
  if (qSize != 0 && qSize % detailsInterval == 0
          && lastEventDetailsQueueSizeLogged != qSize) {
    lastEventDetailsQueueSizeLogged = qSize;
    printEventQueueDetails();
    printTrigger = true;
  }
  int remCapacity = eventQueue.remainingCapacity();
  if (remCapacity < 1000) {
    LOG.warn("Very low remaining capacity in the event-queue: "
        + remCapacity);
  }
  try {
    eventQueue.put(event);
  } catch (InterruptedException e) {
    if (!stopped) {
      LOG.warn("AsyncDispatcher thread interrupted", e);
    }
    // Need to reset drained flag to true if event queue is empty,
    // otherwise dispatcher will hang on stop.
    drained = eventQueue.isEmpty();
    throw new YarnRuntimeException(e);
  }
};
  • blockNewEvents: 是否阻塞事件處理,只有當中央處理器停止之后才會(huì )停止接受事件。
  • eventQueue:將接收到的請求放置到當前阻塞隊列里面。方便指定線(xiàn)程及時(shí)處理。

事件處理線(xiàn)程

在服務(wù)啟動(dòng)時(shí)(serviceStart函數)創(chuàng )建一個(gè)線(xiàn)程,會(huì )循環(huán)處理接受到的事件。核心處理邏輯在函數dispatch里面。

Runnable createThread() {
  return new Runnable() {
    @Override
    public void run() {
      while (!stopped && !Thread.currentThread().isInterrupted()) {
        drained = eventQueue.isEmpty();
        // 省略。。。
        Event event;
        try {
          event = eventQueue.take();
        } catch(InterruptedException ie) {
          if (!stopped) {
            LOG.warn("AsyncDispatcher thread interrupted", ie);
          }
          return;
        }
        if (event != null) {
          // 省略。。。
          dispatch(event);
          // 省略。。。
        }
      }
    }
  };
}

dispatch詳解

  • 從已經(jīng)注冊的eventDispatchers列表里面查找當前事件對應的處理器,調用當前處理器的handler函數。
  • 如果當前handler處理出現異常時(shí),默認會(huì )退出RM。
protected void dispatch(Event event) {
  //all events go thru this loop
  LOG.debug("Dispatching the event {}.{}", event.getClass().getName(),
      event);

  Class<? extends Enum> type = event.getType().getDeclaringClass();

  try{
    EventHandler handler = eventDispatchers.get(type);
    if(handler != null) {
      handler.handle(event);
    } else {
      throw new Exception("No handler for registered for " + type);
    }
  } catch (Throwable t) {
    //TODO Maybe log the state of the queue
    LOG.error(FATAL, "Error in dispatcher thread", t);
    // If serviceStop is called, we should exit this thread gracefully.
    if (exitOnDispatchException
        && (ShutdownHookManager.get().isShutdownInProgress()) == false
        && stopped == false) {
      stopped = true;
      Thread shutDownThread = new Thread(createShutDownThread());
      shutDownThread.setName("AsyncDispatcher ShutDown handler");
      shutDownThread.start();
    }
  }
}

狀態(tài)機

狀態(tài)轉換由成員變量StateMachine管理,所有的StateMachine都由StateMachineFactory進(jìn)行管理。由addTransition函數實(shí)現狀態(tài)機。

private static final StateMachineFactory<RMAppImpl,
                                           RMAppState,
                                           RMAppEventType,
                                           RMAppEvent> stateMachineFactory
                               = new StateMachineFactory<RMAppImpl,
                                           RMAppState,
                                           RMAppEventType,
                                           RMAppEvent>(RMAppState.NEW)


     // Transitions from NEW state
    .addTransition(RMAppState.NEW, RMAppState.NEW,
        RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
    .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
        RMAppEventType.START, new RMAppNewlySavingTransition())
    .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
            RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED,
            RMAppState.KILLED, RMAppState.FINAL_SAVING),
        RMAppEventType.RECOVER, new RMAppRecoveredTransition())
    .addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
        new AppKilledTransition())
    .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING,
        RMAppEventType.APP_REJECTED,
        new FinalSavingTransition(new AppRejectedTransition(),
          RMAppState.FAILED))

    .addTransition(
        RMAppState.KILLED,
        RMAppState.KILLED,
        EnumSet.of(RMAppEventType.APP_ACCEPTED,
            RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
            RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
            RMAppEventType.NODE_UPDATE, RMAppEventType.START))

     .installTopology();

Transition定義了“從一個(gè)狀態(tài)轉換到另一個(gè)狀態(tài)”的行為,由轉換操作、開(kāi)始狀態(tài)、事件類(lèi)型、事件組成:

public interface StateMachine
                 <STATE extends Enum<STATE>,
                  EVENTTYPE extends Enum<EVENTTYPE>, EVENT> {
  public STATE getCurrentState();
  public STATE getPreviousState();
  public STATE doTransition(EVENTTYPE eventType, EVENT event)
        throws InvalidStateTransitionException;
}

ResourceManager中狀態(tài)機

  • RMApp:用于維護一個(gè)Application的生命周期,實(shí)現類(lèi) - RMAppImpl
  • RMAppAttempt:用于維護一次試探運行的生命周期,實(shí)現類(lèi) - RMAppAttemptImpl
  • RMContainer:用于維護一個(gè)已分配的資源最小單位Container的生命周期,實(shí)現類(lèi) - RMContainerImpl
  • RMNode:用于維護一個(gè)NodeManager的生命周期,實(shí)現類(lèi) - RMNodeImpl

NodeManager中狀態(tài)機:

  • Application:用于維護節點(diǎn)上一個(gè)Application的生命周期,實(shí)現類(lèi) - ApplicationImpl
  • Container:用于維護節點(diǎn)上一個(gè)容器的生命周期,實(shí)現類(lèi) - ContainerImpl
  • LocalizedResource:用于維護節點(diǎn)上資源本地化的生命周期,沒(méi)有使用接口即實(shí)現類(lèi) - LocalizedResource

# hadoop 


標 題:《【Hadoop】Yarn 狀態(tài)機以及事件機制
作 者:zeekling
提 示:轉載請注明文章轉載自個(gè)人博客:浪浪山旁那個(gè)村

評論

取消
亚洲成a人片在线观看69,中文字幕免费无线观看,日韩精品无码一区二区三区四区,92精品国产自产在线观看直播,亚洲精品无码不卡在线观看屁,亚洲成av人片在www色猫咪 亚洲乱色伦图片区小说| 国产αV无码专区亚洲αv| 亚洲区小说区图片区qvod| 中文字幕日本人妻久久久免费| 亚洲天堂一区二区三区| 一区二区欧美日韩高清免费| 人人狠狠综合久久亚洲| 日本不卡一区久久精品| 亚洲一区无码中文字幕不卡| 免费观看美女奶头视频网站| 久久人妻丝袜精品一区二区| 日韩亚洲一中文字幕| 精品人妻少妇嫩草AV无码专区| 亚洲精品乱码久久久久久不卡| 亚洲精品久久久久综合| 国产乱子伦精品免费视频| 最新国产剧手机在线观看| 亚洲综合无码精品一区二区三区| 中文字幕乱码一区av久久不卡| 久久中文字幕视频、最近更新| 欧美综合自拍亚洲欧美人| 伊人久久大香线焦| 中文字幕无码中文| 中文字幕久久麻豆人妻| 亚洲一区精品人人爽人人躁| 亚洲国产在线午夜视频无| 亚洲一区二区三区| 最新行业传来重要进展| 国产资源在线免费观看| 亚洲a∨无码男人的天堂在线观看| 欧美成人a天堂片在线观看| 久久精品日美女b的视频| 亚洲国产精品一区二区| 伊人久久大香线蕉综合影院首页| 亚洲欧洲日产韩国综合第一页| 香港三级精品三级在线专区| 精品亚av一区日韩| 亚洲福利在线播放| 91精品久久人妻一区二区夜夜夜| 久久亚洲aV无码精品色午夜| 亚洲欧美日韩日产在线首页| http://wangmouxingzuo.com http://mcjysc.com http://coolsap.com http://hmzrdzsw.com http://bjeduask.com http://jmaxltd.com