Package com.taobao.zeus.store.mysql

Source Code of com.taobao.zeus.store.mysql.MysqlGroupManager

package com.taobao.zeus.store.mysql;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.hibernate.HibernateException;
import org.hibernate.Query;
import org.hibernate.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.orm.hibernate3.HibernateCallback;
import org.springframework.orm.hibernate3.support.HibernateDaoSupport;

import com.taobao.zeus.client.ZeusException;
import com.taobao.zeus.model.GroupDescriptor;
import com.taobao.zeus.model.JobDescriptor;
import com.taobao.zeus.model.JobDescriptor.JobRunType;
import com.taobao.zeus.model.JobDescriptor.JobScheduleType;
import com.taobao.zeus.model.JobStatus;
import com.taobao.zeus.model.processer.DownloadProcesser;
import com.taobao.zeus.model.processer.Processer;
import com.taobao.zeus.store.GroupBean;
import com.taobao.zeus.store.GroupManager;
import com.taobao.zeus.store.GroupManagerTool;
import com.taobao.zeus.store.JobBean;
import com.taobao.zeus.store.mysql.persistence.GroupPersistence;
import com.taobao.zeus.store.mysql.persistence.JobPersistence;
import com.taobao.zeus.store.mysql.persistence.ZeusUser;
import com.taobao.zeus.store.mysql.tool.GroupValidate;
import com.taobao.zeus.store.mysql.tool.JobValidate;
import com.taobao.zeus.store.mysql.tool.PersistenceAndBeanConvert;
import com.taobao.zeus.util.Tuple;
@SuppressWarnings("unchecked")
public class MysqlGroupManager extends HibernateDaoSupport implements GroupManager{
  @Override
  public void deleteGroup(String user, String groupId) throws ZeusException {
    GroupBean group=getDownstreamGroupBean(groupId);
    if(group.isDirectory()){
      if(!group.getChildrenGroupBeans().isEmpty()){
        throw new ZeusException("该组下不为空,无法删除");
      }
    }else{
      if(!group.getJobBeans().isEmpty()){
        throw new ZeusException("该组下不为空,无法删除");
      }
    }
    getHibernateTemplate().delete(getHibernateTemplate().get(GroupPersistence.class, Integer.valueOf(groupId)));
  }

  @Override
  public void deleteJob(String user, String jobId) throws ZeusException {
    GroupBean root= getGlobeGroupBean();
    JobBean job=root.getAllSubJobBeans().get(jobId);
    if(!job.getDepender().isEmpty()){
      List<String> deps=new ArrayList<String>();
      for(JobBean jb:job.getDepender()){
        deps.add(jb.getJobDescriptor().getId());
      }
      throw new ZeusException("该Job正在被其他Job"+deps.toString()+"依赖,无法删除");
    }
    getHibernateTemplate().delete(getHibernateTemplate().get(JobPersistence.class, Long.valueOf(jobId)));
  }


  @Override
  public GroupBean getDownstreamGroupBean(String groupId) {
    GroupDescriptor group=getGroupDescriptor(groupId);
    GroupBean result=new GroupBean(group);
    return getDownstreamGroupBean(result);
  }
  @Override
  public GroupBean getDownstreamGroupBean(GroupBean parent) {
    if(parent.isDirectory()){
      List<GroupDescriptor> children=getChildrenGroup(parent.getGroupDescriptor().getId());
      for(GroupDescriptor child:children){
        GroupBean childBean=new GroupBean(child);
        getDownstreamGroupBean(childBean);
        childBean.setParentGroupBean(parent);
        parent.getChildrenGroupBeans().add(childBean);
      }
    }else{
      List<Tuple<JobDescriptor, JobStatus>> jobs=getChildrenJob(parent.getGroupDescriptor().getId());
      for(Tuple<JobDescriptor, JobStatus> tuple:jobs){
        JobBean jobBean=new JobBean(tuple.getX(),tuple.getY());
        jobBean.setGroupBean(parent);
        parent.getJobBeans().put(tuple.getX().getId(), jobBean);
      }
    }
   
    return parent;
  }

  @Override
  public GroupBean getGlobeGroupBean() {
    return GroupManagerTool.buildGlobeGroupBean(this);
  }
 
  /**
   * 获取叶子组下所有的Job
   * @param groupId
   * @return
   */
  @Override
  public List<Tuple<JobDescriptor, JobStatus>> getChildrenJob(String groupId){
    List<JobPersistence> list=getHibernateTemplate().find("from com.taobao.zeus.store.mysql.persistence.JobPersistence where groupId="+groupId);
    List<Tuple<JobDescriptor, JobStatus>> result=new ArrayList<Tuple<JobDescriptor, JobStatus>>();
    if(list!=null){
      for(JobPersistence j:list){
        result.add(PersistenceAndBeanConvert.convert(j));
      }
    }
    return result;
  }
  /**
   * 获取组的下级组列表
   * @param groupId
   * @return
   */
  @Override
  public List<GroupDescriptor> getChildrenGroup(String groupId){
    List<GroupPersistence> list=getHibernateTemplate().find("from com.taobao.zeus.store.mysql.persistence.GroupPersistence where parent="+groupId);
    List<GroupDescriptor> result=new ArrayList<GroupDescriptor>();
    if(list!=null){
      for(GroupPersistence p:list){
        result.add(PersistenceAndBeanConvert.convert(p));
      }
    }
    return result;
  }

