Infra 面试之数据结构五:顺序组装

这是我在很早之前遇到的一个题,很有意思,所以到现在仍然记得。题意借用了 TCP 的上下文,要求实现 TCP 中一个“顺序组装”的关键逻辑:

  1. 对于 TCP 层来说,IP 层的 packet 是乱序的收到。
  2. 对于应用层来说,TCP 层交付的是顺序的数据。

这个题有意思的点在于,借用了 TCP 的上下文之后,就可以先和候选人讨论一些 TCP 的基础知识,然后话锋一转,引出这道题。这样既可以考察一些基础知识,也可以考察工程代码能力。

题目

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
struct Packet {
size_t offset;
size_t length;
uint8_t *data;
};

// 实现一个“顺序交付”语义
class TCP {
// 应用层调用:按顺序读取不超过 count 的字节数到 buf 中,并返回实际读到的字节数
size_t read(void *buf, size_t count);
// TCP 层回调:得到一些随机顺序的 IP 层封包
void receive(Packet* p);
// TCP 层回调:数据发完,连接关闭
void finish();
};

作者:木鸟杂记 https://www.qtmuniao.com/2024/05/05/infra-interview-tcp 转载请注明出处

讨论

由于多少和“实际”有些关联,所以本题目中有相当多的可以讨论的点,反过来说,如果你不进行合适的建模和简化,实现起来复杂度会非常高——四十五分钟是写不完的。

对于 Packet 结构体:

  1. offset 是否会有交叠?(只要发送数据足够多,耗尽 size_t 的范围就有可能发生)
  2. length 的长度是固定的还是变长的?

对于 read 调用:

  1. TCP:read() 是否阻塞?如果是阻塞的,是否要阻塞到凑齐要求大小(count)的数据才能返回,还是只要有一部分数据就立即返回?
  2. 如果TCP::finish() 后,TCP:read() 的返回值是什么?

对于内存问题:

  1. TCP::read 中给应用层中 buf 填充数据时,是否要进行拷贝?
  2. TCP::receivePacket::data 的内存是否要在 TCP 类中释放?

实现

这本质上是一个生产者消费者问题。我们需要维护一个线程安全的有序数据结构,生产者(TCP::receive)往里面放数据,消费者(TCP::read)从里面取数据。要求是:乱序放、顺序取、可切分。

为了简化实现,可以和面试官做如下设定。

对于 Packet 结构体:

  1. offset 没有交叠
  2. length 变长

对于 read 是否阻塞问题,可以设定:

  1. read 函数是阻塞的
  2. read 只要收到一部分数据,即使没达到 count,也可以立即返回

对于内存问题,可以设定:

  1. TCP 类不负责收到的数据的生命周期,但要求调用者保证 TCP 整个生命周期中 packet 中的数据都是有效(不能被释放)的。
  2. 给应用层的数据会拷贝到用户提供的 buf 中。

以下是代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
struct Packet{
Packet(size_t off, size_t len, uint8_t *data):
offset(off), length(len), data(data) {}
size_t offset;
size_t length;
uint8_t *data;
};

class TCP {
public:
TCP() {
nextOffset_ = 0;
finished_ = false;
}

size_t read(void *buf, size_t count) {
std::unique_lock<std::mutex> lock(mu_);

size_t leftBytes = count;
while (leftBytes > 0) {
if (!packets_.empty()) {
size_t offset = packets_.begin()->first;
auto* p = packets_.begin()->second;
if (offset == nextOffset_) {
// fetch the packet
packets_.erase(offset);

// copy to the user buf
size_t len = std::min(p->length, leftBytes);
std::memcpy(buf, p->data, len);
leftBytes -= len;
nextOffset_ += len;

// put back the left data
p->length -= len;
if (p->length > 0) {
p->data += len;
p->offset += len;
packets_[p->offset] = p;
}

return count-leftBytes;
}
} else if (finished_) {
break;
}

cv_.wait(lock);
}

// finished
return 0;
}

void receive(Packet* p) {
std::unique_lock<std::mutex> lock(mu_);
packets_[p->offset] = p;
cv_.notify_one();
}

void finish() {
std::unique_lock<std::mutex> lock(mu_);
finished_ = true;
cv_.notify_one();
}

private:
std::mutex mu_;
std::condition_variable cv_;
size_t nextOffset_;
bool finished_;
std::map<uint64_t, Packet*> packets_;
};

核心实现点包括:

  1. 使用 std::map 组织所有 IP 层封包
  2. 使用 nextOffset_ 来记录需要交付给应用层下一个偏移量
  3. 如果包被切分了,剩余的包记得放回去

测试

本题目的测试也有些意思:

  1. 如何模拟乱序
  2. 如何控制所有 IP 封包中的 data 生命周期
  3. 如何对应用层收到的数据进行校验
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
TCP tcp;

void producer(uint8_t* data) {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(50, 99);

std::cout << "construct data..." << std::endl;
std::vector<Packet*> packets(100);
size_t offset = 0;
for (size_t i = 0; i < 100; ++i) {
size_t randLen = dis(gen);
packets[i] = new Packet(offset, randLen, data);
data += randLen;
offset += randLen;
}

std::cout << "total " << offset << " bytes" << std::endl;
std::cout << "make the data unordered..." << std::endl;
std::shuffle(packets.begin(), packets.end(), gen);

std::cout << "give it to tcp..." << std::endl;
for (size_t i = 0; i < 100; ++i) {
tcp.receive(packets[i]);
std::cout << "receive data [" << packets[i]->offset << " ,"
<< packets[i]->offset+packets[i]->length<< "): "
<< packets[i]->length << " bytes" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
tcp.finish();
}

void consumer(uint8_t* data) {
size_t nBytes = 0;
uint8_t buf[100];

size_t offset = 0;
while ((nBytes = tcp.read(buf, 50)) > 0) {
auto diff = std::memcmp(data+offset, buf, nBytes);
std::stringstream ss;
ss << "read data [" << offset << " ," << offset+nBytes << "): "
<< nBytes << " bytes";
if (!diff) {
ss << "; verify ok";
} else {
ss << "; verify bad";
}
std::cout << ss.str() << std::endl;

offset += nBytes;
}
std::cout << "read finish" << std::endl;
}

int main() {
uint8_t buffer[10000];
for (size_t i = 0; i < 10000; ++i) {
buffer[i] = i & 0xff;
}
std::thread c(consumer, buffer);
std::thread p(producer, buffer);

p.join();
c.join();
}

如果你想自己跑下代码,可以在这里自取。

最后

如果还有时间,面试官可能会跟你讨论些,如果取消你做的那些假设,需要怎么样实现,不过只要能做过基础版,剩下的就都是加分项了。

本文出自我的专栏《系统日知录》的 infra 程序员面试题系列,本系列集合了我多年来面试和被面试遇到的一些精华题目。当下一共更新了七篇:阻塞队列、无锁队列、事件队列、哈希表、顺序组装、堆、前缀树。如果大家感兴趣,之后还会继续更新。

专栏目前有一百多篇文章。如果大家觉得我的文章写的不错,欢迎大家留言、订阅支持我,你们的支持是我持续创作的最大动力。


我是青藤木鸟,一个喜欢摄影、专注大规模数据系统的程序员,欢迎关注我的公众号:“木鸟杂记”,有更多的分布式系统、存储和数据库相关的文章,欢迎关注。 关注公众号后,回复“资料”可以获取我总结一份分布式数据库学习资料。 回复“优惠券”可以获取我的大规模数据系统付费专栏《系统日知录》的八折优惠券。

我们还有相关的分布式系统和数据库的群,可以添加我的微信号:qtmuniao,我拉你入群。加我时记得备注:“分布式系统群”。 另外,如果你不想加群,还有一个分布式系统和数据库的论坛(点这里),欢迎来玩耍。

wx-distributed-system-s.jpg