The backend for the project formerly known as signet, now known as beignet.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

113 rivejä
2.8 KiB

  1. package endpoints
  2. import (
  3. "net/http"
  4. "strconv"
  5. "strings"
  6. "github.com/gorilla/websocket"
  7. . "github.com/imosed/signet/data"
  8. "github.com/rs/zerolog/log"
  9. "github.com/spf13/viper"
  10. "github.com/stellar/go/clients/horizonclient"
  11. "github.com/stellar/go/protocols/horizon"
  12. "github.com/stellar/go/protocols/horizon/operations"
  13. "golang.org/x/net/context"
  14. )
  15. var upgrader = websocket.Upgrader{
  16. ReadBufferSize: 1024,
  17. WriteBufferSize: 1024,
  18. CheckOrigin: func(r *http.Request) bool {
  19. origin := r.Header.Get("Origin")
  20. for _, domain := range viper.GetStringSlice("app.domains") {
  21. if !strings.HasPrefix(origin, domain) {
  22. continue
  23. }
  24. return true
  25. }
  26. return false
  27. },
  28. }
  29. var wsConn *websocket.Conn
  30. func ContributorStream(resp http.ResponseWriter, req *http.Request) {
  31. var err error
  32. wsConn, err = upgrader.Upgrade(resp, req, nil)
  33. if err != nil {
  34. log.Error().Err(err).Msg("Could not establish websocket connection")
  35. return
  36. }
  37. }
  38. var cancellations []context.CancelFunc
  39. func InitializeContributionStreams() {
  40. var err error
  41. var wallets []struct {
  42. FundWallet string
  43. }
  44. Db.Table("reward_funds").Select("fund_wallet").Scan(&wallets)
  45. contributionUpdateHandler := func(op operations.Operation) {
  46. payment := op.(operations.Payment)
  47. var tx horizon.Transaction
  48. var amt float64
  49. var fund RewardFund
  50. tx, err = client.TransactionDetail(payment.GetTransactionHash())
  51. if err != nil {
  52. log.Error().Err(err).Msg("Could not get transaction from hash")
  53. return
  54. }
  55. amt, err = strconv.ParseFloat(payment.Amount, 64)
  56. if err != nil {
  57. log.Error().Err(err).Msg("Could not convert payment to float")
  58. return
  59. }
  60. if tx.Memo == "" {
  61. return
  62. }
  63. Db.Table("reward_funds").Where("memo = ? and fund_wallet = ?", tx.Memo, payment.To).First(&fund)
  64. newAmt := fund.AmountAvailable - amt
  65. Db.Model(&RewardFund{}).Where("id = ?", fund.ID).Update("amount_available", newAmt)
  66. contribution := Contribution{
  67. ModelBase: ModelBase{CreatedAt: tx.LedgerCloseTime},
  68. Wallet: payment.From,
  69. Amount: amt,
  70. TransactionID: payment.GetTransactionHash(),
  71. RewardFundID: fund.ID,
  72. }
  73. if wsConn != nil {
  74. err = wsConn.WriteJSON(contribution)
  75. if err != nil {
  76. log.Error().Err(err).Msg("Unable to write json to contribution stream")
  77. }
  78. } else {
  79. log.Info().Msg("No websocket connections")
  80. }
  81. Db.Table("contributions").Create(&contribution)
  82. }
  83. for _, wallet := range wallets {
  84. opReq := horizonclient.OperationRequest{
  85. ForAccount: wallet.FundWallet,
  86. Cursor: "now",
  87. IncludeFailed: false,
  88. }
  89. opReq.SetOperationsEndpoint()
  90. ctx, cancellation := context.WithCancel(context.Background())
  91. cancellations = append(cancellations, cancellation)
  92. err = client.StreamOperations(ctx, opReq, contributionUpdateHandler)
  93. if err != nil {
  94. log.Error().Err(err).Msg("Failed to stream contributions from Horizon client")
  95. }
  96. }
  97. }