The backend for the project formerly known as signet, now known as beignet.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

contributionstream.go 2.7 KiB

преди 2 години
преди 2 години
преди 2 години
преди 2 години
преди 2 години
преди 2 години
преди 2 години
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package endpoints
  2. import (
  3. "fmt"
  4. "github.com/gorilla/websocket"
  5. . "github.com/imosed/signet/data"
  6. "github.com/spf13/viper"
  7. "github.com/stellar/go/clients/horizonclient"
  8. "github.com/stellar/go/protocols/horizon"
  9. "github.com/stellar/go/protocols/horizon/operations"
  10. "golang.org/x/net/context"
  11. "gorm.io/gorm"
  12. "net/http"
  13. "strconv"
  14. "strings"
  15. )
  16. var upgrader = websocket.Upgrader{
  17. ReadBufferSize: 1024,
  18. WriteBufferSize: 1024,
  19. CheckOrigin: func(r *http.Request) bool {
  20. origin := r.Header.Get("Origin")
  21. for _, domain := range viper.GetStringSlice("app.domains") {
  22. if !strings.HasPrefix(origin, domain) {
  23. continue
  24. }
  25. return true
  26. }
  27. return false
  28. },
  29. }
  30. var wsConn *websocket.Conn
  31. func ContributorStream(resp http.ResponseWriter, req *http.Request) {
  32. var err error
  33. wsConn, err = upgrader.Upgrade(resp, req, nil)
  34. if err != nil {
  35. panic("Could not establish websocket connection")
  36. }
  37. }
  38. var cancellations []context.CancelFunc
  39. func InitializeContributionStreams() {
  40. var err error
  41. var wallets []struct {
  42. FundWallet string
  43. }
  44. Db.Table("reward_funds").Select("fund_wallet").Scan(&wallets)
  45. contributionUpdateHandler := func(op operations.Operation) {
  46. payment := op.(operations.Payment)
  47. var tx horizon.Transaction
  48. var amt float64
  49. var fund RewardFund
  50. tx, err = client.TransactionDetail(payment.GetTransactionHash())
  51. if err != nil {
  52. panic("Could not get transaction from hash")
  53. }
  54. amt, err = strconv.ParseFloat(payment.Amount, 64)
  55. if err != nil {
  56. panic("Could not convert payment to float")
  57. }
  58. if tx.Memo == "" {
  59. return
  60. }
  61. Db.Table("reward_funds").Where("memo = ? and fund_wallet = ?", tx.Memo, payment.To).First(&fund)
  62. newAmt := fund.AmountAvailable - amt
  63. Db.Model(&RewardFund{}).Where("id = ?", fund.ID).Update("amount_available", newAmt)
  64. contribution := Contribution{
  65. Model: gorm.Model{
  66. CreatedAt: tx.LedgerCloseTime,
  67. },
  68. Wallet: payment.From,
  69. Amount: amt,
  70. TransactionID: payment.GetTransactionHash(),
  71. RewardFundID: fund.ID,
  72. }
  73. if wsConn != nil {
  74. err = wsConn.WriteJSON(contribution)
  75. if err != nil {
  76. panic("Unable to write json to contribution stream")
  77. }
  78. } else {
  79. fmt.Println("No websocket connections")
  80. }
  81. Db.Table("contributions").Create(&contribution)
  82. }
  83. for _, wallet := range wallets {
  84. opReq := horizonclient.OperationRequest{
  85. ForAccount: wallet.FundWallet,
  86. Cursor: "now",
  87. IncludeFailed: false,
  88. }
  89. opReq.SetOperationsEndpoint()
  90. ctx, cancellation := context.WithCancel(context.Background())
  91. cancellations = append(cancellations, cancellation)
  92. err = client.StreamOperations(ctx, opReq, contributionUpdateHandler)
  93. if err != nil {
  94. panic(err.Error())
  95. }
  96. }
  97. }