cloudwatch-log-alerts/lambda-fn/main.go

268 lines
7.5 KiB
Go
Raw Permalink Normal View History

package main
import (
"context"
"crypto/sha1"
"encoding/hex"
"encoding/json"
"fmt"
"net/url"
"strings"
"time"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/aws/aws-sdk-go/service/dynamodb/expression"
"github.com/go-resty/resty/v2"
"github.com/kelseyhightower/envconfig"
"go.uber.org/zap"
)
// AppConfig is the application configuration read from the environment
type AppConfig struct {
SlackWebhook string `envconfig:"SLACK_WEBHOOK" required:"true"`
Region string `envconfig:"AWS_REGION" required:"true"`
DynamoDBTable string `envconfig:"DDB_TABLE" required:"true"`
}
// App encapsulates the application's runtime dependencies
type App struct {
config AppConfig
restCli *resty.Client
logger *zap.Logger
ddb *dynamodb.DynamoDB
}
var app App
func init() {
err := envconfig.Process("", &app.config)
if err != nil {
panic(err)
}
app.logger, err = zap.NewProduction()
if err != nil {
panic(err)
}
app.restCli = resty.New()
session := session.Must(session.NewSession())
app.ddb = dynamodb.New(session)
}
// Suppress duplicate messages appearing within this duration
const duplicateMessageTTL = 5 * time.Minute
func main() {
lambda.Start(handler)
}
func handler(ctx context.Context, event events.CloudwatchLogsEvent) error {
data, err := event.AWSLogs.Parse()
if err != nil {
app.logger.Error("error parsing logs event", zap.Error(err))
return nil
}
message := buildSlackMessage(data)
fpr, err := fingerprint(data.LogGroup, message)
if err != nil {
app.logger.Warn("Error calculating message fingerprint", zap.Error(err))
} else {
dup, err := isDuplicate(fpr)
if err != nil {
app.logger.Warn("Error determining duplicate message status", zap.Error(err))
}
if dup {
app.logger.Info("Ignoring duplicate message", zap.String("Fingerprint", fpr))
return nil
}
}
resp, err := app.restCli.R().
SetHeader("Content-Type", "application/json").
SetBody(message).
Post(app.config.SlackWebhook)
if err != nil {
app.logger.Error("Error sending slack message", zap.Error(err))
}
app.logger.Info("Slack response", zap.String("Response", resp.String()))
return nil
}
// isDuplicate makes a conditional put request to DynamoDB. If an unexpired record with
// the same fingerprint already exists, the PutItem request will fail with a
// ConditionalCheckFailedException. When we encouter this error we return true: this message
// fingerprint has been seen recently. If the PutItem request succeeds, there was no matching
// unexpired fingerprint in the table and we return false. We also return false if any other
// error occurs, allowing the alert to proceed.
func isDuplicate(fpr string) (bool, error) {
record := struct {
Fingerprint string `dynamodbav:"fingerprint"`
Expires int64 `dynamodbav:"expires"`
}{
fpr,
time.Now().Add(duplicateMessageTTL).Unix(),
}
item, err := dynamodbattribute.MarshalMap(record)
if err != nil {
return false, fmt.Errorf("error marshaling DynamoDB record: %v", err)
}
cond := expression.Or(
expression.AttributeNotExists(expression.Name("fingerprint")),
expression.Name("expires").LessThan(expression.Value(time.Now().Unix())),
)
expr, err := expression.NewBuilder().WithCondition(cond).Build()
if err != nil {
return false, fmt.Errorf("error creating DynamoDB conditional expression: %v", err)
}
_, err = app.ddb.PutItem(&dynamodb.PutItemInput{
ConditionExpression: expr.Condition(),
ExpressionAttributeNames: expr.Names(),
ExpressionAttributeValues: expr.Values(),
Item: item,
TableName: aws.String(app.config.DynamoDBTable),
})
if err != nil {
if _, ok := err.(*dynamodb.ConditionalCheckFailedException); ok {
return true, nil
}
return false, fmt.Errorf("error from DynamoDB PutItem: %v", err)
}
return false, nil
}
// fingerprint returns a hash of a log group and message attachments that is used to
// suppress duplicate messages. Note that the messageText function returns a string
// with the log stream name embedded, so we exclude this from the fingerprint
// calculation. Otherwise, we would alert multiple times for the same message logged
// to different streams (e.g. if it was logged by two different instances of a lambda
// function).
func fingerprint(logGroup string, message *slackMessage) (string, error) {
var fields []struct {
Text string `json:"text"`
Type string `json:"type"`
}
if len(message.Blocks) > 0 {
fields = message.Blocks[0].Fields
}
data := struct {
LogGroup string
Fields []struct {
Text string `json:"text"`
Type string `json:"type"`
}
}{
logGroup,
fields,
}
b, err := json.Marshal(data)
if err != nil {
return "", fmt.Errorf("JSON encode error: %v", err)
}
h := sha1.New()
h.Write(b)
return hex.EncodeToString(h.Sum(nil)), nil
}
func messageText(data events.CloudwatchLogsData) string {
return fmt.Sprintf("Error logged to Cloudwatch stream <%s|%s>",
cloudwatchConsoleURL(app.config.Region, data.LogGroup, data.LogStream),
data.LogGroup,
)
}
type slackBlock struct {
Fields []struct {
Text string `json:"text"`
Type string `json:"type"`
} `json:"fields,omitempty"`
Text struct {
Text string `json:"text"`
Type string `json:"type"`
} `json:"text"`
Type string `json:"type"`
}
type slackMessage struct {
Blocks []slackBlock `json:"blocks"`
Text string `json:"text"`
}
func (b *slackBlock) AddText(s string) {
b.Text = struct {
Text string `json:"text"`
Type string `json:"type"`
}{
Type: "mrkdwn",
Text: s,
}
}
func (b *slackBlock) AddField(k, v string) {
field := struct {
Text string `json:"text"`
Type string `json:"type"`
}{
Type: "mrkdwn",
Text: fmt.Sprintf("*%s*:\n%s", k, v),
}
b.Fields = append(b.Fields, field)
}
func buildSlackMessage(data events.CloudwatchLogsData) *slackMessage {
message := new(slackMessage)
text := messageText(data)
message.Text = text
block := slackBlock{Type: "section"}
block.AddText(text)
for _, e := range data.LogEvents {
var m map[string]interface{}
err := json.Unmarshal([]byte(e.Message), &m)
if err != nil {
// Catch-all for non-JSON messages
block.AddField("Message", e.Message)
} else {
for k, v := range m {
if excludeField(k) {
continue
}
if s, ok := v.(string); ok {
block.AddField(k, s)
continue
}
if f, ok := v.(float64); ok {
block.AddField(k, fmt.Sprintf("%f", f))
continue
}
}
}
}
message.Blocks = append(message.Blocks, block)
return message
}
// excludeField returns true if the field should be excluded from the Slack message
func excludeField(k string) bool {
k = strings.ToLower(k)
for _, excluded := range []string{"ts", "level", "stacktrace", "payload"} {
if k == excluded {
return true
}
}
return false
}
// consoleURLEscape escapes a URL parameter for building a Cloudwatch URL. It query escapes
// the parameter twice then replaces "%" with "$". Don't ask me why.
func consoleURLEscape(s string) string {
return strings.Replace(url.QueryEscape(url.QueryEscape(s)), "%", "$", -1)
}
// cloudwatchConsoleURL constructs a URL to a log stream in the Cloudwatch console.
func cloudwatchConsoleURL(region string, logGroup string, logStream string) string {
return fmt.Sprintf("https://console.aws.amazon.com/cloudwatch/home?region=%s#logsV2:log-groups/log-group/%s/log-events/%s",
region, consoleURLEscape(logGroup), consoleURLEscape(logStream))
}