Package krati.retention

Source Code of krati.retention.SimpleRetentionStoreReader

/*
* Copyright (c) 2010-2012 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package krati.retention;

import java.util.List;

import org.apache.log4j.Logger;

import krati.retention.clock.Clock;
import krati.store.DataStore;
import krati.util.IndexedIterator;

/**
* SimpleRetentionStoreReader
*
* @param <K> Key
* @param <V> Value
* @version 0.4.2
* @author jwu
*
* <p>
* 08/23, 2011 - Created <br/>
* 11/20, 2011 - Updated for SimpleRetention <br/>
* 01/25, 2012 - Fixed bootstrap scan logging info <br/>
* 02/08, 2012 - Update the clock of position upon finishing bootstrap <br/>
* 02/22, 2012 - Update the initial index start for Clock.ZERO <br/>
*/
public class SimpleRetentionStoreReader<K, V> extends AbstractRetentionStoreReader<K, V> {
    private final static Logger _logger = Logger.getLogger(SimpleRetentionStoreReader.class);
    private final String _source;
    private final Retention<K> _retention;
    private final DataStore<K, V> _store;
   
    public SimpleRetentionStoreReader(String source, Retention<K> retention, DataStore<K, V> store) {
        this._source = source;
        this._retention = retention;
        this._store = store;
    }
   
    public final DataStore<K, V> getStore() {
        return _store;
    }
   
    public final Retention<K> getRetention() {
        return _retention;
    }
   
    @Override
    public final String getSource() {
        return _source;
    }
   
    @Override
    public Position getPosition() {
        return _retention.getPosition();
    }
   
    @Override
    public Position getPosition(Clock sinceClock) {
        if(Clock.ZERO != sinceClock) {
            Position pos = _retention.getPosition(sinceClock);
            if(pos != null) {
                return pos;
            }
        }
       
        return new SimplePosition(_retention.getId(),
                                  _retention.getOffset(),
                                  getStoreIndexStart(), sinceClock);
    }
   
    @Override
    public V get(K key) throws Exception {
        return key == null ? null : _store.get(key);
    }
   
    @Override
    public Position get(Position pos, List<Event<K>> list) {
        if(pos.getId() != _retention.getId()) {
            if(pos.isIndexed()) {
                throw new InvalidPositionException("Bootstrap reconnection rejected", pos);
            } else {
                Position newPos = getPosition(pos.getClock());
                if(newPos == null) {
                    newPos = new SimplePosition(_retention.getId(),
                                                _retention.getOffset(),
                                                getStoreIndexStart(), pos.getClock());
                    _logger.warn("Reset position from " + pos + " to " + newPos);
                }
                pos = newPos;
            }
        }
       
        // Reset position if necessary
        if(pos.getOffset() < _retention.getOrigin()) {
            Position newPos = new SimplePosition(_retention.getId(),
                                                 _retention.getOffset(),
                                                 getStoreIndexStart(), pos.getClock());
            _logger.warn("Reset position from " + pos + " to " + newPos);
            pos = newPos;
        }
       
        // Read from the retention directly
        Position nextPos = _retention.get(pos, list);
       
        // Out of retention and need to start bootstrap
        if(nextPos == null && pos.isIndexed()) {
            int index = pos.getIndex();
            IndexedIterator<K> iter = _store.keyIterator();
           
            try {
                iter.reset(index);
            } catch(ArrayIndexOutOfBoundsException e) {
                Position newPos = new SimplePosition(_retention.getId(), pos.getOffset(), pos.getClock());
                _logger.warn("Reset position from " + pos + " to " + newPos, e);
                return newPos;
            }
           
            int cnt = 0;
            int lastIndex = index;
            while(iter.hasNext()) {
                lastIndex = iter.index();
                K key = iter.next();
                index = iter.index();
               
                list.add(new SimpleEvent<K>(key, pos.getClock()));
                cnt++;
               
                if(cnt >= _retention.getBatchSize()) {
                    if(lastIndex == index) {
                        while(iter.hasNext() && iter.index() == index) {
                            key = iter.next();
                            list.add(new SimpleEvent<K>(key, pos.getClock()));
                            cnt++;
                        }
                        index++;
                    }
                   
                    // Exit loop when enough events are collected
                    break;
                }
            }
           
            if(cnt > 0) {
                _logger.info("Read[" + pos.getIndex() + "," + index + ") " + cnt);
            }
           
            if(iter.hasNext()) {
                return new SimplePosition(_retention.getId(), pos.getOffset(), index, pos.getClock());
            } else {
                Clock newClock;
                newClock = _retention.getClock(pos.getOffset());
                if (newClock == null) newClock = pos.getClock();
                return new SimplePosition(_retention.getId(), pos.getOffset(), newClock);
            }
        } else {
            return nextPos;
        }
    }
   
    /**
     * Gets the index start of the underlying store.
     */
    protected int getStoreIndexStart() {
        return _store.keyIterator().index();
    }
}
TOP

Related Classes of krati.retention.SimpleRetentionStoreReader

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.