Package com.netflix.staash.mesh.server

Source Code of com.netflix.staash.mesh.server.MemoryServer

/*******************************************************************************
* /***
*  *
*  *  Copyright 2013 Netflix, Inc.
*  *
*  *     Licensed under the Apache License, Version 2.0 (the "License");
*  *     you may not use this file except in compliance with the License.
*  *     You may obtain a copy of the License at
*  *
*  *         http://www.apache.org/licenses/LICENSE-2.0
*  *
*  *     Unless required by applicable law or agreed to in writing, software
*  *     distributed under the License is distributed on an "AS IS" BASIS,
*  *     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*  *     See the License for the specific language governing permissions and
*  *     limitations under the License.
*  *
******************************************************************************/
package com.netflix.staash.mesh.server;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.UUID;
import java.util.Map.Entry;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.netflix.staash.mesh.CompareInstanceInfoByUuid;
import com.netflix.staash.mesh.InstanceInfo;
import com.netflix.staash.mesh.InstanceRegistry;
import com.netflix.staash.mesh.client.Client;
import com.netflix.staash.mesh.client.ClientFactory;
import com.netflix.staash.mesh.db.TopicRegistry;
import com.netflix.staash.mesh.db.memory.MemoryTopicFactory;
import com.netflix.staash.mesh.endpoints.EndpointPolicy;
import com.netflix.staash.mesh.messages.AsyncResponse;
import com.netflix.staash.mesh.messages.Message;
import com.netflix.staash.mesh.messages.RequestHandler;
import com.netflix.staash.mesh.messages.Verb;
import com.netflix.staash.mesh.server.handlers.DataPushHandler;
import com.netflix.staash.mesh.server.handlers.DataRequestHandler;
import com.netflix.staash.mesh.server.handlers.DataResponseHandler;
import com.netflix.staash.mesh.server.handlers.DigestRequestHandler;
import com.netflix.staash.mesh.server.handlers.DigestResponseHandler;

public class MemoryServer implements Server, RequestHandler {
    private static final CompareInstanceInfoByUuid comparator = new CompareInstanceInfoByUuid();
    private static final AtomicLong changeCounter = new AtomicLong();
   
    private final InstanceRegistry         instanceRegistry;
    private final ClientFactory            clientFactory;
    private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
    private final InstanceInfo             instanceInfo;
    private final EndpointPolicy           endpointPolicy;
    private final SortedMap<InstanceInfo, Client>  peers  = Maps.newTreeMap(comparator);
    private long  generationCounter = 0;
    private Map<Verb, RequestHandler>         verbHandlers = Maps.newEnumMap(Verb.class);
    private TopicRegistry topics = new TopicRegistry(new MemoryTopicFactory());
   
    @Inject
    public MemoryServer(InstanceRegistry instanceRegistry, ClientFactory clientFactory, EndpointPolicy endpointPolicy, String id) {
        this.instanceRegistry = instanceRegistry;
        this.clientFactory    = clientFactory;
        this.instanceInfo     = new InstanceInfo(id, UUID.randomUUID());
        this.endpointPolicy   = endpointPolicy;
       
        verbHandlers.put(Verb.DATA_PUSH,       new DataPushHandler());
        verbHandlers.put(Verb.DATA_REQUEST,    new DataRequestHandler());
        verbHandlers.put(Verb.DATA_RESPONSE,   new DataResponseHandler());
        verbHandlers.put(Verb.DIGEST_REQUEST,  new DigestRequestHandler());
        verbHandlers.put(Verb.DIGEST_RESPONSE, new DigestResponseHandler());
    }
   
    public void start() {
        System.out.println("Starting " + instanceInfo);
       
        this.instanceRegistry.join(instanceInfo);
       
//        executor.scheduleAtFixedRate(
//                new RefreshRingRunnable(this, instanceRegistry),
//                10, 10, TimeUnit.SECONDS);
    }
   
