Package org.apache.logging.log4j.flume.appender

Source Code of org.apache.logging.log4j.flume.appender.FlumePersistentManager$BDBManagerFactory

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache license, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the license for the specific language governing permissions and
* limitations under the license.
*/
package org.apache.logging.log4j.flume.appender;

import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.StatsConfig;
import org.apache.flume.Event;
import org.apache.flume.event.SimpleEvent;
import org.apache.logging.log4j.LoggingException;
import org.apache.logging.log4j.core.appender.ManagerFactory;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.config.plugins.PluginManager;
import org.apache.logging.log4j.core.config.plugins.PluginType;
import org.apache.logging.log4j.core.helpers.FileUtils;
import org.apache.logging.log4j.core.helpers.SecretKeyProvider;

import javax.crypto.Cipher;
import javax.crypto.SecretKey;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
*
*/
public class FlumePersistentManager extends FlumeAvroManager {

    public static final String KEY_PROVIDER = "keyProvider";

    private static final Charset UTF8 = Charset.forName("UTF-8");

    private static final String SHUTDOWN = "Shutdown";

    private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";

    private static BDBManagerFactory factory = new BDBManagerFactory();

    private Database database;

    private final WriterThread worker;

    private final LinkedBlockingQueue<byte []> queue = new LinkedBlockingQueue<byte[]>();

    private final SecretKey secretKey;

    private final int delay;

