在现代软件开发领域,分布式系统已经变得至关重要。它们使服务能够扩展、处理大量数据并提供高可用性。本文将指导您使用 Golang 构建一个简单的分布式系统,该系统利用主节点和单个工作节点,并使用 gRPC 协议进行通信。
这种架构非常适合数据处理、并行计算和大规模处理工作负载等分布式任务。我们将介绍如何设置主从结构、建立基于 gRPC 的通信,以及实现简单的任务分配和执行流程。
我们的分布式系统包含以下组件:
主节点: 控制器节点,负责将任务分配给工作节点。它跟踪可用的工作节点、监控任务状态并管理任务分配。
工作节点: 执行器节点,接收来自主节点的任务,执行计算并返回结果。
gRPC 协议: gRPC(Google Remote Procedure Call)用于主节点和工作节点之间的通信,实现高效、高性能的通信。
系统上已安装 Go 1.13+。
用于生成 gRPC 代码的 Protobuf 编译器 (protoc)。
gRPC-Go 和 Protobuf 库。
go install google.golang.org/grpc go install google.golang.org/protobuf/cmd/protoc-gen-go go install google.golang.org/protobuf/cmd/protoc-gen-go-grpc
创建基于 gRPC 的分布式系统的第一步是在 .proto 文件中定义 gRPC 服务和消息。此文件概述了用于通信的服务、RPC 方法和消息结构。
创建一个名为 node.proto 的文件,内容如下:
syntax="proto3";package core;optiongo_package=".;core";message Request { stringaction=1;} message Response { stringdata=1;} service NodeService { rpc ReportStatus(Request)returns(Response){};rpc AssignTask(Request)returns(stream Response){};}
使用 protoc 为我们的 gRPC 服务生成 Go 代码:
mkdir core protoc--go_out=./core --go-grpc_out=./core node.proto
我们设置了一个 gRPC 服务器来报告状态,并通过命令通道持续发送客户端任务。它使用 Go 的并发特性来处理实时命令通知。
package coreimport"context"typeNodeServiceGrpcServer struct { UnimplementedNodeServiceServer CmdChannel chan string } func(n NodeServiceGrpcServer)ReportStatus(ctx context.Context,request*Request)(*Response,error){return&Response{Data:"ok"},nil } func(n NodeServiceGrpcServer)AssignTask(request*Request,server NodeService_AssignTaskServer)error {for{select{casecmd :=<-n.CmdChannel:iferr :=server.Send(&Response{Data: cmd});err!=nil {returnerr } } } } var server*NodeServiceGrpcServer func GetNodeServiceGrpcServer()*NodeServiceGrpcServer {ifserver==nil { server=&NodeServiceGrpcServer{ CmdChannel: make(chan string),} }returnserver }
主节点负责将任务分配给工作节点。它通过 gRPC 连接到工作节点,并使用 AssignTask 方法分配任务。
现在,让我们在名为 node.go 的文件中实现主节点:我们使用 API 框架 gin 创建一个简单的 API 服务,该服务允许对 /tasks 的 POST 请求将命令发送到通道 CmdChannel 并传递给 NodeServiceGrpcServer。
package coreimport("net""net/http""github.com/gin-gonic/gin""google.golang.org/grpc")typeMasterNode struct { api*gin.Engineln net.Listener svr*grpc.Server nodeSvr*NodeServiceGrpcServer } func(n*MasterNode)Init()(err error){ n.ln,err=net.Listen("tcp",":50051")iferr!=nil {returnerr } n.svr=grpc.NewServer()n.nodeSvr=GetNodeServiceGrpcServer()RegisterNodeServiceServer(node.svr,n.nodeSvr)n.api=gin.Default()n.api.POST("/tasks",func(c*gin.Context){ var payload struct { Cmd string`json:"cmd"`}iferr :=c.ShouldBindBodyWithJSON(&payload);err!=nil { c.AbortWithStatus(http.StatusBadRequest)return} n.nodeSvr.CmdChannel<-payload.Cmd c.AbortWithStatusJSON(200,http.StatusOK)})returnnil } func(n*MasterNode)Start(){ go n.svr.Serve(n.ln)_=n.api.Run(":9092")n.svr.Stop()} var node*MasterNode func GetMasterNode()*MasterNode {ifnode==nil { node=&MasterNode{}iferr :=node.Init();err!=nil { panic(err)} }returnnode }
工作节点的职责是从主节点接收任务、处理任务并返回结果。
现在,让我们在名为 worker_node.go 的文件中实现工作服务器:工作节点通过获取的流从服务器(主节点)连续接收数据并执行命令。
package coreimport("context""fmt""os/exec""strings""google.golang.org/grpc")typeWokerNode struct { conn*grpc.ClientConn c NodeServiceClient } func(n*WokerNode)Init()(err error){ n.conn,err=grpc.Dial("localhost:50051",grpc.WithInsecure())iferr!=nil {returnerr } n.c=NewNodeServiceClient(n.conn)returnnil } func(n*WokerNode)Start(){ fmt.Println("worker node started")_,_=n.c.ReportStatus(context.Background(),&Request{})stream,_ :=n.c.AssignTask(context.Background(),&Request{})for{ res,err :=stream.Recv()iferr!=nil {return} fmt.Print("received command: ",res.Data)parts :=strings.Split(res.Data," ")iferr :=exec.Command(parts[0],parts[1:]...).Run();err!=nil { fmt.Println(err)} } } var workerNode*WokerNode func GetWorkerNode()*WokerNode {ifworkerNode==nil { workerNode=&WokerNode{}iferr :=workerNode.Init();err!=nil { panic(err)} }returnworkerNode }
我们创建一个 main.go,它位于 core 文件夹之外。main 函数接受一个参数,并将其与 switch 语句进行比较,以确定是运行主节点还是工作节点。
package mainimport("go-master-worker-node/core""os")func main(){ nodeType :=os.Args[1]switch nodeType {case"master": core.GetMasterNode().Start()case"worker": core.GetWorkerNode().Start()default: panic("invalid node type")} }
启动主节点:
go run main.go master
启动工作节点:
go run main.go worker
我们可以使用 curl POST 方法发送命令,如下所示,我们向本地主机 9092 发送一个 touch 命令,路径设置为“tasks”,这是主节点当前运行的位置。
发送 touch 命令:
curl-X POST-H"Content-Type: application/json"-d'{"cmd": "touch test.txt"}'http://localhost:9092/tasks
我们使用 Golang 构建了一个基本的分布式系统,该系统采用主从架构并使用 gRPC 进行高效通信。在实际场景中,您可以使用更复杂的任务分配、负载均衡和错误处理来扩展此模型,以处理生产级别的分布式任务。