package fiinha import ( "errors" "runtime" "uuid" g "gobang" ) const ( topicX = "new-event-x" topicY = "new-event-y" ) var forbidden3Err = errors.New("we don't like 3") func processNewEventXToY(message Message) (UnsentMessage, error) { payload := string(message.Payload) if payload == "event 3" { return UnsentMessage{}, forbidden3Err } newPayload := []byte("processed " + payload) unsent := UnsentMessage{ Topic: topicY, FlowID: message.FlowID, Payload: newPayload, } return unsent, nil } func MainTest() { g.SetLevel(g.LogLevel_None) _, file, _, ok := runtime.Caller(0) g.TAssertEqualS(ok, true, "can't get filename") databasePath := file + ".db" queue, err := New(databasePath) g.TErrorIf(err) defer queue.Close() pub := func(payload []byte, flowID uuid.UUID) { unsent := UnsentMessage{ Topic: topicX, FlowID: flowID, Payload: payload, } _, err := queue.Publish(unsent) g.TErrorIf(err) } g.Testing("we can WaitFor() a message after a deadletter", func() { flowID := uuid.New() handlerFn := func(message Message) error { messageY, err := processNewEventXToY(message) if err != nil { return err } _, err = queue.Publish(messageY) return err } queue.Subscribe(topicX, "main-worker", handlerFn) defer queue.Unsubscribe(topicX, "main-worker") pub([]byte("event 1"), uuid.New()) pub([]byte("event 2"), uuid.New()) pub([]byte("event 3"), uuid.New()) pub([]byte("event 4"), uuid.New()) pub([]byte("event 5"), flowID) given := <- queue.WaitFor(topicY, flowID, "waiter").Channel g.TAssertEqual(given, []byte("processed event 5")) }) }