💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
#### ngx_http_copy_filter_module分析[](http://tengine.taobao.org/book/chapter_12.html#ngx-http-copy-filter-module "永久链接至标题") ngx_http_copy_filter_module是响应体过滤链(body filter)中非常重要的一个模块,这个filter模块主要是来将一些需要复制的buf(可能在文件中,也可能在内存中)重新复制一份交给后面的filter模块处理。先来看它的初始化函数: [](http:// "点击提交Issue,反馈你的意见...") static ngx_int_t ngx_http_copy_filter_init(ngx_conf_t *cf) { ngx_http_next_body_filter = ngx_http_top_body_filter; ngx_http_top_body_filter = ngx_http_copy_filter; return NGX_OK; } 可以看到,它只注册了body filter,而没有注册header filter,也就是说只有body filter链中才有这个模块。 该模块有一个命令,命令名为output_buffers,用来配置可用的buffer数和buffer大小,它的值保存在copy filter的loc conf的bufs字段,默认数量为1,大小为32768字节。这个参数具体的作用后面会做介绍。 Nginx中,一般filter模块可以header filter函数中根据请求响应头设置一个模块上下文(context),用来保存相关的信息,在body filter函数中使用这个上下文。而copy filter没有header filter,因此它的context的初始化也是放在body filter中的,而它的ctx就是ngx_output_chain_ctx_t,为什么名字是output_chain呢,这是因为copy filter的主要逻辑的处理都放在ngx_output_chain模块中,另外这个模块在core目录下,而不是属于http目录。 接下来看一下上面说到的context结构: [](http:// "点击提交Issue,反馈你的意见...") struct ngx_output_chain_ctx_s { ngx_buf_t *buf; /* 保存临时的buf */ ngx_chain_t *in; /* 保存了将要发送的chain */ ngx_chain_t *free; /* 保存了已经发送完毕的chain,以便于重复利用 */ ngx_chain_t *busy; /* 保存了还未发送的chain */ unsigned sendfile:1; /* sendfile标记 */ unsigned directio:1; /* directio标记 */ #if (NGX_HAVE_ALIGNED_DIRECTIO) unsigned unaligned:1; #endif unsigned need_in_memory:1; /* 是否需要在内存中保存一份(使用sendfile的话, 内存中没有文件的拷贝的,而我们有时需要处理文件, 此时就需要设置这个标记) */ unsigned need_in_temp:1; /* 是否需要在内存中重新复制一份,不管buf是在内存还是文件, 这样的话,后续模块可以直接修改这块内存 */ #if (NGX_HAVE_FILE_AIO) unsigned aio:1; ngx_output_chain_aio_pt aio_handler; #endif off_t alignment; ngx_pool_t *pool; ngx_int_t allocated; /* 已经分别的buf个数 */ ngx_bufs_t bufs; /* 对应loc conf中设置的bufs */ ngx_buf_tag_t tag; /* 模块标记,主要用于buf回收 */ ngx_output_chain_filter_pt output_filter; /* 一般是ngx_http_next_filter,也就是继续调用filter链 */ void *filter_ctx; /* 当前filter的上下文, 这里是由于upstream也会调用output_chain */ }; 为了更好的理解context结构每个域的具体含义,接下来分析filter的具体实现: [](http:// "点击提交Issue,反馈你的意见...") static ngx_int_t ngx_http_copy_filter(ngx_http_request_t *r, ngx_chain_t *in) { ngx_int_t rc; ngx_connection_t *c; ngx_output_chain_ctx_t *ctx; ngx_http_core_loc_conf_t *clcf; ngx_http_copy_filter_conf_t *conf; c = r->connection; ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0, "http copy filter: \"%V?%V\"", &r->uri, &r->args); /* 获取ctx */ ctx = ngx_http_get_module_ctx(r, ngx_http_copy_filter_module); /* 如果为空,则说明需要初始化ctx */ if (ctx == NULL) { ctx = ngx_pcalloc(r->pool, sizeof(ngx_output_chain_ctx_t)); if (ctx == NULL) { return NGX_ERROR; } ngx_http_set_ctx(r, ctx, ngx_http_copy_filter_module); conf = ngx_http_get_module_loc_conf(r, ngx_http_copy_filter_module); clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); /* 设置sendfile */ ctx->sendfile = c->sendfile; /* 如果request设置了filter_need_in_memory的话,ctx的这个域就会被设置 */ ctx->need_in_memory = r->main_filter_need_in_memory || r->filter_need_in_memory; /* 和上面类似 */ ctx->need_in_temp = r->filter_need_temporary; ctx->alignment = clcf->directio_alignment; ctx->pool = r->pool; ctx->bufs = conf->bufs; ctx->tag = (ngx_buf_tag_t) &ngx_http_copy_filter_module; /* 可以看到output_filter就是下一个body filter节点 */ ctx->output_filter = (ngx_output_chain_filter_pt) ngx_http_next_body_filter; /* 此时filter ctx为当前的请求 */ ctx->filter_ctx = r; ... if (in && in->buf && ngx_buf_size(in->buf)) { r->request_output = 1; } } ... for ( ;; ) { /* 最关键的函数,下面会详细分析 */ rc = ngx_output_chain(ctx, in); if (ctx->in == NULL) { r->buffered &= ~NGX_HTTP_COPY_BUFFERED; } else { r->buffered |= NGX_HTTP_COPY_BUFFERED; } ... return rc; } } 上面的代码去掉了AIO相关的部分,函数首先设置并初始化context,接着调用ngx_output_chain函数,这个函数实际上包含了copy filter模块的主要逻辑,它的原型为: ngx_int_t ngx_output_chain(ngx_output_chain_ctx_t [*](http://tengine.taobao.org/book/chapter_12.html#id12)ctx, ngx_chain_t [*](http://tengine.taobao.org/book/chapter_12.html#id14)in) 分段来看它的代码,下面这段代码是一个快捷路径(short path),也就是说当能直接确定所有的in chain都不需要复制的时,可以直接调用output_filter来交给剩下的filter去处理: [](http:// "点击提交Issue,反馈你的意见...") if (ctx->in == NULL && ctx->busy == NULL) { /* * the short path for the case when the ctx->in and ctx->busy chains * are empty, the incoming chain is empty too or has the single buf * that does not require the copy */ if (in == NULL) { return ctx->output_filter(ctx->filter_ctx, in); } if (in->next == NULL #if (NGX_SENDFILE_LIMIT) && !(in->buf->in_file && in->buf->file_last > NGX_SENDFILE_LIMIT) #endif && ngx_output_chain_as_is(ctx, in->buf)) { return ctx->output_filter(ctx->filter_ctx, in); } } 上面可以看到了一个函数ngx_output_chain_as_is,这个函数很关键,下面还会再次被调用,这个函数主要用来判断是否需要复制buf。返回1,表示不需要拷贝,否则为需要拷贝: [](http:// "点击提交Issue,反馈你的意见...") static ngx_inline ngx_int_t ngx_output_chain_as_is(ngx_output_chain_ctx_t *ctx, ngx_buf_t *buf) { ngx_uint_t sendfile; /* 是否为特殊buf(special buf),是的话返回1,也就是不用拷贝 */ if (ngx_buf_special(buf)) { return 1; } /* 如果buf在文件中,并且使用了directio的话,需要拷贝buf */ if (buf->in_file && buf->file->directio) { return 0; } /* sendfile标记 */ sendfile = ctx->sendfile; #if (NGX_SENDFILE_LIMIT) /* 如果pos大于sendfile的限制,设置标记为0 */ if (buf->in_file && buf->file_pos >= NGX_SENDFILE_LIMIT) { sendfile = 0; } #endif if (!sendfile) { /* 如果不走sendfile,而且buf不在内存中,则我们就需要复制到内存一份 */ if (!ngx_buf_in_memory(buf)) { return 0; } buf->in_file = 0; } /* 如果需要内存中有一份拷贝,而并不在内存中,此时返回0,表示需要拷贝 */ if (ctx->need_in_memory && !ngx_buf_in_memory(buf)) { return 0; } /* 如果需要内存中有可修改的拷贝,并且buf存在于只读的内存中或者mmap中,则返回0 */ if (ctx->need_in_temp && (buf->memory || buf->mmap)) { return 0; } return 1; } 上面有两个标记要注意,一个是need_in_memory ,这个主要是用于当使用sendfile的时候,Nginx并不会将请求文件拷贝到内存中,而有时需要操作文件的内容,此时就需要设置这个标记。然后后面的body filter就能操作内容了。 第二个是need_in_temp,这个主要是用于把本来就存在于内存中的buf复制一份可修改的拷贝出来,这里有用到的模块有charset,也就是编解码 filter。 然后接下来这段是复制in chain到ctx->in的结尾,它是通过调用ngx_output_chain_add_copy来进行add copy的,这个函数比较简单,这里就不分析了,不过只有一个要注意的地方,那就是如果buf是存在于文件中,并且file_pos超过了sendfile limit,此时就会切割buf为两个buf,然后保存在两个chain中,最终连接起来: [](http:// "点击提交Issue,反馈你的意见...") /* add the incoming buf to the chain ctx->in */ if (in) { if (ngx_output_chain_add_copy(ctx->pool, &ctx->in, in) == NGX_ERROR) { return NGX_ERROR; } } 然后就是主要的逻辑处理阶段。这里nginx做的非常巧妙也非常复杂,首先是chain的重用,然后是buf的重用。 先来看chain的重用。关键的几个结构以及域:ctx的free,busy以及ctx->pool的chain域。 其中每次发送没有发完的chain就放到busy中,而已经发送完毕的就放到free中,而最后会调用 ngx_free_chain来将free的chain放入到pool->chain中,而在ngx_alloc_chain_link中,如果pool->chain中存在chain的话,就不用malloc了,而是直接返回pool->chain,相关的代码如下: [](http:// "点击提交Issue,反馈你的意见...") /* 链接cl到pool->chain中 */ #define ngx_free_chain(pool, cl) \ cl->next = pool->chain; \ pool->chain = cl /* 从pool中分配chain */ ngx_chain_t * ngx_alloc_chain_link(ngx_pool_t *pool) { ngx_chain_t *cl; cl = pool->chain; /* 如果cl存在,则直接返回cl */ if (cl) { pool->chain = cl->next; return cl; } /* 否则才会malloc chain */ cl = ngx_palloc(pool, sizeof(ngx_chain_t)); if (cl == NULL) { return NULL; } return cl; } 然后是buf的重用,严格意义上来说buf的重用是从free中的chain中取得的,当free中的buf被重用,则这个buf对应的chain就会被链接到ctx->pool中,从而这个chain就会被重用。也就是说首先考虑的是buf的重用,只有当这个chain的buf确定不需要被重用(或者说已经被重用)的时候,chain才会被链接到ctx->pool中被重用。 还有一个就是ctx的allocated域,这个域表示了当前的上下文中已经分配了多少个buf,output_buffer命令用来设置output的buf大小以及buf的个数。而allocated如果比output_buffer大的话,则需要先发送完已经存在的buf,然后才能再次重新分配buf。 来看代码,上面所说的重用以及buf的控制,代码里面都可以看的比较清晰。下面这段主要是拷贝buf前所做的一些工作,比如判断是否拷贝,以及给buf分贝内存等: [](http:// "点击提交Issue,反馈你的意见...") /* out为最终需要传输的chain,也就是交给剩下的filter处理的chain */ out = NULL; /* last_out为out的最后一个chain */ last_out = &out; last = NGX_NONE; for ( ;; ) { /* 开始遍历chain */ while (ctx->in) { /* 取得当前chain的buf大小 */ bsize = ngx_buf_size(ctx->in->buf); /* 跳过bsize为0的buf */ if (bsize == 0 && !ngx_buf_special(ctx->in->buf)) { ngx_debug_point(); ctx->in = ctx->in->next; continue; } /* 判断是否需要复制buf */ if (ngx_output_chain_as_is(ctx, ctx->in->buf)) { /* move the chain link to the output chain */ /* 如果不需要复制,则直接链接chain到out,然后继续循环 */ cl = ctx->in; ctx->in = cl->next; *last_out = cl; last_out = &cl->next; cl->next = NULL; continue; } /* 到达这里,说明我们需要拷贝buf,这里buf最终都会被拷贝进ctx->buf中, 因此这里先判断ctx->buf是否为空 */ if (ctx->buf == NULL) { /* 如果为空,则取得buf,这里要注意,一般来说如果没有开启directio的话, 这个函数都会返回NGX_DECLINED */ rc = ngx_output_chain_align_file_buf(ctx, bsize); if (rc == NGX_ERROR) { return NGX_ERROR; } /* 大部分情况下,都会落入这个分支 */ if (rc != NGX_OK) { /* 准备分配buf,首先在free中寻找可以重用的buf */ if (ctx->free) { /* get the free buf */ /* 得到free buf */ cl = ctx->free; ctx->buf = cl->buf; ctx->free = cl->next; /* 将要重用的chain链接到ctx->poll中,以便于chain的重用 */ ngx_free_chain(ctx->pool, cl); } else if (out || ctx->allocated == ctx->bufs.num) { /* 如果已经等于buf的个数限制,则跳出循环,发送已经存在的buf。 这里可以看到如果out存在的话,nginx会跳出循环,然后发送out, 等发送完会再次处理,这里很好的体现了nginx的流式处理 */ break; } else if (ngx_output_chain_get_buf(ctx, bsize) != NGX_OK) { /* 上面这个函数也比较关键,它用来取得buf。接下来会详细看这个函数 */ return NGX_ERROR; } } } /* 从原来的buf中拷贝内容或者从文件中读取内容 */ rc = ngx_output_chain_copy_buf(ctx); if (rc == NGX_ERROR) { return rc; } if (rc == NGX_AGAIN) { if (out) { break; } return rc; } /* delete the completed buf from the ctx->in chain */ if (ngx_buf_size(ctx->in->buf) == 0) { ctx->in = ctx->in->next; } /* 分配新的chain节点 */ cl = ngx_alloc_chain_link(ctx->pool); if (cl == NULL) { return NGX_ERROR; } cl->buf = ctx->buf; cl->next = NULL; *last_out = cl; last_out = &cl->next; ctx->buf = NULL; } ... } 上面的代码分析的时候有个很关键的函数,那就是ngx_output_chain_get_buf,这个函数当没有可重用的buf时用来分配buf。 如果当前的buf位于最后一个chain,则需要特殊处理,一是buf的recycled域,另外是将要分配的buf的大小。 先来说recycled域,这个域表示当前的buf需要被回收。而一般情况下Nginx(比如在非last buf)会缓存一部分buf(默认是1460字节),然后再发送,而设置了recycled的话,就不会让它缓存buf,也就是尽量发送出去,然后以供回收使用。 因此如果是最后一个buf,则不需要设置recycled域的,否则的话,需要设置recycled域。 然后就是buf的大小。这里会有两个大小,一个是需要复制的buf的大小,一个是配置文件中设置的大小。如果不是最后一个buf,则只需要分配配置中设置的buf的大小就行了。如果是最后一个buf,则就处理不太一样,下面的代码会看到: [](http:// "点击提交Issue,反馈你的意见...") static ngx_int_t ngx_output_chain_get_buf(ngx_output_chain_ctx_t *ctx, off_t bsize) { size_t size; ngx_buf_t *b, *in; ngx_uint_t recycled; in = ctx->in->buf; /* 可以看到这里分配的buf,每个buf的大小是配置文件中设置的size */ size = ctx->bufs.size; /* 默认有设置recycled域 */ recycled = 1; /* 如果当前的buf是属于最后一个chain的时候,需要特殊处理 */ if (in->last_in_chain) { /* 如果buf大小小于配置指定的大小,则直接按实际大小分配,不设置回收标记 */ if (bsize < (off_t) size) { /* * allocate a small temp buf for a small last buf * or its small last part */ size = (size_t) bsize; recycled = 0; } else if (!ctx->directio && ctx->bufs.num == 1 && (bsize < (off_t) (size + size / 4))) { /* * allocate a temp buf that equals to a last buf, * if there is no directio, the last buf size is lesser * than 1.25 of bufs.size and the temp buf is single */ size = (size_t) bsize; recycled = 0; } } /* 开始分配buf内存 */ b = ngx_calloc_buf(ctx->pool); if (b == NULL) { return NGX_ERROR; } if (ctx->directio) { /* directio需要对齐 */ b->start = ngx_pmemalign(ctx->pool, size, (size_t) ctx->alignment); if (b->start == NULL) { return NGX_ERROR; } } else { /* 大部分情况会走到这里 */ b->start = ngx_palloc(ctx->pool, size); if (b->start == NULL) { return NGX_ERROR; } } b->pos = b->start; b->last = b->start; b->end = b->last + size; /* 设置temporary */ b->temporary = 1; b->tag = ctx->tag; b->recycled = recycled; ctx->buf = b; /* 更新allocated,可以看到每分配一个就加1 */ ctx->allocated++; return NGX_OK; } 分配新的buf和chain,并调用ngx_output_chain_copy_buf拷贝完数据之后,Nginx就将新的chain链表交给下一个body filter继续处理: [](http:// "点击提交Issue,反馈你的意见...") if (out == NULL && last != NGX_NONE) { if (ctx->in) { return NGX_AGAIN; } return last; } last = ctx->output_filter(ctx->filter_ctx, out); if (last == NGX_ERROR || last == NGX_DONE) { return last; } ngx_chain_update_chains(ctx->pool, &ctx->free, &ctx->busy, &out, ctx->tag); last_out = &out; 在其他body filter处理完之后,ngx_output_chain函数还需要更新chain链表,以便回收利用,ngx_chain_update_chains函数主要是将处理完毕的chain节点放入到free链表,没有处理完毕的放到busy链表中,另外这个函数用到了tag,它只回收copy filter产生的chain节点。