木鸟杂记

大规模数据系统

Infra Interview: Data Structures v — In-Order Assembly

This is a problem I encountered a long time ago. It was quite interesting, so I still remember it to this day. The problem borrows the context of TCP and asks you to implement a key piece of TCP logic: “in-order assembly”:

  1. From the TCP layer’s perspective, IP layer packets are received out of order.
  2. From the application layer’s perspective, data delivered by the TCP layer is in order.

What’s interesting about this problem is that by borrowing the TCP context, you can first discuss some TCP fundamentals with the candidate, then pivot to introduce this problem. This way, you can test both foundational knowledge and engineering coding skills.

Problem

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
struct Packet {
size_t offset;
size_t length;
uint8_t *data;
};

// 实现一个“顺序交付”语义
class TCP {
// 应用层调用:按顺序读取不超过 count 的字节数到 buf 中,并返回实际读到的字节数
size_t read(void *buf, size_t count);
// TCP 层回调:得到一些随机顺序的 IP 层封包
void receive(Packet* p);
// TCP 层回调:数据发完,连接关闭
void finish();
};

作者:木鸟杂记 https://www.qtmuniao.com/2024/05/05/infra-interview-tcp 转载请注明出处

Discussion

Because this problem is somewhat grounded in “reality,” there are quite a few points worth discussing. Conversely, without proper modeling and simplification, the implementation complexity would be very high—you couldn’t finish it in forty-five minutes.

Regarding the Packet struct:

  1. Will offsets overlap? (This can happen if enough data is sent to exhaust the range of size_t.)
  2. Is the length fixed or variable?

Regarding the read call:

  1. Is TCP::read() blocking? If it is blocking, should it block until it has accumulated count bytes before returning, or should it return immediately as soon as some data is available?
  2. What does TCP::read() return after TCP::finish() is called?

Regarding memory issues:

  1. When TCP::read fills data into the application layer’s buf, should it copy the data?
  2. Should the memory for Packet::data in TCP::receive be freed within the TCP class?

Implementation

This is essentially a producer-consumer problem. We need to maintain a thread-safe ordered data structure: the producer (TCP::receive) puts data into it, and the consumer (TCP::read) takes data out of it. The requirements are: out-of-order insertion, in-order retrieval, and splittable.

To simplify the implementation, you can agree on the following assumptions with the interviewer.

Regarding the Packet struct:

  1. Offsets do not overlap.
  2. Variable length.

Regarding whether read blocks, we can assume:

  1. The read function is blocking.
  2. read returns immediately once some data is received, even if it hasn’t reached count.

Regarding memory, we can assume:

  1. The TCP class is not responsible for the lifecycle of received data, but the caller must ensure that the data in each packet remains valid (not freed) throughout the entire lifetime of the TCP object.
  2. Data delivered to the application layer will be copied into the user-provided buf.

Here is the code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
struct Packet{
Packet(size_t off, size_t len, uint8_t *data):
offset(off), length(len), data(data) {}
size_t offset;
size_t length;
uint8_t *data;
};

class TCP {
public:
TCP() {
nextOffset_ = 0;
finished_ = false;
}

size_t read(void *buf, size_t count) {
std::unique_lock<std::mutex> lock(mu_);

size_t leftBytes = count;
while (leftBytes > 0) {
if (!packets_.empty()) {
size_t offset = packets_.begin()->first;
auto* p = packets_.begin()->second;
if (offset == nextOffset_) {
// fetch the packet
packets_.erase(offset);

// copy to the user buf
size_t len = std::min(p->length, leftBytes);
std::memcpy(buf, p->data, len);
leftBytes -= len;
nextOffset_ += len;

// put back the left data
p->length -= len;
if (p->length > 0) {
p->data += len;
p->offset += len;
packets_[p->offset] = p;
}

return count-leftBytes;
}
} else if (finished_) {
break;
}

cv_.wait(lock);
}

// finished
return 0;
}

void receive(Packet* p) {
std::unique_lock<std::mutex> lock(mu_);
packets_[p->offset] = p;
cv_.notify_one();
}

void finish() {
std::unique_lock<std::mutex> lock(mu_);
finished_ = true;
cv_.notify_one();
}

private:
std::mutex mu_;
std::condition_variable cv_;
size_t nextOffset_;
bool finished_;
std::map<uint64_t, Packet*> packets_;
};