  @Override
  public GroupDescriptor getGroupDescriptor(String groupId) {
    GroupPersistence persist=(GroupPersistence)getHibernateTemplate().get(GroupPersistence.class, Integer.valueOf(groupId));
    if(persist!=null){
      return PersistenceAndBeanConvert.convert(persist);
    }
    return null;
  }

  @Override
  public Tuple<JobDescriptor,JobStatus> getJobDescriptor(String jobId) {
    JobPersistence persist=getJobPersistence(jobId);
    if(persist==null){
      return null;
    }
    return PersistenceAndBeanConvert.convert(persist);
  }
 
  private JobPersistence getJobPersistence(String jobId){
    JobPersistence persist=(JobPersistence) getHibernateTemplate().get(JobPersistence.class, Long.valueOf(jobId));
    if(persist==null){
      return null;
    }
    return persist;
  }
 
  @Override
  public String getRootGroupId() {
    return (String) getHibernateTemplate().execute(new HibernateCallback() {
     
      @Override
      public Object doInHibernate(Session session) throws HibernateException,
          SQLException {
        Query query=session.createQuery("from com.taobao.zeus.store.mysql.persistence.GroupPersistence g order by g.id asc");
        query.setMaxResults(1);
        List<GroupPersistence> list=query.list();
        if(list==null || list.size()==0){
          GroupPersistence persist=new GroupPersistence();
          persist.setName("众神之神");
          persist.setOwner(ZeusUser.ADMIN.getUid());
          persist.setDirectory(0);
          session.save(persist);
          if(persist.getId()==null){
            return null;
          }
          return String.valueOf(persist.getId());
        }
        return String.valueOf(list.get(0).getId());
      }
    });
  }

  @Override
  public GroupBean getUpstreamGroupBean(String groupId) {
    return GroupManagerTool.getUpstreamGroupBean(groupId, this);
  }

  @Override
  public JobBean getUpstreamJobBean(String jobId) {
    return GroupManagerTool.getUpstreamJobBean(jobId, this);
  }

  @Override
  public void updateGroup(String user,GroupDescriptor group) throws ZeusException{
    GroupPersistence old=(GroupPersistence) getHibernateTemplate().get(GroupPersistence.class, Integer.valueOf(group.getId()));
    updateGroup(user, group, old.getOwner(),old.getParent()==null?null:old.getParent().toString());
  }
 
  public void updateGroup(String user,GroupDescriptor group,String owner,String parent) throws ZeusException{
   
    GroupPersistence old=(GroupPersistence) getHibernateTemplate().get(GroupPersistence.class, Integer.valueOf(group.getId()));
   
    GroupPersistence persist=PersistenceAndBeanConvert.convert(group);
   
    persist.setOwner(owner);
    if(parent!=null){
      persist.setParent(Integer.valueOf(parent));
    }
   
   
    //以下属性不允许修改,强制采用老的数据
    persist.setDirectory(old.getDirectory());
    persist.setGmtCreate(old.getGmtCreate());
    persist.setGmtModified(new Date());
   
    getHibernateTemplate().update(persist);
  }
 

  @Override
  public void updateJob(String user,JobDescriptor job) throws ZeusException {
    JobPersistence orgPersist=(JobPersistence) getHibernateTemplate().get(JobPersistence.class, Long.valueOf(job.getId()));
    updateJob(user, job, orgPersist.getOwner(),orgPersist.getGroupId().toString());
  }
 
  public void updateJob(String user,JobDescriptor job,String owner,String groupId) throws ZeusException {
    JobPersistence orgPersist=(JobPersistence) getHibernateTemplate().get(JobPersistence.class, Long.valueOf(job.getId()));
    if(job.getScheduleType()==JobScheduleType.Independent){
      job.setDependencies(new ArrayList<String>());
    }else if(job.getScheduleType()==JobScheduleType.Dependent){
      job.setCronExpression("");
    }
    job.setOwner(owner);
    job.setGroupId(groupId);
    //以下属性不允许修改,强制采用老的数据
    JobPersistence persist=PersistenceAndBeanConvert.convert(job);
    persist.setGmtCreate(orgPersist.getGmtCreate());
    persist.setGmtModified(new Date());
    persist.setRunType(orgPersist.getRunType());
    persist.setStatus(orgPersist.getStatus());
    persist.setReadyDependency(orgPersist.getReadyDependency());
   
    if(jobValidate.valide(job)){
      getHibernateTemplate().update(persist);
    }
  }
  @Autowired
  private JobValidate jobValidate;
 