    /**
     * Constructor
     * @param name The unique name of this manager.
     * @param agents An array of Agents.
     * @param batchSize The number of events to include in a batch.
     * @param database The database to write to.
     */
    protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
                                     final int batchSize, final int retries, final int connectionTimeout,
                                     final int requestTimeout, final int delay, final Database database,
                                     SecretKey secretKey) {
        super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout);
        this.delay = delay;
        this.database = database;
        this.worker = new WriterThread(database, this, queue, batchSize, secretKey);
        this.worker.start();
        this.secretKey = secretKey;
    }


    /**
     * Returns a FlumeAvroManager.
     * @param name The name of the manager.
     * @param agents The agents to use.
     * @param batchSize The number of events to include in a batch.
     * @return A FlumeAvroManager.
     */
    public static FlumePersistentManager getManager(final String name, final Agent[] agents, Property[] properties,
                                                    int batchSize, final int retries, final int connectionTimeout,
                                                    final int requestTimeout, final int delay, final String dataDir) {
        if (agents == null || agents.length == 0) {
            throw new IllegalArgumentException("At least one agent is required");
        }

        if (batchSize <= 0) {
            batchSize = 1;
        }
        String dataDirectory = dataDir == null || dataDir.length() == 0 ? DEFAULT_DATA_DIR : dataDir;

        final StringBuilder sb = new StringBuilder("FlumePersistent[");
        boolean first = true;
        for (final Agent agent : agents) {
            if (!first) {
                sb.append(",");
            }
            sb.append(agent.getHost()).append(":").append(agent.getPort());
            first = false;
        }
        sb.append("]");
        sb.append(" ").append(dataDirectory);
        return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
            connectionTimeout, requestTimeout, delay, dataDir, properties));
    }

    @Override
    public synchronized void send(final Event event)  {
        if (worker.isShutdown()) {
            throw new LoggingException("Unable to record event");
        }

        Map<String, String> headers = event.getHeaders();
        byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
        try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            DataOutputStream daos = new DataOutputStream(baos);
            daos.writeInt(event.getBody().length);
            daos.write(event.getBody(), 0, event.getBody().length);
            daos.writeInt(event.getHeaders().size());
            for (Map.Entry<String, String> entry : headers.entrySet()) {
                daos.writeUTF(entry.getKey());
                daos.writeUTF(entry.getValue());
            }
            byte[] eventData = baos.toByteArray();
            if (secretKey != null) {
                Cipher cipher = Cipher.getInstance("AES");
                cipher.init(Cipher.ENCRYPT_MODE, secretKey);
                eventData = cipher.doFinal(eventData);
            }
            final DatabaseEntry key = new DatabaseEntry(keyData);
            final DatabaseEntry data = new DatabaseEntry(eventData);
            database.put(null, key, data);
            queue.add(keyData);
        } catch (Exception ex) {
            throw new LoggingException("Exception occurred writing log event", ex);
        }
    }

    @Override
    protected void releaseSub() {
        LOGGER.debug("Shutting down FlumePersistentManager");
        worker.shutdown();
        try {
            worker.join();
        } catch (InterruptedException ex) {
            LOGGER.debug("Interrupted while waiting for worker to complete");
        }
        try {
            LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
            database.close();
        } catch (final Exception ex) {
            LOGGER.warn("Failed to close database", ex);
        }
        super.releaseSub();
    }

    private void doSend(final SimpleEvent event) {
        LOGGER.debug("Sending event to Flume");
        super.send(event);
    }

    /**
     * Factory data.
     */
    private static class FactoryData {
        private final String name;
        private final Agent[] agents;
        private final int batchSize;
        private final String dataDir;
        private final int retries;
        private final int connectionTimeout;
        private final int requestTimeout;
        private final int delay;
        private final Property[] properties;

        /**
         * Constructor.
         * @param name The name of the Appender.
         * @param agents The agents.
         * @param batchSize The number of events to include in a batch.
         * @param dataDir The directory for data.
         */
        public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
                           final int connectionTimeout, final int requestTimeout, final int delay,
                           final String dataDir, final Property[] properties) {
            this.name = name;
            this.agents = agents;
            this.batchSize = batchSize;
            this.dataDir = dataDir;
            this.retries = retries;
            this.connectionTimeout = connectionTimeout;
            this.requestTimeout = requestTimeout;
            this.delay = delay;
            this.properties = properties;
        }
    }

    /**
     * Avro Manager Factory.
     */
    private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {

        /**
         * Create the FlumeKratiManager.
         * @param name The name of the entity to manage.
         * @param data The data required to create the entity.
         * @return The FlumeKratiManager.
         */
        public FlumePersistentManager createManager(final String name, final FactoryData data) {
            SecretKey secretKey = null;

            Database database;

            Map<String, String> properties = new HashMap<String, String>();
            if (data.properties != null) {
                for (Property property : data.properties) {
                    properties.put(property.getName(), property.getValue());
                }
            }

            try {

                File dir = new File(data.dataDir);
                FileUtils.mkdir(dir, true);
                final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
                dbEnvConfig.setTransactional(false);
                dbEnvConfig.setAllowCreate(true);
                final Environment environment = new Environment(dir, dbEnvConfig);
                final DatabaseConfig dbConfig = new DatabaseConfig();
                dbConfig.setTransactional(false);
                dbConfig.setAllowCreate(true);
                database = environment.openDatabase(null, name, dbConfig);
            } catch (final Exception ex) {
                LOGGER.error("Could not create FlumePersistentManager", ex);
                return null;
            }

            try {
                String key = null;
                for (Map.Entry<String, String> entry : properties.entrySet()) {
                    if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
                        key = entry.getValue();
                    }
                }
                if (key != null) {
                    final PluginManager manager = new PluginManager("KeyProvider", SecretKeyProvider.class);
                    manager.collectPlugins();
                    final Map<String, PluginType> plugins = manager.getPlugins();
                    if (plugins != null) {
                        boolean found = false;
                        for (Map.Entry<String, PluginType> entry : plugins.entrySet()) {
                            if (entry.getKey().equalsIgnoreCase(key)) {
                                found = true;
                                Class cl = entry.getValue().getPluginClass();
                                try {
                                    SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
                                    secretKey = provider.getSecretKey();
                                    LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
                                } catch (Exception ex) {
                                    LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
                                        cl.getName());
                                }
                                break;
                            }
                        }
                        if (!found) {
                            LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
                        }
                    } else {
                        LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
                    }
                }
            } catch (Exception ex) {
                LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
            }
            return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
                data.connectionTimeout, data.requestTimeout, data.delay, database, secretKey);
        }
    }

    private static class WriterThread extends Thread  {
        private volatile boolean shutdown = false;
        private final Database database;
        private final FlumePersistentManager manager;
        private final LinkedBlockingQueue<byte[]> queue;
        private final SecretKey secretKey;
        private final int batchSize;

        public WriterThread(Database database, FlumePersistentManager manager, LinkedBlockingQueue<byte[]> queue,
                            int batchsize, SecretKey secretKey) {
            this.database = database;
            this.manager = manager;
            this.queue = queue;
            this.batchSize = batchsize;
            this.secretKey = secretKey;
            this.setDaemon(true);
        }

        public void shutdown() {
            LOGGER.debug("Writer thread shutting down");
            this.shutdown = true;
            if (queue.size() == 0) {
                queue.add(SHUTDOWN.getBytes(UTF8));
            }
        }

        public boolean isShutdown() {
            return shutdown;
        }

        @Override
        public void run() {
            LOGGER.trace("WriterThread started");
            long lastBatch = System.currentTimeMillis();
            while (!shutdown) {
                if (database.count() >= batchSize ||
                    database.count() > 0 && lastBatch + manager.delay > System.currentTimeMillis()) {
                    lastBatch = System.currentTimeMillis();
                    try {
                        boolean errors = false;
                        DatabaseEntry key = new DatabaseEntry();
                        final DatabaseEntry data = new DatabaseEntry();
                        final Cursor cursor = database.openCursor(null, null);
                        try {
                            queue.clear();
                            OperationStatus status;
                            try {
                                status = cursor.getFirst(key, data, LockMode.RMW);
                                if (batchSize > 1) {
                                    BatchEvent batch = new BatchEvent();
                                    for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
                                        SimpleEvent event = createEvent(data);
                                        if (event != null) {
                                            batch.addEvent(event);
                                        }
                                        status = cursor.getNext(key, data, LockMode.RMW);
                                    }
                                    try {
                                        manager.send(batch);
                                    } catch (Exception ioe) {
                                        LOGGER.error("Error sending events", ioe);
                                        break;
                                    }
                                    for (Event event : batch.getEvents()) {
                                        try {
                                            Map<String, String> headers = event.getHeaders();
                                            key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
                                            database.delete(null, key);
                                        } catch (Exception ex) {
                                            LOGGER.error("Error deleting key from database", ex);
                                        }
                                    }
                                } else {
                                    while (status == OperationStatus.SUCCESS) {
                                        SimpleEvent event = createEvent(data);
                                        if (event != null) {
                                            try {
                                                manager.doSend(event);
                                            } catch (Exception ioe) {
                                                errors = true;
                                                LOGGER.error("Error sending event", ioe);
                                                break;
                                            }
                                            if (!errors) {
                                                try {
                                                    cursor.delete();
                                                } catch (Exception ex) {
                                                    LOGGER.error("Unable to delete event", ex);
                                                }
                                            }
                                        }
                                        status = cursor.getNext(key, data, LockMode.RMW);
                                    }
                                }
                            } catch (Exception ex) {
                                LOGGER.error("Error reading database", ex);
                                shutdown = true;
                                break;
                            }

                        } finally {
                            cursor.close();
                        }
                        if (errors) {
                            Thread.sleep(manager.delay);
                            continue;
                        }
                    } catch (Exception ex) {
                        LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
                    }
                } else {
                    try {
                        if (database.count() >= batchSize) {
                            continue;
                        }
                        queue.poll(manager.delay, TimeUnit.MILLISECONDS);
                        LOGGER.debug("WriterThread notified of work");
                    } catch (InterruptedException ie) {
                        LOGGER.warn("WriterThread interrupted, continuing");
                    } catch (Exception ex) {
                        LOGGER.error("WriterThread encountered an exception waiting for work", ex);
                        break;
                    }
                }
            }
            LOGGER.trace("WriterThread exiting");
        }

        private SimpleEvent createEvent(DatabaseEntry data) {
            SimpleEvent event = new SimpleEvent();
            try {
                byte[] eventData = data.getData();
                if (secretKey != null) {
                    Cipher cipher = Cipher.getInstance("AES");
                    cipher.init(Cipher.DECRYPT_MODE, secretKey);
                    eventData = cipher.doFinal(eventData);
                }
                ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
                DataInputStream dais = new DataInputStream(bais);
                int length = dais.readInt();
                byte[] bytes = new byte[length];
                dais.read(bytes, 0, length);
                event.setBody(bytes);
                length = dais.readInt();
                Map<String, String> map = new HashMap<String, String>(length);
                for (int i = 0; i < length; ++i) {
                    String headerKey = dais.readUTF();
                    String value = dais.readUTF();
                    map.put(headerKey, value);
                }
                event.setHeaders(map);
                return event;
            } catch (Exception ex) {
                LOGGER.error("Error retrieving event", ex);
                return null;
            }
        }

    }
}
TOP

Related Classes of org.apache.logging.log4j.flume.appender.FlumePersistentManager$BDBManagerFactory

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.