Package org.apache.uima.ducc.rm.config

Source Code of org.apache.uima.ducc.rm.config.ResourceManagerConfiguration$NodeMetricsProcessor

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.rm.config;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.uima.ducc.common.config.CommonConfiguration;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.SystemPropertyResolver;
import org.apache.uima.ducc.rm.NodeStability;
import org.apache.uima.ducc.rm.ResourceManager;
import org.apache.uima.ducc.rm.ResourceManagerComponent;
import org.apache.uima.ducc.rm.event.ResourceManagerEventListener;
import org.apache.uima.ducc.rm.scheduler.SchedConstants;
import org.apache.uima.ducc.transport.DuccTransportConfiguration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;


@Configuration
    @Import({DuccTransportConfiguration.class,CommonConfiguration.class})

    public class ResourceManagerConfiguration
    implements SchedConstants
    {
        @Autowired CommonConfiguration common;
        @Autowired DuccTransportConfiguration resourceManagerTransport;

        DuccLogger logger = DuccLogger.getLogger(this.getClass(), COMPONENT_NAME);
       
        public ResourceManagerEventListener resourceManagerDelegateListener(ResourceManagerComponent rm) {
            ResourceManagerEventListener jmel =  new ResourceManagerEventListener(rm);
            int nodeStability = SystemPropertyResolver.getIntProperty("ducc.rm.node.stability", DEFAULT_STABILITY_COUNT);
            int agentMetricsRate = SystemPropertyResolver.getIntProperty("ducc.agent.node.metrics.publish.rate", DEFAULT_NODE_METRICS_RATE);
            NodeStability ns = new NodeStability(rm, nodeStability, agentMetricsRate);           
            rm.setNodeStability(ns);
            jmel.setEndpoint(common.rmStateUpdateEndpoint);
            jmel.setNodeStability(ns);
            ns.start();
            return jmel;
        }
       
        public RouteBuilder routeBuilderForEndpoint(final String endpoint, final ResourceManagerEventListener delegate) {
            return new RouteBuilder() {
                public void configure() {
                    from(endpoint)
                        .bean(delegate);
                }
            };
        }


        // test and debug only - user routeBuilderForEndpoint normally
        public RouteBuilder routeBuilderForJmEndpoint(final String endpoint, final ResourceManagerEventListener delegate) {
            System.out.println("Starting JM endpoint " + endpoint + "  ???????????????????????");
            return new RouteBuilder() {
                public void configure() {
                    from(endpoint)
                        .threads(10)
                        .bean(delegate);
                }
            };
        }
       
        @Bean
        public ResourceManagerComponent resourceManager()
        throws Throwable
        {
            String methodName = "resourceManager";
            ResourceManagerComponent rm = null;
            try {           
                rm = new ResourceManagerComponent(common.camelContext());

                // rm.init();

                rm.setTransportConfiguration(resourceManagerTransport.duccEventDispatcher(common.rmStateUpdateEndpoint, rm.getContext()),
                                             common.rmStateUpdateEndpoint);
       
                //  Instantiate Resource Manager delegate listener. This listener will receive
                //  incoming messages.
                ResourceManagerEventListener delegateListener = this.resourceManagerDelegateListener(rm);

                //  Inject a dispatcher into the listener in case it needs to send
                //  a message to another component.
                delegateListener.setDuccEventDispatcher(resourceManagerTransport.duccEventDispatcher(common.orchestratorStateUpdateEndpoint,rm.getContext()));

                //  Inject Camel Router that will generate state updates at regular intervals
                // jrc rm.getContext().addRoutes(this.routeBuilderForRMStateUpdate(rm, common.rmStateUpdateEndpoint, Integer.parseInt(common.rmStatePublishRate)));

                //  Inject Camel Router that will handle Job Manager state update messages
                // rm.getContext().addRoutes(this.routeBuilderForEndpoint(common.orchestratorStateUpdateEndpoint, delegateListener));
                rm.getContext().addRoutes(this.routeBuilderForEndpoint(common.orchestratorAbbreviatedStateUpdateEndpoint, delegateListener));

                //  Inject Camel Router that will handle Agent Node inventory update messages
                // rm.getContext().addRoutes(this.routeBuilderForEndpoint(common.nodeInventoryEndpoint,delegateListener));

                //  Inject Camel Router that will handle Node Metrics messages
                rm.getContext().addRoutes(this.routeBuilderForEndpoint(common.nodeMetricsEndpoint, delegateListener));
               
                return rm;
            } catch ( Throwable t ) {
                logger.fatal(methodName, null, t);
                throw new IllegalStateException("Can't start RM: " + t.getMessage());
            }

        }

        public class ResourceManagerStateUpdateProcessor implements Processor {
            private ResourceManager resourceManager;
            public ResourceManagerStateUpdateProcessor(ResourceManager resourceManager) {
                this.resourceManager = resourceManager;
            }
            public void process(Exchange exchange) throws Exception {
                exchange.getIn().setBody(resourceManager.getState()); //new RmStateDuccEvent());
            }
               
        }

        public class NodeInventoryProcessor implements Processor {

            public void process(Exchange exchange) throws Exception {
                //                      System.out.println("... transport received Event. Body Type:"+exchange.getIn().getBody().getClass().getName());
                //Destination replyTo = exchange.getIn().getHeader("JMSReplyTo", Destination.class);
                //                      System.out.println("... transport - value of replyTo:" + replyTo);
            }
               
        }

        public class NodeMetricsProcessor implements Processor {

            public void process(Exchange exchange) throws Exception {
                //                      System.out.println("... transport received Event. Body Type:"+exchange.getIn().getBody().getClass().getName());
                //Destination replyTo = exchange.getIn().getHeader("JMSReplyTo", Destination.class);
                //                      System.out.println("... transport - value of replyTo:" + replyTo);
            }
               
        }

    }
TOP

Related Classes of org.apache.uima.ducc.rm.config.ResourceManagerConfiguration$NodeMetricsProcessor

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.