C++性能提示

常有人断章取义引用克努斯的*“过早优化是万恶之源”。其完整引文实为:”我们应当忽略97%的小幅效率提升:过早优化是万恶之源。但关键的3%我们绝不能错过。”

目录

多年来,我们(Jeff 与 Sanjay)深入研究了各类代码的性能调优工作。自谷歌创立之初,提升软件性能就始终是我们的重要任务——这使我们能够为更多用户提供更优质的服务。本文旨在归纳我们进行此类工作时遵循的通用原则与具体技术,并精选具有代表性的源代码变更(变更列表,即CL)作为各类方法与技术的实例。下文具体建议多涉及C++类型和CL,但通用原则同样适用于其他语言。本文聚焦于单一二进制文件的通用性能调优,不涉及分布式系统或机器学习(ML)硬件性能调优(这些领域本身极为广阔)。我们希望本文能为他人提供参考价值。

文档中多数示例包含演示技术的代码片段(点击小三角即可展开!)。 请注意部分代码片段提及谷歌内部代码库的各类抽象概念。若我们认为示例本身足够完整,即使不熟悉这些抽象细节的读者也能理解,则仍保留相关说明。

元素周期表

性能思考的重要性§

常有人断章取义引用克努斯的*“过早优化是万恶之源”。其完整引文实为:“我们应当忽略97%的小幅效率提升:过早优化是万恶之源。但关键的3%我们绝不能错过。”本文正是探讨这关键的3%,而另一句更具说服力的克努斯名言是:

从示例2到示例2a的速度提升仅约12%,多数人会认为微不足道。当今软件工程师普遍奉行的传统智慧主张忽略微小效率;但我认为这不过是对那些只顾小利而忽视大局的程序员滥用优化的过度反应——他们既无法调试也无法维护自己“优化”后的程序。在成熟的工程领域,轻易可得的12%提升绝非微不足道;我认为软件工程也应秉持相同理念。当然,对于一次性任务我不会费心优化,但涉及高质量程序开发时,我绝不愿受限于那些剥夺效率提升的工具。

许多人会说“先用最简洁的方式编写代码,性能问题留待后续分析时处理”。但这种做法往往是错误的:

  1. 若在开发大型系统时完全忽视性能问题,最终将得到一张平坦的性能分析图——由于性能损失遍布各处,导致没有明显热点区域。这将使性能优化工作无从下手。
  2. 若开发的是供他人使用的库,最终遭遇性能瓶颈的往往是难以直接优化性能的人群(他们需要理解其他团队编写的代码细节,并就性能优化的必要性与之协商)。
  3. 系统处于高负载运行时,进行重大改动更为困难。
  4. 难以判断哪些性能问题可轻松解决,最终可能导致采用过度复制或严重超配服务等代价高昂的方案来应对负载问题。

因此我们建议:编写代码时,若不显著影响代码可读性/复杂度,应优先选择更快的实现方案。

性能预估§

若能培养对代码性能重要性的直觉判断,便能做出更明智的决策(例如:为追求性能可接受多少额外复杂度)。以下是在编写代码时评估性能的建议:

  • 是否为测试代码?如果是,主要关注算法和数据结构的渐近复杂度即可。(注:开发周期时间很重要,因此避免编写运行时间过长的测试。)
  • 是否属于应用程序专用代码?如果是,请尝试评估这段代码对性能的要求。这通常并不困难:只需判断代码属于初始化/设置代码,还是最终会出现在热点路径上的代码(例如处理服务中的每个请求)就足够了。
  • 这是将被多个应用程序使用的库代码吗?在这种情况下,很难预判其敏感程度。正因如此,遵循本文档所述的简单技巧尤为重要。例如,若需存储通常元素数量较少的向量,请使用 absl::InlinedVector 替代 std::vector。这些技巧并不难遵循,也不会给系统增加任何非局部复杂性。如果最终发现你编写的代码确实消耗了大量资源,那么它从一开始就能提供更高的性能。而且在分析性能时,更容易找到下一个需要关注的优化点。

在选择可能存在性能差异的方案时,可借助信封背面计算进行更深入的分析。这类计算能快速估算不同方案的性能差异,其结果可直接淘汰部分方案而无需实际实现。

具体估算流程如下:

  1. 估算各类底层操作所需次数,例如磁盘寻道次数、网络往返次数、传输字节数等。
  2. 将每类高成本操作乘以其粗略开销值,再将结果相加。
  3. 上述步骤得出的结果即为系统资源消耗的成本。若需关注延迟且系统存在并发性,部分成本可能存在重叠,此时需进行稍复杂的分析以估算延迟。

下表是2007年斯坦福大学演讲中表格的更新版本 (2007年演讲视频已不存在,但可参考2011年斯坦福相关演讲视频,其中涵盖部分相同内容),该表列出了需考虑的操作类型及其大致成本:

L1 cache reference                             0.5 ns
L2 cache reference                             3 ns
Branch mispredict                              5 ns
Mutex lock/unlock (uncontended)               15 ns
Main memory reference                         50 ns
Compress 1K bytes with Snappy              1,000 ns
Read 4KB from SSD                         20,000 ns
Round trip within same datacenter         50,000 ns
Read 1MB sequentially from memory         64,000 ns
Read 1MB over 100 Gbps network           100,000 ns
Read 1MB from SSD                      1,000,000 ns
Disk seek                              5,000,000 ns
Read 1MB sequentially from disk       10,000,000 ns
Send packet CA->Netherlands->CA      150,000,000 ns

上表列出了若干基础底层操作的粗略成本。您可能还需追踪系统相关的高级操作成本估算,例如:SQL数据库读取单条记录的成本、云服务交互的延迟、或渲染简单HTML页面的耗时。若不清楚不同操作的实际成本,连粗略估算都难以完成!

示例:快速排序十亿个4字节数值所需时间§

粗略估算,优质的快速排序算法对大小为N的数组需执行log(N)次遍历。每次遍历中,数组内容将从内存流式传输至处理器缓存,划分代码会将每个元素与枢轴元素进行一次比较。让我们汇总主要成本:

  1. 内存带宽:数组占用4GB(每个数占4字节×10亿个数)。假设每核内存带宽约为16GB/s,则每次遍历耗时约0.25秒。当N≈2^30时,需进行约30次遍历,总内存传输成本约为7.5秒。
  2. 分支预测错误:总计需执行 N*log(N) 次比较(约 300 亿次)。假设其中半数(即 150 亿次)预测错误,按每次错误消耗 5 纳秒计算,预测错误成本为 75 秒。本分析中正确预测的分支不计成本。
  3. 综合上述数据,总耗时约为82.5秒。

若需进一步分析,可考虑处理器缓存的影响。根据上述分析,分支预测错误已是主要成本项,此项优化可能并非必要,但仍将其作为补充示例列出。假设我们拥有32MB L3缓存,且数据从L3缓存传输至处理器的成本可忽略不计。L3缓存可容纳2^23个数值,因此最后22次迭代可直接操作L3缓存中的数据(第23次迭代将数据加载至L3缓存,后续迭代均基于该缓存数据运行)。这将内存传输成本从7.5秒(30次传输)降至2.5秒(10次传输,每次传输4GB数据,传输速率16GB/s)。

示例:生成含30张图片缩略图的网页所需时间§

比较两种潜在方案:原始图片存储于磁盘,每张图片约1MB大小。

  1. 串行读取30张图片内容并逐张生成缩略图。每次读取包含一次寻道+一次传输:寻道耗时5毫秒,传输耗时10毫秒,因此30张图片需耗时30×15毫秒=450毫秒。
  2. 假设图像均匀分布在K个磁盘上,采用并行读取方式。先前资源消耗的估算依然成立,但延迟将降低约K倍(忽略波动因素,例如偶尔可能出现某磁盘承载超过1/K的读取图像)。因此若在拥有数百个磁盘的分布式文件系统上运行,预期延迟将降至约15毫秒。
  3. 考虑所有图像存储于单个SSD的变体。这将使顺序读取性能变为每张图像20微秒+1毫秒,总计约30毫秒。

测量§

前文提供了编写代码时思考性能的思路,无需过度纠结如何量化性能影响。但在实际优化前,或面临性能与简洁性等权衡时,仍需测量或估算潜在性能收益。高效的测量能力是性能优化工作中最重要的工具。

顺带一提,对不熟悉的代码进行性能分析也是把握代码库整体结构和运行机制的有效途径。通过检查程序动态调用图中频繁调用的函数源代码,可获得代码运行时的宏观认知,从而增强你在稍显陌生的代码中实施性能优化改动的信心。

性能分析工具与技巧§

现有多种实用性能分析工具可供选择。首选工具当属pprof,它能提供优质的高层级性能信息,且在本地环境和生产环境中均易于使用。若需更深入的性能洞察,也可尝试perf

性能分析技巧:

  • 构建生产环境二进制文件时,需包含适当的调试信息和优化标志。
  • 若条件允许,编写覆盖待优化代码的微基准测试。微基准测试能缩短性能优化迭代周期,验证优化效果,并预防未来性能退化。但需注意其存在陷阱,可能导致测试结果无法代表完整系统性能。编写微基准测试的实用库:C++ Go Java
  • 使用基准测试库输出性能计数器读数,既能提高精度,又能深入了解程序行为。
  • 锁竞争常会人为降低CPU使用率。某些互斥锁实现支持锁竞争分析。
  • 机器学习性能分析请使用ML分析工具

当性能分析结果平坦化时的应对策略§

