golang异步任务machinery V2框架使用示例

main.go

package main

import (
	"context"
	"fmt"
	"github.com/RichardKnop/machinery/v2"
	//amqpbackend "github.com/RichardKnop/machinery/v2/backends/amqp"
	redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"

	amqpbroker "github.com/RichardKnop/machinery/v2/brokers/amqp"
	//redisbroker "github.com/RichardKnop/machinery/v2/brokers/redis"
	"github.com/RichardKnop/machinery/v2/config"
	"github.com/RichardKnop/machinery/v2/example/tracers"
	eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
	"github.com/RichardKnop/machinery/v2/log"
	"github.com/RichardKnop/machinery/v2/tasks"
	"github.com/opentracing/opentracing-go"
	"gotest/itasks"
	"time"
)

func main()  {

         //执行后发送任务,并阻塞等待任务结果返回
	if err := send();err!=nil{
		fmt.Println(err.Error())
		return
	}


}

func startServer() (*machinery.Server, error) {
	//cnf, err := config.NewFromYaml("./config.yml",false)
	//if err != nil {
	//	fmt.Printf(err.Error())
	//	return nil, err
	//}

	 cnf := &config.Config{
		 Broker: "amqp://admin:123456@127.0.0.1:31598/",
		 DefaultQueue:  "machinery_tasks",
		 ResultBackend: "amqp://admin:123456@127.0.0.1:31598/",
		 AMQP: &config.AMQPConfig{
		 	AutoDelete: false,
			 Exchange:     "machinery_exchange",
			 ExchangeType: "direct",
			 BindingKey:   "machinery_task",
		 },
	 }
	broker:=amqpbroker.New(cnf)
	//backend := amqpbackend.New(cnf)
	//broker := redisbroker.NewGR(cnf, []string{"123456@127.0.0.1:36379"}, 10)

	backend := redisbackend.NewGR(cnf, []string{"123456@127.0.0.1:36379"}, 10)
	lock := eagerlock.New()
	server := machinery.NewServer(cnf, broker, backend, lock)

	// Register mytasks
	tasksMap := map[string]interface{}{
		"add": mytasks.Add,
	}
	return server, server.RegisterTasks(tasksMap)
}

func worker() error {
	consumerTag := "machinery_worker"

	cleanup, err := tracers.SetupTracer(consumerTag)
	if err != nil {
		log.FATAL.Fatalln("Unable to instantiate a tracer:", err)
	}
	defer cleanup()

	server, err := startServer()
	if err != nil {
		return err
	}

	// The second argument is a consumer tag
	// Ideally, each worker should have a unique tag (worker1, worker2 etc)
	worker := server.NewWorker(consumerTag, 0)

	// Here we inject some custom code for error handling,
	// start and end of task hooks, useful for metrics for example.
	errorHandler := func(err error) {
		log.ERROR.Println("I am an error handler:", err)
	}

	preTaskHandler := func(signature *tasks.Signature) {
		log.INFO.Println("I am a start of task handler for:", signature.Name)
	}

	postTaskHandler := func(signature *tasks.Signature) {
		log.INFO.Println("I am an end of task handler for:", signature.Name)
	}

	worker.SetPostTaskHandler(postTaskHandler)
	worker.SetErrorHandler(errorHandler)
	worker.SetPreTaskHandler(preTaskHandler)

	return worker.Launch()
}


func send() error {
	cleanup, err := tracers.SetupTracer("sender")
	if err != nil {
		log.FATAL.Fatalln("Unable to instantiate a tracer:", err)
	}
	defer cleanup()

	server, err := startServer()
	if err != nil {
		return err
	}

	var (
		addTask0 tasks.Signature

	)

	var initTasks = func() {
		data := mytasks.Data{ID: 1000, Msg: "消息内容"}
		b,_:=data.MarshalBinary()
		eta:=time.Now().UTC().Add(time.Second * 5)
		addTask0 = tasks.Signature{
			Name:           "add",
			RoutingKey:     "",
			ETA:            &eta,
			GroupUUID:      "",
			GroupTaskCount: 0,
			RetryCount: 3,
			Args: []tasks.Arg{
				{
					Type:  "string",
					Value: "IDK53434232",
				},
				{
					Type:  "string",
					Value: "环行之家",
				},
				{
					Type:  "[]byte",
					Value: b,
				},
			},

		}
	}

	span, ctx := opentracing.StartSpanFromContext(context.Background(), "send")
	defer span.Finish()

	log.INFO.Println("Single task:")
	initTasks()
           //发送任务
	_, err = server.SendTaskWithContext(ctx, &addTask0)
	if err != nil {
		return fmt.Errorf("Could not send task: %s", err.Error())
	}

	//taskSignature,err:=server.GetBroker().GetPendingTasks("asong")
	//if err != nil {
	//	fmt.Println(err.Error())
	//}
	//
	//fmt.Println(taskSignature)

	results, err := asyncResult.Get(time.Millisecond * 5)
	if err != nil {
		return fmt.Errorf("Getting task result failed with error: %s", err.Error())
	}
	//
	//taskState:=asyncResult.GetState()
	//fmt.Println(taskState.IsSuccess())
	//fmt.Println(taskState.IsCompleted())
	//fmt.Println(taskState.IsFailure())
        //阻塞,等待获取任务结果
	log.INFO.Printf("split([\"foo\"]) = %v\n", tasks.HumanReadableResults(results))

	return nil
}

itasks/task.go

package mytasks

import (

	"fmt"
	"strconv"
	"strings"
)

type Data struct {
	Msg string
	ID int
}

// 实现 BinaryMarshaler 接口
func (m *Data) MarshalBinary() ([]byte, error) {
	return []byte(fmt.Sprintf("%s:%d", m.Msg, m.ID)), nil
}

// 实现 BinaryUnmarshaler 接口
func (m *Data) UnmarshalBinary(data []byte) error {
	parts := strings.SplitN(string(data), ":", 2)
	if len(parts) != 2 {
		return fmt.Errorf("invalid data format")
	}
	m.Msg = parts[0]
	field2, err := strconv.Atoi(parts[1])
	if err != nil {
		return err
	}
	m.ID = field2
	return nil
}



func Add(appId string,appName string,data []byte) (int64, error) {
	fmt.Println(string(data))
	fmt.Println(appId,appName)

	var sum int64
	for i := 0; i < 1000; i++ {
		sum+=int64(i)
		fmt.Println(sum)

	}


	return sum, nil
}

启动一个worker进程,将上方的main方法代码换成如下代码后启动服务,当有新任务时将会执行

func main() {
	if err := worker(); err != nil {
		fmt.Println(err.Error())
		return
	}
}
阅读剩余
THE END