  @Override
  public GroupDescriptor createGroup(String user, String groupName,
      String parentGroup, boolean isDirectory) throws ZeusException {
    if(parentGroup==null){
      throw new ZeusException("parent group may not be null");
    }
    GroupDescriptor group=new GroupDescriptor();
    group.setOwner(user);
    group.setName(groupName);
    group.setParent(parentGroup);
    group.setDirectory(isDirectory);
   
   
   
    GroupValidate.valide(group);
   
    GroupPersistence persist=PersistenceAndBeanConvert.convert(group);
    persist.setGmtCreate(new Date());
    persist.setGmtModified(new Date());
   
    getHibernateTemplate().save(persist);
    return PersistenceAndBeanConvert.convert(persist);
  }

  @Override
  public JobDescriptor createJob(String user, String jobName,
      String parentGroup,JobRunType jobType) throws ZeusException {
    GroupDescriptor parent=getGroupDescriptor(parentGroup);
    if(parent.isDirectory()){
      throw new ZeusException("目录组下不得创建Job");
    }
    JobDescriptor job=new JobDescriptor();
    job.setOwner(user);
    job.setName(jobName);
    job.setGroupId(parentGroup);
    job.setJobType(jobType);
    job.setPreProcessers(Arrays.asList((Processer)new DownloadProcesser()));
    JobPersistence persist=PersistenceAndBeanConvert.convert(job);
    persist.setGmtCreate(new Date());
    persist.setGmtModified(new Date());
    getHibernateTemplate().save(persist);
    return PersistenceAndBeanConvert.convert(persist).getX();
  }

 
  @Override
  public Map<String, Tuple<JobDescriptor, JobStatus>> getJobDescriptor(final Collection<String> jobIds) {
    List<Tuple<JobDescriptor, JobStatus>> list=(List<Tuple<JobDescriptor, JobStatus>>) getHibernateTemplate().execute(new HibernateCallback() {
     
      @Override
      public Object doInHibernate(Session session) throws HibernateException,
          SQLException {
        if(jobIds.isEmpty()){
          return Collections.emptyList();
        }
        List<Long> ids=new ArrayList<Long>();
        for(String i:jobIds){
          ids.add(Long.valueOf(i));
        }
        Query query=session.createQuery("from com.taobao.zeus.store.mysql.persistence.JobPersistence where id in (:list)");
        query.setParameterList("list", ids);
        List<JobPersistence> list= query.list();
        List<Tuple<JobDescriptor, JobStatus>> result=new ArrayList<Tuple<JobDescriptor, JobStatus>>();
        if(list!=null && !list.isEmpty()){
          for(JobPersistence persist:list){
            result.add(PersistenceAndBeanConvert.convert(persist));
          }
        }
        return result;
      }
    });
   
    Map<String, Tuple<JobDescriptor, JobStatus>> map=new HashMap<String, Tuple<JobDescriptor, JobStatus>>();
    for(Tuple<JobDescriptor, JobStatus> jd:list){
      map.put(jd.getX().getId(), jd);
    }
    return map;
  }

  @Override
  public void updateJobStatus(JobStatus jobStatus) {
    JobPersistence persistence=getJobPersistence(jobStatus.getJobId());
    persistence.setGmtModified(new Date());
   
    //只修改状态  和  依赖 2个字段
    JobPersistence temp=PersistenceAndBeanConvert.convert(jobStatus);
    persistence.setStatus(temp.getStatus());
    persistence.setReadyDependency(temp.getReadyDependency());
    persistence.setHistoryId(temp.getHistoryId());
   
    getHibernateTemplate().update(persistence);
  }

  @Override
  public JobStatus getJobStatus(String jobId) {
    Tuple<JobDescriptor, JobStatus> tuple=getJobDescriptor(jobId);
    if(tuple==null){
      return null;
    }
    return tuple.getY();
  }

  @Override
  public void grantGroupOwner(String granter, String uid, String groupId) throws ZeusException {
    GroupDescriptor gd=getGroupDescriptor(groupId);
    if(gd!=null){
      updateGroup(granter, gd,uid,gd.getParent());
    }
  }

  @Override
  public void grantJobOwner(String granter, String uid, String jobId)throws ZeusException {
    Tuple<JobDescriptor, JobStatus> job=getJobDescriptor(jobId);
    if(job!=null){
      job.getX().setOwner(uid);
      updateJob(granter, job.getX(),uid,job.getX().getGroupId());
    }
  }

  @Override
  public void moveJob(String uid,String jobId, String groupId) throws ZeusException {
    JobDescriptor jd=getJobDescriptor(jobId).getX();
    GroupDescriptor gd=getGroupDescriptor(groupId);
    if(gd.isDirectory()){
      throw new ZeusException("非法操作");
    }
    updateJob(uid, jd, jd.getOwner(), groupId);
  }

  @Override
  public void moveGroup(String uid,String groupId, String newParentGroupId)
      throws ZeusException {
    GroupDescriptor gd=getGroupDescriptor(groupId);
    GroupDescriptor parent=getGroupDescriptor(newParentGroupId);
    if(!parent.isDirectory()){
      throw new ZeusException("非法操作");
    }
    updateGroup(uid, gd, gd.getOwner(), newParentGroupId);
  }

}
TOP

Related Classes of com.taobao.zeus.store.mysql.MysqlGroupManager

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.