天津Java培训
达内天津天大中心

022-87670409

热门课程

使用Go语言每分钟处理一百万个请求

  • 时间:2016-04-19 16:46
  • 发布:天津达内
  • 来源:天津达内

  该文是Malwarebytes首席架构师介绍其希望如何使用Go语言实现每分钟处理100万个请求。天津java培训现在和大家来分享一下:
其主要职责是加强系统基础架构,以支持每天数百万人使用,其本人已经在反病毒和反恶意软件领域工作12年,他深深知道这些系统的复杂原因最终是由于每天处理大量数据。
其过去九年后端经历大部分使用Ruby On Rails,虽然他相信RoR是一个令人惊奇的环境,但是当你以Ruby方式开始思考和设计系统一段时间以后,在你需要考虑多线程 并行和快速执行以及小内存消耗的情况下,你会忘记原来系统架构的所谓有效与简单。多年来,他也是C/C++, Delphi 和C# ,他开始认识到使用正确工具做事能减少不必要的复杂性。

作为一个首席架构师,他并不执着于语言和框架,而这两点总是网上争论的焦点,他相信效率产品性和代码维护性大部分依赖于你的架构解决方案有多简单。

问题
目标是能够处理来自数百万端点的POST请求,Web处理器会接受到一个JSON文档,其中包含一个许多数据集合,这些需要写入到Amazon S3,然后Map-reduce系统稍后可以操作分析这些数据。
传统地闯进一个worker-tier工作层架构,如:
1.Sidekiq
2.Resque
3.DelayedJob
4. Elasticbeanstalk Worker Tier
5,RabbitMQ
然后设置两个不同集群,一个是用于Web前端,另外一个是用于worker,这样我们可以扩展很多后端服务器数量以应付大量增长的请求。
当他们发现这是高流量系统时,意识到应该使用Go语言完成,之前其本人已经使用Go语言两年,开发过一些系统,但是没有人确证Go语言能够应付如此大的负载。
天津java培训
他们定义了数据结构,这是通过POST提交获得的请求,然后通过一个方法上传到S3。
type PayloadCollection struct {
WindowsVersion string `json:"version"`
Token string `json:"token"`
Payloads []Payload `json:"data"`
}
type Payload struct {
// [redacted]
}
func (p *Payload) UploadToS3() error {
// the storageFolder method ensures that there are no name collision in
// case we get same timestamp in the key name
storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())
bucket := S3Bucket
b := new(bytes.Buffer)
encodeErr := json.NewEncoder(b).Encode(payload)
if encodeErr != nil {
return encodeErr
}
// Everything we post to the S3 bucket should be marked 'private'
var acl = s3.Private
var contentType = "application/octet-stream"
return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}
起初他们试图并行化这种接受请求然后转发上传的处理工作,将这些处理过程放入一个简单的goroutine(Go语言并行协程)
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
go payload.UploadToS3() // <----- DON'T DO THIS
}
w.WriteHeader(http.StatusOK)
}
在负载稳定情况下,这种方案能够大部分工作得很好,但是在大规模访问下,却工作得不怎么样。当每分钟达到100万POST请求时,这段代码崩溃了。
他们引入了buffered channel缓冲通道,将一些工作放入队列。
var Queue chan Payload
func init() {
Queue = make(chan Payload, MAX_QUEUE)
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {
...
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
Queue <- payload
}
...
}
在队列另外一端,也就是出列工作中再处理上传到S3的工作,如下:
func StartProcessor() {
for {
select {
case job := <-Queue:
job.payload.UploadToS3() // <-- STILL NOT GOOD
}
}
}
但是带来问题是这个队列迅速达到其上限,堵塞住了请求处理器,这样就不能向队列中继续放入。这可以通过加入一个倒数计数来避免,但是系统的延迟会恒速上升(系统变慢)。
导致这个原因是因为上传到S3这个工作非常耗时,因为是通过网络连接,因此引入Java和C概念中的线程池来处理上传工作,这样使用Channel实现Queue+Worker的概念.
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
// Job represents the job to be run
type Job struct {
Payload Payload
}
// A buffered channel that we can send work requests on.
var JobQueue chan Job
// A pool of workers that are instantianted to perform the work
var WorkerPool chan chan Job
// Worker represents the worker that executes the job
type Worker struct {
ID int
JobChannel chan Job
WorkerPool chan chan Job
QuitChan chan bool
}
func NewWorker(id int, workerPool chan chan Job) Worker {
worker := Worker{
ID: id,
Work: make(chan Job),
WorkerPool: workerPool,
QuitChan: make(chan bool)}
return worker
}
// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}
case <-w.QuitChan:
// we have received a signal to stop
return
}
}
}()
}
// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.QuitChan <- true
}()
}
在队列放入的一端代码改为如下:
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
// let's create a job with the payload
work := Job{Payload: payload}
// Push the work onto the queue.
JobQueue <- work
}
w.WriteHeader(http.StatusOK)
}
通过StartDispatcher创建worker池,然后开始监听JobQueue中的job:
func StartDispatcher(maxWorkers int) {
WorkerPool = make(chan chan Job, maxWorkers)
// starting n number of workers
for i := 0; i < maxWorkers; i++ {
worker := NewWorker(i+1, WorkerPool)
worker.Start()
}
go func() {
for {
select {
case job := <-JobQueue:
// a job request has been received
go func(jobChannel chan Job) {
// try to obtain a worker that is available.
// this will block until a worker is idle
worker := <-WorkerPool
// dispatch the job to the worker, dequeuing from
// the jobChannel
worker <- jobChannel
}(job)
}
}
}()
}
最后终于达到了每分钟处理100万个请求,更多测试结果欢迎咨询天津java培训老师.
上一篇:形象解释functor和monad
下一篇:权限引擎和认证框架能结合不?
选择城市和中心
贵州省

广西省

海南省