The backend for the project formerly known as signet, now known as beignet.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

contributionstream.go 3.2 KiB

преди 2 години
преди 1 година
преди 2 години
преди 2 години
преди 1 година
преди 1 година
преди 2 години
преди 1 година
преди 2 години
преди 2 години
преди 2 години
преди 2 години
преди 2 години
преди 1 година
преди 2 години
преди 1 година
преди 2 години
преди 2 години
преди 1 година
преди 2 години
преди 1 година
преди 2 години
преди 1 година
преди 2 години
преди 1 година
преди 2 години
преди 2 години
преди 2 години
преди 2 години
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package endpoints
  2. import (
  3. "net/http"
  4. "strconv"
  5. "strings"
  6. "github.com/gorilla/websocket"
  7. "github.com/imosed/signet/client"
  8. . "github.com/imosed/signet/data"
  9. "github.com/imosed/signet/utils"
  10. "github.com/rs/zerolog/log"
  11. "github.com/spf13/viper"
  12. "github.com/stellar/go/clients/horizonclient"
  13. "github.com/stellar/go/protocols/horizon"
  14. "github.com/stellar/go/protocols/horizon/operations"
  15. "golang.org/x/net/context"
  16. )
  17. var upgrader = websocket.Upgrader{
  18. ReadBufferSize: 1024,
  19. WriteBufferSize: 1024,
  20. CheckOrigin: func(r *http.Request) bool {
  21. origin := r.Header.Get("Origin")
  22. for _, domain := range viper.GetStringSlice("app.domains") {
  23. if !strings.HasPrefix(origin, domain) {
  24. continue
  25. }
  26. return true
  27. }
  28. return false
  29. },
  30. }
  31. var wsConn *websocket.Conn
  32. func ContributorStream(resp http.ResponseWriter, req *http.Request) {
  33. var err error
  34. wsConn, err = upgrader.Upgrade(resp, req, nil)
  35. if err != nil {
  36. log.Error().Err(err).Msg("Could not establish websocket connection")
  37. return
  38. }
  39. }
  40. var cancellations []context.CancelFunc
  41. func InitializeContributionStreams() {
  42. var err error
  43. var wallets []struct {
  44. FundWallet string
  45. }
  46. Db.Table("reward_funds").Select("fund_wallet").Scan(&wallets)
  47. contributionUpdateHandler := func(op operations.Operation) {
  48. if op.GetBase().GetTypeI() == 6 || op.GetBase().GetTypeI() == 12 {
  49. return
  50. }
  51. payment := op.(operations.Payment)
  52. var tx horizon.Transaction
  53. var amt float64
  54. var fund RewardFund
  55. tx, err = client.SignetClient.TransactionDetail(payment.GetTransactionHash())
  56. if err != nil {
  57. log.Error().Err(err).Msg("Could not get transaction from hash")
  58. return
  59. }
  60. amt, err = strconv.ParseFloat(payment.Amount, 64)
  61. if err != nil {
  62. log.Error().Err(err).Msg("Could not convert payment to float")
  63. return
  64. }
  65. Db.Table("reward_funds").Where("memo = ? and fund_wallet = ?", tx.Memo, payment.To).First(&fund)
  66. newAmt := fund.AmountAvailable - amt
  67. Db.Model(&RewardFund{}).Where("id = ?", fund.ID).Update("amount_available", newAmt)
  68. if tx.Memo == "" {
  69. return
  70. }
  71. if newAmt < 5000 && newAmt > 0 {
  72. _, err = utils.SubmitGroupFund(fund.ID)
  73. if err != nil {
  74. log.Error().Err(err).Msg("Could not submit group fund from contribution stream")
  75. }
  76. }
  77. contribution := Contribution{
  78. ModelBase: ModelBase{CreatedAt: tx.LedgerCloseTime},
  79. Wallet: payment.From,
  80. Amount: amt,
  81. TransactionID: payment.GetTransactionHash(),
  82. RewardFundID: fund.ID,
  83. }
  84. if wsConn != nil {
  85. err = wsConn.WriteJSON(contribution)
  86. if err != nil {
  87. log.Error().Err(err).Msg("Unable to write json to contribution stream")
  88. }
  89. } else {
  90. log.Info().Msg("No websocket connections")
  91. }
  92. Db.Table("contributions").Create(&contribution)
  93. }
  94. for _, wallet := range wallets {
  95. opReq := horizonclient.OperationRequest{
  96. ForAccount: wallet.FundWallet,
  97. Cursor: "now",
  98. IncludeFailed: false,
  99. }
  100. opReq.SetOperationsEndpoint()
  101. ctx, cancellation := context.WithCancel(context.Background())
  102. cancellations = append(cancellations, cancellation)
  103. err = client.SignetClient.StreamOperations(ctx, opReq, contributionUpdateHandler)
  104. if err != nil {
  105. log.Error().Err(err).Msg("Failed to stream contributions from Horizon SignetClient")
  106. }
  107. }
  108. }