|  | package endpoints
import (
	"net/http"
	"strconv"
	"strings"
	"github.com/gorilla/websocket"
	"github.com/imosed/signet/client"
	. "github.com/imosed/signet/data"
	"github.com/rs/zerolog/log"
	"github.com/spf13/viper"
	"github.com/stellar/go/clients/horizonclient"
	"github.com/stellar/go/protocols/horizon"
	"github.com/stellar/go/protocols/horizon/operations"
	"golang.org/x/net/context"
)
var upgrader = websocket.Upgrader{
	ReadBufferSize:  1024,
	WriteBufferSize: 1024,
	CheckOrigin: func(r *http.Request) bool {
		origin := r.Header.Get("Origin")
		for _, domain := range viper.GetStringSlice("app.domains") {
			if !strings.HasPrefix(origin, domain) {
				continue
			}
			return true
		}
		return false
	},
}
var wsConn *websocket.Conn
func ContributorStream(resp http.ResponseWriter, req *http.Request) {
	var err error
	wsConn, err = upgrader.Upgrade(resp, req, nil)
	if err != nil {
		log.Error().Err(err).Msg("Could not establish websocket connection")
		return
	}
}
var cancellations []context.CancelFunc
func InitializeContributionStreams() {
	var err error
	var wallets []struct {
		FundWallet string
	}
	Db.Table("reward_funds").Select("fund_wallet").Scan(&wallets)
	contributionUpdateHandler := func(op operations.Operation) {
		if op.GetBase().GetTypeI() == 6 || op.GetBase().GetTypeI() == 12 {
			return
		}
		payment := op.(operations.Payment)
		var tx horizon.Transaction
		var amt float64
		var fund RewardFund
		tx, err = client.SignetClient.TransactionDetail(payment.GetTransactionHash())
		if err != nil {
			log.Error().Err(err).Msg("Could not get transaction from hash")
			return
		}
		amt, err = strconv.ParseFloat(payment.Amount, 64)
		if err != nil {
			log.Error().Err(err).Msg("Could not convert payment to float")
			return
		}
		if tx.Memo == "" {
			return
		}
		Db.Table("reward_funds").Where("memo = ? and fund_wallet = ?", tx.Memo, payment.To).First(&fund)
		newAmt := fund.AmountAvailable - amt
		Db.Model(&RewardFund{}).Where("id = ?", fund.ID).Update("amount_available", newAmt)
		contribution := Contribution{
			ModelBase:     ModelBase{CreatedAt: tx.LedgerCloseTime},
			Wallet:        payment.From,
			Amount:        amt,
			TransactionID: payment.GetTransactionHash(),
			RewardFundID:  fund.ID,
		}
		if wsConn != nil {
			err = wsConn.WriteJSON(contribution)
			if err != nil {
				log.Error().Err(err).Msg("Unable to write json to contribution stream")
			}
		} else {
			log.Info().Msg("No websocket connections")
		}
		Db.Table("contributions").Create(&contribution)
	}
	for _, wallet := range wallets {
		opReq := horizonclient.OperationRequest{
			ForAccount:    wallet.FundWallet,
			Cursor:        "now",
			IncludeFailed: false,
		}
		opReq.SetOperationsEndpoint()
		ctx, cancellation := context.WithCancel(context.Background())
		cancellations = append(cancellations, cancellation)
		err = client.SignetClient.StreamOperations(ctx, opReq, contributionUpdateHandler)
		if err != nil {
			log.Error().Err(err).Msg("Failed to stream contributions from Horizon SignetClient")
		}
	}
}
 |