您常会遇到CPU性能分析结果呈现平坦化(无明显瓶颈)的情况,这通常发生在所有低难度优化点已被采纳之后。若您陷入此类困境,可参考以下建议:

  • 切勿低估微优化累积效应!在子系统中实现二十处独立的1%性能提升往往切实可行,其综合效果将带来显著改进(此类工作通常依赖稳定且高质量的微基准测试)。相关示例可参见展示多重技术的变更方案章节。
  • 寻找调用堆栈顶部附近的循环(CPU 性能分析的火焰图视图在此很有帮助)。这些循环或其调用的代码可能通过重构提升效率。例如,某段代码最初通过遍历输入节点的边和节点来逐步构建复杂图结构,后来改为一次性传递完整输入来构建图结构。此举消除了初始代码中针对每条边执行的内部检查。
  • 退一步思考,在调用堆栈更高层级寻找结构性改进,而非专注于微优化。此时可参考算法改进部分列出的技术手段。
  • 寻找过于通用的代码。用定制化或更底层的实现方案替换它。例如,如果应用程序反复使用正则表达式匹配,而简单的前缀匹配就足够了,则考虑放弃正则表达式的使用。
  • 尝试减少内存分配次数:获取分配剖析报告(https://gperftools.github.io/gperftools/heapprofile.html),重点优化分配次数最高的环节。此举将产生双重效果:(1) 直接缩短分配器(及带垃圾回收的语言中的垃圾收集器)的运行时间;(2) 通常能减少缓存未命中,因为在使用 tcmalloc 的长期运行程序中,每次分配往往会占用不同的缓存行。
  • 收集其他类型的性能剖析数据,特别是基于硬件性能计数器的剖析。此类剖析可能指出遭遇高缓存未命中率的函数。可参考性能剖析工具与技巧章节所述技术。

API 考量§

以下部分优化方案需修改数据结构和函数签名,可能对调用方造成干扰。建议将代码组织在封装边界内实施性能优化,避免影响公共接口。若模块采用深度封装(通过窄接口访问核心功能),实施难度将显著降低(详见[https://web.stanford.edu/~ouster/cgi-bin/book.php])。

广泛使用的API面临着添加功能的巨大压力。添加新功能时需谨慎,因为这些功能将限制未来的实现,并为不需要新功能的用户增加不必要的成本。例如,许多C++标准库容器承诺迭代器稳定性,但在典型实现中,这会显著增加分配次数,尽管许多用户并不需要指针稳定性。

以下列举若干具体技术。需审慎权衡性能收益与由此引发的API可用性问题。

批量API§

提供批量操作以减少高成本的API边界调用,或利用算法改进优势。

新增批量接口MemoryManager::LookupMany。

除新增批量接口外,此举还简化了新批量变体的签名:经测试客户端仅需判断是否找到所有键值,因此可直接返回布尔值而非Status对象。

memory_manager.h

class MemoryManager {
 public:
  ...
  util::StatusOr<LiveTensor> Lookup(const TensorIdProto& id);
class MemoryManager {
 public:
  ...
  util::StatusOr<LiveTensor> Lookup(const TensorIdProto& id);

  // Lookup the identified tensors
  struct LookupKey {
    ClientHandle client;
    uint64 local_id;
  };
  bool LookupMany(absl::Span<const LookupKey> keys,
                  absl::Span<tensorflow::Tensor> tensors);
添加批量ObjectStore::DeleteRefs API以摊销锁定开销。

object_store.h

template <typename T>
class ObjectStore {
 public:
  ...
  absl::Status DeleteRef(Ref);
template <typename T>
class ObjectStore {
 public:
  ...
  absl::Status DeleteRef(Ref);

  // Delete many references.  For each ref, if no other Refs point to the same
  // object, the object will be deleted.  Returns non-OK on any error.
  absl::Status DeleteRefs(absl::Span<const Ref> refs);
  ...
template <typename T>
absl::Status ObjectStore<T>::DeleteRefs(absl::Span<const Ref> refs) {
  util::Status result;
  absl::MutexLock l(&mu_);
  for (auto ref : refs) {
    result.Update(DeleteRefLocked(ref));
  }
  return result;
}

memory_tracking.cc

void HandleBatch(int, const plaque::Batch& input) override {
  for (const auto& t : input) {
    auto in = In(t);
    PLAQUE_OP_ASSIGN_OR_RETURN(const auto& handles, in.handles());
    for (const auto handle : handles.value->handles()) {
      PLAQUE_OP_RETURN_IF_ERROR(in_buffer_store_
                                    ? bstore_->DeleteRef(handle)
                                    : tstore_->DeleteRef(handle));
    }
  }
}
void HandleBatch(int, const plaque::Batch& input) override {
  for (const auto& t : input) {
    auto in = In(t);
    PLAQUE_OP_ASSIGN_OR_RETURN(const auto& handles, in.handles());
    if (in_buffer_store_) {
      PLAQUE_OP_RETURN_IF_ERROR(
          bstore_->DeleteRefs(handles.value->handles()));
    } else {
      PLAQUE_OP_RETURN_IF_ERROR(
          tstore_->DeleteRefs(handles.value->handles()));
    }
  }
}
采用弗洛伊德堆构造法实现高效初始化。

堆的批量初始化可在 O(N) 时间内完成,而逐个添加元素并在每次添加后更新堆属性则需要 O(N lg(N)) 时间。

有时难以直接让调用方改用新的批量 API。此时可考虑在内部使用批量 API,并将结果缓存以供后续非批量 API 调用使用:

缓存区块解码结果以供后续调用使用。

每次查找都需要解码包含K条目整区块。将解码后的条目存储在缓存中,后续查找时直接调用缓存。

lexicon.cc

void GetTokenString(int pos, std::string* out) const {
  ...
  absl::FixedArray<LexiconEntry, 32> entries(pos + 1);

  // Decode all lexicon entries up to and including pos.
  for (int i = 0; i <= pos; ++i) {
    p = util::coding::TwoValuesVarint::Decode32(p, &entries[i].remaining,
                                                &entries[i].shared);
    entries[i].remaining_str = p;
    p += entries[i].remaining;  // remaining bytes trail each entry.
  }
mutable std::vector<absl::InlinedVector<std::string, 16>> cache_;
...
void GetTokenString(int pos, std::string* out) const {
  ...
  DCHECK_LT(skentry, cache_.size());
  if (!cache_[skentry].empty()) {
    *out = cache_[skentry][pos];
    return;
  }
  ...
  // Init cache.
  ...
  const char* prev = p;
  for (int i = 0; i < block_sz; ++i) {
    uint32 shared, remaining;
    p = TwoValuesVarint::Decode32(p, &remaining, &shared);
    auto& cur = cache_[skentry].emplace_back();
    gtl::STLStringResizeUninitialized(&cur, remaining + shared);

    std::memcpy(cur.data(), prev, shared);
    std::memcpy(cur.data() + shared, p, remaining);
    prev = cur.data();
    p += remaining;
  }
  *out = cache_[skentry][pos];

视图类型§

函数参数应优先采用视图类型(如std::string_viewstd::Span<T>absl::FunctionRef<R(Args...)>),除非涉及数据所有权转移。这些类型可减少复制操作,并允许调用方选择容器类型(例如:某调用方可能使用std::vector,而另一调用方使用absl::InlinedVector)。

预分配/预计算参数§

对于高频调用的例程,有时允许更高层调用方传递其拥有的数据结构或客户端已有的必要信息会很有用。这可避免底层例程被迫分配临时数据结构或重新计算已有的信息。

新增 RPC_Stats::RecordRPC 变体,允许客户端传递已有的 WallTime 值。

rpc-stats.h

static void RecordRPC(const Name &name, const RPC_Stats_Measurement& m);
static void RecordRPC(const Name &name, const RPC_Stats_Measurement& m,
                      WallTime now);

clientchannel.cc

const WallTime now = WallTime_Now();
...
RPC_Stats::RecordRPC(stats_name, m);
const WallTime now = WallTime_Now();
...
RPC_Stats::RecordRPC(stats_name, m, now);

线程兼容型与线程安全型数据结构§

类型可分为线程兼容型(外部同步)或线程安全型(内部同步)。多数常用类型应采用线程兼容设计,使无需线程安全性的调用方无需为此付出代价。

因调用方已实现同步,故将该类设为线程兼容型。

hitless-transfer-phase.cc

TransferPhase HitlessTransferPhase::get() const {
  static CallsiteMetrics cm("HitlessTransferPhase::get");
  MonitoredMutexLock l(&cm, &mutex_);
  return phase_;
}
TransferPhase HitlessTransferPhase::get() const { return phase_; }

hitless-transfer-phase.cc

bool HitlessTransferPhase::AllowAllocate() const {
  static CallsiteMetrics cm("HitlessTransferPhase::AllowAllocate");
  MonitoredMutexLock l(&cm, &mutex_);
  return phase_ == TransferPhase::kNormal || phase_ == TransferPhase::kBrownout;
}
bool HitlessTransferPhase::AllowAllocate() const {
  return phase_ == TransferPhase::kNormal || phase_ == TransferPhase::kBrownout;
}

然而,若类型在典型使用场景中需要同步操作,则应优先将同步机制移入类型内部。此举可使同步机制根据需要进行调整以提升性能(例如通过分片减少竞争),同时不影响调用方。

算法优化§

性能优化的关键机遇源于算法改进,例如将O(N²)算法优化为O(N lg(N))或O(N),避免潜在指数级行为等。此类机会在稳定代码中较为罕见,但在编写新代码时值得关注。以下是现有代码优化案例:

采用逆后序遍历向循环检测结构添加节点

此前我们采用逐个添加图节点和边的方式构建循环检测数据结构,每条边都需要高昂的计算成本。现改为以逆后序遍历方式一次性添加整个图,使循环检测变得轻而易举。

graphcycles.h

class GraphCycles : public util_graph::Graph {
 public:
  GraphCycles();
  ~GraphCycles() override;

  using Node = util_graph::Node;
class GraphCycles : public util_graph::Graph {
 public:
  GraphCycles();
  ~GraphCycles() override;

  using Node = util_graph::Node;

  // InitFrom adds all the nodes and edges from src, returning true if
  // successful, false if a cycle is encountered.
  // REQUIRES: no nodes and edges have been added to GraphCycles yet.
  bool InitFrom(const util_graph::Graph& src);

graphcycles.cc

bool GraphCycles::InitFrom(const util_graph::Graph& src) {
  ...
  // Assign ranks in topological order so we don't need any reordering during
  // initialization. For an acyclic graph, DFS leaves nodes in reverse
  // topological order, so we assign decreasing ranks to nodes as we leave them.
  Rank last_rank = n;
  auto leave = [&](util_graph::Node node) {
    DCHECK(r->rank[node] == kMissingNodeRank);
    NodeInfo* nn = &r->nodes[node];
    nn->in = kNil;
    nn->out = kNil;
    r->rank[node] = --last_rank;
  };
  util_graph::DFSAll(src, std::nullopt, leave);

  // Add all the edges (detect cycles as we go).
  bool have_cycle = false;
  util_graph::PerEdge(src, [&](util_graph::Edge e) {
    DCHECK_NE(r->rank[e.src], kMissingNodeRank);
    DCHECK_NE(r->rank[e.dst], kMissingNodeRank);
    if (r->rank[e.src] >= r->rank[e.dst]) {
      have_cycle = true;
    } else if (!HasEdge(e.src, e.dst)) {
      EdgeListAddNode(r, &r->nodes[e.src].out, e.dst);
      EdgeListAddNode(r, &r->nodes[e.dst].in, e.src);
    }
  });
  if (have_cycle) {
    return false;
  } else {
    DCHECK(CheckInvariants());
    return true;
  }
}

graph_partitioner.cc

absl::Status MergeGraph::Init() {
  const Graph& graph = *compiler_->graph();
  clusters_.resize(graph.NodeLimit());
  graph.PerNode([&](Node node) {
    graph_->AddNode(node);
    NodeList* n = new NodeList;
    n->push_back(node);
    clusters_[node] = n;
  });
  absl::Status s;
  PerEdge(graph, [&](Edge e) {
    if (!s.ok()) return;
    if (graph_->HasEdge(e.src, e.dst)) return;  // already added
    if (!graph_->InsertEdge(e.src, e.dst)) {
      s = absl::InvalidArgumentError("cycle in the original graph");
    }
  });
  return s;
}
absl::Status MergeGraph::Init() {
  const Graph& graph = *compiler_->graph();
  if (!graph_->InitFrom(graph)) {
    return absl::InvalidArgumentError("cycle in the original graph");
  }
  clusters_.resize(graph.NodeLimit());
  graph.PerNode([&](Node node) {
    NodeList* n = new NodeList;
    n->push_back(node);
    clusters_[node] = n;
  });
  return absl::OkStatus();
}
用更优算法替换互斥锁实现中内置的死锁检测系统。

采用新算法替换死锁检测机制,其运行速度提升约50倍,且可无障碍扩展至数百万个互斥锁(旧算法依赖2K限制避免性能断崖)。新代码基于以下论文:《有向无环图的动态拓扑排序算法》David J. Pearce, Paul H. J. Kelly《实验算法学报》(JEA)JEA主页存档第11卷,2006年,文章编号1.7

新算法仅需 O(|V|+|E|) 空间(旧算法需 O(|V|^2) 位)。由于锁获取顺序图结构极为稀疏,空间消耗大幅降低。该算法结构简洁:核心实现仅需约100行C++代码。由于代码现可支持更大规模互斥量,我们得以解除人为设定的2K限制,从而在实际程序中发现了大量潜在死锁。

基准测试结果:因死锁检测主要在调试模式下启用,故采用DEBUG模式运行。基准测试参数(/2k等)代表追踪节点数量。在旧算法默认2k限制下,新算法每次插入边仅需0.5微秒,而旧算法耗时22微秒。新算法还能轻松扩展至更大规模图结构而不崩溃,旧算法则会迅速崩溃。

DEBUG: Benchmark            Time(ns)    CPU(ns) Iterations
----------------------------------------------------------
DEBUG: BM_StressTest/2k        23553      23566      29086
DEBUG: BM_StressTest/4k        45879      45909      15287
DEBUG: BM_StressTest/16k      776938     777472        817
DEBUG: BM_StressTest/2k          392        393   10485760
DEBUG: BM_StressTest/4k          392        393   10485760
DEBUG: BM_StressTest/32k         407        407   10485760
DEBUG: BM_StressTest/256k        456        456   10485760
DEBUG: BM_StressTest/1M          534        534   10485760
用哈希表(O(1)查找)替换间隔映射(IntervalMap,O(lg N)查找)。

初始代码采用区间映射(IntervalMap)因其似乎是支持相邻块合并的理想数据结构,但哈希表足以胜任——通过哈希表查找即可定位相邻块。此项改进(结合CL中的其他变更)使tpu::BestFitAllocator性能提升约4倍。

best_fit_allocator.h

using Block = gtl::IntervalMap<int64, BlockState>::Entry;
...
// Map of pairs (address range, BlockState) with one entry for each allocation
// covering the range [0, allocatable_range_end_).  Adjacent kFree and
// kReserved blocks are coalesced. Adjacent kAllocated blocks are not
// coalesced.
gtl::IntervalMap<int64, BlockState> block_list_;

// Set of all free blocks sorted according to the allocation policy. Adjacent
// free blocks are coalesced.
std::set<Block, BlockSelector> free_list_;
// A faster hash function for offsets in the BlockTable
struct OffsetHash {
  ABSL_ATTRIBUTE_ALWAYS_INLINE size_t operator()(int64 value) const {
    uint64 m = value;
    m *= uint64_t{0x9ddfea08eb382d69};
    return static_cast<uint64_t>(m ^ (m >> 32));
  }
};

// Hash table maps from block start address to block info.
// We include the length of the previous block in this info so we
// can find the preceding block to coalesce with.
struct HashTableEntry {
  BlockState state;
  int64 my_length;
  int64 prev_length;  // Zero if there is no previous block.
};
using BlockTable = absl::flat_hash_map<int64, HashTableEntry, OffsetHash>;
用哈希表查找(O(N))替换排序列表交集(O(N log N))。

旧代码检测两个节点是否共享源时,会将每个节点的源按排序顺序获取,然后进行排序交集操作。新代码将一个节点的源放入哈希表,然后遍历另一个节点的源并检查哈希表。

name             old time/op  new time/op  delta
BM_CompileLarge   28.5s ± 2%   22.4s ± 2%  -21.61%  (p=0.008 n=5+5)
实现优质哈希函数,使时间复杂度从O(N)降至O(1)。

location.h

// Hasher for Location objects.
struct LocationHash {
  size_t operator()(const Location* key) const {
    return key != nullptr ? util_hash::Hash(key->address()) : 0;
  }
};
size_t HashLocation(const Location& loc);
...
struct LocationHash {
  size_t operator()(const Location* key) const {
    return key != nullptr ? HashLocation(*key) : 0;
  }
};

location.cc

size_t HashLocation(const Location& loc) {
  util_hash::MurmurCat m;

  // Encode some simpler features into a single value.
  m.AppendAligned((loc.dynamic() ? 1 : 0)                    //
                  | (loc.append_shard_to_address() ? 2 : 0)  //
                  | (loc.is_any() ? 4 : 0)                   //
                  | (!loc.any_of().empty() ? 8 : 0)          //
                  | (loc.has_shardmap() ? 16 : 0)            //
                  | (loc.has_sharding() ? 32 : 0));

  if (loc.has_shardmap()) {
    m.AppendAligned(loc.shardmap().output() |
                    static_cast<uint64_t>(loc.shardmap().stmt()) << 20);
  }
  if (loc.has_sharding()) {
    uint64_t num = 0;
    switch (loc.sharding().type_case()) {
      case Sharding::kModShard:
        num = loc.sharding().mod_shard();
        break;
      case Sharding::kRangeSplit:
        num = loc.sharding().range_split();
        break;
      case Sharding::kNumShards:
        num = loc.sharding().num_shards();
        break;
      default:
        num = 0;
        break;
    }
    m.AppendAligned(static_cast<uint64_t>(loc.sharding().type_case()) |
                    (num << 3));
  }

  auto add_string = [&m](absl::string_view s) {
    if (!s.empty()) {
      m.Append(s.data(), s.size());
    }
  };

  add_string(loc.address());
  add_string(loc.lb_policy());

  // We do not include any_of since it is complicated to compute a hash
  // value that is not sensitive to order and duplication.
  return m.GetHash();
}

更优内存表示§

对关键数据结构的内存占用和缓存占用进行周密考量,往往能带来显著节省。以下数据结构通过减少缓存行访问次数来支持常用操作。此处的优化可实现:(a) 避免代价高昂的缓存未命中 (b) 降低内存总线流量,从而加速目标程序及同机运行的其他程序。这些结构采用的通用技术在设计自有数据结构时可能有所裨益。

紧凑数据结构§

对频繁访问或占用大量应用内存的数据采用紧凑表示形式。紧凑表示能显著降低内存占用,通过减少缓存行访问次数和内存总线带宽消耗来提升性能。但需警惕缓存行竞争问题。

内存布局§

对于占用大量内存或缓存空间的类型,需仔细规划其内存布局。

  • 重排字段顺序以减少不同对齐要求字段间的填充(参见类布局讨论)。
  • 在存储数据能容纳较小类型时,使用更小的数值类型。
  • 枚举值有时会占用整个字,除非特别注意。考虑采用更小的表示方式(例如使用 enum class OpType : uint8_t { ... } 替代 enum class OpType { ... })。
  • 将频繁共同访问的字段排列得更靠近——这将减少常见操作中触及的缓存行数量。
  • 将高频读取的只读字段与高频修改的可变字段分离,避免对可变字段的写操作导致附近缓存中的只读字段被替换。
  • 将冷数据移离热数据区域,可通过以下方式实现:将冷数据置于结构体末尾、通过间接层隔离,或存放于独立数组中。
  • 考虑通过位级和字节级编码压缩数据以减少占用字节数。此操作较为复杂,仅当目标数据封装在经过充分测试的模块内且能显著降低整体内存占用时才实施。同时需警惕副作用,如高频数据对齐不足或访问压缩表示形式的代码开销增加。通过基准测试验证此类变更。

用索引替代指针§

在现代64位机器上,指针占用64位空间。若数据结构中充斥指针,T*的间接引用极易消耗大量内存。建议改用整数索引访问数组T[]或其他数据结构。此举不仅能缩小引用占用空间(当索引数小于等于32位时),更能使所有T[]元素存储连续,通常能提升缓存局部性。

批量存储§

避免使用为每个存储元素单独分配对象的数据结构(如C中的std::mapstd::unordered_map)。建议采用分块或扁平化表示的类型,将多个元素存储在内存邻近区域(如C中的std::vectorabsl::flat_hash_{map,set})。此类类型通常具有更优的缓存行为,且分配器开销更低。

一种有效技术是将元素划分为块,每个块固定容纳若干元素。此技术能在保持良好渐近行为的同时,显著降低数据结构的缓存占用。

某些数据结构仅需单个块即可容纳所有元素(如字符串和向量)。其他类型(如absl::flat_hash_map)也采用此技术。

内联存储§

某些容器类型专为存储少量元素而优化。这类类型在顶层预留少量元素空间,当元素数量较少时可完全避免分配操作。当此类类型实例频繁构造(例如作为高频代码中的栈变量)或存在大量同时存活实例时,此特性尤为有效。若容器通常仅存储少量元素,建议选用内联存储类型(如InlinedVector)。

注意事项:当sizeof(T)较大时,内联存储容器可能并非最佳选择,因其内联后备存储空间会显著增大。

冗余嵌套映射§

有时可将嵌套映射数据结构替换为带复合键的单层映射,这能显著降低查找和插入成本。

通过将 btree<a,btree<b,c>> 转换为 btree<pair<a,b>,c>,可减少分配操作并优化缓存占用。

graph_splitter.cc

absl::btree_map<std::string, absl::btree_map<std::string, OpDef>> ops;
// The btree maps from {package_name, op_name} to its const Opdef*.
absl::btree_map<std::pair<absl::string_view, absl::string_view>,
                const OpDef*>
    ops;

注意事项:若首级映射键值过大,保留嵌套映射更优:

切换至嵌套映射后,微基准测试性能提升76%。

此前我们采用单层哈希表,其键由路径(字符串)及若干数值子键组成。平均每个路径约出现在1000个键中。现将哈希表拆分为两级:第一级以路径为键,每个二级哈希表仅存储特定路径的子键到数据映射关系。此方案使路径存储内存消耗减少1000倍,同时显著提升了同一路径下批量访问多个子键的操作速度。

区域§

竞技场有助于降低内存分配成本,同时还能将独立分配的项目紧凑地排列在相邻位置(通常占用更少的缓存行),并消除大部分销毁开销。对于包含大量子对象的复杂数据结构,竞技场往往效果最佳。建议为竞技场设置适当的初始大小,这有助于减少分配操作。

注意事项:若将过多短寿命对象放入长寿命区域,容易导致内存占用无谓膨胀。

用数组替代映射§

若映射的域可由小整数表示或属于枚举类型,或映射元素极少时,可考虑用数组或某种向量替代映射。

使用数组替代flat_map。

rtp_controller.h

const gtl::flat_map<int, int> payload_type_to_clock_frequency_;
// A map (implemented as a simple array) indexed by payload_type to clock freq
// for that paylaod type (or 0)
struct PayloadTypeToClockRateMap {
  int map[128];
};
...
const PayloadTypeToClockRateMap payload_type_to_clock_frequency_;

用位向量替代集合§

若集合的域可由小整数表示,则可用位向量替代(InlinedBitVector常是优选方案)。通过位运算(或运算表示并集,与运算表示交集等)可高效实现集合运算。

Spanner 布局系统。将 dense\_hash\_set<ZoneId> 替换为每区域占用一位的位向量。

zone_set.h

class ZoneSet: public dense_hash_set<ZoneId> {
 public:
  ...
  bool Contains(ZoneId zone) const {
    return count(zone) > 0;
  }
class ZoneSet {
  ...
  // Returns true iff "zone" is contained in the set
  bool ContainsZone(ZoneId zone) const {
    return zone < b_.size() && b_.get_bit(zone);
  }
  ...
 private:
  int size_;          // Number of zones inserted
  util::bitmap::InlinedBitVector<256> b_;

基准测试结果:

CPU: AMD Opteron (4 cores) dL1:64KB dL2:1024KB
Benchmark                          Base (ns)  New (ns) Improvement
------------------------------------------------------------------
BM_Evaluate/1                            960       676    +29.6%
BM_Evaluate/2                           1661      1138    +31.5%
BM_Evaluate/3                           2305      1640    +28.9%
BM_Evaluate/4                           3053      2135    +30.1%
BM_Evaluate/5                           3780      2665    +29.5%
BM_Evaluate/10                          7819      5739    +26.6%
BM_Evaluate/20                         17922     12338    +31.2%
BM_Evaluate/40                         36836     26430    +28.2%
使用位矩阵替代哈希表来追踪操作数间的可达性属性。

hlo_computation.h

using TransitiveOperandMap =
    std::unordered_map<const HloInstruction*,
                       std::unordered_set<const HloInstruction*>>;
class HloComputation::ReachabilityMap {
  ...
  // dense id assignment from HloInstruction* to number
  tensorflow::gtl::FlatMap<const HloInstruction*, int> ids_;
  // matrix_(a,b) is true iff b is reachable from a
  tensorflow::core::Bitmap matrix_;
};

减少内存分配§

内存分配会增加开销:

  1. 延长分配器处理时间。
  2. 新分配的对象可能需要耗费资源的初始化,且在不再需要时可能需要对应的高成本销毁操作。
  3. 每次分配往往占据新的缓存行,因此分散在多个独立分配中的数据会比集中在较少分配中的数据占用更大缓存空间。

垃圾回收运行时有时通过将连续分配顺序放置在内存中来规避问题#3。

避免不必要的分配§

减少分配可使基准测试吞吐量提升21%。

memory_manager.cc

LiveTensor::LiveTensor(tf::Tensor t, std::shared_ptr<const DeviceInfo> dinfo,
                       bool is_batched)
    : tensor(std::move(t)),
      device_info(dinfo ? std::move(dinfo) : std::make_shared<DeviceInfo>()),
      is_batched(is_batched) {
static const std::shared_ptr<DeviceInfo>& empty_device_info() {
  static std::shared_ptr<DeviceInfo>* result =
      new std::shared_ptr<DeviceInfo>(new DeviceInfo);
  return *result;
}

LiveTensor::LiveTensor(tf::Tensor t, std::shared_ptr<const DeviceInfo> dinfo,
                       bool is_batched)
    : tensor(std::move(t)), is_batched(is_batched) {
  if (dinfo) {
    device_info = std::move(dinfo);
  } else {
    device_info = empty_device_info();
  }
尽可能使用静态分配的零向量,而非分配向量后再填充零值。

embedding_executor_8bit.cc

// The actual implementation of the EmbeddingLookUpT using template parameters
// instead of object members to improve the performance.
template <bool Mean, bool SymmetricInputRange>
static tensorflow::Status EmbeddingLookUpT(...) {
    ...
  std::unique_ptr<tensorflow::quint8[]> zero_data(
      new tensorflow::quint8[max_embedding_width]);
  memset(zero_data.get(), 0, sizeof(tensorflow::quint8) * max_embedding_width);
// A size large enough to handle most embedding widths
static const int kTypicalMaxEmbedding = 256;
static tensorflow::quint8 static_zero_data[kTypicalMaxEmbedding];  // All zeroes
...
// The actual implementation of the EmbeddingLookUpT using template parameters
// instead of object members to improve the performance.
template <bool Mean, bool SymmetricInputRange>
static tensorflow::Status EmbeddingLookUpT(...) {
    ...
  std::unique_ptr<tensorflow::quint8[]> zero_data_backing(nullptr);

  // Get a pointer to a memory area with at least
  // "max_embedding_width" quint8 zero values.
  tensorflow::quint8* zero_data;
  if (max_embedding_width <= ARRAYSIZE(static_zero_data)) {
    // static_zero_data is big enough so we don't need to allocate zero data
    zero_data = &static_zero_data[0];
  } else {
    // static_zero_data is not big enough: we need to allocate zero data
    zero_data_backing =
        absl::make_unique<tensorflow::quint8[]>(max_embedding_width);
    memset(zero_data_backing.get(), 0,
           sizeof(tensorflow::quint8) * max_embedding_width);
    zero_data = zero_data_backing.get();
  }

此外,当对象生命周期受作用域限制时,优先采用栈分配而非堆分配(但需注意大型对象的栈帧大小)。

容器的预分配与动态扩展§

当向量(或其他容器类型)的最大或预期最大容量可预知时,应预先分配容器的存储空间(例如在C++中使用resizereserve)。

预先分配向量大小并填充,而非执行N次push_back操作。

indexblockdecoder.cc

for (int i = 0; i < ndocs-1; i++) {
  uint32 delta;
  ERRORCHECK(b->GetRice(rice_base, &delta));
  docs_.push_back(DocId(my_shard_ + (base + delta) * num_shards_));
  base = base + delta + 1;
}
docs_.push_back(last_docid_);
docs_.resize(ndocs);
DocId* docptr = &docs_[0];
for (int i = 0; i < ndocs-1; i++) {
  uint32 delta;
  ERRORCHECK(b.GetRice(rice_base, &delta));
  *docptr = DocId(my_shard_ + (base + delta) * num_shards_);
  docptr++;
  base = base + delta + 1;
}
*docptr = last_docid_;

注意事项:切勿使用resizereserve逐个元素扩展容器,此举可能导致二次增长行为。若元素构造成本较高,建议优先采用初始reserve配合多次push_backemplace_back的组合方式,而非初始resize——后者将使构造函数调用次数翻倍。

尽可能避免复制§

  • 尽可能优先采用移动复制而非复制数据结构。
  • 若生命周期不是问题,在临时数据结构中存储指针或索引而非对象副本。例如:若使用局部映射从传入的原语列表中筛选数据集,可让映射仅存储原语指针而非复制可能深度嵌套的数据。另一常见场景是排序索引向量而非直接排序大型对象向量,后者将产生显著的复制/移动开销。
通过gRPC接收张量时避免额外复制。

发送约400KB张量的基准测试显示性能提升约10-15%:

Benchmark              Time(ns)    CPU(ns) Iterations
-----------------------------------------------------
BM_RPC/30/98k_mean    148764691 1369998944       1000
Benchmark              Time(ns)    CPU(ns) Iterations
-----------------------------------------------------
BM_RPC/30/98k_mean    131595940 1216998084       1000
移动大型选项结构而非复制。

index.cc

return search_iterators::DocPLIteratorFactory::Create(opts);
return search_iterators::DocPLIteratorFactory::Create(std::move(opts));
使用 std::sort 替代 std::stable_sort,可避免稳定排序实现内部的复制操作。

encoded-vector-hits.h

std::stable_sort(hits_.begin(), hits_.end(),
                 gtl::OrderByField(&HitWithPayloadOffset::docid));
struct HitWithPayloadOffset {
  search_iterators::LocalDocId64 docid;
  int first_payload_offset;  // offset into the payload vector.
  int num_payloads;

  bool operator<(const HitWithPayloadOffset& other) const {
    return (docid < other.docid) ||
           (docid == other.docid &&
            first_payload_offset < other.first_payload_offset);
  }
};
    ...
    std::sort(hits_.begin(), hits_.end());

重用临时对象§

在循环内部声明的容器或对象会在每次循环迭代时被重新创建。这可能导致昂贵的构造、销毁和调整大小操作。将声明提升到循环外部可实现复用,并能显著提升性能。(由于语言语义或无法确保程序等效性,编译器通常无法自行执行此类提升。)

将变量定义移出循环迭代范围。

autofdo_profile_utils.h

auto iterator = absl::WrapUnique(sstable->GetIterator());
while (!iterator->done()) {
  T profile;
  if (!profile.ParseFromString(iterator->value_view())) {
    return absl::InternalError(
        "Failed to parse mem_block to specified profile type.");
  }
  ...
  iterator->Next();
}
auto iterator = absl::WrapUnique(sstable->GetIterator());
T profile;
while (!iterator->done()) {
  if (!profile.ParseFromString(iterator->value_view())) {
    return absl::InternalError(
        "Failed to parse mem_block to specified profile type.");
  }
  ...
  iterator->Next();
}
在循环外部定义 protobuf 变量,使其分配的存储空间可在循环迭代间复用。

stats-router.cc

for (auto& r : routers_to_update) {
  ...
  ResourceRecord record;
  {
    MutexLock agg_lock(r.agg->mutex());
    r.agg->AddResourceRecordUsages(measure_indices, &record);
  }
  ...
}
ResourceRecord record;
for (auto& r : routers_to_update) {
  ...
  record.Clear();
  {
    MutexLock agg_lock(r.agg->mutex());
    r.agg->AddResourceRecordUsages(measure_indices, &record);
  }
  ...
}
重复序列化至同一 std::string。

program_rep.cc

std::string DeterministicSerialization(const proto2::Message& m) {
  std::string result;
  proto2::io::StringOutputStream sink(&result);
  proto2::io::CodedOutputStream out(&sink);
  out.SetSerializationDeterministic(true);
  m.SerializePartialToCodedStream(&out);
  return result;
}
absl::string_view DeterministicSerializationTo(const proto2::Message& m,
                                               std::string* scratch) {
  scratch->clear();
  proto2::io::StringOutputStream sink(scratch);
  proto2::io::CodedOutputStream out(&sink);
  out.SetSerializationDeterministic(true);
  m.SerializePartialToCodedStream(&out);
  return absl::string_view(*scratch);
}

注意事项:protobuf、string、vector、容器等倾向于增长至其存储过的最大值的大小。因此定期重建它们(例如每使用N次后)有助于降低内存需求和重新初始化成本。

避免不必要的工作§

提升性能最有效的途径之一,或许就是规避非必要操作。具体手段包括:为常见场景创建专用代码路径以规避通用高成本计算、预先计算、延迟执行直至真正需要、将操作移至执行频率较低的代码段等。下文将按典型类别归纳此类通用策略的具体示例。

常见情况的快速路径§

代码常需覆盖所有情况,但其中部分子集明显更简单且更常见。例如vector::push_back通常预留足够空间容纳新元素,但当空间不足时仍包含底层存储扩容的代码。通过优化代码结构,可在不显著影响罕见场景性能的前提下,加速常见简单场景的执行。

让快速路径覆盖更多常见场景。

为尾部单个ASCII字节添加处理逻辑,而非仅处理四字节倍数。此举可避免对5字节等全ASCII字符串调用较慢的通用处理函数。

utf8statetable.cc

// Scan a UTF-8 stringpiece based on state table.
// Always scan complete UTF-8 characters
// Set number of bytes scanned. Return reason for exiting
// OPTIMIZED for case of 7-bit ASCII 0000..007f all valid
int UTF8GenericScanFastAscii(const UTF8ScanObj* st, absl::string_view str,
                             int* bytes_consumed) {
                             ...
  int exit_reason;
  do {
    //  Skip 8 bytes of ASCII at a whack; no endianness issue
    while ((src_limit - src >= 8) &&
           (((UNALIGNED_LOAD32(src + 0) | UNALIGNED_LOAD32(src + 4)) &
             0x80808080) == 0)) {
      src += 8;
    }
    //  Run state table on the rest
    int rest_consumed;
    exit_reason = UTF8GenericScan(
        st, absl::ClippedSubstr(str, src - initial_src), &rest_consumed);
    src += rest_consumed;
  } while (exit_reason == kExitDoAgain);

  *bytes_consumed = src - initial_src;
  return exit_reason;
}
// Scan a UTF-8 stringpiece based on state table.
// Always scan complete UTF-8 characters
// Set number of bytes scanned. Return reason for exiting
// OPTIMIZED for case of 7-bit ASCII 0000..007f all valid
int UTF8GenericScanFastAscii(const UTF8ScanObj* st, absl::string_view str,
                             int* bytes_consumed) {
                             ...
  int exit_reason = kExitOK;
  do {
    //  Skip 8 bytes of ASCII at a whack; no endianness issue
    while ((src_limit - src >= 8) &&
           (((UNALIGNED_LOAD32(src + 0) | UNALIGNED_LOAD32(src + 4)) &
             0x80808080) == 0)) {
      src += 8;
    }
    while (src < src_limit && Is7BitAscii(*src)) { // Skip ASCII bytes
      src++;
    }
    if (src < src_limit) {
      //  Run state table on the rest
      int rest_consumed;
      exit_reason = UTF8GenericScan(
          st, absl::ClippedSubstr(str, src - initial_src), &rest_consumed);
      src += rest_consumed;
    }
  } while (exit_reason == kExitDoAgain);

  *bytes_consumed = src - initial_src;
  return exit_reason;
}
为InlinedVector简化快速路径实现。

inlined_vector.h

auto Storage<T, N, A>::Resize(ValueAdapter values, size_type new_size) -> void {
  StorageView storage_view = MakeStorageView();

  IteratorValueAdapter<MoveIterator> move_values(
      MoveIterator(storage_view.data));

  AllocationTransaction allocation_tx(GetAllocPtr());
  ConstructionTransaction construction_tx(GetAllocPtr());

  absl::Span<value_type> construct_loop;
  absl::Span<value_type> move_construct_loop;
  absl::Span<value_type> destroy_loop;

  if (new_size > storage_view.capacity) {
  ...
  } else if (new_size > storage_view.size) {
    construct_loop = {storage_view.data + storage_view.size,
                      new_size - storage_view.size};
  } else {
    destroy_loop = {storage_view.data + new_size, storage_view.size - new_size};
  }
auto Storage<T, N, A>::Resize(ValueAdapter values, size_type new_size) -> void {
  StorageView storage_view = MakeStorageView();
  auto* const base = storage_view.data;
  const size_type size = storage_view.size;
  auto* alloc = GetAllocPtr();
  if (new_size <= size) {
    // Destroy extra old elements.
    inlined_vector_internal::DestroyElements(alloc, base + new_size,
                                             size - new_size);
  } else if (new_size <= storage_view.capacity) {
    // Construct new elements in place.
    inlined_vector_internal::ConstructElements(alloc, base + size, &values,
                                               new_size - size);
  } else {
  ...
  }
为1-D至4-D张量初始化的常见场景提供快速路径。

tensor_shape.cc

template <class Shape>
TensorShapeBase<Shape>::TensorShapeBase(gtl::ArraySlice<int64> dim_sizes) {
  set_tag(REP16);
  set_data_type(DT_INVALID);
  set_ndims_byte(0);
  set_num_elements(1);
  for (int64 s : dim_sizes) {
    AddDim(internal::SubtleMustCopy(s));
  }
}
template <class Shape>
void TensorShapeBase<Shape>::InitDims(gtl::ArraySlice<int64> dim_sizes) {
  DCHECK_EQ(tag(), REP16);

  // Allow sizes that are under kint64max^0.25 so that 4-way multiplication
  // below cannot overflow.
  static const uint64 kMaxSmall = 0xd744;
  static_assert(kMaxSmall * kMaxSmall * kMaxSmall * kMaxSmall <= kint64max,
                "bad overflow check");
  bool large_size = false;
  for (auto s : dim_sizes) {
    if (s > kMaxSmall) {
      large_size = true;
      break;
    }
  }

  if (!large_size) {
    // Every size fits in 16 bits; use fast-paths for dims in {1,2,3,4}.
    uint16* dst = as16()->dims_;
    switch (dim_sizes.size()) {
      case 1: {
        set_ndims_byte(1);
        const int64 size = dim_sizes[0];
        const bool neg = Set16(kIsPartial, dst, 0, size);
        set_num_elements(neg ? -1 : size);
        return;
      }
      case 2: {
        set_ndims_byte(2);
        const int64 size0 = dim_sizes[0];
        const int64 size1 = dim_sizes[1];
        bool neg = Set16(kIsPartial, dst, 0, size0);
        neg |= Set16(kIsPartial, dst, 1, size1);
        set_num_elements(neg ? -1 : (size0 * size1));
        return;
      }
      case 3: {
      ...
      }
      case 4: {
      ...
      }
    }
  }

  set_ndims_byte(0);
  set_num_elements(1);
  for (int64 s : dim_sizes) {
    AddDim(internal::SubtleMustCopy(s));
  }
}
使变长整型解析器的快速路径仅覆盖1字节情况,而非同时覆盖1字节和2字节情况。

缩减(内联)快速路径的体积可降低代码规模和指令缓存压力,从而提升性能。

parse_context.h

template <typename T>
PROTOBUF_NODISCARD const char* VarintParse(const char* p, T* out) {
  auto ptr = reinterpret_cast<const uint8_t*>(p);
  uint32_t res = ptr[0];
  if (!(res & 0x80)) {
    *out = res;
    return p + 1;
  }
  uint32_t byte = ptr[1];
  res += (byte - 1) << 7;
  if (!(byte & 0x80)) {
    *out = res;
    return p + 2;
  }
  return VarintParseSlow(p, res, out);
}
template <typename T>
PROTOBUF_NODISCARD const char* VarintParse(const char* p, T* out) {
  auto ptr = reinterpret_cast<const uint8_t*>(p);
  uint32_t res = ptr[0];
  if (!(res & 0x80)) {
    *out = res;
    return p + 1;
  }
  return VarintParseSlow(p, res, out);
}

parse_context.cc

std::pair<const char*, uint32_t> VarintParseSlow32(const char* p,
                                                   uint32_t res) {
  for (std::uint32_t i = 2; i < 5; i++) {
  ...
}
...
std::pair<const char*, uint64_t> VarintParseSlow64(const char* p,
                                                   uint32_t res32) {
  uint64_t res = res32;
  for (std::uint32_t i = 2; i < 10; i++) {
  ...
}
std::pair<const char*, uint32_t> VarintParseSlow32(const char* p,
                                                   uint32_t res) {
  for (std::uint32_t i = 1; i < 5; i++) {
  ...
}
...
std::pair<const char*, uint64_t> VarintParseSlow64(const char* p,
                                                   uint32_t res32) {
  uint64_t res = res32;
  for (std::uint32_t i = 1; i < 10; i++) {
  ...
}
若未发生错误,则跳过RPC_Stats_Measurement中大量计算工作。

rpc-stats.h

struct RPC_Stats_Measurement {
  ...
  double errors[RPC::NUM_ERRORS];
struct RPC_Stats_Measurement {
  ...
  double get_errors(int index) const { return errors[index]; }
  void set_errors(int index, double value) {
    errors[index] = value;
    any_errors_set = true;
  }
 private:
  ...
  // We make this private so that we can keep track of whether any of
  // these values have been set to non-zero values.
  double errors[RPC::NUM_ERRORS];
  bool any_errors_set;  // True iff any of the errors[i] values are non-zero

rpc-stats.cc

void RPC_Stats_Measurement::operator+=(const RPC_Stats_Measurement& x) {
  ...
  for (int i = 0; i < RPC::NUM_ERRORS; ++i) {
    errors[i] += x.errors[i];
  }
}
void RPC_Stats_Measurement::operator+=(const RPC_Stats_Measurement& x) {
  ...
  if (x.any_errors_set) {
    for (int i = 0; i < RPC::NUM_ERRORS; ++i) {
      errors[i] += x.errors[i];
    }
    any_errors_set = true;
  }
}
对字符串首字节进行数组查找,通常可避免完整字符串指纹识别。

soft-tokens-helper.cc

bool SoftTokensHelper::IsSoftToken(const StringPiece& token) const {
  return soft_tokens_.find(Fingerprint(token.data(), token.size())) !=
      soft_tokens_.end();
}

soft-tokens-helper.h

class SoftTokensHelper {
 ...
 private:
  ...
  // Since soft tokens are mostly punctuation-related, for performance
  // purposes, we keep an array filter_.  filter_[i] is true iff any
  // of the soft tokens start with the byte value 'i'.  This avoids
  // fingerprinting a term in the common case, since we can just do an array
  // lookup based on the first byte, and if filter_[b] is false, then
  // we can return false immediately.
  bool          filter_[256];
  ...
};

inline bool SoftTokensHelper::IsSoftToken(const StringPiece& token) const {
  if (token.size() >= 1) {
    char first_char = token.data()[0];
    if (!filter_[first_char]) {
      return false;
    }
  }
  return IsSoftTokenFallback(token);
}

soft-tokens-helper.cc

bool SoftTokensHelper::IsSoftTokenFallback(const StringPiece& token) const {
  return soft_tokens_.find(Fingerprint(token.data(), token.size())) !=
      soft_tokens_.end();
}

预先计算高成本信息§

预先计算TensorFlow图执行节点属性,以便快速排除某些异常情况。

executor.cc

struct NodeItem {
  ...
  bool kernel_is_expensive = false;  // True iff kernel->IsExpensive()
  bool kernel_is_async = false;      // True iff kernel->AsAsync() != nullptr
  bool is_merge = false;             // True iff IsMerge(node)
  ...
  if (IsEnter(node)) {
  ...
  } else if (IsExit(node)) {
  ...
  } else if (IsNextIteration(node)) {
  ...
  } else {
    // Normal path for most nodes
    ...
  }
struct NodeItem {
  ...
  bool kernel_is_expensive : 1;  // True iff kernel->IsExpensive()
  bool kernel_is_async : 1;      // True iff kernel->AsAsync() != nullptr
  bool is_merge : 1;             // True iff IsMerge(node)
  bool is_enter : 1;             // True iff IsEnter(node)
  bool is_exit : 1;              // True iff IsExit(node)
  bool is_control_trigger : 1;   // True iff IsControlTrigger(node)
  bool is_sink : 1;              // True iff IsSink(node)
  // True iff IsEnter(node) || IsExit(node) || IsNextIteration(node)
  bool is_enter_exit_or_next_iter : 1;
  ...
  if (!item->is_enter_exit_or_next_iter) {
    // Fast path for nodes types that don't need special handling
    DCHECK_EQ(input_frame, output_frame);
    ...
  } else if (item->is_enter) {
  ...
  } else if (item->is_exit) {
  ...
  } else {
    DCHECK(IsNextIteration(node));
    ...
  }
预先计算256元素数组并在三元组初始化时使用。

byte_trigram_classifier.cc

void ByteTrigramClassifier::VerifyModel(void) const {
  ProbT class_sums[num_classes_];
  for (int cls = 0; cls < num_classes_; cls++) {
    class_sums[cls] = 0;
  }
  for (ByteNgramId id = 0; id < trigrams_.num_trigrams(); id++) {
    for (int cls = 0; cls < num_classes_; ++cls) {
      class_sums[cls] += Prob(trigram_probs_[id].log_probs[cls]);
    }
  }
  ...
}                         
void ByteTrigramClassifier::VerifyModel(void) const {
  CHECK_EQ(sizeof(ByteLogProbT), 1);
  ProbT fast_prob[256];
  for (int b = 0; b < 256; b++) {
    fast_prob[b] = Prob(static_cast<ByteLogProbT>(b));
  }

  ProbT class_sums[num_classes_];
  for (int cls = 0; cls < num_classes_; cls++) {
    class_sums[cls] = 0;
  }
  for (ByteNgramId id = 0; id < trigrams_.num_trigrams(); id++) {
    for (int cls = 0; cls < num_classes_; ++cls) {
      class_sums[cls] += fast_prob[trigram_probs_[id].log_probs[cls]];
    }
  }
  ...
}                         

通用建议:在模块边界检查输入格式错误,避免内部重复检查。

将耗时计算移出循环§

将边界计算移出循环。

literal_linearizer.cc

for (int64 i = 0; i < src_shape.dimensions(dimension_numbers.front());
     ++i) {
int64 dim_front = src_shape.dimensions(dimension_numbers.front());
const uint8* src_buffer_data = src_buffer.data();
uint8* dst_buffer_data = dst_buffer.data();
for (int64 i = 0; i < dim_front; ++i) {

延迟高耗计算§

延迟调用 GetSubSharding 至实际需要时,将 CPU 时间从 43 秒缩短至 2 秒。

sharding_propagation.cc

HloSharding alternative_sub_sharding =
    user.sharding().GetSubSharding(user.shape(), {i});
if (user.operand(i) == &instruction &&
    hlo_sharding_util::IsShardingMoreSpecific(alternative_sub_sharding,
                                              sub_sharding)) {
  sub_sharding = alternative_sub_sharding;
}
if (user.operand(i) == &instruction) {
  // Only evaluate GetSubSharding if this operand is of interest,
  // as it is relatively expensive.
  HloSharding alternative_sub_sharding =
      user.sharding().GetSubSharding(user.shape(), {i});
  if (hlo_sharding_util::IsShardingMoreSpecific(
          alternative_sub_sharding, sub_sharding)) {
    sub_sharding = alternative_sub_sharding;
  }
}
避免频繁更新统计数据;按需计算。

在高频的内存分配/释放调用中不更新统计数据。改为在低频调用的Stats()方法被调用时按需计算统计数据。

将谷歌Web服务器查询处理的预分配节点数从200降至10。

此项简单变更使Web服务器CPU使用率降低7.5%。

querytree.h

static const int kInitParseTreeSize = 200;   // initial size of querynode pool
static const int kInitParseTreeSize = 10;   // initial size of querynode pool
调整搜索顺序后吞吐量提升19%。

旧版搜索系统(约2000年)采用双层索引架构:一层存储全文索引,另一层仅存储标题和锚文本索引。我们曾优先搜索较小的标题/锚文本层。但出乎意料的是,优先搜索更大的全文索引层反而更经济——若全文层检索完毕,即可完全跳过标题/锚文本层(作为全文层的子集)。这种情况发生频率较高,使我们得以减少处理单次查询的平均磁盘寻道次数。

背景信息详见《大规模超文本网络搜索引擎解剖》中关于标题与锚文本处理的讨论。

代码特化§

某些性能敏感的调用场景可能无需通用库提供的全部功能。若能提升性能,建议在此类场景中编写专用代码替代通用代码调用。

Histogram类的自定义打印代码比sprintf快4倍。

该代码因在监控系统收集服务器统计数据时被调用而具有性能敏感性。

histogram_export.cc

void Histogram::PopulateBuckets(const string &prefix,
                                expvar::MapProto *const var) const {
                                ...
  for (int i = min_bucket; i <= max_bucket; ++i) {
    const double count = BucketCount(i);
    if (!export_empty_buckets && count == 0.0) continue;
    acc += count;
    // The label format of exported buckets for discrete histograms
    // specifies an inclusive upper bound, which is the same as in
    // the original Histogram implementation.  This format is not
    // applicable to non-discrete histograms, so a half-open interval
    // is used for them, with "_" instead of "-" as a separator to
    // make possible to distinguish the formats.
    string key =
        options_.export_cumulative_counts() ?
            StringPrintf("%.12g", boundaries_->BucketLimit(i)) :
        options_.discrete() ?
            StringPrintf("%.0f-%.0f",
                         ceil(boundaries_->BucketStart(i)),
                         ceil(boundaries_->BucketLimit(i)) - 1.0) :
            StringPrintf("%.12g_%.12g",
                         boundaries_->BucketStart(i),
                         boundaries_->BucketLimit(i));
    EscapeMapKey(&key);
    const double value = options_.export_cumulative_counts() ? acc : count;
    expvar::AddMapFloat(StrCat(prefix,
                               options_.export_bucket_key_prefix(),
                               key),
                        value * count_mult,
                        var);
  }
// Format "val" according to format.  If "need_escape" is true, then the
// format can produce output with a '.' in it, and the result will be escaped.
// If "need_escape" is false, then the caller guarantees that format is
// such that the resulting number will not have any '.' characters and
// therefore we can avoid calling EscapeKey.
// The function is free to use "*scratch" for scratch space if necessary,
// and the resulting StringPiece may point into "*scratch".
static StringPiece FormatNumber(const char* format,
                                bool need_escape,
                                double val, string* scratch) {
  // This routine is specialized to work with only a limited number of formats
  DCHECK(StringPiece(format) == "%.0f" || StringPiece(format) == "%.12g");

  scratch->clear();
  if (val == trunc(val) && val >= kint32min && val <= kint32max) {
    // An integer for which we can just use StrAppend
    StrAppend(scratch, static_cast<int32>(val));
    return StringPiece(*scratch);
  } else if (isinf(val)) {
    // Infinity, represent as just 'inf'.
    return StringPiece("inf", 3);
  } else {
    // Format according to "format", and possibly escape.
    StringAppendF(scratch, format, val);
    if (need_escape) {
      EscapeMapKey(scratch);
    } else {
      DCHECK(!StringPiece(*scratch).contains("."));
    }
    return StringPiece(*scratch);
  }
}
...
void Histogram::PopulateBuckets(const string &prefix,
                                expvar::MapProto *const var) const {
                                ...
  const string full_key_prefix = StrCat(prefix,
                                        options_.export_bucket_key_prefix());
  string key = full_key_prefix;  // Keys will start with "full_key_prefix".
  string start_scratch;
  string limit_scratch;
  const bool cumul_counts = options_.export_cumulative_counts();
  const bool discrete = options_.discrete();
  for (int i = min_bucket; i <= max_bucket; ++i) {
    const double count = BucketCount(i);
    if (!export_empty_buckets && count == 0.0) continue;
    acc += count;
    // The label format of exported buckets for discrete histograms
    // specifies an inclusive upper bound, which is the same as in
    // the original Histogram implementation.  This format is not
    // applicable to non-discrete histograms, so a half-open interval
    // is used for them, with "_" instead of "-" as a separator to
    // make possible to distinguish the formats.
    key.resize(full_key_prefix.size());  // Start with full_key_prefix.
    DCHECK_EQ(key, full_key_prefix);

    const double limit = boundaries_->BucketLimit(i);
    if (cumul_counts) {
      StrAppend(&key, FormatNumber("%.12g", true, limit, &limit_scratch));
    } else {
      const double start = boundaries_->BucketStart(i);
      if (discrete) {
        StrAppend(&key,
                  FormatNumber("%.0f", false, ceil(start), &start_scratch),
                  "-",
                  FormatNumber("%.0f", false, ceil(limit) - 1.0,
                               &limit_scratch));
      } else {
        StrAppend(&key,
                  FormatNumber("%.12g", true, start, &start_scratch),
                  "_",
                  FormatNumber("%.12g", true, limit, &limit_scratch));
      }
    }
    const double value = cumul_counts ? acc : count;

    // Add to map var
    expvar::AddMapFloat(key, value * count_mult, var);
  }
}
为VLOG(1)、VLOG(2)等添加特化实现,以提升速度并缩小代码体积。

VLOG是代码库中频繁使用的宏。此变更避免在几乎所有调用点额外传递整数常量(当调用点日志级别固定时,如VLOG(1) << ...),从而节省代码空间。

vlog_is_on.h

class VLogSite final {
 public:
  ...
  bool IsEnabled(int level) {
    int stale_v = v_.load(std::memory_order_relaxed);
    if (ABSL_PREDICT_TRUE(level > stale_v)) {
      return false;
    }

    // We put everything other than the fast path, i.e. vlogging is initialized
    // but not on, behind an out-of-line function to reduce code size.
    return SlowIsEnabled(stale_v, level);
  }
  ...
 private:
  ...
  ABSL_ATTRIBUTE_NOINLINE
  bool SlowIsEnabled(int stale_v, int level);
  ...
};
class VLogSite final {
 public:
  ...
  bool IsEnabled(int level) {
    int stale_v = v_.load(std::memory_order_relaxed);
    if (ABSL_PREDICT_TRUE(level > stale_v)) {
      return false;
    }

    // We put everything other than the fast path, i.e. vlogging is initialized
    // but not on, behind an out-of-line function to reduce code size.
    // "level" is almost always a call-site constant, so we can save a bit
    // of code space by special-casing for levels 1, 2, and 3.
#if defined(__has_builtin) && __has_builtin(__builtin_constant_p)
    if (__builtin_constant_p(level)) {
      if (level == 0) return SlowIsEnabled0(stale_v);
      if (level == 1) return SlowIsEnabled1(stale_v);
      if (level == 2) return SlowIsEnabled2(stale_v);
      if (level == 3) return SlowIsEnabled3(stale_v);
      if (level == 4) return SlowIsEnabled4(stale_v);
      if (level == 5) return SlowIsEnabled5(stale_v);
    }
#endif
    return SlowIsEnabled(stale_v, level);
    ...
 private:
  ...
  ABSL_ATTRIBUTE_NOINLINE
  bool SlowIsEnabled(int stale_v, int level);
  ABSL_ATTRIBUTE_NOINLINE bool SlowIsEnabled0(int stale_v);
  ABSL_ATTRIBUTE_NOINLINE bool SlowIsEnabled1(int stale_v);
  ABSL_ATTRIBUTE_NOINLINE bool SlowIsEnabled2(int stale_v);
  ABSL_ATTRIBUTE_NOINLINE bool SlowIsEnabled3(int stale_v);
  ABSL_ATTRIBUTE_NOINLINE bool SlowIsEnabled4(int stale_v);
  ABSL_ATTRIBUTE_NOINLINE bool SlowIsEnabled5(int stale_v);
  ...
};

vlog_is_on.cc

bool VLogSite::SlowIsEnabled0(int stale_v) { return SlowIsEnabled(stale_v, 0); }
bool VLogSite::SlowIsEnabled1(int stale_v) { return SlowIsEnabled(stale_v, 1); }
bool VLogSite::SlowIsEnabled2(int stale_v) { return SlowIsEnabled(stale_v, 2); }
bool VLogSite::SlowIsEnabled3(int stale_v) { return SlowIsEnabled(stale_v, 3); }
bool VLogSite::SlowIsEnabled4(int stale_v) { return SlowIsEnabled(stale_v, 4); }
bool VLogSite::SlowIsEnabled5(int stale_v) { return SlowIsEnabled(stale_v, 5); }
尽可能用简单的前缀匹配替代正则表达式调用。

read_matcher.cc

enum MatchItemType {
  MATCH_TYPE_INVALID,
  MATCH_TYPE_RANGE,
  MATCH_TYPE_EXACT,
  MATCH_TYPE_REGEXP,
};
enum MatchItemType {
  MATCH_TYPE_INVALID,
  MATCH_TYPE_RANGE,
  MATCH_TYPE_EXACT,
  MATCH_TYPE_REGEXP,
  MATCH_TYPE_PREFIX,   // Special type for regexp ".*"
};

read_matcher.cc

p->type = MATCH_TYPE_REGEXP;
term.NonMetaPrefix().CopyToString(&p->prefix);
if (term.RegexpSuffix() == ".*") {
  // Special case for a regexp that matches anything, so we can
  // bypass RE2::FullMatch
  p->type = MATCH_TYPE_PREFIX;
} else {
  p->type = MATCH_TYPE_REGEXP;
使用StrCat而非StringPrintf格式化IP地址。

ipaddress.cc

string IPAddress::ToString() const {
  char buf[INET6_ADDRSTRLEN];

  switch (address_family_) {
    case AF_INET:
      CHECK(inet_ntop(AF_INET, &addr_.addr4, buf, INET6_ADDRSTRLEN) != NULL);
      return buf;
    case AF_INET6:
      CHECK(inet_ntop(AF_INET6, &addr_.addr6, buf, INET6_ADDRSTRLEN) != NULL);
      return buf;
    case AF_UNSPEC:
      LOG(DFATAL) << "Calling ToString() on an empty IPAddress";
      return "";
    default:
      LOG(FATAL) << "Unknown address family " << address_family_;
  }
}
...
string IPAddressToURIString(const IPAddress& ip) {
  switch (ip.address_family()) {
    case AF_INET6:
      return StringPrintf("[%s]", ip.ToString().c_str());
    default:
      return ip.ToString();
  }
}
...
string SocketAddress::ToString() const {
  return IPAddressToURIString(host_) + StringPrintf(":%u", port_);
}
string IPAddress::ToString() const {
  char buf[INET6_ADDRSTRLEN];

  switch (address_family_) {
    case AF_INET: {
      uint32 addr = gntohl(addr_.addr4.s_addr);
      int a1 = static_cast<int>((addr >> 24) & 0xff);
      int a2 = static_cast<int>((addr >> 16) & 0xff);
      int a3 = static_cast<int>((addr >> 8) & 0xff);
      int a4 = static_cast<int>(addr & 0xff);
      return StrCat(a1, ".", a2, ".", a3, ".", a4);
    }
    case AF_INET6:
      CHECK(inet_ntop(AF_INET6, &addr_.addr6, buf, INET6_ADDRSTRLEN) != NULL);
      return buf;
    case AF_UNSPEC:
      LOG(DFATAL) << "Calling ToString() on an empty IPAddress";
      return "";
    default:
      LOG(FATAL) << "Unknown address family " << address_family_;
  }
}
...
string IPAddressToURIString(const IPAddress& ip) {
  switch (ip.address_family()) {
    case AF_INET6:
      return StrCat("[", ip.ToString(), "]");
    default:
      return ip.ToString();
  }
}
...
string SocketAddress::ToString() const {
  return StrCat(IPAddressToURIString(host_), ":", port_);
}

使用缓存避免重复工作§

基于预计算的大型序列化协议指纹进行缓存。

dp_ops.cc

InputOutputMappingProto mapping_proto;
PLAQUE_OP_REQUIRES(
    mapping_proto.ParseFromStringPiece(GetAttrMappingProto(state)),
    absl::InternalError("Failed to parse InputOutputMappingProto"));
ParseMapping(mapping_proto);
uint64 mapping_proto_fp = GetAttrMappingProtoFp(state);
{
  absl::MutexLock l(&fp_to_iometa_mu);
  if (fp_to_iometa == nullptr) {
    fp_to_iometa =
        new absl::flat_hash_map<uint64, std::unique_ptr<ProgramIOMetadata>>;
  }
  auto it = fp_to_iometa->find(mapping_proto_fp);
  if (it != fp_to_iometa->end()) {
    io_metadata_ = it->second.get();
  } else {
    auto serial_proto = GetAttrMappingProto(state);
    DCHECK_EQ(mapping_proto_fp, Fingerprint(serial_proto));
    InputOutputMappingProto mapping_proto;
    PLAQUE_OP_REQUIRES(
        mapping_proto.ParseFromStringPiece(GetAttrMappingProto(state)),
        absl::InternalError("Failed to parse InputOutputMappingProto"));
    auto io_meta = ParseMapping(mapping_proto);
    io_metadata_ = io_meta.get();
    (*fp_to_iometa)[mapping_proto_fp] = std::move(io_meta);
  }
}

简化编译器工作§

编译器可能因需对代码整体行为做出保守假设,或无法在速度与体积间做出正确权衡,而难以穿透抽象层进行优化。应用程序开发者通常更了解系统行为,可通过重写代码使其在更底层运行来辅助编译器。但仅当性能分析显示问题时才需如此操作,因为编译器通常能自行完成正确优化。查看性能关键例程生成的汇编代码,有助于判断编译器是否“优化得当”。Pprof提供极具价值的源代码与反汇编交错显示功能,并附带性能数据注释。

以下技术可能有所助益:

  1. 避免在热点函数中调用其他函数(可减少编译器的帧设置开销)。
  2. 将慢路径代码移至单独的尾调用函数。
  3. 在大量使用前将少量数据复制到局部变量。这可使编译器认为数据不存在别名冲突,从而提升自动向量化和寄存器分配效率。
  4. 手动展开极热循环。
通过将 absl::Span 替换为底层数组的原始指针,加速 ShapeUtil::ForEachState 实现。

shape_util.h

struct ForEachState {
  ForEachState(const Shape& s, absl::Span<const int64_t> b,
               absl::Span<const int64_t> c, absl::Span<const int64_t> i);
  ~ForEachState();

  const Shape& shape;
  const absl::Span<const int64_t> base;
  const absl::Span<const int64_t> count;
  const absl::Span<const int64_t> incr;
struct ForEachState {
  ForEachState(const Shape& s, absl::Span<const int64_t> b,
               absl::Span<const int64_t> c, absl::Span<const int64_t> i);
  inline ~ForEachState() = default;

  const Shape& shape;
  // Pointers to arrays of the passed-in spans
  const int64_t* const base;
  const int64_t* const count;
  const int64_t* const incr;
手动展开循环冗余校验(CRC)计算循环。

crc.cc

void CRC32::Extend(uint64 *lo, uint64 *hi, const void *bytes, size_t length)
                      const {
                      ...
  // Process bytes 4 at a time
  while ((p + 4) <= e) {
    uint32 c = l ^ WORD(p);
    p += 4;
    l = this->table3_[c & 0xff] ^
        this->table2_[(c >> 8) & 0xff] ^
        this->table1_[(c >> 16) & 0xff] ^
        this->table0_[c >> 24];
  }

  // Process the last few bytes
  while (p != e) {
    int c = (l & 0xff) ^ *p++;
    l = this->table0_[c] ^ (l >> 8);
  }
  *lo = l;
}
void CRC32::Extend(uint64 *lo, uint64 *hi, const void *bytes, size_t length)
                      const {
                      ...
#define STEP {                                  \
    uint32 c = l ^ WORD(p);                     \
    p += 4;                                     \
    l = this->table3_[c & 0xff] ^               \
        this->table2_[(c >> 8) & 0xff] ^        \
        this->table1_[(c >> 16) & 0xff] ^       \
        this->table0_[c >> 24];                 \
}

  // Process bytes 16 at a time
  while ((e-p) >= 16) {
    STEP;
    STEP;
    STEP;
    STEP;
  }

  // Process bytes 4 at a time
  while ((p + 4) <= e) {
    STEP;
  }
#undef STEP

  // Process the last few bytes
  while (p != e) {
    int c = (l & 0xff) ^ *p++;
    l = this->table0_[c] ^ (l >> 8);
  }
  *lo = l;
}
解析Spanner键值时每次处理四个字符。
  1. 手动展开循环以每次处理四个字符,而非使用memchr
  2. 手动展开循环以查找名称的分隔部分
  3. 使用’#’分隔符时,反向查找名称分隔部分(而非正向),因为名称首部分通常最长。

key.cc

void Key::InitSeps(const char* start) {
  const char* base = &rep_[0];
  const char* limit = base + rep_.size();
  const char* s = start;

  DCHECK_GE(s, base);
  DCHECK_LT(s, limit);

  for (int i = 0; i < 3; i++) {
    s = (const char*)memchr(s, '#', limit - s);
    DCHECK(s != NULL);
    seps_[i] = s - base;
    s++;
  }
}
inline const char* ScanBackwardsForSep(const char* base, const char* p) {
  while (p >= base + 4) {
    if (p[0] == '#') return p;
    if (p[-1] == '#') return p-1;
    if (p[-2] == '#') return p-2;
    if (p[-3] == '#') return p-3;
    p -= 4;
  }
  while (p >= base && *p != '#') p--;
  return p;
}

void Key::InitSeps(const char* start) {
  const char* base = &rep_[0];
  const char* limit = base + rep_.size();
  const char* s = start;

  DCHECK_GE(s, base);
  DCHECK_LT(s, limit);

  // We go backwards from the end of the string, rather than forwards,
  // since the directory name might be long and definitely doesn't contain
  // any '#' characters.
  const char* p = ScanBackwardsForSep(s, limit - 1);
  DCHECK(*p == '#');
  seps_[2] = p - base;
  p--;

  p = ScanBackwardsForSep(s, p);
  DCHECK(*p == '#');
  seps_[1] = p - base;
  p--;

  p = ScanBackwardsForSep(s, p);
  DCHECK(*p == '#');
  seps_[0] = p - base;
}
通过将ABSL_LOG(FATAL)转换为ABSL_DCHECK(false)避免帧设置开销。

arena_cleanup.h

inline ABSL_ATTRIBUTE_ALWAYS_INLINE size_t Size(Tag tag) {
  if (!EnableSpecializedTags()) return sizeof(DynamicNode);

  switch (tag) {
    case Tag::kDynamic:
      return sizeof(DynamicNode);
    case Tag::kString:
      return sizeof(TaggedNode);
    case Tag::kCord:
      return sizeof(TaggedNode);
    default:
      ABSL_LOG(FATAL) << "Corrupted cleanup tag: " << static_cast<int>(tag);
      return sizeof(DynamicNode);
  }
}
inline ABSL_ATTRIBUTE_ALWAYS_INLINE size_t Size(Tag tag) {
  if (!EnableSpecializedTags()) return sizeof(DynamicNode);

  switch (tag) {
    case Tag::kDynamic:
      return sizeof(DynamicNode);
    case Tag::kString:
      return sizeof(TaggedNode);
    case Tag::kCord:
      return sizeof(TaggedNode);
    default:
      ABSL_DCHECK(false) << "Corrupted cleanup tag: " << static_cast<int>(tag);
      return sizeof(DynamicNode);
  }
}

降低统计信息收集成本§

权衡系统统计信息及其他行为数据的实用价值与维护成本。额外信息虽有助于理解和优化高阶行为,但维护成本可能较高。

可完全移除无用的统计信息。

停止维护SelectServer中关于警报和关闭次数的高成本统计数据。

该变更使设置警报的时间从771纳秒缩短至271纳秒。

selectserver.h

class SelectServer {
 public:
 ...
 protected:
  ...
  scoped_ptr<MinuteTenMinuteHourStat> num_alarms_stat_;
  ...
  scoped_ptr<MinuteTenMinuteHourStat> num_closures_stat_;
  ...
};
// Selectserver class
class SelectServer {
 ...
 protected:
 ...
};

/selectserver.cc

void SelectServer::AddAlarmInternal(Alarmer* alarmer,
                                    int offset_in_ms,
                                    int id,
                                    bool is_periodic) {
                                    ...
  alarms_->insert(alarm);
  num_alarms_stat_->IncBy(1);
  ...
}
void SelectServer::AddAlarmInternal(Alarmer* alarmer,
                                    int offset_in_ms,
                                    int id,
                                    bool is_periodic) {
                                    ...
  alarms_->Add(alarm);
  ...
}

/selectserver.cc

void SelectServer::RemoveAlarm(Alarmer* alarmer, int id) {
      ...
      alarms_->erase(alarm);
      num_alarms_stat_->IncBy(-1);
      ...
}
void SelectServer::RemoveAlarm(Alarmer* alarmer, int id) {
      ...
      alarms_->Remove(alarm);
      ...
}

通常可对系统处理的元素(如RPC请求、输入记录、用户)进行抽样统计。许多子系统采用此方法(tcmalloc分配追踪、/requestz状态页面、Dapper采样)。

在采样时,应酌情降低采样频率。

仅对文档信息请求的样本维护统计数据。

采样机制使我们无需处理绝大多数请求的39个直方图及MinuteTenMinuteHour统计数据。

generic-leaf-stats.cc

... code that touches 39 histograms to update various stats on every request ...
// Add to the histograms periodically
if (TryLockToUpdateHistogramsDocInfo(docinfo_stats, bucket)) {
  // Returns true and grabs bucket->lock only if we should sample this
  // request for maintaining stats
  ... code that touches 39 histograms to update various stats ...
  bucket->lock.Unlock();
}
降低采样率并加速采样决策。

本次变更将采样率从1/10降至1/32。此外,我们仅对采样事件保留执行时间统计数据,并通过采用二进制模数运算加速采样决策。该代码在Google Meet视频会议系统中对每个数据包执行,因用户在疫情初期快速转向线上会议,为满足容量需求而进行性能优化。

packet_executor.cc

class ScopedPerformanceMeasurement {
 public:
  explicit ScopedPerformanceMeasurement(PacketExecutor* packet_executor)
      : packet_executor_(packet_executor),
        tracer_(packet_executor->packet_executor_trace_threshold_,
                kClosureTraceName) {
    // ThreadCPUUsage is an expensive call. At the time of writing,
    // it takes over 400ns, or roughly 30 times slower than absl::Now,
    // so we sample only 10% of closures to keep the cost down.
    if (packet_executor->closures_executed_ % 10 == 0) {
      thread_cpu_usage_start_ = base::ThreadCPUUsage();
    }

    // Sample start time after potentially making the above expensive call,
    // so as not to pollute wall time measurements.
    run_start_time_ = absl::Now();
  }

  ~ScopedPerformanceMeasurement() {
ScopedPerformanceMeasurement::ScopedPerformanceMeasurement(
    PacketExecutor* packet_executor)
    : packet_executor_(packet_executor),
      tracer_(packet_executor->packet_executor_trace_threshold_,
              kClosureTraceName) {
  // ThreadCPUUsage is an expensive call. At the time of writing,
  // it takes over 400ns, or roughly 30 times slower than absl::Now,
  // so we sample only 1 in 32 closures to keep the cost down.
  if (packet_executor->closures_executed_ % 32 == 0) {
    thread_cpu_usage_start_ = base::ThreadCPUUsage();
  }

  // Sample start time after potentially making the above expensive call,
  // so as not to pollute wall time measurements.
  run_start_time_ = absl::Now();
}

packet_executor.cc

~ScopedPerformanceMeasurement() {
  auto run_end_time = absl::Now();
  auto run_duration = run_end_time - run_start_time_;

  if (thread_cpu_usage_start_.has_value()) {
  ...
  }

  closure_execution_time->Record(absl::ToInt64Microseconds(run_duration));
ScopedPerformanceMeasurement::~ScopedPerformanceMeasurement() {
  auto run_end_time = absl::Now();
  auto run_duration = run_end_time - run_start_time_;

  if (thread_cpu_usage_start_.has_value()) {
    ...
    closure_execution_time->Record(absl::ToInt64Microseconds(run_duration));
  }

基准测试结果:

Run on (40 X 2793 MHz CPUs); 2020-03-24T20:08:19.991412535-07:00
CPU: Intel Ivybridge with HyperThreading (20 cores) dL1:32KB dL2:256KB dL3:25MB
Benchmark                                      Base (ns)    New (ns) Improvement
----------------------------------------------------------------------------
BM_PacketOverhead_mean                               224          85    +62.0%

避免在热点代码路径中记录日志§

日志语句可能造成性能开销,即使该语句的日志级别实际未记录任何内容。例如,ABSL_VLOG 的实现至少需要一次加载和一次比较操作,这在热点代码路径中可能成为问题。此外,日志代码的存在可能抑制编译器优化。建议在热点代码路径中完全移除日志记录。

从内存分配器的核心逻辑中移除日志记录。

此为大规模变更中的局部调整。

gpu_bfc_allocator.cc

void GPUBFCAllocator::SplitChunk(...) {
  ...
  VLOG(6) << "Adding to chunk map: " << new_chunk->ptr;
  ...
}
...
void GPUBFCAllocator::DeallocateRawInternal(void* ptr) {
  ...
  VLOG(6) << "Chunk at " << c->ptr << " no longer in use";
  ...
}
void GPUBFCAllocator::SplitChunk(...) {
...
}
...
void GPUBFCAllocator::DeallocateRawInternal(void* ptr) {
...
}
在嵌套循环外部预先计算日志记录是否启用。

image_similarity.cc

for (int j = 0; j < output_subimage_size_y; j++) {
  int j1 = j - rad + output_to_integral_subimage_y;
  int j2 = j1 + 2 * rad + 1;
  // Create a pointer for this row's output, taking into account the offset
  // to the full image.
  double *image_diff_ptr = &(*image_diff)(j + min_j, min_i);

  for (int i = 0; i < output_subimage_size_x; i++) {
    ...
    if (VLOG_IS_ON(3)) {
    ...
    }
    ...
  }
}
const bool vlog_3 = DEBUG_MODE ? VLOG_IS_ON(3) : false;

for (int j = 0; j < output_subimage_size_y; j++) {
  int j1 = j - rad + output_to_integral_subimage_y;
  int j2 = j1 + 2 * rad + 1;
  // Create a pointer for this row's output, taking into account the offset
  // to the full image.
  double *image_diff_ptr = &(*image_diff)(j + min_j, min_i);

  for (int i = 0; i < output_subimage_size_x; i++) {
    ...
    if (vlog_3) {
    ...
    }
  }
}
Run on (40 X 2801 MHz CPUs); 2016-05-16T15:55:32.250633072-07:00
CPU: Intel Ivybridge with HyperThreading (20 cores) dL1:32KB dL2:256KB dL3:25MB
Benchmark                          Base (ns)  New (ns) Improvement
------------------------------------------------------------------
BM_NCCPerformance/16                   29104     26372     +9.4%
BM_NCCPerformance/64                  473235    425281    +10.1%
BM_NCCPerformance/512               30246238  27622009     +8.7%
BM_NCCPerformance/1k              125651445  113361991     +9.8%
BM_NCCLimitedBoundsPerformance/16       8314      7498     +9.8%
BM_NCCLimitedBoundsPerformance/64     143508    132202     +7.9%
BM_NCCLimitedBoundsPerformance/512   9335684   8477567     +9.2%
BM_NCCLimitedBoundsPerformance/1k   37223897  34201739     +8.1%
预先计算日志记录是否启用,并在辅助例程中使用该结果。

periodic_call.cc

  VLOG(1) << Logid()
          << "MaybeScheduleAlarmAtNextTick. Time until next real time: "
          << time_until_next_real_time;
          ...
  uint64 next_virtual_time_ms =
      next_virtual_time_ms_ - num_ticks * kResolutionMs;
  CHECK_GE(next_virtual_time_ms, 0);
  ScheduleAlarm(now, delay, next_virtual_time_ms);
}

void ScheduleNextAlarm(uint64 current_virtual_time_ms)
    ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
  if (calls_.empty()) {
    VLOG(1) << Logid() << "No calls left, entering idle mode";
    next_real_time_ = absl::InfiniteFuture();
    return;
  }
  uint64 next_virtual_time_ms = FindNextVirtualTime(current_virtual_time_ms);
  auto delay =
      absl::Milliseconds(next_virtual_time_ms - current_virtual_time_ms);
  ScheduleAlarm(GetClock().TimeNow(), delay, next_virtual_time_ms);
}

// An alarm scheduled by this function supersedes all previously scheduled
// alarms. This is ensured through `scheduling_sequence_number_`.
void ScheduleAlarm(absl::Time now, absl::Duration delay,
                   uint64 virtual_time_ms)
    ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
  next_real_time_ = now + delay;
  next_virtual_time_ms_ = virtual_time_ms;
  ++ref_count_;  // The Alarm holds a reference.
  ++scheduling_sequence_number_;
  VLOG(1) << Logid() << "ScheduleAlarm. Time : "
          << absl::FormatTime("%M:%S.%E3f", now, absl::UTCTimeZone())
          << ", delay: " << delay << ", virtual time: " << virtual_time_ms
          << ", refs: " << ref_count_
          << ", seq: " << scheduling_sequence_number_
          << ", executor: " << executor_;

  executor_->AddAfter(
      delay, new Alarm(this, virtual_time_ms, scheduling_sequence_number_));
}
  const bool vlog_1 = VLOG_IS_ON(1);

  if (vlog_1) {
    VLOG(1) << Logid()
            << "MaybeScheduleAlarmAtNextTick. Time until next real time: "
            << time_until_next_real_time;
  }
  ...
  uint64 next_virtual_time_ms =
      next_virtual_time_ms_ - num_ticks * kResolutionMs;
  CHECK_GE(next_virtual_time_ms, 0);
  ScheduleAlarm(now, delay, next_virtual_time_ms, vlog_1);
}

void ScheduleNextAlarm(uint64 current_virtual_time_ms, bool vlog_1)
    ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
  if (calls_.empty()) {
    if (vlog_1) {
      VLOG(1) << Logid() << "No calls left, entering idle mode";
    }
    next_real_time_ = absl::InfiniteFuture();
    return;
  }
  uint64 next_virtual_time_ms = FindNextVirtualTime(current_virtual_time_ms);
  auto delay =
      absl::Milliseconds(next_virtual_time_ms - current_virtual_time_ms);
  ScheduleAlarm(GetClock().TimeNow(), delay, next_virtual_time_ms, vlog_1);
}

// An alarm scheduled by this function supersedes all previously scheduled
// alarms. This is ensured through `scheduling_sequence_number_`.
void ScheduleAlarm(absl::Time now, absl::Duration delay,
                   uint64 virtual_time_ms,
                   bool vlog_1)
    ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
  next_real_time_ = now + delay;
  next_virtual_time_ms_ = virtual_time_ms;
  ++ref_count_;  // The Alarm holds a reference.
  ++scheduling_sequence_number_;
  if (vlog_1) {
    VLOG(1) << Logid() << "ScheduleAlarm. Time : "
            << absl::FormatTime("%M:%S.%E3f", now, absl::UTCTimeZone())
            << ", delay: " << delay << ", virtual time: " << virtual_time_ms
            << ", refs: " << ref_count_
            << ", seq: " << scheduling_sequence_number_
            << ", executor: " << executor_;
  }

  executor_->AddAfter(
      delay, new Alarm(this, virtual_time_ms, scheduling_sequence_number_));
}

代码体积考量§

性能不仅关乎运行时速度。有时值得考虑软件选择对生成代码大小的影响。代码体积庞大会导致编译链接时间延长、二进制文件臃肿、内存占用增加、指令缓存压力增大,并对分支预测器等微架构结构产生其他负面影响。在编写将被广泛使用的底层库代码,或编写预期会被实例化为多种类型的模板代码时,思考这些问题尤为重要。

不同编程语言中缩减代码体积的技术差异显著。以下是我们总结的适用于C++代码(常因模板过度使用和内联导致膨胀)的有效技巧:

精简常见内联代码§

高频调用的函数结合内联技术,对代码体积影响显著。

加速 TF_CHECK_OK 处理。

避免创建 Ok 对象,通过将致命错误消息的复杂格式化操作移出调用点实现,从而节省代码空间。

status.h

#define TF_CHECK_OK(val) CHECK_EQ(::tensorflow::Status::OK(), (val))
#define TF_QCHECK_OK(val) QCHECK_EQ(::tensorflow::Status::OK(), (val))
extern tensorflow::string* TfCheckOpHelperOutOfLine(
    const ::tensorflow::Status& v, const char* msg);
inline tensorflow::string* TfCheckOpHelper(::tensorflow::Status v,
                                           const char* msg) {
  if (v.ok()) return nullptr;
  return TfCheckOpHelperOutOfLine(v, msg);
}
#define TF_CHECK_OK(val)                                           \
  while (tensorflow::string* _result = TfCheckOpHelper(val, #val)) \
  LOG(FATAL) << *(_result)
#define TF_QCHECK_OK(val)                                          \
  while (tensorflow::string* _result = TfCheckOpHelper(val, #val)) \
  LOG(QFATAL) << *(_result)

status.cc

string* TfCheckOpHelperOutOfLine(const ::tensorflow::Status& v,
                                 const char* msg) {
  string r("Non-OK-status: ");
  r += msg;
  r += " status: ";
  r += v.ToString();
  // Leaks string but this is only to be used in a fatal error message
  return new string(r);
}
每个 RETURN_IF_ERROR 调用点缩减 79 字节代码。
  1. 新增专用于 RETURN_IF_ERROR 的特殊适配器类。
  2. 在 RETURN_IF_ERROR 的快速路径中不再构造/销毁 StatusBuilder。
  3. 因快速路径不再需要,不再内联部分 StatusBuilder 方法。
  4. 避免不必要的 ~Status 调用。
CHECK_GE性能提升4.5倍,代码体积从125字节缩减至77字节。

logging.h

struct CheckOpString {
  CheckOpString(string* str) : str_(str) { }
  ~CheckOpString() { delete str_; }
  operator bool() const { return str_ == NULL; }
  string* str_;
};
...
#define DEFINE_CHECK_OP_IMPL(name, op) \
  template <class t1, class t2> \
  inline string* Check##name##Impl(const t1& v1, const t2& v2, \
                                   const char* names) { \
    if (v1 op v2) return NULL; \
    else return MakeCheckOpString(v1, v2, names); \
  } \
  string* Check##name##Impl(int v1, int v2, const char* names);
DEFINE_CHECK_OP_IMPL(EQ, ==)
DEFINE_CHECK_OP_IMPL(NE, !=)
DEFINE_CHECK_OP_IMPL(LE, <=)
DEFINE_CHECK_OP_IMPL(LT, < )
DEFINE_CHECK_OP_IMPL(GE, >=)
DEFINE_CHECK_OP_IMPL(GT, > )
#undef DEFINE_CHECK_OP_IMPL
struct CheckOpString {
  CheckOpString(string* str) : str_(str) { }
  // No destructor: if str_ is non-NULL, we're about to LOG(FATAL),
  // so there's no point in cleaning up str_.
  operator bool() const { return str_ == NULL; }
  string* str_;
};
...
extern string* MakeCheckOpStringIntInt(int v1, int v2, const char* names);

template<int, int>
string* MakeCheckOpString(const int& v1, const int& v2, const char* names) {
  return MakeCheckOpStringIntInt(v1, v2, names);
}
...
#define DEFINE_CHECK_OP_IMPL(name, op) \
  template <class t1, class t2> \
  inline string* Check##name##Impl(const t1& v1, const t2& v2, \
                                   const char* names) { \
    if (v1 op v2) return NULL; \
    else return MakeCheckOpString(v1, v2, names); \
  } \
  inline string* Check##name##Impl(int v1, int v2, const char* names) { \
    if (v1 op v2) return NULL; \
    else return MakeCheckOpString(v1, v2, names); \
  }
DEFINE_CHECK_OP_IMPL(EQ, ==)
DEFINE_CHECK_OP_IMPL(NE, !=)
DEFINE_CHECK_OP_IMPL(LE, <=)
DEFINE_CHECK_OP_IMPL(LT, < )
DEFINE_CHECK_OP_IMPL(GE, >=)
DEFINE_CHECK_OP_IMPL(GT, > )
#undef DEFINE_CHECK_OP_IMPL

logging.cc

string* MakeCheckOpStringIntInt(int v1, int v2, const char* names) {
  strstream ss;
  ss << names << " (" << v1 << " vs. " << v2 << ")";
  return new string(ss.str(), ss.pcount());
}

谨慎内联§

内联通常能提升性能,但有时会增加代码体积却未带来相应性能收益(某些情况下甚至因指令缓存压力增大导致性能下降)。

减少TensorFlow中的内联操作。

该变更停止对许多非性能敏感函数(如错误路径和操作符注册代码)进行内联处理。此外,部分性能敏感函数的慢速路径被移至非内联函数中。

这些变更使典型二进制文件中的TensorFlow符号体积缩减12.2%(从8814545字节降至7740233字节)。

协议缓冲区库变更。避免为≥128字节的消息编码消息长度时消耗昂贵的内联代码空间,转而调用共享的非内联例程。

不仅能缩小重要大型二进制文件的体积,还能提升运行速度。

某大型二进制文件中高度内联例程每行生成的代码字节数。首列数字代表特定源代码行生成的总字节数,包含所有内联位置。

变更前:

.           0   1825 template <typename MessageType>
.           0   1826 inline uint8* WireFormatLite::InternalWriteMessage(
.           0   1827     int field_number, const MessageType& value, uint8* target,
.           0   1828     io::EpsCopyOutputStream* stream) {
>>>    389246   1829   target = WriteTagToArray(field_number, WIRETYPE_LENGTH_DELIMITED, target);
>>>   5454640   1830   target = io::CodedOutputStream::WriteVarint32ToArray(
>>>    337837   1831       static_cast<uint32>(value.GetCachedSize()), target);
>>>   1285539   1832   return value._InternalSerialize(target, stream);
.           0   1833 }

变更后代码大小输出如下:

.           0   1825 template <typename MessageType>
.           0   1826 inline uint8* WireFormatLite::InternalWriteMessage(
.           0   1827     int field_number, const MessageType& value, uint8* target,
.           0   1828     io::EpsCopyOutputStream* stream) {
>>>    450612   1829   target = WriteTagToArray(field_number, WIRETYPE_LENGTH_DELIMITED, target);
>>       9609   1830   target = io::CodedOutputStream::WriteVarint32ToArrayOutOfLine(
>>>    434668   1831       static_cast<uint32>(value.GetCachedSize()), target);
>>>   1597394   1832   return value._InternalSerialize(target, stream);
.           0   1833 }

coded_stream.h

class PROTOBUF_EXPORT CodedOutputStream {
  ...
  // Like WriteVarint32()  but writing directly to the target array, and with the
  // less common-case paths being out of line rather than inlined.
  static uint8* WriteVarint32ToArrayOutOfLine(uint32 value, uint8* target);
  ...
};
...
inline uint8* CodedOutputStream::WriteVarint32ToArrayOutOfLine(uint32 value,
                                                               uint8* target) {
  target[0] = static_cast<uint8>(value);
  if (value < 0x80) {
    return target + 1;
  } else {
    return WriteVarint32ToArrayOutOfLineHelper(value, target);
  }
}

coded_stream.cc

uint8* CodedOutputStream::WriteVarint32ToArrayOutOfLineHelper(uint32 value,
                                                              uint8* target) {
  DCHECK_GE(value, 0x80);
  target[0] |= static_cast<uint8>(0x80);
  value >>= 7;
  target[1] = static_cast<uint8>(value);
  if (value < 0x80) {
    return target + 2;
  }
  target += 2;
  do {
    // Turn on continuation bit in the byte we just wrote.
    target[-1] |= static_cast<uint8>(0x80);
    value >>= 7;
    *target = static_cast<uint8>(value);
    ++target;
  } while (value >= 0x80);
  return target;
}
缩减 absl::flat_hash_set 和 absl::flat_hash_map 的代码体积。
  1. 将不依赖特定哈希表类型的代码提取为通用(非内联)函数。
  2. 审慎放置 ABSL_ATTRIBUTE_NOINLINE 指令。
  3. 将部分低效路径移出内联处理。

使部分大型二进制文件体积缩减约 0.5%。

未使用 protobuf 内存池时,避免内联字符串分配与释放操作。

public/arenastring.h

  if (IsDefault(default_value)) {
    std::string* new_string = new std::string();
    tagged_ptr_.Set(new_string);
    return new_string;
  } else {
    return UnsafeMutablePointer();
  }
}
  if (IsDefault(default_value)) {
    return SetAndReturnNewString();
  } else {
    return UnsafeMutablePointer();
  }
}

internal/arenastring.cc

std::string* ArenaStringPtr::SetAndReturnNewString() {
  std::string* new_string = new std::string();
  tagged_ptr_.Set(new_string);
  return new_string;
}
避免内联某些例程。创建接受’const char*’而非’const std::string&’的例程变体,以避免在每次调用点生成std::string构造代码。

op.h

class OpDefBuilderWrapper {
 public:
  explicit OpDefBuilderWrapper(const char name[]) : builder_(name) {}
  OpDefBuilderWrapper& Attr(std::string spec) {
    builder_.Attr(std::move(spec));
    return *this;
  }
  OpDefBuilderWrapper& Input(std::string spec) {
    builder_.Input(std::move(spec));
    return *this;
  }
  OpDefBuilderWrapper& Output(std::string spec) {
    builder_.Output(std::move(spec));
    return *this;
  }
class OpDefBuilderWrapper {
 public:
  explicit OpDefBuilderWrapper(const char name[]) : builder_(name) {}
  OpDefBuilderWrapper& Attr(std::string spec) {
    builder_.Attr(std::move(spec));
    return *this;
  }
  OpDefBuilderWrapper& Attr(const char* spec) TF_ATTRIBUTE_NOINLINE {
    return Attr(std::string(spec));
  }
  OpDefBuilderWrapper& Input(std::string spec) {
    builder_.Input(std::move(spec));
    return *this;
  }
  OpDefBuilderWrapper& Input(const char* spec) TF_ATTRIBUTE_NOINLINE {
    return Input(std::string(spec));
  }
  OpDefBuilderWrapper& Output(std::string spec) {
    builder_.Output(std::move(spec));
    return *this;
  }
  OpDefBuilderWrapper& Output(const char* spec) TF_ATTRIBUTE_NOINLINE {
    return Output(std::string(spec));
  }

减少模板实例化§

模板代码在实例化时可能因模板参数的各种组合而重复生成。

用常规参数替换模板参数。

将一个基于布尔值的大型模板化例程改为将布尔值作为额外参数传递。(该布尔值仅用于选择两个字符串常量之一,因此运行时检查完全可行。)此举将大型例程的实例化次数从287次减少至143次。

sharding_util_ops.cc

template <bool Split>
Status GetAndValidateAttributes(OpKernelConstruction* ctx,
                                std::vector<int32>& num_partitions,
                                int& num_slices, std::vector<int32>& paddings,
                                bool& has_paddings) {
  absl::string_view num_partitions_attr_name =
      Split ? kNumSplitsAttrName : kNumConcatsAttrName;
      ...
  return OkStatus();
}
Status GetAndValidateAttributes(bool split, OpKernelConstruction* ctx,
                                std::vector<int32>& num_partitions,
                                int& num_slices, std::vector<int32>& paddings,
                                bool& has_paddings) {
  absl::string_view num_partitions_attr_name =
      split ? kNumSplitsAttrName : kNumConcatsAttrName;
      ...
  return OkStatus();
}
将冗余代码从模板构造函数移至非模板共享基类构造函数。

同时将模板实例化次数从每种<T, Device, Rank>组合各一次,优化为每种<T>和每种<Rank>各一次。

sharding_util_ops.cc

template <typename Device, typename T>
class XlaSplitNDBaseOp : public OpKernel {
 public:
  explicit XlaSplitNDBaseOp(OpKernelConstruction* ctx) : OpKernel(ctx) {
    OP_REQUIRES_OK(
        ctx, GetAndValidateAttributes(/*split=*/true, ctx, num_splits_,
                                      num_slices_, paddings_, has_paddings_));
  }
// Shared base class to save code space
class XlaSplitNDShared : public OpKernel {
 public:
  explicit XlaSplitNDShared(OpKernelConstruction* ctx) TF_ATTRIBUTE_NOINLINE
      : OpKernel(ctx),
        num_slices_(1),
        has_paddings_(false) {
    GetAndValidateAttributes(/*split=*/true, ctx, num_splits_, num_slices_,
                             paddings_, has_paddings_);
  }
缩减 absl::flat_hash_set 和 absl::flat_hash_map 的生成代码体积。
  • 将不依赖特定哈希表类型的代码提取为通用(非内联)函数。
  • 审慎放置 ABSL_ATTRIBUTE_NOINLINE 指令。
  • 将部分低效路径移出行内实现。

优化容器操作§

需考量映射表及其他容器操作的影响,因每次调用此类操作都可能产生大量生成代码。

将连续多次映射插入调用(用于初始化表情符号字符哈希表)合并为单次批量插入操作(从188KB文本压缩至360字节,该库被众多二进制文件链接使用)。😊

textfallback_init.h

inline void AddEmojiFallbacks(TextFallbackMap *map) {
  (*map)[0xFE000] = &kFE000;
  (*map)[0xFE001] = &kFE001;
  (*map)[0xFE002] = &kFE002;
  (*map)[0xFE003] = &kFE003;
  (*map)[0xFE004] = &kFE004;
  (*map)[0xFE005] = &kFE005;
  ...
  (*map)[0xFEE7D] = &kFEE7D;
  (*map)[0xFEEA0] = &kFEEA0;
  (*map)[0xFE331] = &kFE331;
};
inline void AddEmojiFallbacks(TextFallbackMap *map) {
#define PAIR(x) {0x##x, &k##x}
  // clang-format off
  map->insert({
    PAIR(FE000),
    PAIR(FE001),
    PAIR(FE002),
    PAIR(FE003),
    PAIR(FE004),
    PAIR(FE005),
    ...
    PAIR(FEE7D),
    PAIR(FEEA0),
    PAIR(FE331)});
  // clang-format on
#undef PAIR
};
停止内联大量使用InlinedVector操作的代码段。

将原被内联的超长例程从.h文件移至.cc文件(内联此处并无实际性能收益)。

reduction_ops_common.h

Status Simplify(const Tensor& data, const Tensor& axis,
                const bool keep_dims) {
  ... Eighty line routine body ...
}
Status Simplify(const Tensor& data, const Tensor& axis, const bool keep_dims);

并行化与同步§

并行化利用§

现代机器拥有众多核心,但其利用率往往不足。通过并行化处理,高成本任务可加速完成。最常见的方法是并行处理不同项,并在完成后合并结果。通常会先将项划分为批次,以避免为每项单独并行运行而产生的开销。

四路并行化使令牌编码速率提升约3.6倍。

blocked-token-coder.cc

MutexLock l(&encoder_threads_lock);
if (encoder_threads == NULL) {
  encoder_threads = new ThreadPool(NumCPUs());
  encoder_threads->SetStackSize(262144);
  encoder_threads->StartWorkers();
}
encoder_threads->Add
    (NewCallback(this,
                 &BlockedTokenEncoder::EncodeRegionInThread,
                 region_tokens, N, region,
                 stats,
                 controller_->GetClosureWithCost
                 (NewCallback(&DummyCallback), N)));
并行化使解码性能提升5倍。

coding.cc

for (int c = 0; c < clusters->size(); c++) {
  RET_CHECK_OK(DecodeBulkForCluster(...);
}
struct SubTask {
  absl::Status result;
  absl::Notification done;
};

std::vector<SubTask> tasks(clusters->size());
for (int c = 0; c < clusters->size(); c++) {
  options_.executor->Schedule([&, c] {
    tasks[c].result = DecodeBulkForCluster(...);
    tasks[c].done.Notify();
  });
}
for (int c = 0; c < clusters->size(); c++) {
  tasks[c].done.WaitForNotification();
}
for (int c = 0; c < clusters->size(); c++) {
  RETURN_IF_ERROR(tasks[c].result);
}

需谨慎评估对系统性能的影响——若无闲置CPU资源或内存带宽饱和,并行化可能无效甚至造成负面影响。

锁获取成本摊销§

避免细粒度锁定以减少热点路径中互斥锁操作的开销。注意事项:仅当变更不会增加锁竞争时才应实施。

通过单次锁获取释放整个查询节点树,而非为树中每个节点重复获取锁。

mustang-query.cc

// Pool of query nodes
ThreadSafeFreeList<MustangQuery> pool_(256);
...
void MustangQuery::Release(MustangQuery* node) {
  if (node == NULL)
    return;
  for (int i=0; i < node->children_->size(); ++i)
    Release((*node->children_)[i]);
  node->children_->clear();
  pool_.Delete(node);
}
// Pool of query nodes
Mutex pool_lock_;
FreeList<MustangQuery> pool_(256);
...
void MustangQuery::Release(MustangQuery* node) {
  if (node == NULL)
    return;
  MutexLock l(&pool_lock_);
  ReleaseLocked(node);
}

void MustangQuery::ReleaseLocked(MustangQuery* node) {
#ifndef NDEBUG
  pool_lock_.AssertHeld();
#endif
  if (node == NULL)
    return;
  for (int i=0; i < node->children_->size(); ++i)
    ReleaseLocked((*node->children_)[i]);
  node->children_->clear();
  pool_.Delete(node);
}

缩短临界区时长§

避免在临界区内执行耗费资源的操作。尤其要警惕看似无害的代码可能涉及RPC调用或文件访问。

减少临界区内触及的缓存行数量。

通过精心调整数据结构,显著降低了访问的缓存行数量,使机器学习训练性能提升3.3%。

  1. 预先计算节点项数据结构中的某些节点类型属性,将其存储为位值,从而在关键区内处理出边时无需访问Node*对象。
  2. 将ExecutorState::ActivateNodes修改为使用目标节点的NodeItem,而非访问*item->node对象的字段。通常这意味着访问所需边数据仅需触及1至2条缓存行,而非原先的~2 + O(出边数量)(对于多核处理的大型图,此举还可减轻TLB压力)。
避免在持有互斥锁期间调用RPC。

trainer.cc

{
  // Notify the parameter server that we are starting.
  MutexLock l(&lock_);
  model_ = model;
  MaybeRecordProgress(last_global_step_);
}
bool should_start_record_progress = false;
int64 step_for_progress = -1;
{
  // Notify the parameter server that we are starting.
  MutexLock l(&lock_);
  model_ = model;
  should_start_record_progress = ShouldStartRecordProgress();
  step_for_progress = last_global_step_;
}
if (should_start_record_progress) {
  StartRecordProgress(step_for_progress);
}

此外需警惕在互斥锁解锁前执行的高成本析构函数(当互斥锁解锁由~MutexUnlock触发时常发生此情况)。可在互斥锁锁定前声明带高成本析构函数的对象(前提是该操作线程安全)。

通过分片降低竞争§

当受互斥锁保护的数据结构出现高竞争时,可安全地将其拆分为多个分片,每个分片拥有独立的互斥锁。(注意:这要求不同分片之间不存在跨分片不变量。)

将缓存16分片后,在多线程负载下吞吐量提升约2倍。

cache.cc

class ShardedLRUCache : public Cache {
 private:
  LRUCache shard_[kNumShards];
  port::Mutex id_mutex_;
  uint64_t last_id_;

  static inline uint32_t HashSlice(const Slice& s) {
    return Hash(s.data(), s.size(), 0);
  }

  static uint32_t Shard(uint32_t hash) {
    return hash >> (32 - kNumShardBits);
  }
  ...
  virtual Handle* Lookup(const Slice& key) {
    const uint32_t hash = HashSlice(key);
    return shard_[Shard(hash)].Lookup(key, hash);
  }
将调用追踪的Spanner数据结构进行分片。

transaction_manager.cc

absl::MutexLock l(&active_calls_in_mu_);
ActiveCallMap::const_iterator iter = active_calls_in_.find(m->tid());
if (iter != active_calls_in_.end()) {
  iter->second.ExtractElements(&m->tmp_calls_);
}
ActiveCalls::LockedShard shard(active_calls_in_, m->tid());
const ActiveCallMap& active_calls_map = shard.active_calls_map();
ActiveCallMap::const_iterator iter = active_calls_map.find(m->tid());
if (iter != active_calls_map.end()) {
  iter->second.ExtractElements(&m->tmp_calls_);
}

若涉及的数据结构为映射表,建议改用并发哈希映射实现方案。

需谨慎处理分片选择所用的信息。例如,若使用哈希值的某些位进行分片选择,而这些位后续又被重复使用,则后者可能因哈希值分布失衡导致性能下降。

固定分片选择信息以避免哈希表问题。

netmon_map_impl.h

ConnectionBucket* GetBucket(Index index) {
  // Rehash the hash to make sure we are not partitioning the buckets based on
  // the original hash. If num_buckets_ is a power of 2 that would drop the
  // entropy of the buckets.
  size_t original_hash = absl::Hash<Index>()(index);
  int hash = absl::Hash<size_t>()(original_hash) % num_buckets_;
  return &buckets_[hash];
}
ConnectionBucket* GetBucket(Index index) {
  absl::Hash<std::pair<Index, size_t>> hasher{};
  // Combine the hash with 42 to prevent shard selection using the same bits
  // as the underlying hashtable.
  return &buckets_[hasher({index, 42}) % num_buckets_];
}
用于追踪调用的Shard Spanner数据结构。

此代码变更将 ActiveCallMap 分区为 64 个分片。每个分片由独立互斥锁保护。特定事务将映射至唯一分片。新增 LockedShard(tid) 接口,支持线程安全访问事务的 ActiveCallMap。使用示例:

transaction_manager.cc

{
  absl::MutexLock l(&active_calls_in_mu_);
  delayed_locks_timer_ring_.Add(delayed_locks_flush_time_ms, tid);
}
{
  ActiveCalls::LockedShard shard(active_calls_in_, tid);
  shard.delayed_locks_timer_ring().Add(delayed_locks_flush_time_ms, tid);
}

测试结果显示:在8192个纤维环境下运行基准测试时,整体墙钟时间缩短69%

Benchmark                   Time(ns)        CPU(ns)     Iterations
------------------------------------------------------------------
BM_ActiveCalls/8k        11854633492     98766564676            10
BM_ActiveCalls/16k       26356203552    217325836709            10
Benchmark                   Time(ns)        CPU(ns)     Iterations
------------------------------------------------------------------
BM_ActiveCalls/8k         3696794642     39670670110            10
BM_ActiveCalls/16k        7366284437     79435705713            10

SIMD指令§

探索利用现代CPU支持的SIMD指令同时处理多个数据项能否提升性能(例如参见下文批量操作章节中关于absl::flat_hash_map的讨论)。

减少虚假共享§

若不同线程访问不同可变数据,可考虑将数据项放置在不同缓存行中,例如在C++中使用alignas指令。但此类指令易被误用且可能显著增加对象大小,需确保性能测试证明其使用合理性。

将频繁修改的字段与其他字段分离至不同缓存行。

histogram.h

HistogramOptions options_;
...
internal::HistogramBoundaries *boundaries_;
...
std::vector<double> buckets_;

double min_;             // Minimum.
double max_;             // Maximum.
double count_;           // Total count of occurrences.
double sum_;             // Sum of values.
double sum_of_squares_;  // Sum of squares of values.
...
RegisterVariableExporter *exporter_;
  HistogramOptions options_;
  ...
  internal::HistogramBoundaries *boundaries_;
  ...
  RegisterVariableExporter *exporter_;
  ...
  // Place the following fields in a dedicated cacheline as they are frequently
  // mutated, so we can avoid potential false sharing.
  ...
#ifndef SWIG
  alignas(ABSL_CACHELINE_SIZE)
#endif
  std::vector<double> buckets_;

  double min_;             // Minimum.
  double max_;             // Maximum.
  double count_;           // Total count of occurrences.
  double sum_;             // Sum of values.
  double sum_of_squares_;  // Sum of squares of values.

减少上下文切换频率§

将小型工作项在内存中处理,而非交由设备线程池处理。

cast_op.cc

template <typename Device, typename Tout, typename Tin>
void CastMaybeInline(const Device& d, typename TTypes<Tout>::Flat o,
                     typename TTypes<Tin>::ConstFlat i) {
  if (o.size() * (sizeof(Tin) + sizeof(Tout)) < 16384) {
    // Small cast on a CPU: do inline
    o = i.template cast<Tout>();
  } else {
    o.device(d) = i.template cast<Tout>();
  }
}

使用缓冲通道实现流水线处理§

通道可设置为无缓冲模式,即写入方需阻塞直至读取方准备就绪。此模式适用于通道用于同步场景,但不适用于提升并行度的情境。

考虑无锁方案§

无锁数据结构有时能比传统互斥锁保护的数据结构更有效。但直接操作原子变量可能存在风险。建议优先采用更高层次的抽象方案。

采用无锁映射管理RPC通道缓存。

RPC存根缓存条目每秒被读取数千次,修改频率极低。切换至合适的无锁映射可降低3%-5%的检索延迟。

使用固定词典+无锁哈希表来加速IsValidTokenId的判定。

dynamic_token_class_manager.h

mutable Mutex mutex_;

// The density of this hash map is guaranteed by the fact that the
// dynamic lexicon reuses previously allocated TokenIds before trying
// to allocate new ones.
dense_hash_map<TokenId, common::LocalTokenClassId> tid_to_cid_
    GUARDED_BY(mutex_);
// Read accesses to this hash-map should be done using
// 'epoch_gc_'::(EnterFast / LeaveFast). The writers should periodically
// GC the deleted entries, by simply invoking LockFreeHashMap::CreateGC.
typedef util::gtl::LockFreeHashMap<TokenId, common::LocalTokenClassId>
    TokenIdTokenClassIdMap;
TokenIdTokenClassIdMap tid_to_cid_;

Protocol Buffer 建议§

Protobuf是便捷的数据表示形式,尤其适用于网络传输或持久化存储场景。但其性能开销可能显著。例如:将1000个点构成的列表转换为C++ std::vector结构体后,求和Y坐标的代码运行速度提升了20倍

两种版本的基准测试代码。
name                old time/op  new time/op  delta
BenchmarkIteration  17.4µs ± 5%   0.8µs ± 1%  -95.30%  (p=0.000 n=11+12)

Protobuf版本:

message PointProto {
  int32 x = 1;
  int32 y = 2;
}
message PointListProto {
  repeated PointProto points = 1;
}
void SumProto(const PointListProto& vec) {
  int sum = 0;
  for (const PointProto& p : vec.points()) {
    sum += p.y();
  }
  ABSL_VLOG(1) << sum;
}

void BenchmarkIteration() {
  PointListProto points;
  points.mutable_points()->Reserve(1000);
  for (int i = 0; i < 1000; i++) {
    PointProto* p = points.add_points();
    p->set_x(i);
    p->set_y(i * 2);
  }
  SumProto(points);
}

非Protobuf版本:

struct PointStruct {
  int x;
  int y;
};

void SumVector(const std::vector<PointStruct>& vec) {
  int sum = 0;
  for (const PointStruct& p : vec) {
    sum += p.y;
  }
  ABSL_VLOG(1) << sum;
}

void BenchmarkIteration() {
  std::vector<PointStruct> points;
  points.reserve(1000);
  for (int i = 0; i < 1000; i++) {
    points.push_back({i, i * 2});
  }
  SumVector(points);
}

此外,Protobuf版本会为二进制文件增加数千字节的代码和数据,看似不多,但在包含大量Protobuf类型的系统中会迅速累积。这种体积增长会因产生指令缓存和数据缓存压力而引发性能问题。

以下是关于Protobuf性能的建议:

避免不必要的协议缓冲区使用。

鉴于上述20倍的性能差异,若某些数据从未被序列化或解析,则不应将其封装在协议缓冲区中。协议缓冲区的核心价值在于简化数据结构的序列化与反序列化,但其可能带来显著的代码体积、内存及CPU开销。若仅需DebugString和可复制性等次要特性,则无需使用协议缓冲区。

避免不必要的消息层次结构。

消息层次结构有助于以更易读的方式组织信息。但额外的层次会带来内存分配、函数调用、缓存未命中、序列化消息增大等开销。

例如,请避免使用:

message Foo {
  optional Bar bar = 1;
}
message Bar {
  optional Baz baz = 1;
}
message Baz {
  optional int32 count = 1;
}

而应采用:

message Foo {
  optional int32 count = 1;
}

协议缓冲区消息在 C++ 生成的代码中对应消息类,传输时会发送标签及有效负载长度。旧格式传输整数时需要更多分配(和释放)操作,并生成更多代码。因此所有协议缓冲区操作(解析、序列化、大小计算等)都变得更耗时,需要遍历消息树。新形式不存在此类开销,效率更高。

高频字段应使用较小的字段编号。

Protobuf采用可变长整数表示字段编号与传输格式组合(详见编码规范文档)。字段编号1至15占用1字节,16至2047占用2字节。(通常应避免使用2048及以上的字段编号。)

建议为未来扩展性能敏感型Protobuf预留部分小字段编号。

谨慎选择int32、sint32、fixed32和uint32(及其64位变体)。

通常使用int32int64,但对于哈希码等大值应采用fixed32fixed64,而经常出现负值的情况则应使用sint32sint64

变长整型编码小整数时占用更少字节,可节省空间但解码成本更高。但处理负值或大值时可能占用更多空间。此时使用fixed32或fixed64(而非uint32或uint64)可大幅降低编码解码成本并缩小体积。对于小型负整数,请使用sint32或sint64替代int32或int64。

对于Proto2,通过[packed=true]注解对重复数值字段进行打包处理。

在Proto2中,重复值默认以(标签, 值)对序列化。这种方式效率低下,因为每个元素都需要解码标签。

打包重复基本类型时,先序列化有效负载长度,随后跟随无标签的值。使用固定宽度值时,解析开始时即可知晓最终大小,从而避免重新分配内存;即无重新分配开销。但仍无法预知有效负载中包含多少变长整数,可能仍需承担重新分配成本。

在Proto3中,重复字段默认采用紧凑格式。

紧凑格式最适用于固定宽度值(如fixed32、fixed64、float、double等),因其编码长度可通过元素数量乘以固定值大小预先确定,无需逐个计算单个元素长度。

对于二进制数据和大型值,请使用bytes替代string。

string类型存储UTF8编码文本,有时需要进行验证。bytes类型可存储任意字节序列(非文本数据),通常比string更合适且更高效。

考虑使用 string_type = VIEW 避免复制操作。

解析过程中复制大型字符串或字节字段会消耗大量资源。通常可通过为字段标记 string_type = VIEW 来规避此成本。

message Image {
  ...
  bytes jpeg_encoding = 4 [features.(pb.cpp).string_type=VIEW];
}

若未添加VIEW注解,解析协议缓冲区时,潜在的大型字段内容将从序列化协议缓冲区复制到内存中的字符串对象。根据字符串或字节字段的数量及大小,复制开销可能相当显著。

替代复制二进制大块数据,诸如ParseFromStringWithAliasing等例程会使用absl::string_view引用原始底层字符串。需注意底层字符串(序列化协议缓冲区)的生命周期必须长于包含别名的协议缓冲区实例。

建议对大型字段使用Cord以减少复制开销。

为大型bytesstring字段添加[ctype=CORD]注解可降低复制成本。该注解将字段表示形式从std::string转换为absl::Cordabsl::Cord采用引用计数和树形存储结构,有效减少复制与追加操作的开销。若协议缓冲区序列化为Cord格式,解析带[ctype=CORD]注释的字符串/字节字段时可避免复制字段内容。

message Document {
  ...
  bytes html = 4 [ctype = CORD];
}

Cord字段的性能取决于长度分布和访问模式。请通过基准测试验证此类变更效果。

在 C++ 代码中使用 protobuf 区域。

建议使用区域机制节省分配与释放开销,尤其适用于包含重复字符串或消息字段的协议缓冲区。

消息字段和字符串字段采用堆分配(即使顶级协议缓冲区对象为栈分配)。若协议缓冲区消息包含大量子消息字段和字符串字段,分配与释放开销可能显著增加。内存池可摊销分配成本,使释放操作几乎零开销,同时通过从连续内存块分配资源来提升内存局部性。

保持.proto文件精简

避免在单个.proto文件中放置过多消息。一旦依赖该文件的任何内容,即使大部分内容未被使用,链接器仍会加载整个文件,导致构建时间延长和二进制文件增大。可通过扩展和Any类型避免对包含大量消息类型的庞大.proto文件产生硬性依赖。

建议将协议缓冲区以序列化形式存储,即使在内存中亦然。

内存中的协议缓冲区对象占用大量内存(通常是传输格式大小的5倍),且可能分散在多个缓存行中。因此,若应用程序需长期保持大量协议缓冲区对象处于活动状态,请考虑采用序列化形式存储。

避免使用Protobuf映射字段。

Protobuf映射字段存在性能问题,其弊端通常远大于语法上的微小便利性。建议使用非Protobuf映射结构,并从Protobuf内容初始化:

msg.proto

map<string, bytes> env_variables = 5;
message Var {
  string key = 1;
  bytes value = 2;
}
repeated Var env_variables = 5;
使用包含子集字段的协议缓冲区消息定义。

若需访问大型消息类型的少量字段,可考虑定义自定义协议缓冲区消息类型:复刻原始类型结构,仅定义所需字段。示例如下:

message FullMessage {
  optional int32 field1 = 1;
  optional BigMessage field2 = 2;
  optional int32 field3 = 3;
  repeater AnotherBigMessage field4 = 4;
  ...
  optional int32 field100 = 100;
}
message SubsetMessage {
  optional int32 field3 = 3;
  optional int32 field88 = 88;
}

将序列化的FullMessage解析为SubsetMessage时,仅解析百分之一百字段中的两个,其余视为未知字段。在合适场景下,建议使用舍弃未知字段的API以进一步提升性能。

尽可能复用protobuf对象。

在循环外部声明 protobuf 对象,使其分配的存储空间可在循环迭代中复用。

C++ 特定建议§

absl::flat_hash_map (及 set)§

Absl哈希表通常比C++标准库容器(如std::mapstd::unordered_map)性能更优。

加速 LanguageFromCode 实现(使用 absl::flat_hash_map 替代 __gnu_cxx::hash_map)。

languages.cc

class CodeToLanguage
    ...
    : public __gnu_cxx::hash_map<absl::string_view, i18n::languages::Language,
                                 CodeHash, CodeCompare> {
class CodeToLanguage
    ...
    : public absl::flat_hash_map<absl::string_view, i18n::languages::Language,
                                 CodeHash, CodeCompare> {

基准测试结果:

name               old time/op  new time/op  delta
BM_CodeToLanguage  19.4ns ± 1%  10.2ns ± 3%  -47.47%  (p=0.000 n=8+10)
加速统计数据发布/取消发布(早期变更,因此使用dense_hash_map替代当时尚未存在的absl::flat_hash_map)。

publish.cc

typedef hash_map<uint64, Publication*> PublicationMap;
static PublicationMap* publications = NULL;
typedef dense_hash_map<uint64, Publication*> PublicationMap;;
static PublicationMap* publications GUARDED_BY(mu) = NULL;
使用dense_hash_map替代hash_map来追踪SelectServer告警(当前会使用absl::flat_hash_map)。

alarmer.h

typedef hash_map<int, Alarm*> AlarmList;
typedef dense_hash_map<int, Alarm*> AlarmList;

absl::btree_map/absl::btree_set§

absl::btree_map 和 absl::btree_set 每个树节点存储多个条目。相较于 std::map 等有序的 C++ 标准库容器,这种设计具有多重优势:首先,指向子树节点的指针开销通常能显著降低;其次,由于给定 btree 树节点中的条目或键值在内存中连续存储,缓存效率往往大幅提升。

使用 btree_set 替代 std::set 来表示高负载的工作队列。

register_allocator.h

using container_type = std::set<WorklistItem>;
using container_type = absl::btree_set<WorklistItem>;

util::bitmap::InlinedBitVector§

util::bitmap::InlinedBitvector 可内联存储短位向量,因此通常比 std::vector<bool> 或其他位图类型更优。

建议使用 InlinedBitVector 替代 std::vector,并通过 FindNextBitSet 查找下一个目标位。

block_encoder.cc

vector<bool> live_reads(nreads);
...
for (int offset = 0; offset < b_.block_width(); offset++) {
  ...
  for (int r = 0; r < nreads; r++) {
    if (live_reads[r]) {
util::bitmap::InlinedBitVector<4096> live_reads(nreads);
...
for (int offset = 0; offset < b_.block_width(); offset++) {
  ...
  for (size_t r = 0; live_reads.FindNextSetBit(&r); r++) {
    DCHECK(live_reads[r]);

absl::InlinedVector§

absl::InlinedVector 将少量元素内存化存储(可通过第二个模板参数配置)。这使得小于该元素数量的向量通常能获得更佳的缓存效率,并在元素数量较少时完全避免分配后备存储数组。

在多处场景中用 InlinedVector 替代 std::vector。

bundle.h

class Bundle {
 public:
 ...
 private:
  // Sequence of (slotted instruction, unslotted immediate operands).
  std::vector<InstructionRecord> instructions_;
  ...
};
class Bundle {
 public:
 ...
 private:
  // Sequence of (slotted instruction, unslotted immediate operands).
  absl::InlinedVector<InstructionRecord, 2> instructions_;
  ...
};

gtl::vector32§

通过采用定制向量类型节省空间,该类型仅支持32位容量范围内的大小。

此简单类型变更在Spanner中节省约8TiB内存。

table_ply.h

class TablePly {
    ...
    // Returns the set of data columns stored in this file for this table.
    const std::vector<FamilyId>& modified_data_columns() const {
      return modified_data_columns_;
    }
    ...
   private:
    ...
    std::vector<FamilyId> modified_data_columns_;  // Data columns in the table.
#include "util/gtl/vector32.h"
    ...
    // Returns the set of data columns stored in this file for this table.
    absl::Span<const FamilyId> modified_data_columns() const {
      return modified_data_columns_;
    }
    ...

    ...
    // Data columns in the table.
    gtl::vector32<FamilyId> modified_data_columns_;

gtl::small_map§

gtl::small_map 使用内联数组存储有限数量的唯一键值对元素,当空间不足时会自动升级为用户指定的映射类型。

在 tflite_model 中使用 gtl::small_map。

tflite_model.cc

using ChoiceIdToContextMap = gtl::flat_hash_map<int, TFLiteContext*>;
using ChoiceIdToContextMap =
    gtl::small_map<gtl::flat_hash_map<int, TFLiteContext*>>;

gtl::small_ordered_set§

gtl::small_ordered_set 是为关联容器(如 std::set 或 absl::btree_multiset)设计的优化方案。它使用固定数组存储固定数量的元素,当空间不足时切换为使用集合或多集合。对于通常较小的集合,这比直接使用集合等结构快得多,因为集合针对大型数据集进行了优化。此变更缩小了缓存占用并缩短了临界区长度。

使用 gtl::small_ordered_set 存储监听器集合。

broadcast_stream.h

class BroadcastStream : public ParsedRtpTransport {
 ...
 private:
  ...
  std::set<ParsedRtpTransport*> listeners_ ABSL_GUARDED_BY(listeners_mutex_);
};
class BroadcastStream : public ParsedRtpTransport {
 ...
 private:
  ...
  using ListenersSet =
      gtl::small_ordered_set<std::set<ParsedRtpTransport*>, 10>;
  ListenersSet listeners_ ABSL_GUARDED_BY(listeners_mutex_);

gtl::intrusive_list§

gtl::intrusive_list<T> 是双向链表,其链接指针嵌入在类型 T 的元素中。相较于 std::list<T*>,它为每个元素节省一条缓存行+间接引用。

使用 intrusive_list 跟踪每个索引行更新的在途请求。

row-update-sender-inflight-set.h

std::set<int64> inflight_requests_ GUARDED_BY(mu_);
class SeqNum : public gtl::intrusive_link<SeqNum> {
  ...
  int64 val_ = -1;
  ...
};
...
gtl::intrusive_list<SeqNum> inflight_requests_ GUARDED_BY(mu_);

限制 absl::Status 和 absl::StatusOr 的使用§

尽管 absl::Statusabsl::StatusOr 类型效率较高,但即使在成功路径中仍存在非零开销,因此应避免在无需返回任何有意义错误细节(甚至可能永远不会失败!)的热点例程中使用:

避免在 RoundUpToAlignment() 函数中使用 StatusOr 返回类型。

best_fit_allocator.cc

absl::StatusOr<int64> BestFitAllocator::RoundUpToAlignment(int64 bytes) const {
  TPU_RET_CHECK_GE(bytes, 0);

  const int64 max_aligned = MathUtil::RoundDownTo<int64>(
      std::numeric_limits<int64>::max(), alignment_in_bytes_);
  if (bytes > max_aligned) {
    return util::ResourceExhaustedErrorBuilder(ABSL_LOC)
           << "Attempted to allocate "
           << strings::HumanReadableNumBytes::ToString(bytes)
           << " which after aligning to "
           << strings::HumanReadableNumBytes::ToString(alignment_in_bytes_)
           << " cannot be expressed as an int64.";
  }

  return MathUtil::RoundUpTo<int64>(bytes, alignment_in_bytes_);
}

best_fit_allocator.h

// Rounds bytes up to nearest multiple of alignment_.
// REQUIRES: bytes >= 0.
// REQUIRES: result does not overflow int64.
// REQUIRES: alignment_in_bytes_ is a power of 2 (checked in constructor).
int64 RoundUpToAlignment(int64 bytes) const {
  DCHECK_GE(bytes, 0);
  DCHECK_LE(bytes, max_aligned_bytes_);
  int64 result =
      ((bytes + (alignment_in_bytes_ - 1)) & ~(alignment_in_bytes_ - 1));
  DCHECK_EQ(result, MathUtil::RoundUpTo<int64>(bytes, alignment_in_bytes_));
  return result;
}
添加 ShapeUtil::ForEachIndexNoStatus 以避免为张量的每个元素创建 Status 返回对象。

shape_util.h

using ForEachVisitorFunction =
    absl::FunctionRef<StatusOr<bool>(absl::Span<const int64_t>)>;
    ...
static void ForEachIndex(const Shape& shape, absl::Span<const int64_t> base,
                         absl::Span<const int64_t> count,
                         absl::Span<const int64_t> incr,
                         const ForEachVisitorFunction& visitor_function);
using ForEachVisitorFunctionNoStatus =
    absl::FunctionRef<bool(absl::Span<const int64_t>)>;
    ...
static void ForEachIndexNoStatus(
    const Shape& shape, absl::Span<const int64_t> base,
    absl::Span<const int64_t> count, absl::Span<const int64_t> incr,
    const ForEachVisitorFunctionNoStatus& visitor_function);

literal.cc

ShapeUtil::ForEachIndex(
    result_shape, [&](absl::Span<const int64_t> output_index) {
      for (int64_t i = 0, end = dimensions.size(); i < end; ++i) {
        scratch_source_index[i] = output_index[dimensions[i]];
      }
      int64_t dest_index = IndexUtil::MultidimensionalIndexToLinearIndex(
          result_shape, output_index);
      int64_t source_index = IndexUtil::MultidimensionalIndexToLinearIndex(
          shape(), scratch_source_index);
      memcpy(dest_data + primitive_size * dest_index,
             source_data + primitive_size * source_index, primitive_size);
      return true;
    });
ShapeUtil::ForEachIndexNoStatus(
    result_shape, [&](absl::Span<const int64_t> output_index) {
      // Compute dest_index
      int64_t dest_index = IndexUtil::MultidimensionalIndexToLinearIndex(
          result_shape, result_minor_to_major, output_index);

      // Compute source_index
      int64_t source_index;
      for (int64_t i = 0, end = dimensions.size(); i < end; ++i) {
        scratch_source_array[i] = output_index[dimensions[i]];
      }
      if (src_shape_dims == 1) {
        // Fast path for this case
        source_index = scratch_source_array[0];
        DCHECK_EQ(source_index,
                  IndexUtil::MultidimensionalIndexToLinearIndex(
                      src_shape, src_minor_to_major, scratch_source_span));
      } else {
        source_index = IndexUtil::MultidimensionalIndexToLinearIndex(
            src_shape, src_minor_to_major, scratch_source_span);
      }
      // Move one element from source_index in source to dest_index in dest
      memcpy(dest_data + PRIMITIVE_SIZE * dest_index,
             source_data + PRIMITIVE_SIZE * source_index, PRIMITIVE_SIZE);
      return true;
    });
在 TF_CHECK_OK 中,为测试 ok() 函数,避免创建 Ok 对象。

status.h

#define TF_CHECK_OK(val) CHECK_EQ(::tensorflow::Status::OK(), (val))
#define TF_QCHECK_OK(val) QCHECK_EQ(::tensorflow::Status::OK(), (val))
extern tensorflow::string* TfCheckOpHelperOutOfLine(
    const ::tensorflow::Status& v, const char* msg);
inline tensorflow::string* TfCheckOpHelper(::tensorflow::Status v,
                                           const char* msg) {
  if (v.ok()) return nullptr;
  return TfCheckOpHelperOutOfLine(v, msg);
}
#define TF_CHECK_OK(val)                                           \
  while (tensorflow::string* _result = TfCheckOpHelper(val, #val)) \
  LOG(FATAL) << *(_result)
#define TF_QCHECK_OK(val)                                          \
  while (tensorflow::string* _result = TfCheckOpHelper(val, #val)) \
  LOG(QFATAL) << *(_result)
从远程过程调用(RPC)的热点路径中移除StatusOr。

移除热点路径中的StatusOr消除了先前变更导致的RPC基准测试中14%的CPU退化。

privacy_context.h

absl::StatusOr<privacy::context::PrivacyContext> GetRawPrivacyContext(
    const CensusHandle& h);

privacy_context_statusfree.h

enum class Result {
  kSuccess,
  kNoRootScopedData,
  kNoPrivacyContext,
  kNoDDTContext,
  kDeclassified,
  kNoPrequestContext
};
...
Result GetRawPrivacyContext(const CensusHandle& h,
                            PrivacyContext* privacy_context);

批量操作§

尽可能批量处理多项操作,而非逐项处理。

absl::flat_hash_map 通过单条 SIMD 指令,对一组键值进行逐键哈希字节比较。

参见瑞士表设计笔记及相关CppCon 2017CppCon 2019 演讲。

raw_hash_set.h

// Returns a bitmask representing the positions of slots that match hash.
BitMask<uint32_t> Match(h2_t hash) const {
  auto ctrl = _mm_loadu_si128(reinterpret_cast<const __m128i*>(pos));
  auto match = _mm_set1_epi8(hash);
  return BitMask<uint32_t>(_mm_movemask_epi8(_mm_cmpeq_epi8(match, ctrl)));
}
采用单次操作处理多个字节并修复问题,而非逐字节检查处理方式。

ordered-code.cc

int len = 0;
while (val > 0) {
  len++;
  buf[9 - len] = (val & 0xff);
  val >>= 8;
}
buf[9 - len - 1] = (unsigned char)len;
len++;
FastStringAppend(dest, reinterpret_cast<const char*>(buf + 9 - len), len);
BigEndian::Store(val, buf + 1);  // buf[0] may be needed for length
const unsigned int length = OrderedNumLength(val);
char* start = buf + 9 - length - 1;
*start = length;
AppendUpto9(dest, start, length + 1);
通过分块处理多个交织输入缓冲区,提升里德-所罗门编码处理速度。
Run on (12 X 3501 MHz CPUs); 2016-09-27T16:04:55.065995192-04:00
CPU: Intel Haswell with HyperThreading (6 cores) dL1:32KB dL2:256KB dL3:15MB
Benchmark                          Base (ns)  New (ns) Improvement
------------------------------------------------------------------
BM_OneOutput/3/2                      466867    351818    +24.6%
BM_OneOutput/4/2                      563130    474756    +15.7%
BM_OneOutput/5/3                      815393    688820    +15.5%
BM_OneOutput/6/3                      897246    780539    +13.0%
BM_OneOutput/8/4                     1270489   1137149    +10.5%
BM_AllOutputs/3/2                     848772    642942    +24.3%
BM_AllOutputs/4/2                    1067647    638139    +40.2%
BM_AllOutputs/5/3                    1739135   1151369    +33.8%
BM_AllOutputs/6/3                    2045817   1456744    +28.8%
BM_AllOutputs/8/4                    3012958   2484937    +17.5%
BM_AllOutputsSetUpOnce/3/2            717310    493371    +31.2%
BM_AllOutputsSetUpOnce/4/2            833866    600060    +28.0%
BM_AllOutputsSetUpOnce/5/3           1537870   1137357    +26.0%
BM_AllOutputsSetUpOnce/6/3           1802353   1398600    +22.4%
BM_AllOutputsSetUpOnce/8/4           3166930   2455973    +22.4%
每次解码四个整数(约2004年实现)。

引入GroupVarInt格式,该格式以5-17字节为单位批量编码/解码4个可变长整数组,而非逐个整数处理。采用新格式解码一组4个整数所需时间约为单独解码4个变长整数的1/3。

groupvarint.cc

const char* DecodeGroupVar(const char* p, int N, uint32* dest) {
  assert(groupvar_initialized);
  assert(N % 4 == 0);
  while (N) {
    uint8 tag = *p;
    p++;

    uint8* lenptr = &groupvar_table[tag].length[0];

#define GET_NEXT                                        \
    do {                                                \
      uint8 len = *lenptr;                              \
      *dest = UNALIGNED_LOAD32(p) & groupvar_mask[len]; \
      dest++;                                           \
      p += len;                                         \
      lenptr++;                                         \
    } while (0)
    GET_NEXT;
    GET_NEXT;
    GET_NEXT;
    GET_NEXT;
#undef GET_NEXT

    N -= 4;
  }
  return p;
}
每次对4个k位数字进行分组编码。

新增KBitStreamEncoder和KBitStreamDecoder类,用于将4个k位数同时编码/解码为比特流。由于K值在编译时已知,编码与解码效率显著提升。例如:因每次编码4个数,代码可假设流始终按字节对齐(k为偶数时)或按半字节对齐(k为奇数时)。

演示多种技术的代码变更§

有时单个代码提交(CL)会包含多项性能优化变更,综合运用了前述多种技术。研究这些CL中的变更类型,往往能帮助开发者建立思维模式:在识别系统瓶颈后,通过通用性调整来提升特定模块的运行效率。

[综述]

GPU内存分配器加速约40%。

GPUBFCAllocator的分配/释放速度提升36-48%:

  1. 通过句柄编号而非Chunk指针识别数据块。Chunk数据结构现分配于vector<Chunk>中,句柄作为该向量索引指向特定数据块。此举使Chunk对象的next/prev指针可采用ChunkHandle(4字节)替代Chunk*(8字节)。
  2. 当Chunk对象不再使用时,维护自由列表:列表头由ChunkHandle free_chunks_list_标识,且Chunk->next指向下一个空闲列表项。结合方案(1),这使分配器在绝大多数情况下(仅当vector<Chunk>扩展时)无需为Chunk对象进行堆分配/释放操作,同时确保所有Chunk对象内存连续。
  3. 我们采用基于log₂(byte_size/256)函数索引的二进制数组替代std::set数据结构,通过该函数定位给定字节大小的二进制桶。此方案仅需少量位运算即可定位二进制桶,省去了二叉搜索树的查找开销。该方案还支持将所有Bin数据结构存储在连续数组中,而非分散于多个缓存行。当多线程同时进行分配时,这能减少跨核心移动的缓存行数量。
  4. 为GPUBFCAllocator::AllocateRaw添加快速路径,优先尝试不调用retry_helper_的内存分配。若初始尝试失败(返回nullptr),则转入retry_helper处理,但通常可避免多层过程调用及带多个参数的std::function分配/释放操作。
  5. 注释掉大部分VLOG调用。调试时可通过取消注释并重新编译选择性启用。

新增多线程基准测试以检验竞争环境下的分配效率。

在搭载Titan X显卡的桌面机上,ptb_word_lm处理速度从每秒8036词提升至8272词(+2.9%)。

Run on (40 X 2801 MHz CPUs); 2016/02/16-15:12:49
CPU: Intel Ivybridge with HyperThreading (20 cores) dL1:32KB dL2:256KB dL3:25MB
Benchmark                          Base (ns)  New (ns) Improvement
------------------------------------------------------------------
BM_Allocation                            347       184    +47.0%
BM_AllocationThreaded/1                  351       181    +48.4%
BM_AllocationThreaded/4                 2470      1975    +20.0%
BM_AllocationThreaded/16               11846      9507    +19.7%
BM_AllocationDelayed/1                   392       199    +49.2%
BM_AllocationDelayed/10                  285       169    +40.7%
BM_AllocationDelayed/100                 245       149    +39.2%
BM_AllocationDelayed/1000                238       151    +36.6%
通过多项杂项优化提升Pathways吞吐量约20%。
  • 将多项特殊快速描述符解析函数统一整合至ParsedDescriptor类,并在更多场景使用该类以避免高成本的完整解析调用。
  • 将若干协议缓冲字段从字符串类型改为字节类型(避免不必要的utf-8检查及相关错误处理代码)。
  • DescriptorProto.inlined_contents 现为字符串类型而非 Cord 类型(仅适用于中等规模张量)。为此需在 tensor_util.cc 中添加多项复制辅助函数(现需同时支持字符串与 Cord 类型)。
  • 在若干位置将 std::unordered_map 替换为 flat_hash_map。
  • 为堆栈运算添加 MemoryManager::LookupMany 方法,替代按批次元素调用 Lookup 的方式。此变更可减少锁定等初始化开销。
  • 移除 TransferDispatchOp 中部分冗余字符串创建操作。
  • 在同一进程内传输 1000 个 1KB 张量批次的性能结果:
Before: 227.01 steps/sec
After:  272.52 steps/sec (+20% throughput)
通过一系列改动实现XLA编译器性能提升约15%。

为加速XLA编译所做的部分改动:

  1. 在SortComputationsByContent中,若比较函数中a == b则返回false,避免对长计算字符串进行序列化和指纹识别。
  2. 将 HloComputation::ComputeInstructionPostOrder 中的 CHECK 改为 DCHECK,避免额外触发缓存行访问。
  3. 在 CoreSequencer::IsVectorSyncHoldSatisfied() 中避免对前导指令进行耗时的复制操作。
  4. 重构HloComputation::ToString和HloComputation::ToCord双参数函数,将主要工作移至std::string拼接操作,而非Cord对象拼接。
  5. 将PerformanceCounterSet::Increment的哈希表查找次数从两次改为一次。
  6. 精简 Scoreboard::Update 代码

某重要模型在 XLA 编译阶段整体提速 14%。

加速 Google Meet 应用程序代码中的低级日志记录。

优化 ScopedLogId 性能,该操作处于每个数据包的关键路径上。

  • 移除了仅用于检测不变量是否被违反的 LOG_EVERY_N(ERROR, ...) 消息。
  • 内联了 PushLogId 和 PopLogid() 例程(由于移除了 LOG_EVERY_N_SECONDS(ERROR, ...) 语句,这些例程现已足够小可进行内联)。
  • 改用固定大小为4的数组及’int size’变量替代InlinedVector<...>来维护线程局部状态。由于实际使用中从未超过4个元素,InlinedVector的功能过于冗余。
Base: Baseline plus the code in scoped_logid_test.cc to add the benchmark
New: This changelist

CPU: Intel Ivybridge with HyperThreading (20 cores) dL1:32KB dL2:256KB dL3:25MB
Benchmark                                      Base (ns)    New (ns) Improvement
----------------------------------------------------------------------------
BM_ScopedLogId/threads:1                               8           4    +52.6%
BM_ScopedLogId/threads:2                               8           4    +51.9%
BM_ScopedLogId/threads:4                               8           4    +52.9%
BM_ScopedLogId/threads:8                               8           4    +52.1%
BM_ScopedLogId/threads:16                             11           6    +44.0%
通过优化Shape处理机制,将XLA编译时间缩短约31%。

多项提升XLA编译器性能的改动:

  1. 从多方面提升ShapeUtil::ForEachIndex…迭代性能:
    • 在ShapeUtil::ForEachState中,仅保存指向数组的指针(而非完整span对象)。
    • 预先构造指向ShapeUtil::ForEachState::indexes向量的ShapeUtil::ForEachState::indexes_span,避免每次循环迭代时从向量构建该span。
    • 保存指向 ShapeUtil::ForEachState::indexes 向量后备存储的 ShapeUtil::ForEachState::indexes_ptr 指针,使 ShapeUtil::ForEachState::IncrementDim() 中可直接执行简单数组操作,而非更耗费资源的 vector::operator[] 操作。
    • 保存构造函数中通过调用 shape.layout().minor_to_major().data() 初始化的 ShapeUtil::ForEachState::minor_to_major 数组指针,避免每次迭代中为每个维度调用 LayoutUtil::Minor()。
    • 内联了 ShapeUtil::ForEachState 构造函数及 ShapeUtil::ForEachState::IncrementDim() 例程
  2. 针对无需在传递函数中返回状态的调用场景,优化了 ShapeUtil::ForEachIndex 的迭代性能。通过引入ShapeUtil::ForEachIndexNoStatus变体实现,该变体接受返回简单bool值的ForEachVisitorFunctionNoStatus函数。相较于接受返回StatusOr<bool>的ForEachVisitorFunction(每次迭代元素都需要调用昂贵的StatusOr<bool>析构函数),此方案更高效。
    • 在 LiteralBase::Broadcast 和 GenerateReduceOutputElement 中使用了 ShapeUtil::ForEachIndexNoStatus 的此变体。
  3. 通过以下方式提升 LiteralBase::Broadcast 的性能:
    • 在 literal.cc 中引入模板化的 BroadcastHelper 例程,针对不同原始字节大小进行特化(未特化时,primitive_size 是运行时变量,导致编译器无法有效优化每个元素的 memcpy 操作,即使实际情况是微小的 2 的幂次方 (通常为1、2、4或8))。
    • 通过在LiteralBase::Broadcast例程开头仅调用一次’shape()’,将每次Broadcast调用中约(5 + num_dimensions + num_result_elements)次虚函数调用精简为仅一次。看似无害的’shape()’调用遍布各处,最终都可归结为“root_piece().subshape()”,其中subshape()是虚函数。
    • 在BroadcastHelper例程中,针对源维度为1的特殊情况,避免调用IndexUtil::MultiDimensionalIndexToLinearIndex函数。
    • 在 BroadcastHelper 中,使用了一个指向 scratch_source_index 向量后备存储区域的 scratch_source_array 指针变量,并直接使用该指针以避免在逐元素代码中进行 vector::operator[ ] 操作。同时在BroadcastHelper的逐元素循环外预先计算指向scratch_source_index向量的scratch_source_span,避免每次处理元素时都从向量构造span。
    • 在IndexUtil::MultiDimensionalIndexToLinearIndex中引入三参数变体,调用方需传入与shape参数关联的minor_to_major span。在BroadcastHelper中应用此变体,对src和dst形状进行全局广播计算(每次广播仅计算一次),而非按元素复制次数重复计算。
  4. 在ShardingPropagation::GetShardingFromUser中,针对HloOpcode::kTuple场景,仅当发现操作数具有处理价值时才调用user.sharding().GetSubSharding(…)。避免过早调用该方法使该例程的CPU时间从43.7秒缩短至2.0秒。
  5. 新增ShapeUtil::ForEachIndex、Literal::Broadcast及新方法ShapeUtil::ForEachIndexNoStatus的基准测试。
Base is with the benchmark additions of
BM_ForEachIndex and BM_BroadcastVectorToMatrix (and BUILD file change to add
benchmark dependency), but no other changes.

New is this cl

Run on (72 X 1357.56 MHz CPU s) CPU Caches: L1 Data 32 KiB (x36)
L1 Instruction 32 KiB (x36) L2 Unified 1024 KiB (x36) L3 Unified 25344 KiB (x2)

Benchmark                                      Base (ns)    New (ns) Improvement
----------------------------------------------------------------------------
BM_MakeShape                                       18.40       18.90     -2.7%
BM_MakeValidatedShape                              35.80       35.60     +0.6%
BM_ForEachIndex/0                                  57.80       55.80     +3.5%
BM_ForEachIndex/1                                  90.90       85.50     +5.9%
BM_ForEachIndex/2                               1973606     1642197     +16.8%

新添加的ForEachIndexNoStatus比ForEachIndex变体快得多(它仅存在于此新cl中,但BM_ForEachIndexNoStatus/NUM完成的基准测试工作可与上述BM_ForEachIndex/NUM结果相媲美)。

Benchmark                                      Base (ns)    New (ns) Improvement
----------------------------------------------------------------------------
BM_ForEachIndexNoStatus/0                             0        46.90    ----
BM_ForEachIndexNoStatus/1                             0        65.60    ----
BM_ForEachIndexNoStatus/2                             0     1001277     ----

广播性能提升约58%。

Benchmark                                      Base (ns)    New (ns) Improvement
----------------------------------------------------------------------------
BM_BroadcastVectorToMatrix/16/16                   5556        2374     +57.3%
BM_BroadcastVectorToMatrix/16/1024               319510      131075     +59.0%
BM_BroadcastVectorToMatrix/1024/1024           20216949     8408188     +58.4%

大型语言模型预编译宏测试结果(程序不仅执行XLA编译,但XLA相关代码耗时占比不足一半):

基准程序总耗时:573秒 采用此CL程序总耗时:465秒(提升19%)

运行该程序时两大XLA程序的编译耗时:

基准方案:141秒 + 143秒 = 284秒
采用此CL方案:99秒 + 95秒 = 194秒(提升31%)

在分布式执行框架Plaque中将大型程序编译时间缩短约22%。

通过微调实现约22%的编译加速:

  1. 优化节点源代码共享检测效率。原方案需对各节点源代码排序后进行交集运算,现改为将单节点源代码放入哈希表,再遍历另一节点源代码进行哈希表查询。
  2. 步骤1中复用同一临时哈希表。
  3. 生成编译后proto时,采用单一pair<package, opname>键值的二叉树替代嵌套二叉树结构。
  4. 将opdef指针存储于前置二叉树中,避免将opdef内容复制到二叉树内。

大型程序(约4.5万条操作)速度测试:

name             old time/op  new time/op  delta
BM_CompileLarge   28.5s ± 2%   22.4s ± 2%  -21.61%  (p=0.008 n=5+5)
MapReduce性能优化(单词计数基准测试提升约2倍速度)。

MapReduce加速方案:

  1. 修改了SafeCombinerMapOutput类的组合器数据结构。原先使用hash_multimap<SafeCombinerKey, StringPiece>(为表中每个唯一键值对创建哈希表条目),现改用hash_map<SafeCombinerKey, ValuePtr*>(其中ValuePtr是包含值及其重复次数的链表)。此变更带来三重效益:
    • 显著降低内存占用:每个值仅需“sizeof(ValuePtr) + value_len”字节,而非原先“sizeof(SafeCombinerKey) + sizeof(StringPiece) + value_len + 新哈希表条目开销”。这意味着减少了清空reducer缓冲区的频率。
    • 显著提升速度,因为当为表中已存在键插入新值时,我们避免了额外的哈希表条目开销(只需将值挂接到该键的值链表中)。
    • 由于在链表中为每个值关联重复计数,我们可以将以下序列:
      Output(key, "1");
      Output(key, "1");
      Output(key, "1");
      Output(key, "1");
      Output(key, "1");

    作为“key”链表中重复计数为5的单条记录。内部会向用户级组合函数输出五次“1”。(此技巧或许也可应用于reduce侧)。

  2. (次要)在默认的MapReductionBase::KeyFingerprintSharding函数中添加了“nshards == 1”的检测,当仅使用1个reduce分片时完全跳过键值指纹计算(此时可直接返回0而无需检查键值)。
  3. 将组合器接收每个键值对时调用的代码路径中部分VLOG(3)语句改为DVLOG(3)。

单词计数基准测试耗时从12.56秒缩短至6.55秒。

重构SelectServer中的告警处理代码,显著提升其性能(告警添加/移除耗时从771纳秒缩短至271纳秒)。

重构SelectServer中的告警处理代码,显著提升其性能。

变更内容:

  1. AlarmQueue的存储结构从set<Alarm*>切换为AdjustablePriorityQueue<Alarm>。此变更显著提升了警报处理速度,将添加/移除警报所需时间从771纳秒缩短至281纳秒。此变更避免了每次警报设置时对STL集合对象中红黑树节点的分配/释放操作,同时显著提升了缓存局部性(因AdjustablePriorityQueue采用向量实现堆结构而非红黑树),在每次selectserver循环处理AlarmQueue时触发的缓存行数量大幅减少。
  2. 将Alarmer中的AlarmList从hash_map转换为dense_hash_map,避免每次警报增删时额外分配/释放内存(此举同时提升了警报增删时的缓存局部性)。
  3. 移除了num_alarms_stat_num_closures_stat_这两个MinuteTenMinuteHourStat对象,以及对应的导出变量。虽然监控这些数据看似合理,但实际会给关键网络代码带来显著开销。若将这些变量保留为Atomic32类型而非MinuteTenMinuteHourStat,仍会使警报增删操作的成本从281纳秒增加至340纳秒。

基准测试结果

Benchmark                      Time(ns)  CPU(ns) Iterations
-----------------------------------------------------------
BM_AddAlarm/1                       902      771     777777

实施此变更后

Benchmark                      Time(ns)  CPU(ns) Iterations
-----------------------------------------------------------
BM_AddAlarm/1                       324      281    2239999
索引服务速度提升3.3倍!

2001年规划从磁盘索引切换至内存索引服务时,我们发现了多个性能问题。本次变更解决了其中多数问题,使每秒内存查询量从150次提升至500次以上(基于双处理器奔腾III机器上2GB内存索引的测试)。

  • 索引块解码速度大幅提升(微基准测试从8.9 MB/s提升至13.1 MB/s)。
  • 解码过程中新增块校验和机制,使所有getsymbol操作无需边界检查即可完成。
  • 采用粗糙的宏实现:将BitDecoder的各字段存储于局部变量中贯穿整个循环,并在循环末尾回写。
  • 通过内联汇编调用Intel芯片的’bsf’指令实现getUnary(查找字中首个1位的索引)
  • 将值解码至向量时,我们改为在循环外调整向量大小,仅沿向量移动指针,而非通过边界检查逐项存储值。
  • 文档ID解码过程中,将文档ID保存在局部文档ID空间,避免乘以num_shards_。仅在需要实际文档ID值时才进行num_shards_乘法运算并添加my_shard_。
  • IndexBlockDecoder现导出接口’AdvanceToDocid’,该接口返回首个≥“d”的文档ID索引。这使得扫描操作可基于本地文档ID进行,而非强制将每个本地文档ID转换为全局文档ID——当客户端为区块内每个索引调用GetDocid(index)时。
  • 文档位置数据的解码现改为按需执行,而非在客户端请求区块内任意文档的位置数据时就提前完成整个区块的解码。
  • 若解码的索引块在距页面边界4字节范围内结束,则将其复制到本地缓冲区。这确保我们始终能通过4字节加载操作加载位解码缓冲区,无需担心因mmap页面末尾溢出导致的段错误。
  • 仅初始化各类评分数据结构的前 nterms_ 个元素,而非全部 MAX_TERMS 元素(某些情况下,我们曾对每个评分文档进行 20K 至 100K 数据的冗余 memset 操作)。
  • 避免对计算值为0的中间评分值进行round_to_int转换及后续运算(后续运算仅会将memset的0覆盖为’0’,此类情况最为常见)。
  • 将评分数据结构的边界检查改为调试模式断言。

扩展阅读§

以下是作者认为有价值的性能相关书籍与文章(无特定排序):

推荐引用格式§

如需引用本文档,建议采用:

Jeffrey Dean & Sanjay Ghemawat, Performance Hints, 2025, https://abseil.io/fast/hints.html

或BibTeX格式:

@misc{DeanGhemawatPerformance2025,
  author = {Dean, Jeffrey and Ghemawat, Sanjay},
  title = {Performance Hints},
  year = {2025},
  howpublished = {\url{https://abseil.io/fast/hints.html}},
}

致谢§

众多同仁为本文档提供了宝贵反馈,包括:

  • Adrian Ulrich
  • Alexander Kuzmin
  • Alexei Bendebury
  • Alexey Alexandrov
  • Amer Diwan
  • Austin Sims
  • Benoit Boissinot
  • Brooks Moses
  • Chris Kennelly
  • Chris Ruemmler
  • 丹尼拉·库特宁
  • 达里尔·戈夫
  • 大卫·马杰纳默
  • 德米特里·维尤科夫
  • 伊曼纽尔·塔罗帕
  • 菲利克斯·布罗伯格
  • 弗朗西斯·伯克·莫雷拉
  • 吉迪恩·格拉斯
  • 亨里克·斯图维纽斯
  • 杰里米·多夫曼
  • 约翰·德思里奇
  • 库尔特·克吕弗
  • 凯尔·康拉德
  • 卢卡斯·佩雷拉
  • 马克·伊迪
  • 迈克尔·马蒂
  • 迈克尔·惠特克
  • 米尔恰·特罗芬
  • 米沙·布鲁克曼
  • 尼古拉斯·希勒格
  • 兰吉特·马修
  • 拉斯穆斯·拉森
  • 苏海尔·哈萨斯·叶加内
  • 斯尔丹·彼得罗维奇
  • 斯泰纳·H·冈德森
  • 斯特吉奥斯·斯特吉乌
  • 史蒂文·蒂莫西乌斯
  • 希尔万·维尼奥
  • 托马斯·埃特
  • 托马斯·科佩
  • 蒂姆·切斯纳特
  • 托德·利普康
  • 万斯·兰克哈尔
  • 维克多·科斯坦
  • 姚佐
  • 周芳
  • 杨祖光
元素周期表抱枕

本文由 TecHug 分享,英文原文及文中图片来自 Performance Hints

共有{43}精彩评论

  1. 希望谷歌能开源他们的gtl库。虽然其他地方也有类似工具,但没有同等稳定的质量和如此完善的集成方案。

    我尤其喜欢“如何处理扁平化配置文件”和“protobuf技巧”这两部分。如此精炼的实用建议在别处很难找到。

  2. 精彩文章。但愿更多人在思考性能优化时能秉持这种务实态度

    1. 我其实希望读者能持相反观点,或采取更平衡的立场。务实本身就是一种极端立场。尽管本文提供了难得一见的精炼实用建议,但读者需明白:这些方案是谷歌基于其庞大代码库在实际场景中验证有效的优化策略。

      文章探讨的内容远比“尽早考虑性能”更复杂。作为以此类优化为生的人,我目睹太多团队耗费精力微调代码路径,最终却未能带来可验证的实际价值。

      若将本文视为普适真理(实则不然),便极易陷入这种陷阱。

  3. 将这些宝贵信息整合在一页中实在帮大忙了。我常只关注其中几个方面而忽略其余内容。已收藏以提醒自己:性能优化远不止我所知的那些小技巧。

  4. 这种格式对我来说更直观。

      L1缓存访问                   2,000,000,000次/秒
      L2缓存访问                   333,333,333次/秒
      分支预测错误                    200,000,000次/秒
      互斥锁加/解锁(无竞争)      66,666,667次/秒
      主内存访问                20,000,000 次/秒
      Snappy压缩1KB数据        1,000,000 次/秒
      从SSD读取4KB数据                    50,000 次/秒
      同数据中心内往返传输    20,000 次/秒
      从内存顺序读取1MB数据    15,625 次/秒
      通过100Gbps网络读取1MB数据       10,000次/秒
      从SSD读取1MB数据                    1,000次/秒
      磁盘寻道                            200次/秒
      从磁盘顺序读取1MB数据      100次/秒
      发送数据包:CA→荷兰→CA      7次/秒
    
    1. 不过你的版本仅描述了串行操作的情形。例如消费级SSD每秒可执行百万次(或更多)操作而非5万次,加州与荷兰间每秒传输的数据包总数也远超7个——但要实现这些性能,必须利用并行处理技术。

      若倒数形式更符合您的认知,仍可表述为L1缓存访问耗时1/2,000,000,000秒。正是“每秒操作数”的表述方式使其呈现为吞吐量特征。

      延迟数据有个有趣现象:它们基本不受规模影响,而SSD总吞吐量或互联网带宽则分别取决于存储容量和网络架构规模。同样,CPU总吞吐量也会随核心数量变化。

      我认为思考“基准部署”的吞吐量(及容量等指标)仍有价值:这会影响架构决策,例如“能否在内存中完成?”“能否单服务器实现?”“需哪些优化来消除XYZ潜在瓶颈?”“资源X与Y哪个更稀缺?”等等。这类思考在《数据中心即计算机》中已有体现(https://pages.cs.wisc.edu/~shivaram/cs744-readings/dc-comput… 以及 https://books.google.com/books?id=Td51DwAAQBAJ&pg=PA72#v=one… ),其中以机器、机架和集群为单位。该图展示的是存储层次结构,未提及计算部分,且自2018年以来技术已大幅进步,但此类扩展表格仍可作为系统工程设计的有趣工具。

      1. > 但你的版本仅描述了串行操作的情形

        而这恰恰是直觉判断的基础——因为串行问题确实存在,当遭遇此类问题时,许多常识性认知会瞬间失效。

      2. > 例如消费级SSD每秒可处理百万次(甚至更多)操作,而非5万次

        “从SSD读取1MB数据”的条目对应更高的吞吐量(虽仍不及你暗示的水平,但“SSD”本身是涵盖从SATA接口设备到现今五代NVMe的宽泛类别);我推测“读取4KB”的时序测试实际描述的是单次独立页面读取,这种操作并行化难度较大。

      3. 精彩评论。我喜欢你用“基准部署的性能上限”这个表述,这正是我常说的性能天花板。实际操作中,若在实验室进行合成性能测试,建议尽量还原最佳现场环境,这样基准测试才有准确的参照基准。

    2. 你的建议混淆了延迟与吞吐量,因此并不准确。

      例如,现代CPU在等待缓存未命中时仍能执行其他指令,且可同时处理多个缓存加载请求(尤其在核心间共享缓存时)。

      主内存同样采用异步机制,每个内存通道可同时处理多个加载请求。其他层级亦然(如SSD可并行处理多个事务、网络可同时发送多个请求等)。

      现代计算机硬件层面的几乎所有操作都采用异步模式,常由多个单元协同处理“任务”执行。这种特性贯穿整个系统:从网络设备、SSD到CPU中的算术逻辑单元(ALU)皆是如此。

      现代CPU采用流水线架构(自90年代中后期起),因此可同时执行一条指令、解码下一条指令并回退(输出结果)前一条指令。但实际流水线远不止我列出的三个基本阶段,还能进行指令重排序、并行处理等操作。

      1. 我对此有所了解。是否存在各组件并行化程度的参考列表?我明白这种纸上谈兵大多徒劳,答案通常是“实际测试”,但还是好奇。

        最近面试时被问到如何实现网页爬虫,讨论到瓶颈问题(网络抓取页面、内容写入磁盘、解析响应的CPU占用等)和并行化方案时,我本想直接回答“我会通过测试找出瓶颈点,再迭代优化方案”。

        1. 餐巾纸计算能帮你避免耗费数周生命钻进最终徒劳的死胡同。没错,这只是粗略估算,但操作得当确实有效。

          你关于并行化程度的问题可惜过于模糊。固态硬盘提供某种内部并行性。需要更高并行度/IOPS?在机器上加装更多SSD即可。需要多台机器的SSD容量?那就解耦部署,但此时需考虑网络带宽、网卡性能、跨机延迟及容错机制。

          我见过最优秀的工程师,往往都是餐巾纸计算的高手。

    3. 我更倾向另一种编码方式:周期/操作

      无论是每秒操作数还是操作耗时秒数,都受时钟频率影响,而时钟频率会随机器差异及程序运行时长波动。

      据我所知,周期数(如_rdtsc)是衡量操作稳定性能最接近的指标。它能在不同时钟频率和架构的芯片间进行比较,并得出有意义的洞察。而每秒操作数或操作耗时秒数则无法做到这一点。

      1. 遗憾的是,深入研究后你会发现:每操作周期数其实没有想象中那么有意义。

        现代CPU大多采用乱序执行机制。这意味着虽然某次浮点运算可能需要4个周期完成,但若其前后夹杂加法、除法、乘法等指令,所有操作最终仍会大致同时结束。

        这使得精确推算特定操作序列的耗时变得困难。单独执行浮点乘法可能耗时4个周期,但若指令序列为:

            浮点乘法
            加法
            乘法
            除法
        

        整体同样可能耗时4个周期。这绝非简单相加“四条指令周期总和”就能得出。

        现实中,真正的瓶颈在于缓存和主内存访问。这一特性如此可靠,甚至成为SMT技术的基础。这也是现代CPU普遍采用该技术的原因。

        1. 我认同你的观点,但在我的论述语境中,我坚持认为“每操作周期数”仍是比“每秒操作数”更具意义的性能衡量标准。

          反驳挑刺…你提出的“若在指令间插入其他指令”的假设,默认了指令间不存在数据依赖关系。

          以你举例的指令集为例:

              FloatMul
              ADD
              MUL
              DIV
          

          没错…如果这些指令操作的是独立数据源,理论上可能在同一周期内完成,但在本文语境下(模拟一系列操作的性能特征),我们假设它们存在数据依赖关系,并将按顺序执行。

        2. 您的批评适用于测量单条或少量指令的情况。实际应用中,我们统计的是百万或十亿条指令的周期数。每周期指令数(CPI)具有重要意义,它是CPU核心架构师衡量吞吐量性能的主要指标。

    4. 我见过无数次这份清单,却始终惊讶于它竟未包含寄存器

      1. 寄存器移动操作对性能影响甚微,除非涉及向/从向量寄存器移动数据。

        1. H+P指出寄存器分配是最关键的优化手段之一——若非最重要的优化。

          1. 在CPU架构设计中确实如此,但这超出当前讨论范畴。你优化的C++库中,没有任何操作能通过寄存器分配/重命名影响性能。

            1. 此说法并不绝对。编译器虽擅长寄存器分配,但偶尔会出错,有时通过微调代码能优化分配方案从而提升性能。

              通常问题出在寄存器溢出位置不当,导致操作实际成为L1数据缓存访问,但本质问题依然存在。

              1. > l1d$

                我不明白这个术语的含义。

    5. 之所以不用这种格式,是因为它既无用又不真实。文章中的表格对优化者更有参考价值。那些每秒理论可执行次数的数据点,只适合营销团队。其他人要面对的是现实世界中数据集的实际读取情况,这些读取操作在时间分布上极不均匀。

  5. 有趣的是该博客仅更新至2023年。他们是被AI、Rust还是两者兼收并蓄了?

    与其参与AI开发,我宁愿被巨型甲壳类动物吞噬。

  6. 此处HN标题为“性能提示(2023)”,但该文实际于近期(2025年)才对外发布。(参见公告链接:https://x.com/JeffDean/status/2002089534188892256) 当然2023年是文档初创时间,但其中大部分内容更新于此之后。因此我认为在标题中添加“(2023)”略显误导。

    1. 若这些数据源自2023年的性能分析,其重要性显然高于外部发布时间。

      1. 该页面旨在提供编写高效代码的技巧。其中多数内容二十年前适用,二十年后依然有效。若“数据”特指“估算”章节中的表格(“基础底层操作的粗略成本”,该表格仅占页面总字数的0.5%以下),则该表格最初创建于2007年,截至2025年仍属最新数据。页面其他数据均标注了具体年份(如2001年等),因此2023年显然毫无关联。

    2. (好的,我们已将标题中的2023年补上)

    3. 出乎意料的是我并未添加2023,该年份可能在版主协助下与其他提交内容合并

      1. 备注:该内容可能被他人编辑修改。

  7. 部分内容可简化为基础形式——即通过实际操作微控制器在合理规模下实践。无需实时操作系统或Linux等复杂系统,仅需无操作系统的微控制器。通过学习其内部指令获取架构、掌握时序特性,观察引入SD卡等外部存储器时延迟数值的增长规律。掌握汇编程序输出分析能力,理解指令在流水线中的累加过程尤为重要——至少能洞悉系统运行机制。这将使你更容易将同样严谨的思维方式应用于此——这正是整个优化游戏的精髓所在:优化时间消耗与数据处理的匹配。否则,当有人告诉你某个操作耗时纳秒或微秒时,你会感到陌生,因为你通常不会接触到需要定期计算时钟周期的环境。所以请将此视为学习机会。

    1. 但需谨记:切勿将相同技术盲目套用至移动端或桌面级CPU及更高架构。

      许多代码在追求指令数量极致时反而会降低性能——虚假数据依赖性的引入会损害指令级并行性和微代码优化效果。

      编译器在此领域的表现几乎永远优于人类。

      1. 若要求人类用汇编语言编写完整程序,编译器的表现将远超人类。即便人类能编写出规模可观的汇编程序,其质量也远逊于编译器生成的代码。这是事实。

        然而,这并不意味着查看生成的汇编代码——甚至亲自编写汇编——毫无价值!你无法在全局层面超越编译器,但局部优化依然可行!若能精准定位瓶颈所在,并优化关键函数,这将为你和你的程序带来倍增效应。

        1. 这绝对不是徒劳,我常以此诊断各类问题。但手写代码实际跑得更快的情况极其罕见。

        2. 嘿,完全跑题了,你在做体素游戏/引擎吗?

          1. 没错,你早就知道我了哈哈,我们一直在Discord上聊天 😛

      2. “通过精简指令数量能让大量代码变差”

        能解释这句话的意思吗?

        1. 传统微优化方法是分析生成的汇编代码,试图用更少指令实现相同功能。但现代CPU支持多指令并行执行(乱序执行),该机制依赖于检测指令间的数据依赖关系。

          这意味着指令序列越短未必越快,反而可能导致CPU无谓地停顿。

          最快的指令序列是能最充分利用CPU资源的序列。

        2. 我曾这样操作:发现某个热点循环中,通过在循环内添加分支指令能减少指令数量。虽然速度变慢(这在预期中),但值得进行性能测量。

      3. 重点不在于超越编译器——而在于熟练掌握时钟周期消耗的测量方法,为此你首先需要理解时钟周期的计时尺度。没人要求你用汇编重写程序,但面对指令时,你应当大致了解其执行过程及数据来源。不同总线的读取操作意味着不同的时序。

        编译器同样会出错,甚至可能生成严重错误的代码。不过这属于另一个话题了。

        1. 精彩的纠偏总结。

          “编译器能完成各种精妙的优化,但也可能极其愚蠢”

          -迈克·阿克顿,CPPCON 2014

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

你也许感兴趣的: