这是我在很早之前遇到的一个题,很有意思,所以到现在仍然记得。题意借用了 TCP 的上下文,要求实现 TCP 中一个“顺序组装”的关键逻辑:
- 对于 TCP 层来说,IP 层的 packet 是乱序的收到。
- 对于应用层来说,TCP 层交付的是顺序的数据。
这个题有意思的点在于,借用了 TCP 的上下文之后,就可以先和候选人讨论一些 TCP 的基础知识,然后话锋一转,引出这道题。这样既可以考察一些基础知识,也可以考察工程代码能力。
题目
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| struct Packet { size_t offset; size_t length; uint8_t *data; };
class TCP { size_t read(void *buf, size_t count); void receive(Packet* p); void finish(); };
|
作者:木鸟杂记 https://www.qtmuniao.com/2024/05/05/infra-interview-tcp 转载请注明出处
讨论
由于多少和“实际”有些关联,所以本题目中有相当多的可以讨论的点,反过来说,如果你不进行合适的建模和简化,实现起来复杂度会非常高——四十五分钟是写不完的。
对于 Packet 结构体:
- offset 是否会有交叠?(只要发送数据足够多,耗尽 size_t 的范围就有可能发生)
- length 的长度是固定的还是变长的?
对于 read 调用:
TCP:read()
是否阻塞?如果是阻塞的,是否要阻塞到凑齐要求大小(count)的数据才能返回,还是只要有一部分数据就立即返回?
- 如果
TCP::finish()
后,TCP:read()
的返回值是什么?
对于内存问题:
TCP::read
中给应用层中 buf 填充数据时,是否要进行拷贝?
TCP::receive
中 Packet::data
的内存是否要在 TCP 类中释放?
实现
这本质上是一个生产者消费者问题。我们需要维护一个线程安全的有序数据结构,生产者(TCP::receive
)往里面放数据,消费者(TCP::read
)从里面取数据。要求是:乱序放、顺序取、可切分。
为了简化实现,可以和面试官做如下设定。
对于 Packet 结构体:
- offset 没有交叠
- length 变长
对于 read 是否阻塞问题,可以设定:
- read 函数是阻塞的
- read 只要收到一部分数据,即使没达到 count,也可以立即返回
对于内存问题,可以设定:
- TCP 类不负责收到的数据的生命周期,但要求调用者保证 TCP 整个生命周期中 packet 中的数据都是有效(不能被释放)的。
- 给应用层的数据会拷贝到用户提供的 buf 中。
以下是代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| struct Packet{ Packet(size_t off, size_t len, uint8_t *data): offset(off), length(len), data(data) {} size_t offset; size_t length; uint8_t *data; };
class TCP { public: TCP() { nextOffset_ = 0; finished_ = false; }
size_t read(void *buf, size_t count) { std::unique_lock<std::mutex> lock(mu_);
size_t leftBytes = count; while (leftBytes > 0) { if (!packets_.empty()) { size_t offset = packets_.begin()->first; auto* p = packets_.begin()->second; if (offset == nextOffset_) { packets_.erase(offset);
size_t len = std::min(p->length, leftBytes); std::memcpy(buf, p->data, len); leftBytes -= len; nextOffset_ += len;
p->length -= len; if (p->length > 0) { p->data += len; p->offset += len; packets_[p->offset] = p; }
return count-leftBytes; } } else if (finished_) { break; }
cv_.wait(lock); }
return 0; }
void receive(Packet* p) { std::unique_lock<std::mutex> lock(mu_); packets_[p->offset] = p; cv_.notify_one(); }
void finish() { std::unique_lock<std::mutex> lock(mu_); finished_ = true; cv_.notify_one(); }
private: std::mutex mu_; std::condition_variable cv_; size_t nextOffset_; bool finished_; std::map<uint64_t, Packet*> packets_; };
|
核心实现点包括:
- 使用
std::map
组织所有 IP 层封包
- 使用
nextOffset_
来记录需要交付给应用层下一个偏移量
- 如果包被切分了,剩余的包记得放回去
测试
本题目的测试也有些意思:
- 如何模拟乱序
- 如何控制所有 IP 封包中的 data 生命周期
- 如何对应用层收到的数据进行校验
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| TCP tcp;
void producer(uint8_t* data) { std::random_device rd; std::mt19937 gen(rd()); std::uniform_int_distribution<> dis(50, 99);
std::cout << "construct data..." << std::endl; std::vector<Packet*> packets(100); size_t offset = 0; for (size_t i = 0; i < 100; ++i) { size_t randLen = dis(gen); packets[i] = new Packet(offset, randLen, data); data += randLen; offset += randLen; }
std::cout << "total " << offset << " bytes" << std::endl; std::cout << "make the data unordered..." << std::endl; std::shuffle(packets.begin(), packets.end(), gen);
std::cout << "give it to tcp..." << std::endl; for (size_t i = 0; i < 100; ++i) { tcp.receive(packets[i]); std::cout << "receive data [" << packets[i]->offset << " ," << packets[i]->offset+packets[i]->length<< "): " << packets[i]->length << " bytes" << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(100)); } tcp.finish(); }
void consumer(uint8_t* data) { size_t nBytes = 0; uint8_t buf[100];
size_t offset = 0; while ((nBytes = tcp.read(buf, 50)) > 0) { auto diff = std::memcmp(data+offset, buf, nBytes); std::stringstream ss; ss << "read data [" << offset << " ," << offset+nBytes << "): " << nBytes << " bytes"; if (!diff) { ss << "; verify ok"; } else { ss << "; verify bad"; } std::cout << ss.str() << std::endl;
offset += nBytes; } std::cout << "read finish" << std::endl; }
int main() { uint8_t buffer[10000]; for (size_t i = 0; i < 10000; ++i) { buffer[i] = i & 0xff; } std::thread c(consumer, buffer); std::thread p(producer, buffer);
p.join(); c.join(); }
|
如果你想自己跑下代码,可以在这里自取。
最后
如果还有时间,面试官可能会跟你讨论些,如果取消你做的那些假设,需要怎么样实现,不过只要能做过基础版,剩下的就都是加分项了。
本文出自我的专栏《系统日知录》的 infra 程序员面试题系列,本系列集合了我多年来面试和被面试遇到的一些精华题目。当下一共更新了七篇:阻塞队列、无锁队列、事件队列、哈希表、顺序组装、堆、前缀树。如果大家感兴趣,之后还会继续更新。
专栏目前有一百多篇文章。如果大家觉得我的文章写的不错,欢迎大家留言、订阅支持我,你们的支持是我持续创作的最大动力。
我是青藤木鸟,一个喜欢摄影、专注大规模数据系统的程序员,欢迎关注我的公众号:“木鸟杂记”,有更多的分布式系统、存储和数据库相关的文章,欢迎关注。
关注公众号后,回复“资料”可以获取我总结一份分布式数据库学习资料。
回复“优惠券”可以获取我的大规模数据系统付费专栏《系统日知录》的八折优惠券。
我们还有相关的分布式系统和数据库的群,可以添加我的微信号:qtmuniao,我拉你入群。加我时记得备注:“分布式系统群”。
另外,如果你不想加群,还有一个分布式系统和数据库的论坛(点这里),欢迎来玩耍。