Skip to content

Commit

Permalink
Add stream destination filters (#19739)
Browse files Browse the repository at this point in the history
* Add StreamService#loadStreamTitles
* Start OpenAPI spec for the stream output filters API
* Add API resource skeletons for stream output filters
* Start StreamOutputFilterRuleService
* Make MongoCollections injectable via MongoDBExtension
* Add target-specific endpoints for filter rules
* Add search query parsing to StreamOutputFilterService
* Add changelog
* Implement endpoint to return available conditions
* Rename to "Stream Destination Filter"
* Add rule builder validation endpoint
* Scope the rest of StreamDestinationFilterService method to stream
* Validate rules on creation and update, handle errors

Refs Graylog2/graylog-plugin-enterprise#7210
  • Loading branch information
bernd authored Jul 4, 2024
1 parent d6449da commit ca06191
Show file tree
Hide file tree
Showing 14 changed files with 1,524 additions and 2 deletions.
530 changes: 530 additions & 0 deletions api-specs/stream-output-filters.yml

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions changelog/unreleased/pr-19739.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type = "a"
message = "Add filters for filterable stream destinations."

issues = ["graylog-plugin-enterprise#7210"]
pulls = ["19739"]
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ public class AuditEventTypes implements PluginAuditEventTypes {
public static final String SESSION_DELETE = PREFIX + "session:delete";
public static final String STATIC_FIELD_CREATE = PREFIX + "static_field:create";
public static final String STATIC_FIELD_DELETE = PREFIX + "static_field:delete";
public static final String STREAM_DESTINATION_FILTER_CREATE = PREFIX + "stream_destination_filter:create";
public static final String STREAM_DESTINATION_FILTER_DELETE = PREFIX + "stream_destination_filter:delete";
public static final String STREAM_DESTINATION_FILTER_UPDATE = PREFIX + "stream_destination_filter:update";
public static final String STREAM_CREATE = PREFIX + "stream:create";
public static final String STREAM_DELETE = PREFIX + "stream:delete";
public static final String STREAM_OUTPUT_ASSIGNMENT_CREATE = PREFIX + "stream_output_assignment:create";
Expand Down Expand Up @@ -292,6 +295,9 @@ public class AuditEventTypes implements PluginAuditEventTypes {
.add(STREAM_DELETE)
.add(STREAM_OUTPUT_ASSIGNMENT_CREATE)
.add(STREAM_OUTPUT_ASSIGNMENT_DELETE)
.add(STREAM_DESTINATION_FILTER_CREATE)
.add(STREAM_DESTINATION_FILTER_DELETE)
.add(STREAM_DESTINATION_FILTER_UPDATE)
.add(STREAM_RULE_CREATE)
.add(STREAM_RULE_DELETE)
.add(STREAM_RULE_UPDATE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.graylog2.rest.resources.search.KeywordSearchResource;
import org.graylog2.rest.resources.search.RelativeSearchResource;
import org.graylog2.rest.resources.streams.StreamResource;
import org.graylog2.rest.resources.streams.destinations.filters.StreamDestinationFilterBuilderResource;
import org.graylog2.rest.resources.streams.destinations.filters.StreamDestinationFiltersResource;
import org.graylog2.rest.resources.streams.outputs.StreamOutputResource;
import org.graylog2.rest.resources.streams.rules.StreamRuleInputsResource;
import org.graylog2.rest.resources.streams.rules.StreamRuleResource;
Expand Down Expand Up @@ -258,6 +260,8 @@ private void addStreamsResources() {
addSystemRestResource(StreamRuleResource.class);
addSystemRestResource(StreamResource.class);
addSystemRestResource(StreamRuleInputsResource.class);
addSystemRestResource(StreamDestinationFiltersResource.class);
addSystemRestResource(StreamDestinationFilterBuilderResource.class);
}

private void addMonitoringResources() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog2.rest.resources.streams.destinations.filters;

import com.google.common.collect.ImmutableSet;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import jakarta.inject.Inject;
import jakarta.validation.constraints.NotNull;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.ServerErrorException;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import org.apache.shiro.authz.annotation.RequiresAuthentication;
import org.apache.shiro.authz.annotation.RequiresPermissions;
import org.graylog.plugins.pipelineprocessor.rulebuilder.RuleBuilderRegistry;
import org.graylog.plugins.pipelineprocessor.rulebuilder.db.RuleFragment;
import org.graylog.plugins.pipelineprocessor.rulebuilder.parser.RuleBuilderService;
import org.graylog.plugins.pipelineprocessor.rulebuilder.parser.validation.ValidatorService;
import org.graylog.plugins.pipelineprocessor.rulebuilder.rest.RuleBuilderDto;
import org.graylog2.audit.jersey.NoAuditEvent;
import org.graylog2.shared.rest.resources.RestResource;
import org.graylog2.shared.security.RestPermissions;

import java.util.Comparator;
import java.util.Map;
import java.util.Set;

import static java.util.Objects.requireNonNullElse;
import static org.graylog2.shared.rest.documentation.generator.Generator.CLOUD_VISIBLE;

@Api(value = "Stream/Destinations/Filters/Builder", description = "Stream destination filter builder", tags = {CLOUD_VISIBLE})
@Path("/streams/destinations/filters/builder")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@RequiresAuthentication
public class StreamDestinationFilterBuilderResource extends RestResource {
// We want to control the available conditions for the stream destination filters to avoid exposing functions
// that don't make sense in the destination filter context.
// Check "/api/system/pipelines/rulebuilder/conditions" for all available conditions.
private static final Set<String> INCLUDED_CONDITIONS = ImmutableSet.<String>builder()
.add("array_contains")
.add("field_cidr")
.add("field_contains")
.add("field_ends_with")
.add("field_ip")
.add("field_not_null")
.add("field_null")
.add("field_starts_with")
.add("field_url")
.add("from_forwarder_input")
.add("from_input")
.add("grok_matches")
.add("has_field")
.add("has_field_equals")
.add("has_field_greater_or_equal")
.add("has_field_less_or_equal")
.add("lookup_has_value")
.add("lookup_string_list_contains")
.build();

private final RuleBuilderRegistry ruleBuilderRegistry;
private final ValidatorService validatorService;
private final RuleBuilderService ruleBuilderService;

@Inject
public StreamDestinationFilterBuilderResource(RuleBuilderRegistry ruleBuilderRegistry,
ValidatorService validatorService,
RuleBuilderService ruleBuilderService) {
this.ruleBuilderRegistry = ruleBuilderRegistry;
this.validatorService = validatorService;
this.ruleBuilderService = ruleBuilderService;
}

@GET
@Path("/conditions")
@ApiOperation(value = "Get available filter rule conditions")
@RequiresPermissions(RestPermissions.STREAM_DESTINATION_FILTERS_READ)
public Response getConditions() {
final var conditions = ruleBuilderRegistry.conditions()
.values()
.stream()
.map(RuleFragment::descriptor)
.filter(descriptor -> INCLUDED_CONDITIONS.contains(descriptor.name()))
.sorted(Comparator.comparing(descriptor -> requireNonNullElse(descriptor.ruleBuilderName(), descriptor.name())))
.toList();

return Response.ok(Map.of("conditions", conditions)).build();
}

@POST
@Path("/validate")
@ApiOperation("Validate rule builder")
@NoAuditEvent("No data changes. Only used to validate a rule builder.")
public Response validateRule(@ApiParam(name = "rule", required = true) @NotNull RuleBuilderDto ruleBuilderDto) {
final var validatedDto = validatorService.validate(ruleBuilderDto);
final var dtoWithTitles = validatedDto.toBuilder()
.ruleBuilder(ruleBuilderService.generateTitles(validatedDto.ruleBuilder()))
.build();

return Response.ok(Map.of("rule_builder", dtoWithTitles)).build();
}

@POST
@Path("/simulate")
@ApiOperation(value = "Run the simulator for the given rule and message")
@NoAuditEvent("No data changes. Only used to simulate a filter rule.")
@RequiresPermissions(RestPermissions.STREAM_DESTINATION_FILTERS_READ)
public Response simulateRule() {
throw new ServerErrorException("Simulator not implemented yet", Response.Status.NOT_IMPLEMENTED);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog2.rest.resources.streams.destinations.filters;

import com.mongodb.client.model.Sorts;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import jakarta.inject.Inject;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotBlank;
import jakarta.ws.rs.BadRequestException;
import jakarta.ws.rs.BeanParam;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.NotFoundException;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.PUT;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import org.apache.shiro.authz.annotation.RequiresAuthentication;
import org.graylog.plugins.pipelineprocessor.rulebuilder.parser.validation.ValidatorService;
import org.graylog.plugins.pipelineprocessor.rulebuilder.rest.RuleBuilderDto;
import org.graylog2.audit.AuditEventTypes;
import org.graylog2.audit.jersey.AuditEvent;
import org.graylog2.rest.PaginationParameters;
import org.graylog2.rest.models.PaginatedResponse;
import org.graylog2.shared.rest.resources.RestResource;
import org.graylog2.shared.security.RestPermissions;
import org.graylog2.streams.StreamService;
import org.graylog2.streams.filters.StreamDestinationFilterRuleDTO;
import org.graylog2.streams.filters.StreamDestinationFilterService;

import java.util.List;
import java.util.Map;

import static org.graylog2.shared.rest.documentation.generator.Generator.CLOUD_VISIBLE;
import static org.graylog2.shared.utilities.StringUtils.f;

@Api(value = "Stream/Destinations/Filters", description = "Manage stream destination filter rules", tags = {CLOUD_VISIBLE})
@Path("/streams/{streamId}/destinations")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@RequiresAuthentication
public class StreamDestinationFiltersResource extends RestResource {
private final StreamDestinationFilterService filterService;
private final StreamService streamService;
private final ValidatorService validatorService;

@Inject
public StreamDestinationFiltersResource(StreamDestinationFilterService filterService,
StreamService streamService,
ValidatorService validatorService) {
this.filterService = filterService;
this.streamService = streamService;
this.validatorService = validatorService;
}

@GET
@Path("/filters")
@ApiOperation("Get available filter rules for stream")
public PaginatedResponse<StreamDestinationFilterRuleDTO> getPaginatedFiltersForStream(
@ApiParam(name = "streamId", required = true) @PathParam("streamId") @NotBlank String streamId,
@ApiParam(name = "pagination parameters") @BeanParam PaginationParameters paginationParams
) {
checkPermission(RestPermissions.STREAMS_EDIT, streamId);
checkStream(streamId);

final var paginatedList = filterService.findPaginatedForStream(
streamId,
paginationParams.getQuery(),
Sorts.ascending(StreamDestinationFilterRuleDTO.FIELD_TITLE),
paginationParams.getPerPage(),
paginationParams.getPage(),
dtoId -> isPermitted(RestPermissions.STREAM_DESTINATION_FILTERS_READ, dtoId)
);

return PaginatedResponse.create("elements", paginatedList, paginationParams.getQuery());
}

@GET
@Path("/target/{targetId}/filters")
@ApiOperation("Get available filter rules for stream and target")
public PaginatedResponse<StreamDestinationFilterRuleDTO> getPaginatedFiltersForStreamAndTarget(
@ApiParam(name = "streamId", required = true) @PathParam("streamId") @NotBlank String streamId,
@ApiParam(name = "targetId", required = true) @PathParam("targetId") @NotBlank String targetId,
@ApiParam(name = "pagination parameters") @BeanParam PaginationParameters paginationParams
) {
checkPermission(RestPermissions.STREAMS_EDIT, streamId);
checkStream(streamId);

final var paginatedList = filterService.findPaginatedForStreamAndTarget(
streamId,
targetId,
paginationParams.getQuery(),
Sorts.ascending(StreamDestinationFilterRuleDTO.FIELD_TITLE),
paginationParams.getPerPage(),
paginationParams.getPage(),
dtoId -> isPermitted(RestPermissions.STREAM_DESTINATION_FILTERS_READ, dtoId)
);

return PaginatedResponse.create("elements", paginatedList, paginationParams.getQuery());
}

@GET
@Path("/filters/{filterId}")
@ApiOperation("Get filter rule for given ID")
public Response getFilter(@ApiParam(name = "streamId", required = true) @PathParam("streamId") @NotBlank String streamId,
@ApiParam(name = "filterId", required = true) @PathParam("filterId") @NotBlank String filterId) {
checkPermission(RestPermissions.STREAMS_EDIT, streamId);
checkPermission(RestPermissions.STREAM_DESTINATION_FILTERS_READ, filterId);
checkStream(streamId);

final var dto = filterService.findByIdForStream(streamId, filterId)
.orElseThrow(() -> new NotFoundException("Filter not found"));

return Response.ok(wrapDto(dto)).build();
}

@POST
@Path("/filters")
@ApiOperation("Create new filter rule")
@AuditEvent(type = AuditEventTypes.STREAM_DESTINATION_FILTER_CREATE)
public Response createFilter(@ApiParam(name = "streamId", required = true) @PathParam("streamId") @NotBlank String streamId,
@ApiParam(name = "JSON Body", required = true) @Valid StreamDestinationFilterRuleDTO dto) {
checkPermission(RestPermissions.STREAMS_EDIT, streamId);
checkPermission(RestPermissions.STREAM_DESTINATION_FILTERS_CREATE);
checkStream(streamId);
validateDto(dto);

try {
return Response.ok(wrapDto(filterService.createForStream(streamId, dto))).build();
} catch (IllegalArgumentException e) {
throw new BadRequestException(e.getMessage());
}
}

@PUT
@Path("/filters/{filterId}")
@ApiOperation(value = "Update filter rule")
@AuditEvent(type = AuditEventTypes.STREAM_DESTINATION_FILTER_UPDATE)
public Response updateFilter(@ApiParam(name = "streamId", required = true) @PathParam("streamId") @NotBlank String streamId,
@ApiParam(name = "filterId", required = true) @PathParam("filterId") @NotBlank String filterId,
@ApiParam(name = "JSON Body", required = true) @Valid StreamDestinationFilterRuleDTO dto) {
checkPermission(RestPermissions.STREAMS_EDIT, streamId);
checkPermission(RestPermissions.STREAM_DESTINATION_FILTERS_EDIT, filterId);
checkStream(streamId);

if (!filterId.equals(dto.id())) {
throw new BadRequestException("The filter ID in the URL doesn't match the one in the payload");
}

validateDto(dto);

try {
return Response.ok(wrapDto(filterService.updateForStream(streamId, dto))).build();
} catch (IllegalArgumentException e) {
throw new BadRequestException(e.getMessage());
}
}

@DELETE
@Path("/filters/{filterId}")
@ApiOperation("Delete filter rule")
@AuditEvent(type = AuditEventTypes.STREAM_DESTINATION_FILTER_DELETE)
public Response deleteFilter(@ApiParam(name = "streamId", required = true) @PathParam("streamId") @NotBlank String streamId,
@ApiParam(name = "filterId", required = true) @PathParam("filterId") @NotBlank String filterId) {
checkPermission(RestPermissions.STREAMS_EDIT, streamId);
checkPermission(RestPermissions.STREAM_DESTINATION_FILTERS_DELETE, filterId);
checkStream(streamId);

try {
return Response.ok(wrapDto(filterService.deleteFromStream(streamId, filterId))).build();
} catch (IllegalArgumentException e) {
throw new NotFoundException(e.getMessage());
}
}

private Map<String, StreamDestinationFilterRuleDTO> wrapDto(StreamDestinationFilterRuleDTO dto) {
return Map.of("filter", dto);
}

// We want to ensure that the given stream exists to avoid creating filter rules for non-existent streams.
private void checkStream(String streamId) {
try {
streamService.load(streamId);
} catch (org.graylog2.database.NotFoundException e) {
throw new NotFoundException(f("Stream not found: %s", streamId));
}
}

private void validateDto(@Valid StreamDestinationFilterRuleDTO dto) {
final var ruleBuilderDto = RuleBuilderDto.builder()
.title(dto.title())
.ruleBuilder(dto.rule().toBuilder().actions(List.of()).build())
.build();
try {
validatorService.validateAndFailFast(ruleBuilderDto);
} catch (IllegalArgumentException e) {
throw new BadRequestException(e.getMessage());
}
}
}
Loading

0 comments on commit ca06191

Please sign in to comment.