List<MapWork> mergeWorkList = null;
  private static Map<Integer, DummyStoreOperator> connectOps =
      new TreeMap<Integer, DummyStoreOperator>();
  public MapRecordProcessor(JobConf jconf) throws Exception {
    ObjectCache cache = ObjectCacheFactory.getCache(jconf);
    execContext = new ExecMapperContext(jconf);
    execContext.setJc(jconf);
    // create map and fetch operators
    mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY);
    if (mapWork == null) {
      mapWork = Utilities.getMapWork(jconf);
      cache.cache(MAP_PLAN_KEY, mapWork);
      l4j.debug("Plan: " + mapWork);
      for (String s: mapWork.getAliases()) {
        l4j.debug("Alias: " + s);
      }
    } else {
      Utilities.setMapWork(jconf, mapWork);
    }
    String prefixes = jconf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES);
    if (prefixes != null) {
      mergeWorkList = new ArrayList<MapWork>();
      for (String prefix : prefixes.split(",")) {
        MapWork mergeMapWork = (MapWork) cache.retrieve(prefix);
        if (mergeMapWork != null) {
          l4j.info("Found merge work in cache");
          foundCachedMergeWork = true;
          mergeWorkList.add(mergeMapWork);
          continue;
        }
        if (foundCachedMergeWork) {
          throw new Exception(
              "Should find all work in cache else operator pipeline will be in non-deterministic state");
        }
        if ((prefix != null) && (prefix.isEmpty() == false)) {
          mergeMapWork = (MapWork) Utilities.getMergeWork(jconf, prefix);
          mergeWorkList.add(mergeMapWork);
          cache.cache(prefix, mergeMapWork);
        }
      }
    }
  }