    public void stop() {
        System.out.println("Stopping " + instanceInfo);
       
        this.instanceRegistry.leave(instanceInfo);
    }

    /**
     * Update the list of all members in the ring
     * @param ring
     */
    public void setMembers(List<InstanceInfo> ring) {
        List<InstanceInfo> instances = endpointPolicy.getEndpoints(instanceInfo, ring);
        Collections.sort(instances, comparator);
       
        List<InstanceInfo> toRemove = Lists.newArrayList();
        List<InstanceInfo> toAdd    = Lists.newArrayList();
        List<InstanceInfo> toDisconnect = Lists.newArrayList();
       
        int changeCount = 0;
        for (Entry<InstanceInfo, Client> peer : peers.entrySet()) {
            // Determine if peers have been removed from the ring
            if (Collections.binarySearch(ring, peer.getKey(), comparator) < 0) {
                toRemove.add(peer.getKey());
                changeCount++;
            }
            // Determine if peers are no longer appropriate
            else if (Collections.binarySearch(instances, peer.getKey(), comparator) < 0) {
                toDisconnect.add(peer.getKey());
                changeCount++;
            }
        }
       
        // Add new peers
        for (InstanceInfo peer : instances) {
            if (!peers.containsKey(peer)) {
                toAdd.add(peer);
                changeCount++;
            }
        }
       
        for (InstanceInfo ii : toRemove) {
            removePeer(ii);
        }
       
        for (InstanceInfo ii : toDisconnect) {
            disconnectPeer(ii);
        }
       
        for (InstanceInfo ii : toAdd) {
            addPeer(ii);
        }
       
        generationCounter++;
        if (generationCounter > 1 && changeCount != 0)
            printPeers(changeCount);
    }
   
    /**
     * Remove a peer that is no longer in the ring.
     * @param instanceInfo
     */
    private void removePeer(InstanceInfo instanceInfo) {
        System.out.println("Removing peer " + this.instanceInfo + " -> " + instanceInfo);
        Client client = peers.remove(instanceInfo);
        client.shutdown();
    }
   
    /**
     * Add a new peer connection
     * @param instanceInfo
     */
    private void addPeer(InstanceInfo instanceInfo) {
//        System.out.println("Adding peer " + this.instanceInfo + " -> " + instanceInfo);
        Client client = clientFactory.createClient(instanceInfo);
        peers.put(instanceInfo, client);
        boostrapClient(client);
    }
   
    /**
     * Disconnect a peer that is no longer in our path
     * @param instanceInfo
     */
    private void disconnectPeer(InstanceInfo instanceInfo) {
        System.out.println("Disconnect peer " + this.instanceInfo + " -> " + instanceInfo);
        Client client = peers.remove(instanceInfo);
        if (client != null) {
            client.shutdown();
        }
        else {
            System.out.println(instanceInfo + " > " + peers);
        }
    }
   
    /**
     * List all peers to which this server is connected
     * @return
     */
    public Iterable<InstanceInfo> listPeers() {
        return peers.keySet();
    }
   
    private void boostrapClient(Client client) {
       
    }
   
    private void printPeers(int changeCount) {
        changeCounter.addAndGet(changeCount);
        StringBuilder sb = new StringBuilder();
        sb.append(">>> " + instanceInfo + " (" + changeCount + " of " + peers.size() + " / " + changeCounter.get() + ") gen=" + generationCounter + "\n");
//        for (Entry<InstanceInfo, Client> peer : peers.entrySet()) {
//            sb.append("   " + peer.getKey()).append("\n");
//        }
//       
        System.out.print(sb.toString());
    }

    @Override
    public void onMessage(Message message, AsyncResponse response) {
        try {
            RequestHandler handler = verbHandlers.get(message.getVerb());
            if (handler != null) {
                handler.onMessage(message,  response);
            }
        }
        catch (Exception e) {
            // Notify error
        }
    }
}
TOP

Related Classes of com.netflix.staash.mesh.server.MemoryServer

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.