package main import ( "context" "crypto/sha1" "encoding/hex" "encoding/json" "fmt" "net/url" "strings" "time" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute" "github.com/aws/aws-sdk-go/service/dynamodb/expression" "github.com/go-resty/resty/v2" "github.com/kelseyhightower/envconfig" "go.uber.org/zap" ) // AppConfig is the application configuration read from the environment type AppConfig struct { SlackWebhook string `envconfig:"SLACK_WEBHOOK" required:"true"` Region string `envconfig:"AWS_REGION" required:"true"` DynamoDBTable string `envconfig:"DDB_TABLE" required:"true"` } // App encapsulates the application's runtime dependencies type App struct { config AppConfig restCli *resty.Client logger *zap.Logger ddb *dynamodb.DynamoDB } var app App func init() { err := envconfig.Process("", &app.config) if err != nil { panic(err) } app.logger, err = zap.NewProduction() if err != nil { panic(err) } app.restCli = resty.New() session := session.Must(session.NewSession()) app.ddb = dynamodb.New(session) } // Suppress duplicate messages appearing within this duration const duplicateMessageTTL = 5 * time.Minute func main() { lambda.Start(handler) } func handler(ctx context.Context, event events.CloudwatchLogsEvent) error { data, err := event.AWSLogs.Parse() if err != nil { app.logger.Error("error parsing logs event", zap.Error(err)) return nil } message := buildSlackMessage(data) fpr, err := fingerprint(data.LogGroup, message) if err != nil { app.logger.Warn("Error calculating message fingerprint", zap.Error(err)) } else { dup, err := isDuplicate(fpr) if err != nil { app.logger.Warn("Error determining duplicate message status", zap.Error(err)) } if dup { app.logger.Info("Ignoring duplicate message", zap.String("Fingerprint", fpr)) return nil } } resp, err := app.restCli.R(). SetHeader("Content-Type", "application/json"). SetBody(message). Post(app.config.SlackWebhook) if err != nil { app.logger.Error("Error sending slack message", zap.Error(err)) } app.logger.Info("Slack response", zap.String("Response", resp.String())) return nil } // isDuplicate makes a conditional put request to DynamoDB. If an unexpired record with // the same fingerprint already exists, the PutItem request will fail with a // ConditionalCheckFailedException. When we encouter this error we return true: this message // fingerprint has been seen recently. If the PutItem request succeeds, there was no matching // unexpired fingerprint in the table and we return false. We also return false if any other // error occurs, allowing the alert to proceed. func isDuplicate(fpr string) (bool, error) { record := struct { Fingerprint string `dynamodbav:"fingerprint"` Expires int64 `dynamodbav:"expires"` }{ fpr, time.Now().Add(duplicateMessageTTL).Unix(), } item, err := dynamodbattribute.MarshalMap(record) if err != nil { return false, fmt.Errorf("error marshaling DynamoDB record: %v", err) } cond := expression.Or( expression.AttributeNotExists(expression.Name("fingerprint")), expression.Name("expires").LessThan(expression.Value(time.Now().Unix())), ) expr, err := expression.NewBuilder().WithCondition(cond).Build() if err != nil { return false, fmt.Errorf("error creating DynamoDB conditional expression: %v", err) } _, err = app.ddb.PutItem(&dynamodb.PutItemInput{ ConditionExpression: expr.Condition(), ExpressionAttributeNames: expr.Names(), ExpressionAttributeValues: expr.Values(), Item: item, TableName: aws.String(app.config.DynamoDBTable), }) if err != nil { if _, ok := err.(*dynamodb.ConditionalCheckFailedException); ok { return true, nil } return false, fmt.Errorf("error from DynamoDB PutItem: %v", err) } return false, nil } // fingerprint returns a hash of a log group and message attachments that is used to // suppress duplicate messages. Note that the messageText function returns a string // with the log stream name embedded, so we exclude this from the fingerprint // calculation. Otherwise, we would alert multiple times for the same message logged // to different streams (e.g. if it was logged by two different instances of a lambda // function). func fingerprint(logGroup string, message *slackMessage) (string, error) { var fields []struct { Text string `json:"text"` Type string `json:"type"` } if len(message.Blocks) > 0 { fields = message.Blocks[0].Fields } data := struct { LogGroup string Fields []struct { Text string `json:"text"` Type string `json:"type"` } }{ logGroup, fields, } b, err := json.Marshal(data) if err != nil { return "", fmt.Errorf("JSON encode error: %v", err) } h := sha1.New() h.Write(b) return hex.EncodeToString(h.Sum(nil)), nil } func messageText(data events.CloudwatchLogsData) string { return fmt.Sprintf("Error logged to Cloudwatch stream <%s|%s>", cloudwatchConsoleURL(app.config.Region, data.LogGroup, data.LogStream), data.LogGroup, ) } type slackBlock struct { Fields []struct { Text string `json:"text"` Type string `json:"type"` } `json:"fields,omitempty"` Text struct { Text string `json:"text"` Type string `json:"type"` } `json:"text"` Type string `json:"type"` } type slackMessage struct { Blocks []slackBlock `json:"blocks"` Text string `json:"text"` } func (b *slackBlock) AddText(s string) { b.Text = struct { Text string `json:"text"` Type string `json:"type"` }{ Type: "mrkdwn", Text: s, } } func (b *slackBlock) AddField(k, v string) { field := struct { Text string `json:"text"` Type string `json:"type"` }{ Type: "mrkdwn", Text: fmt.Sprintf("*%s*:\n%s", k, v), } b.Fields = append(b.Fields, field) } func buildSlackMessage(data events.CloudwatchLogsData) *slackMessage { message := new(slackMessage) text := messageText(data) message.Text = text block := slackBlock{Type: "section"} block.AddText(text) for _, e := range data.LogEvents { var m map[string]interface{} err := json.Unmarshal([]byte(e.Message), &m) if err != nil { // Catch-all for non-JSON messages block.AddField("Message", e.Message) } else { for k, v := range m { if excludeField(k) { continue } if s, ok := v.(string); ok { block.AddField(k, s) continue } if f, ok := v.(float64); ok { block.AddField(k, fmt.Sprintf("%f", f)) continue } } } } message.Blocks = append(message.Blocks, block) return message } // excludeField returns true if the field should be excluded from the Slack message func excludeField(k string) bool { k = strings.ToLower(k) for _, excluded := range []string{"ts", "level", "stacktrace", "payload"} { if k == excluded { return true } } return false } // consoleURLEscape escapes a URL parameter for building a Cloudwatch URL. It query escapes // the parameter twice then replaces "%" with "$". Don't ask me why. func consoleURLEscape(s string) string { return strings.Replace(url.QueryEscape(url.QueryEscape(s)), "%", "$", -1) } // cloudwatchConsoleURL constructs a URL to a log stream in the Cloudwatch console. func cloudwatchConsoleURL(region string, logGroup string, logStream string) string { return fmt.Sprintf("https://console.aws.amazon.com/cloudwatch/home?region=%s#logsV2:log-groups/log-group/%s/log-events/%s", region, consoleURLEscape(logGroup), consoleURLEscape(logStream)) }