diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/fiinha.go | 130 |
1 files changed, 65 insertions, 65 deletions
diff --git a/src/fiinha.go b/src/fiinha.go index 1052be8..a819557 100644 --- a/src/fiinha.go +++ b/src/fiinha.go @@ -11,7 +11,7 @@ import ( "time" "golite" - "guuid" + "uuid" g "gobang" ) @@ -41,13 +41,13 @@ type queryT struct{ type queriesT struct{ take func(string, string) error - publish func(UnsentMessage, guuid.UUID) (messageT, error) - find func(string, guuid.UUID) (messageT, error) + publish func(UnsentMessage, uuid.UUID) (messageT, error) + find func(string, uuid.UUID) (messageT, error) next func(string, string) (messageT, error) pending func(string, string, func(messageT) error) error - commit func(string, guuid.UUID) error - toDead func(string, guuid.UUID, guuid.UUID) error - replay func(guuid.UUID, guuid.UUID) (messageT, error) + commit func(string, uuid.UUID) error + toDead func(string, uuid.UUID, uuid.UUID) error + replay func(uuid.UUID, uuid.UUID) (messageT, error) oneDead func(string, string) (deadletterT, error) allDead func(string, string, func(deadletterT, messageT) error) error size func(string) (int, error) @@ -59,31 +59,31 @@ type queriesT struct{ type messageT struct{ id int64 timestamp time.Time - uuid guuid.UUID + uuid uuid.UUID topic string - flowID guuid.UUID + flowID uuid.UUID payload []byte } type UnsentMessage struct{ Topic string - FlowID guuid.UUID + FlowID uuid.UUID Payload []byte } type Message struct{ - ID guuid.UUID + ID uuid.UUID Timestamp time.Time Topic string - FlowID guuid.UUID + FlowID uuid.UUID Payload []byte } type deadletterT struct{ - uuid guuid.UUID + uuid uuid.UUID timestamp time.Time consumer string - messageID guuid.UUID + messageID uuid.UUID } type pingerT[T any] struct{ @@ -100,7 +100,7 @@ type consumerDataT struct{ type waiterDataT struct{ topic string - flowID guuid.UUID + flowID uuid.UUID name string } @@ -121,7 +121,7 @@ type waiterT struct{ type topicSubscriptionT struct{ consumers map[string]consumerT - waiters map[guuid.UUID]map[string]waiterT + waiters map[uuid.UUID]map[string]waiterT } type subscriptionsSetM map[string]topicSubscriptionT @@ -157,7 +157,7 @@ type IQueue interface{ Publish(UnsentMessage) (Message, error) Subscribe( string, string, func(Message) error) error Unsubscribe(string, string) - WaitFor(string, guuid.UUID, string) Waiter + WaitFor(string, uuid.UUID, string) Waiter Close() error } @@ -452,7 +452,7 @@ func publishSQL(prefix string) queryT { func publishStmt( cfg dbconfigT, -) (func(UnsentMessage, guuid.UUID) (messageT, error), func() error, error) { +) (func(UnsentMessage, uuid.UUID) (messageT, error), func() error, error) { q := publishSQL(cfg.prefix) readStmt, err := cfg.shared.Prepare(q.read) @@ -470,7 +470,7 @@ func publishStmt( fn := func( unsentMessage UnsentMessage, - messageID guuid.UUID, + messageID uuid.UUID, ) (messageT, error) { message := messageT{ uuid: messageID, @@ -552,7 +552,7 @@ func findSQL(prefix string) queryT { func findStmt( cfg dbconfigT, -) (func(string, guuid.UUID) (messageT, error), func() error, error) { +) (func(string, uuid.UUID) (messageT, error), func() error, error) { q := findSQL(cfg.prefix) readStmt, err := cfg.shared.Prepare(q.read) @@ -560,7 +560,7 @@ func findStmt( return nil, nil, err } - fn := func(topic string, flowID guuid.UUID) (messageT, error) { + fn := func(topic string, flowID uuid.UUID) (messageT, error) { message := messageT{ topic: topic, flowID: flowID, @@ -580,7 +580,7 @@ func findStmt( if err != nil { return messageT{}, err } - message.uuid = guuid.UUID(message_id_bytes) + message.uuid = uuid.UUID(message_id_bytes) message.timestamp, err = time.Parse(time.RFC3339Nano, timestr) if err != nil { @@ -685,8 +685,8 @@ func nextStmt( ) return messageT{}, err } - message.uuid = guuid.UUID(message_id_bytes) - message.flowID = guuid.UUID(flow_id_bytes) + message.uuid = uuid.UUID(message_id_bytes) + message.flowID = uuid.UUID(flow_id_bytes) message.timestamp, err = time.Parse(time.RFC3339Nano, timestr) if err != nil { @@ -723,8 +723,8 @@ func messageEach(rows *sql.Rows, callback func(messageT) error) error { rows.Close() return err } - message.uuid = guuid.UUID(message_id_bytes) - message.flowID = guuid.UUID(flow_id_bytes) + message.uuid = uuid.UUID(message_id_bytes) + message.flowID = uuid.UUID(flow_id_bytes) message.timestamp, err = time.Parse(time.RFC3339Nano, timestr) if err != nil { @@ -841,7 +841,7 @@ func commitSQL(prefix string) queryT { func commitStmt( cfg dbconfigT, -) (func(string, guuid.UUID) error, func() error, error) { +) (func(string, uuid.UUID) error, func() error, error) { q := commitSQL(cfg.prefix) writeStmt, err := cfg.shared.Prepare(q.write) @@ -849,7 +849,7 @@ func commitStmt( return nil, nil, err } - fn := func(consumer string, messageID guuid.UUID) error { + fn := func(consumer string, messageID uuid.UUID) error { message_id_bytes := messageID[:] _, err = writeStmt.Exec( consumer, @@ -880,7 +880,7 @@ func toDeadSQL(prefix string) queryT { func toDeadStmt( cfg dbconfigT, ) ( - func(string, guuid.UUID, guuid.UUID) error, + func(string, uuid.UUID, uuid.UUID) error, func() error, error, ) { @@ -895,8 +895,8 @@ func toDeadStmt( fn := func( consumer string, - messageID guuid.UUID, - deadletterID guuid.UUID, + messageID uuid.UUID, + deadletterID uuid.UUID, ) error { message_id_bytes := messageID[:] deadletter_id_bytes := deadletterID[:] @@ -982,7 +982,7 @@ func replaySQL(prefix string) queryT { func replayStmt( cfg dbconfigT, -) (func(guuid.UUID, guuid.UUID) (messageT, error), func() error, error) { +) (func(uuid.UUID, uuid.UUID) (messageT, error), func() error, error) { q := replaySQL(cfg.prefix) readStmt, err := cfg.shared.Prepare(q.read) @@ -999,8 +999,8 @@ func replayStmt( writeFn, writeFnClose := execSerialized(q.write, privateDB) fn := func( - deadletterID guuid.UUID, - messageID guuid.UUID, + deadletterID uuid.UUID, + messageID uuid.UUID, ) (messageT, error) { deadletter_id_bytes := deadletterID[:] message_id_bytes := messageID[:] @@ -1031,7 +1031,7 @@ func replayStmt( if err != nil { return messageT{}, err } - message.flowID = guuid.UUID(flow_id_bytes) + message.flowID = uuid.UUID(flow_id_bytes) message.timestamp, err = time.Parse(time.RFC3339Nano, timestr) if err != nil { @@ -1126,8 +1126,8 @@ func oneDeadStmt( if err != nil { return deadletterT{}, err } - deadletter.uuid = guuid.UUID(deadletter_id_bytes) - deadletter.messageID = guuid.UUID(message_id_bytes) + deadletter.uuid = uuid.UUID(deadletter_id_bytes) + deadletter.messageID = uuid.UUID(message_id_bytes) deadletter.timestamp, err = time.Parse( time.RFC3339Nano, @@ -1173,10 +1173,10 @@ func deadletterEach( return err } - deadletter.uuid = guuid.UUID(deadletter_id_bytes) - deadletter.messageID = guuid.UUID(message_id_bytes) - message.uuid = guuid.UUID(message_id_bytes) - message.flowID = guuid.UUID(flow_id_bytes) + deadletter.uuid = uuid.UUID(deadletter_id_bytes) + deadletter.messageID = uuid.UUID(message_id_bytes) + message.uuid = uuid.UUID(message_id_bytes) + message.flowID = uuid.UUID(flow_id_bytes) message.timestamp, err = time.Parse( time.RFC3339Nano, @@ -1519,7 +1519,7 @@ func initDB( defer connMutex.RUnlock() return take(a, b) }, - publish: func(a UnsentMessage, b guuid.UUID) (messageT, error) { + publish: func(a UnsentMessage, b uuid.UUID) (messageT, error) { var ( err error message messageT @@ -1536,7 +1536,7 @@ func initDB( go notifyFn(message) return message, nil }, - find: func(a string, b guuid.UUID) (messageT, error) { + find: func(a string, b uuid.UUID) (messageT, error) { connMutex.RLock() defer connMutex.RUnlock() return find(a, b) @@ -1566,21 +1566,21 @@ func initDB( return messageEach(rows, callback) }, - commit: func(a string, b guuid.UUID) error { + commit: func(a string, b uuid.UUID) error { connMutex.RLock() defer connMutex.RUnlock() return commit(a, b) }, toDead: func( a string, - b guuid.UUID, - c guuid.UUID, + b uuid.UUID, + c uuid.UUID, ) error { connMutex.RLock() defer connMutex.RUnlock() return toDead(a, b, c) }, - replay: func(a guuid.UUID, b guuid.UUID) (messageT, error) { + replay: func(a uuid.UUID, b uuid.UUID) (messageT, error) { var ( err error message messageT @@ -1733,10 +1733,10 @@ func makeNotifyFn( func collectClosedWaiters( set subscriptionsSetM, -) map[string]map[guuid.UUID][]string { - waiters := map[string]map[guuid.UUID][]string{} +) map[string]map[uuid.UUID][]string { + waiters := map[string]map[uuid.UUID][]string{} for topic, topicSub := range set { - waiters[topic] = map[guuid.UUID][]string{} + waiters[topic] = map[uuid.UUID][]string{} for flowID, waitersByName := range topicSub.waiters { names := []string{} for name, waiter := range waitersByName { @@ -1751,7 +1751,7 @@ func collectClosedWaiters( return waiters } -func trimEmptyLeaves(closedWaiters map[string]map[guuid.UUID][]string) { +func trimEmptyLeaves(closedWaiters map[string]map[uuid.UUID][]string) { for topic, waiters := range closedWaiters { for flowID, names := range waiters { if len(names) == 0 { @@ -1785,7 +1785,7 @@ func deleteEmptyTopics(set subscriptionsSetM) { func removeClosedWaiters( set subscriptionsSetM, - closedWaiters map[string]map[guuid.UUID][]string, + closedWaiters map[string]map[uuid.UUID][]string, ) { for topic, waiters := range closedWaiters { _, ok := set[topic] @@ -1813,7 +1813,7 @@ func reapClosedWaiters( readFn func(func(subscriptionsSetM) error) error, writeFn func(func(subscriptionsSetM) error) error, ) { - var closedWaiters map[string]map[guuid.UUID][]string + var closedWaiters map[string]map[uuid.UUID][]string readFn(func(set subscriptionsSetM) error { closedWaiters = collectClosedWaiters(set) return nil @@ -1884,7 +1884,7 @@ func asPublicMessage(message messageT) Message { } func (queue queueT) Publish(unsent UnsentMessage) (Message, error) { - message, err := queue.queries.publish(unsent, guuid.New()) + message, err := queue.queries.publish(unsent, uuid.New()) if err != nil { return Message{}, err } @@ -1895,7 +1895,7 @@ func (queue queueT) Publish(unsent UnsentMessage) (Message, error) { func registerConsumerFn(consumer consumerT) func(subscriptionsSetM) error { topicSub := topicSubscriptionT{ consumers: map[string]consumerT{}, - waiters: map[guuid.UUID]map[string]waiterT{}, + waiters: map[uuid.UUID]map[string]waiterT{}, } return func(set subscriptionsSetM) error { @@ -1913,7 +1913,7 @@ func registerConsumerFn(consumer consumerT) func(subscriptionsSetM) error { func registerWaiterFn(waiter waiterT) func(subscriptionsSetM) error { topicSub := topicSubscriptionT{ consumers: map[string]consumerT{}, - waiters: map[guuid.UUID]map[string]waiterT{}, + waiters: map[uuid.UUID]map[string]waiterT{}, } waiters := map[string]waiterT{} @@ -1937,8 +1937,8 @@ func registerWaiterFn(waiter waiterT) func(subscriptionsSetM) error { func makeConsumeOneFn( data consumerDataT, callback func(Message) error, - successFn func(string, guuid.UUID) error, - errorFn func(string, guuid.UUID, guuid.UUID) error, + successFn func(string, uuid.UUID) error, + errorFn func(string, uuid.UUID, uuid.UUID) error, ) func(messageT) error { return func(message messageT) error { err := callback(asPublicMessage(message)) @@ -1955,7 +1955,7 @@ func makeConsumeOneFn( ), ) - return errorFn(data.name, message.uuid, guuid.New()) + return errorFn(data.name, message.uuid, uuid.New()) } return successFn(data.name, message.uuid) @@ -2004,9 +2004,9 @@ func runConsumer(onPing func(func(struct{})), consumeAllFn func(struct{})) { } func tryFinding( - findFn func(string, guuid.UUID) (messageT, error), + findFn func(string, uuid.UUID) (messageT, error), topic string, - flowID guuid.UUID, + flowID uuid.UUID, waitFn func([]byte), ) { message, err := findFn(topic, flowID) @@ -2062,7 +2062,7 @@ type Waiter struct{ func (queue queueT) WaitFor( topic string, - flowID guuid.UUID, + flowID uuid.UUID, name string, ) Waiter { data := waiterDataT{ @@ -2190,10 +2190,10 @@ func inExec( unsent := UnsentMessage{ Topic: args.topic, - FlowID: guuid.New(), + FlowID: uuid.New(), Payload: payload, } - message, err := queries.publish(unsent, guuid.New()) + message, err := queries.publish(unsent, uuid.New()) if err != nil { return 1, err } @@ -2269,7 +2269,7 @@ func deadExec( return 1, err } - err = queries.toDead(args.consumer, message.uuid, guuid.New()) + err = queries.toDead(args.consumer, message.uuid, uuid.New()) if err != nil { return 1, err } @@ -2313,7 +2313,7 @@ func replayExec( return 1, err } - _, err = queries.replay(deadletter.uuid, guuid.New()) + _, err = queries.replay(deadletter.uuid, uuid.New()) if err != nil { return 1, err } |
