Package org.graylog2.rest.resources.streams

Source Code of org.graylog2.rest.resources.streams.StreamResource

/**
* This file is part of Graylog2.
*
* Graylog2 is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog2 is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog2.  If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog2.rest.resources.streams;

import com.codahale.metrics.annotation.Timed;
import com.google.common.collect.Maps;
import com.wordnik.swagger.annotations.Api;
import com.wordnik.swagger.annotations.ApiOperation;
import com.wordnik.swagger.annotations.ApiParam;
import com.wordnik.swagger.annotations.ApiResponse;
import com.wordnik.swagger.annotations.ApiResponses;
import org.apache.shiro.authz.annotation.RequiresAuthentication;
import org.apache.shiro.authz.annotation.RequiresPermissions;
import org.bson.types.ObjectId;
import org.cliffc.high_scale_lib.Counter;
import org.graylog2.database.NotFoundException;
import org.graylog2.database.ValidationException;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.Tools;
import org.graylog2.plugin.alarms.AlertCondition;
import org.graylog2.plugin.streams.Output;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.plugin.streams.StreamRule;
import org.graylog2.rest.resources.RestResource;
import org.graylog2.rest.resources.streams.requests.CreateRequest;
import org.graylog2.rest.resources.streams.responses.StreamListResponse;
import org.graylog2.rest.resources.streams.rules.requests.CreateStreamRuleRequest;
import org.graylog2.security.RestPermissions;
import org.graylog2.shared.stats.ThroughputStats;
import org.graylog2.streams.StreamRouter;
import org.graylog2.streams.StreamRuleService;
import org.graylog2.streams.StreamService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* @author Lennart Koopmann <lennart@torch.sh>
*/
@RequiresAuthentication
@Api(value = "Streams", description = "Manage streams")
@Path("/streams")
public class StreamResource extends RestResource {
  private static final Logger LOG = LoggerFactory.getLogger(StreamResource.class);

    private final StreamService streamService;
    private final StreamRuleService streamRuleService;
    private final ThroughputStats throughputStats;
    private final StreamRouter streamRouter;

    @Inject
    public StreamResource(StreamService streamService,
                          StreamRuleService streamRuleService,
                          ThroughputStats throughputStats,
                          StreamRouter streamRouter) {
        this.streamService = streamService;
        this.streamRuleService = streamRuleService;
        this.throughputStats = throughputStats;
        this.streamRouter = streamRouter;
    }

    @POST @Timed
    @ApiOperation(value = "Create a stream")
    @RequiresPermissions(RestPermissions.STREAMS_CREATE)
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    public Response create(@ApiParam(name = "JSON body", required = true) final CreateRequest cr) throws ValidationException {
        checkPermission(RestPermissions.STREAMS_CREATE);

        // Create stream.

        final Stream stream = streamService.create(cr, getCurrentUser().getName());
        stream.setDisabled(true);

        final String id = streamService.save(stream);

        if (cr.rules != null && cr.rules.size() > 0) {
            for (CreateStreamRuleRequest request : cr.rules) {
                StreamRule streamRule = streamRuleService.create(id, request);
                streamRuleService.save(streamRule);
            }
        }

        Map<String, Object> result = Maps.newHashMap();
        result.put("stream_id", id);

        return Response.status(Response.Status.CREATED).entity(result).build();
    }
   
    @GET @Timed
    @ApiOperation(value = "Get a list of all streams")
    @Produces(MediaType.APPLICATION_JSON)
    public StreamListResponse get() {
        StreamListResponse response = new StreamListResponse();
        response.streams = new ArrayList<>();

        for (Stream stream : streamService.loadAll())
            if (isPermitted(RestPermissions.STREAMS_READ, stream.getId()))
                response.streams.add(stream);

        response.total = response.streams.size();

        return response;
    }

    @GET @Path("/enabled") @Timed
    @ApiOperation(value = "Get a list of all enabled streams")
    @Produces(MediaType.APPLICATION_JSON)
    public StreamListResponse getEnabled() throws NotFoundException {
        StreamListResponse response = new StreamListResponse();
        response.streams = new ArrayList<>();
        for (Stream stream : streamService.loadAllEnabled())
            if (isPermitted(RestPermissions.STREAMS_READ, stream.getId()))
                response.streams.add(stream);
        response.total = response.streams.size();

        return response;
    }

