main.go 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "log"
  7. "time"
  8. "github.com/dtm-labs/client/dtmcli"
  9. "github.com/dtm-labs/client/dtmcli/logger"
  10. "github.com/dtm-labs/client/workflow"
  11. "github.com/gin-gonic/gin"
  12. "github.com/lithammer/shortuuid/v3"
  13. )
  14. const qsBusiAPI = "/api/busi_start"
  15. const qsBusiPort = 8082
  16. var qsBusi = fmt.Sprintf("http://localhost:%d%s", qsBusiPort, qsBusiAPI)
  17. func main() {
  18. QsStartSvr()
  19. _ = QsFireRequest()
  20. time.Sleep(3 * time.Second)
  21. }
  22. // QsStartSvr quick start: start server
  23. func QsStartSvr() {
  24. app := gin.New()
  25. qsAddRoute(app)
  26. log.Printf("quick start examples listening at %d", qsBusiPort)
  27. go func() {
  28. _ = app.Run(fmt.Sprintf(":%d", qsBusiPort))
  29. }()
  30. time.Sleep(100 * time.Millisecond)
  31. }
  32. func qsAddRoute(app *gin.Engine) {
  33. app.POST(qsBusiAPI+"/TransIn", func(c *gin.Context) {
  34. log.Printf("TransIn")
  35. c.JSON(200, "")
  36. // c.JSON(409, "") // Status 409 for Failure. Won't be retried
  37. })
  38. app.POST(qsBusiAPI+"/TransInCompensate", func(c *gin.Context) {
  39. log.Printf("TransInCompensate")
  40. c.JSON(200, "")
  41. })
  42. app.POST(qsBusiAPI+"/TransOut", func(c *gin.Context) {
  43. log.Printf("TransOut")
  44. c.JSON(200, "")
  45. })
  46. app.POST(qsBusiAPI+"/TransOutCompensate", func(c *gin.Context) {
  47. log.Printf("TransOutCompensate")
  48. c.JSON(200, "")
  49. })
  50. app.POST(qsBusiAPI+"/workflowResume", func(ctx *gin.Context) {
  51. log.Printf("workflowResume")
  52. data, err := ioutil.ReadAll(ctx.Request.Body)
  53. logger.FatalIfError(err)
  54. workflow.ExecuteByQS(ctx.Request.URL.Query(), data)
  55. })
  56. }
  57. const dtmServer = "http://localhost:36789/api/dtmsvr"
  58. // QsFireRequest quick start: fire request
  59. func QsFireRequest() string {
  60. workflow.InitHTTP(dtmServer, qsBusi+"/workflowResume")
  61. wfName := "workflow-http"
  62. err := workflow.Register(wfName, func(wf *workflow.Workflow, data []byte) error {
  63. var req gin.H
  64. err := json.Unmarshal(data, &req)
  65. logger.FatalIfError(err)
  66. _, err = wf.NewBranch().OnRollback(func(bb *dtmcli.BranchBarrier) error {
  67. _, err := wf.NewRequest().SetBody(req).Post(qsBusi + "/TransOutCompensate")
  68. return err
  69. }).NewRequest().SetBody(req).Post(qsBusi + "/TransOut")
  70. if err != nil {
  71. return err
  72. }
  73. _, err = wf.NewBranch().OnRollback(func(bb *dtmcli.BranchBarrier) error {
  74. _, err := wf.NewRequest().SetBody(req).Post(qsBusi + "/TransInCompensate")
  75. return err
  76. }).NewRequest().SetBody(req).Post(qsBusi + "/TransIn")
  77. return err
  78. })
  79. logger.FatalIfError(err)
  80. gid := shortuuid.New()
  81. req := &gin.H{"amount": 30} // the payload of requests
  82. data, err := json.Marshal(req)
  83. logger.FatalIfError(err)
  84. err = workflow.Execute(wfName, gid, data)
  85. logger.Infof("workflow.Execute result is: %v", err)
  86. return gid
  87. }