| 
                        123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 | 
                        - package endpoints
 - 
 - import (
 - 	"net/http"
 - 	"strconv"
 - 	"strings"
 - 
 - 	"github.com/gorilla/websocket"
 - 	"github.com/imosed/signet/client"
 - 	. "github.com/imosed/signet/data"
 - 	"github.com/imosed/signet/utils"
 - 	"github.com/rs/zerolog/log"
 - 	"github.com/spf13/viper"
 - 	"github.com/stellar/go/clients/horizonclient"
 - 	"github.com/stellar/go/protocols/horizon"
 - 	"github.com/stellar/go/protocols/horizon/operations"
 - 	"golang.org/x/net/context"
 - )
 - 
 - var upgrader = websocket.Upgrader{
 - 	ReadBufferSize:  1024,
 - 	WriteBufferSize: 1024,
 - 	CheckOrigin: func(r *http.Request) bool {
 - 		origin := r.Header.Get("Origin")
 - 		for _, domain := range viper.GetStringSlice("app.domains") {
 - 			if !strings.HasPrefix(origin, domain) {
 - 				continue
 - 			}
 - 			return true
 - 		}
 - 		return false
 - 	},
 - }
 - 
 - var wsConn *websocket.Conn
 - 
 - func ContributorStream(resp http.ResponseWriter, req *http.Request) {
 - 	var err error
 - 	wsConn, err = upgrader.Upgrade(resp, req, nil)
 - 	if err != nil {
 - 		log.Error().Err(err).Msg("Could not establish websocket connection")
 - 		return
 - 	}
 - }
 - 
 - var cancellations []context.CancelFunc
 - 
 - func InitializeContributionStreams() {
 - 	var err error
 - 
 - 	var wallets []struct {
 - 		FundWallet string
 - 	}
 - 	Db.Table("reward_funds").Select("fund_wallet").Scan(&wallets)
 - 
 - 	contributionUpdateHandler := func(op operations.Operation) {
 - 		if op.GetBase().GetTypeI() == 6 || op.GetBase().GetTypeI() == 12 {
 - 			return
 - 		}
 - 		payment := op.(operations.Payment)
 - 
 - 		var tx horizon.Transaction
 - 		var amt float64
 - 		var fund RewardFund
 - 
 - 		tx, err = client.SignetClient.TransactionDetail(payment.GetTransactionHash())
 - 		if err != nil {
 - 			log.Error().Err(err).Msg("Could not get transaction from hash")
 - 			return
 - 		}
 - 		amt, err = strconv.ParseFloat(payment.Amount, 64)
 - 		if err != nil {
 - 			log.Error().Err(err).Msg("Could not convert payment to float")
 - 			return
 - 		}
 - 
 - 		Db.Table("reward_funds").Where("memo = ? and fund_wallet = ?", tx.Memo, payment.To).First(&fund)
 - 		newAmt := fund.AmountAvailable - amt
 - 		Db.Model(&RewardFund{}).Where("id = ?", fund.ID).Update("amount_available", newAmt)
 - 		if tx.Memo == "" {
 - 			return
 - 		}
 - 
 - 		if newAmt < 5000 && newAmt > 0 {
 - 			_, err = utils.SubmitGroupFund(fund.ID)
 - 			if err != nil {
 - 				log.Error().Err(err).Msg("Could not submit group fund from contribution stream")
 - 			}
 - 		}
 - 
 - 		contribution := Contribution{
 - 			ModelBase:     ModelBase{CreatedAt: tx.LedgerCloseTime},
 - 			Wallet:        payment.From,
 - 			Amount:        amt,
 - 			TransactionID: payment.GetTransactionHash(),
 - 			RewardFundID:  fund.ID,
 - 		}
 - 		if wsConn != nil {
 - 			err = wsConn.WriteJSON(contribution)
 - 			if err != nil {
 - 				log.Error().Err(err).Msg("Unable to write json to contribution stream")
 - 			}
 - 		} else {
 - 			log.Info().Msg("No websocket connections")
 - 		}
 - 
 - 		Db.Table("contributions").Create(&contribution)
 - 	}
 - 
 - 	for _, wallet := range wallets {
 - 		opReq := horizonclient.OperationRequest{
 - 			ForAccount:    wallet.FundWallet,
 - 			Cursor:        "now",
 - 			IncludeFailed: false,
 - 		}
 - 		opReq.SetOperationsEndpoint()
 - 		ctx, cancellation := context.WithCancel(context.Background())
 - 		cancellations = append(cancellations, cancellation)
 - 		err = client.SignetClient.StreamOperations(ctx, opReq, contributionUpdateHandler)
 - 		if err != nil {
 - 			log.Error().Err(err).Msg("Failed to stream contributions from Horizon SignetClient")
 - 		}
 - 	}
 - }
 
 
  |