    @GET @Path("/{streamId}") @Timed
    @ApiOperation(value = "Get a single stream")
    @Produces(MediaType.APPLICATION_JSON)
    @ApiResponses(value = {
            @ApiResponse(code = 404, message = "Stream not found."),
            @ApiResponse(code = 400, message = "Invalid ObjectId.")
    })
    public Stream get(@ApiParam(name = "streamId", required = true) @PathParam("streamId") String streamId) throws NotFoundException {
        if (streamId == null || streamId.isEmpty()) {
          LOG.error("Missing streamId. Returning HTTP 400.");
          throw new WebApplicationException(400);
        }
        checkPermission(RestPermissions.STREAMS_READ, streamId);

        return streamService.load(streamId);
    }

    @PUT @Timed
    @Path("/{streamId}")
    @ApiOperation(value = "Update a stream")
    @RequiresPermissions(RestPermissions.STREAMS_EDIT)
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    @ApiResponses(value = {
            @ApiResponse(code = 404, message = "Stream not found."),
            @ApiResponse(code = 400, message = "Invalid ObjectId.")
    })
    public Stream update(@ApiParam(name = "JSON body", required = true) CreateRequest cr, @ApiParam(name = "streamId", required = true) @PathParam("streamId") String streamId) throws NotFoundException, ValidationException {
        checkPermission(RestPermissions.STREAMS_EDIT, streamId);
        final Stream stream = streamService.load(streamId);

        stream.setTitle(cr.title);
        stream.setDescription(cr.description);

        streamService.save(stream);

        return stream;
    }

    @DELETE @Path("/{streamId}") @Timed
    @ApiOperation(value = "Delete a stream")
    @ApiResponses(value = {
            @ApiResponse(code = 404, message = "Stream not found."),
            @ApiResponse(code = 400, message = "Invalid ObjectId.")
    })
    public Response delete(@ApiParam(name = "streamId", required = true) @PathParam("streamId") String streamId) throws NotFoundException {
        checkPermission(RestPermissions.STREAMS_EDIT, streamId);
        Stream stream = streamService.load(streamId);
        streamService.destroy(stream);

        return Response.noContent().build();
    }

    @POST @Path("/{streamId}/pause") @Timed
    @ApiOperation(value = "Pause a stream")
    @ApiResponses(value = {
            @ApiResponse(code = 404, message = "Stream not found."),
            @ApiResponse(code = 400, message = "Invalid or missing Stream id.")
    })
    public Response pause(@ApiParam(name = "streamId", required = true) @PathParam("streamId") String streamId) throws NotFoundException, ValidationException {
        if (streamId == null || streamId.isEmpty()) {
            LOG.error("Missing streamId. Returning HTTP 400.");
            throw new WebApplicationException(400);
        }
        checkPermission(RestPermissions.STREAMS_CHANGESTATE, streamId);

        final Stream stream = streamService.load(streamId);
        streamService.pause(stream);

        return Response.ok().build();
    }

    @POST @Path("/{streamId}/resume") @Timed
    @ApiOperation(value = "Resume a stream")
    @ApiResponses(value = {
            @ApiResponse(code = 404, message = "Stream not found."),
            @ApiResponse(code = 400, message = "Invalid or missing Stream id.")
    })
    public Response resume(@ApiParam(name = "streamId", required = true) @PathParam("streamId") String streamId) throws NotFoundException, ValidationException {
        if (streamId == null || streamId.isEmpty()) {
            LOG.error("Missing streamId. Returning HTTP 400.");
            throw new WebApplicationException(400);
        }
        checkPermission(RestPermissions.STREAMS_CHANGESTATE, streamId);

        final Stream stream = streamService.load(streamId);
        streamService.resume(stream);

        return Response.ok().build();
    }

    @POST @Path("/{streamId}/testMatch") @Timed
    @ApiOperation(value = "Test matching of a stream against a supplied message")
    @ApiResponses(value = {
            @ApiResponse(code = 404, message = "Stream not found."),
            @ApiResponse(code = 400, message = "Invalid or missing Stream id.")
    })
    public String testMatch(@ApiParam(name = "streamId", required = true) @PathParam("streamId") String streamId, @ApiParam(name = "JSON body", required = true) Map<String, Map<String, Object>> serialisedMessage) throws NotFoundException {
        checkPermission(RestPermissions.STREAMS_READ, streamId);

        if (serialisedMessage.get("message") == null) {
            LOG.error("Received invalid JSON body. Returning HTTP 400.");
            throw new WebApplicationException(400);
        }

        final Stream stream = streamService.load(streamId);
        final Message message = new Message(serialisedMessage.get("message"));

        final Map<StreamRule, Boolean> ruleMatches = streamRouter.getRuleMatches(stream, message);
        final Map<String, Boolean> rules = Maps.newHashMap();

        for (StreamRule ruleMatch : ruleMatches.keySet())
            rules.put(ruleMatch.getId(), ruleMatches.get(ruleMatch));

        final Map<String, Object> result = Maps.newHashMap();
        result.put("matches", streamRouter.doesStreamMatch(ruleMatches));
        result.put("rules", rules);

        return json(result);
    }