Key implementation points include:

  1. Use std::map to organize all IP layer packets.
  2. Use nextOffset_ to track the next offset to deliver to the application layer.
  3. If a packet is split, remember to put the remainder back.

Testing

Testing for this problem is also interesting:

  1. How to simulate out-of-order delivery.
  2. How to manage the lifecycle of data in all IP packets.
  3. How to verify the data received by the application layer.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
TCP tcp;

void producer(uint8_t* data) {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(50, 99);

std::cout << "construct data..." << std::endl;
std::vector<Packet*> packets(100);
size_t offset = 0;
for (size_t i = 0; i < 100; ++i) {
size_t randLen = dis(gen);
packets[i] = new Packet(offset, randLen, data);
data += randLen;
offset += randLen;
}

std::cout << "total " << offset << " bytes" << std::endl;
std::cout << "make the data unordered..." << std::endl;
std::shuffle(packets.begin(), packets.end(), gen);

std::cout << "give it to tcp..." << std::endl;
for (size_t i = 0; i < 100; ++i) {
tcp.receive(packets[i]);
std::cout << "receive data [" << packets[i]->offset << " ,"
<< packets[i]->offset+packets[i]->length<< "): "
<< packets[i]->length << " bytes" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
tcp.finish();
}

void consumer(uint8_t* data) {
size_t nBytes = 0;
uint8_t buf[100];

size_t offset = 0;
while ((nBytes = tcp.read(buf, 50)) > 0) {
auto diff = std::memcmp(data+offset, buf, nBytes);
std::stringstream ss;
ss << "read data [" << offset << " ," << offset+nBytes << "): "
<< nBytes << " bytes";
if (!diff) {
ss << "; verify ok";
} else {
ss << "; verify bad";
}
std::cout << ss.str() << std::endl;

offset += nBytes;
}
std::cout << "read finish" << std::endl;
}

int main() {
uint8_t buffer[10000];
for (size_t i = 0; i < 10000; ++i) {
buffer[i] = i & 0xff;
}
std::thread c(consumer, buffer);
std::thread p(producer, buffer);

p.join();
c.join();
}

If you want to run the code yourself, you can grab it here.

Final Notes

If there’s still time, the interviewer might discuss with you how to implement it if the assumptions you made are removed. But as long as you can solve the basic version, the rest is all bonus points.

This article is from my column “System Daily Notes” in the infra programmer interview question series. This series collects some essential problems I’ve encountered over the years while interviewing and being interviewed. So far, seven articles have been published: blocking queue, lock-free queue, event queue, hash table, in-order assembly, heap, and trie. If you’re interested, more will be updated in the future.

The column currently has over a hundred articles. If you find my writing helpful, feel free to leave comments and subscribe to support me. Your support is my greatest motivation to keep creating.


我是青藤木鸟,一个喜欢摄影、专注大规模数据系统的程序员,欢迎关注我的公众号:“木鸟杂记”,有更多的分布式系统、存储和数据库相关的文章,欢迎关注。 关注公众号后,回复“资料”可以获取我总结一份分布式数据库学习资料。 回复“优惠券”可以获取我的大规模数据系统付费专栏《系统日知录》的八折优惠券。

我们还有相关的分布式系统和数据库的群,可以添加我的微信号:qtmuniao,我拉你入群。加我时记得备注:“分布式系统群”。 另外,如果你不想加群,还有一个分布式系统和数据库的论坛(点这里),欢迎来玩耍。

wx-distributed-system-s.jpg