    @POST @Path("/{streamId}/clone") @Timed
    @ApiOperation(value = "Clone a stream")
    @RequiresPermissions(RestPermissions.STREAMS_CREATE)
    @ApiResponses(value = {
            @ApiResponse(code = 404, message = "Stream not found."),
            @ApiResponse(code = 400, message = "Invalid or missing Stream id.")
    })
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    public Response cloneStream(@ApiParam(name = "streamId", required = true) @PathParam("streamId") String streamId,
                          @ApiParam(name = "JSON body", required = true) CreateRequest cr) throws ValidationException, NotFoundException {
        checkPermission(RestPermissions.STREAMS_CREATE);
        checkPermission(RestPermissions.STREAMS_READ, streamId);

        final Stream sourceStream = streamService.load(streamId);

        // Create stream.
        final Map<String, Object> streamData = Maps.newHashMap();
        streamData.put("title", cr.title);
        streamData.put("description", cr.description);
        streamData.put("creator_user_id", getCurrentUser().getName());
        streamData.put("created_at", Tools.iso8601());

        final Stream stream = streamService.create(streamData);
        streamService.pause(stream);
        final String id;
        streamService.save(stream);
        id = stream.getId();

        final List<StreamRule> sourceStreamRules;

        sourceStreamRules = streamRuleService.loadForStream(sourceStream);

        if (sourceStreamRules.size() > 0) {
            for (StreamRule streamRule : sourceStreamRules) {
                Map<String, Object> streamRuleData = Maps.newHashMap();

                streamRuleData.put("type", streamRule.getType().toInteger());
                streamRuleData.put("field", streamRule.getField());
                streamRuleData.put("value", streamRule.getValue());
                streamRuleData.put("inverted", streamRule.getInverted());
                streamRuleData.put("stream_id", new ObjectId(id));

                StreamRule newStreamRule = streamRuleService.create(streamRuleData);
                streamRuleService.save(newStreamRule);
            }
        }

        for (AlertCondition alertCondition : streamService.getAlertConditions(sourceStream))
                streamService.addAlertCondition(stream, alertCondition);

        for (Map.Entry<String, List<String>> entry : sourceStream.getAlertReceivers().entrySet())
            for (String receiver : entry.getValue())
                streamService.addAlertReceiver(stream, entry.getKey(), receiver);

        for (Output output : sourceStream.getOutputs())
            streamService.addOutput(stream, output);

        Map<String, Object> result = Maps.newHashMap();
        result.put("stream_id", id);

        return Response.status(Response.Status.CREATED).entity(json(result)).build();
    }

    @GET @Timed
    @Path("/{streamId}/throughput")
    @ApiOperation(value = "Current throughput of this stream on this node in messages per second")
    @Produces(MediaType.APPLICATION_JSON)
    public Response oneStreamThroughput(@ApiParam(name="streamId", required = true) @PathParam("streamId") String streamId) {
        final Map<String, Long> result = Maps.newHashMap();
        result.put("throughput", 0L);

        final HashMap<String,Counter> currentStreamThroughput = throughputStats.getCurrentStreamThroughput();
        if (currentStreamThroughput != null) {
            final Counter counter = currentStreamThroughput.get(streamId);
            if (counter != null && isPermitted(RestPermissions.STREAMS_READ, streamId))
                result.put("throughput", counter.get());
        }
        return Response.ok().entity(json(result)).build();
    }

    @GET @Timed
    @Path("/throughput")
    @ApiOperation("Current throughput of all visible streams on this node in messages per second")
    @Produces(MediaType.APPLICATION_JSON)
    public Response streamThroughput() {
        Map<String, Map<String, Long>> result = Maps.newHashMap();
        final Map<String, Long> perStream = Maps.newHashMap();
        result.put("throughput", perStream);

        final HashMap<String,Counter> currentStreamThroughput = throughputStats.getCurrentStreamThroughput();
        if (currentStreamThroughput != null)
            for (Map.Entry<String, Counter> entry : currentStreamThroughput.entrySet())
                if (entry.getValue() != null && isPermitted(RestPermissions.STREAMS_READ, entry.getKey()))
                    perStream.put(entry.getKey(), entry.getValue().get());

        return Response.ok().entity(json(result)).build();
    }
}
TOP

Related Classes of org.graylog2.rest.resources.streams.StreamResource

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.