This note is to reocrd the source code analysis of Dragonfly, which is the program that I participate during the 2020 summer as the remote internship. Thanks Alibaba for this great opportunity.
This blog (how to read source code?) helps me a lot from the perpective of theory of source code reading. While for the methodology, I will first locate the interested part of the source code, and then using githistory.xyz tool to keep track of the code from the very beginning of the development. With the help of the githistory.xyz, I could find the associated code together with the part that I’m focused on by looking at every commit of the file.
Linus说: “烂程序员关心的是代码。好程序员关心的是数据结构和它们之间的关系。”
Overrall
This section includes my understanding of the entire Dragonfly framework.
Dragonfly P2P Downloader
The entrance of the p2p downloader lies at the dfget/core/downloader/downloader.go
file. The file will call the main function of the p2p downloader inside the ./p2p_downloader/p2p_downloader.go
file, whcih is the run()
method. Here is the psuedocode of the method.
1 | func (p2p * P2PDownloader) run(ctx context.Context, pieceWriter PieceWriter) error { |
And that’s all about the downloader. After the Dragonfly has successfully download the piece and push it to the writer queue, the pieceWriter
will start to write the piece to the local disk and the target file.
Note that the two files are different: the first one is to be sure located in the local file system, while for the later one, it may reside in the cluod disk, or another disk in local disk. Actually, there are two writers which corresponds to them: client_writer
and target_writer
. In this case, the writing process would be the network IO, and that will be the bottleneck of the whole system.
Proposal of ASoC 2020 Dragonfly
Backgrounds
As mentioned in the #1164, the current download process would be a defect if the Dragonfly client is running on a cloud disk: the local file system is not the target place, and the downloader would have to write the file one more time to another file system. This is kind of like the sendfile
function.
Idea
Remove the redundency of writing the file to the local file system. Directly upload the data to the target place by storing the data in memory as the sliding window in TCP protocol. Please look at #1164 to get more information about the great idea.
Implementation Plan
Design of Supernode Scheduler
The key point is to maintain the sender window, and to verify the stream mode is on.
Add
wnd
toapis/types/task_create_request.go:18#TaskCreateRequest
: The task status would be initialized to progress manager during the task registry step.By default, the window size is set to -1, which means that the file is downloaded in regular mode.
Add new struct
SlidingWindow
to the progress manager struct, which has the fieldswnd
(window size) anduna
(oldest unacknowledged sequence number RFC 793).Add
wnd
tosupernode/daemon/mgr/progress/progress_manager.go:52#manager
struct. During the registry process of task, the progress manager is initialized. Update theSlidingWindow
field of progress manager.By default, the SlidingWindow is nil, which means that the file is downloaded in regular mode.
Modify the behavior of
GetPieceProgressByCID
method of progress manager based on theSlidingWindow
field: only return the piece info within the window range.Update the
una
field ofSlidingWindow
in progress manager in theupdateTask
method of task manager.
Design of Uploader
Since there could be multiple tasks running at the same time, which means that one peer server could have multiple client stream writers at the same time, it is necessary to add
csw []*ClientStreamWriter
field to the currentpeerServer
struct indfget/core/uploader/peer_server.go
file. [It seems that this implementation would make the downloader and uploader tightly coupled. Optimization should be considered.]Add Locker for the cache field in
ClientStreamWriter
struct indfget/core/downloader/p2p_downloader/client_stream_writer.go
file.Extract
PeerServer
launch process fromregisterToSuperNode()
method to theStart()
method.And register the client stream writer to peer server after the download process is created in
Start()
method.The
Start()
method lies at thedfget/core/core.go
file.Separate the stream uploading branch and normal uploading branch apart in the
uploadHandler()
ofdfget/core/uploader/peer_server.go
file. Add new interface to handle the different behavior ofgetTaskFile
anduploadPiece
method.
Any recommendations and comments are welcome! Thank you.
ASoC
This section includes my understanding of the part associated with the ASoC program. There will be a lot of details compared to the Overrall section.
IPC in Golang
Reference
Summary
- 完善cache GC
完善文件下载流程
通过类似pipe的进程间通信手段来实现流式传输完善 dfget & supernode 之间关于流式传输的交互- 添加单元测试
跑通联调测试
本周我的主要工作是为流式传输添加了联调测试,并且成功得到了下载内容。通过该测试可以保证downloader方面的流式传输,以及其与supernode的交互没有问题。
我另外的工作为修改了滑动窗口的piece status更新流程。目前的设计为,当piece下载完成之后,更新其状态为success
Question
- 关于将stdout作为dfget的管道出口 -> printer
- 讨论关于piece status的状态流转
- 关于联调测试中多个dfget协作的测试方法
Current Uploader
UploaderHandler
The whole file is stored in the file system. When the uploader is going to fetch the part of the piece, it will read the part of the file. In the URL given by the another peer which request the piece, it will give the following information:
- padSize: default size in the current architecture, which is 4 bytes in head, 1 byte in tail
- Start: start index of the file
- length: read length
- PieceSize
- PieceNum
By the above parameters, the starting piece could be located. It should also be noted that in CDN type, there is no pad, while for the p2p pattern, every piece includes the meta header and tail. What’s more, the meta header of the piece includes the size of the piece, along with the piece size. By using the meta data, we can locate the starting piece number.
Implementation of Cache
关于P2P流式传输在客户端方面的实现:基于目前非常漂亮的系统设计,我考虑在客户端dfget/core新添加一个模块streamcache。该模块用于
- 从downloader处接受下载完成的数据
- 与uploader进行交互分享数据
- 根据碎片的下载情况以及缓存窗口的大小按照LRU策略进行更新缓存
- 向supernode汇报缓存中的碎片信息
这样子设计的话不会破坏目前的模块功能:uploader单一负责查询数据上传,downloader单一负责下载数据,streamcache负责碎片的更新维护以及存储。关于新模块的具体实现,我考虑将cache作为一个单例实体存在于新的模块,暴露API与其它模块进行交互。
Challenge
- the cache cannot exist as the API, since it has states like the cache window, which is different from the stateless HTTP request and response.
- the cache cannot exist as the single module just like the downloader. that’s because the uploader exists as the single process which is responsible for all other tasks.
Based on the upon reasons, the cache must be included in the peer server struct. What’s more, the communication between the downloader and the uploader, should be finished by HTTP request.
Consideration
The reason why I don’t want to implement the cache manager in the client stream writer:
Two caches would exist at the same time, and the cache scheduling result has to be passed to the uploader, which causes unnecessary HTTP request. But if the cache management is implemented in uploader, the downloader would only need to pass the piece content, and leave the cache management work to uploader.
And then, uploader calls the cache module to do the management work.
Why the content of cache is not sync to the file system at last?
The refactor in downloader and doDownloadTimeoutTask.
Summary
fill out the API frame work, create the interface
regist: regist the information of cache to the uploader by dfget [*]add API for dfget indfget/core/api/uploader_api.go:100
add API for uploader to register the cache windowdfget/core/uploader/peer_server.go:286
pass pieces between downloader and uploader [**]Take the current implementation of the CDN stream downloader as the reference atsupernode/daemon/mgr/cdn/manager.go:121
add receivePiece API for uploader [*]
Upload piece handler [*]The current URL is using the file name to locate the resource to share. And according to the annotation of
apis/types/df_get_task.go:47
, the URL is set during the task registration, and it is provided by peer which initializes the task. And what should we do about this?LRU cache [***]- API for update cache state between supernode and uploader cache manager
- GC
Questions
- 关于cache的GC应该如何嵌入目前的serverGC
dfget/core/uploader/uploader.go:171
目前下载的准备阶段会进行临时文件和目标文件的创建,这些将来会被用于碎片分享、serverGC,而且通过文件名以及主机地址作为P2P网络的URL资源。
-> 流式传输在这个阶段创建空文件 - 关于目前的流式传输,当文件下载完成之后的行为
- 优化缓存的更新流程:首次成功的碎片直接进入缓存,而不再需要首先发送successful
之后当从缓存中删除的时候,再发送pieceUpdate信息
TODO
- 完善cache GC
完善文件下载流程
通过类似pipe的进程间通信手段来实现流式传输完善 dfget & supernode 之间关于流式传输的交互- 添加单元测试
跑通联调测试
Implementation of Supernode Scheduler
Questions
inside
supernode/daemon/mgr/progress/progress_manager.go:103
, would supernode use the p2p streaming mode?Personally speaking, I think it won’t.
What should be the key of slidingWindowStates in progress manager?
taskID, or clientID?
Currently I use the taskID because it is also used as the key of piece info in piece progress manager.While in fact, the taskID is the mapping to one file in P2P network, which means that there could be multiple clients downloading the same file at the same time.Should we only consider the P2P network with purely streaming download, or the hybrid mode?
Supernode Scheduler
First of all, we should understand the progress state regarding the download process. The supernode state maintains the piece bitSet of the given CID. If the bit is set, it means that the supernode has the location of the piece: the piece is available. And the client could download the piece if it is set in supernode state. During the downloading process, it would be put into the clientState running bitset. Once the downloading is finished, the piece would be removed from that bitset, and put to the piece bitset of the client state, with the mark of success
[this is achieved by set specific bit of the piece status].
While the clientState not only maintains the piece status of CID, but also the runningPiece, which means the pieces that are currently downloaded from desCID to srcCID.
PR
This PR is part of the work proposed in #1436, which implement the P2P stream mode in supernode side. To summarize the work, the streaming download in supernode could be cateloged into two sections.
Maintain the sliding window in supernode.
- Initialize: The size of window will be registered in the dfget task registration phrase. And then, the window state is recorded in the ProgressManager as the SyncMap indexed by ClientID. The size of window is staic once it is recorded by supernode.
- Update: The dfget client will report to the supernode about the result of downloading for every piece. In the
report
API of supernode server, the window will be updated based on the received piece number and piece status. To be specific, the window keeps sliding until the unacknowledged piece.- Finish: Update the
DeleteCID
method of ProgressManager to delete the sliding window state if it is in stream mode.Schedule the pieces according to the window state
The only modification that I made in this part lies at the
GetPieceProgressByCID
method of ProgressManager. The available pieces in regular mode means the success pieces which are not running; while for the stream mode, the available pieces means the cached pieces which are not running. After the modification, the ProgressManager would only return the available pieces inside the window when stream mode is on.It should be noted that, the scheduler in stream mode currently uses the same scheduler as the regular mode, which may demands future optimization.
Besides the above modifications, I also add two APIs in supernode server, which are
addPieceCache
anddeletePieceCache
. The dfget client calls these APIs to notify the supernode about the state of cache.
Discussion with Developer from Ant
问题
- 目前蚂蚁的流式传输中碎片的调度策略
- 客户端cache的碎片是怎样的范围?仅有窗口内的,或者是?
目前关于流式传输在服务器端的实现基本已经完成:
- 需要修改以及添加的API
/peer/registry
API 添加 streamMode 以及 windowSize 字段- 添加
/peer/piece/stream
的 POST 以及 DELETE 方法,分别用于告知服务器端关于碎片的cache状态
- 窗口维护的流程
- 窗口的大小是否需要变化?
Implementation Plan
The key point is to maintain the sender window, and to verify the stream mode is on.
Add
wnd
toapis/types/task_create_request.go:18#TaskCreateRequest
: The task status would be initialized to progress manager during the task registry step.By default, the window size is set to -1, which means that the file is downloaded in regular mode.
Add new struct
SlidingWindow
to the progress manager struct, which has the fieldswnd
(window size) anduna
(oldest unacknowledged sequence number).Add
wnd
tosupernode/daemon/mgr/progress/progress_manager.go:52#manager
struct. During the registry process of task, the progress manager is initialized. Update theSlidingWindow
field of progress manager.By default, the SlidingWindow is nil, which means that the file is downloaded in regular mode.
Modify the behavior of
GetPieceProgressByCID
method of progress manager based on theSlidingWindow
field: only return the piece info within the window range.Update the
una
field ofSlidingWindow
in progress manager in theupdateTask
method of task manager.
Existing Implementation of sliding window
Reference: https://github.com/tass-belgium/picotcp.git
Questions encountered during reading
How does the current supernode schedule?
The initial piece is a simple to heat the downloading process. It only has the following content:
1
2
3
4
5
6
7return &Piece{
TaskID: taskID,
SuperNode: node,
Status: status,
Result: constants.ResultInvalid,
Content: &bytes.Buffer{},
}where status is
constants.TaskStatusStart
. Every piece status is related with the DfgetTaskStatus code. Check it out atsupernode/server/0.3_bridge.go:65
. This status code corresponds to theDfgetTaskStatus
codetypes.PiecePullRequestDfgetTaskStatusSTARTED
.Would uploader register the upload task to supernode? Considering the
path
field inapis/types/task_create_request.go
file.1
2
3
4
5// path is used in one peer A for uploading functionality. When peer B hopes
// to get piece C from peer A, B must provide a URL for piece C.
// Then when creating a task in supernode, peer A must provide this URL in request.
//
Path string `json:"path,omitempty"`When would the
dfget/core/downloader/p2p_downloader/p2p_downloader.go#getItem
encounter the merge situation?supernode/daemon/mgr/task/manage.go#Register
The PieceSize and PieceTotal are calculated inside the method.How to get the file length by HTTP request to origin http server?
A: In the
supernode/daemon/mgr/task/manager_util.go:50#addOrUpdateTask
method, the supernode would call the rawURL stored in the task struct. Though I’m not quite understand what’s inside the url. But I’ll guess that it is sending the HTTP request to the source.supernode/daemon/mgr/task/manager.go:242
what’s the diff between dfgetTask and task?A: Actually the true usage of the dfgetTaskMgr is in the
supernode/server/server.go:47
. Here the supernode in the whole would maintain the dfgetTask info. And for every downloading task, it would maintain the task info in this struct.While for the dfgetTaskMgr in the taskMgr, it is only used to store the single task info that it is responsible for.
When is the progress updated? I didn’t notice the entry of the updation to client, supernode progress state. Understand the progess state.
A: Once the piece in the dfget client is successfully downloaded, it will report to the supernode about the piece status.
Client notifies the node that is downloading the pieces.
Why is piece number calculated as
startIndex / (endIndex - startIndex + 1)
?I think the piece size is a fixed value?
A: Since the uploader would amend the range of piece by pad, to make sure that the piece is complete.
Identifiers
Client ID is defined in the apis/types/df_get_task.go#DfGetTask
struct. And it is generated during the dfget/core/core.go#prepare
method with the structure rv.LocalIP-cfg.Sign
.
1 | // CID means the client ID. It maps to the specific dfget process. |
TaskID could be used to uniquely define xxx. It is requested by the dfget
from supernode
in dfget/core/regist/register.go#Register
method, which will call the df/get/core/api/supernode_api.go#Register
method to request the register response from supernode. From the url, we can see that it is calling the /peer/registry
API in supernode. From the supernode/server/router.go#initRoute
method, which includes all the HTTP handlers, we can find the corresponding registry function in the supernode/server/0.3_bridge.go#registry
method.
1 | // /apis/types/task_register_request.go |
Inside the registry method, the peer instance in supernode is created, where the peerID is created for supernode to index peer server. What’s more, the task is added or updated after the creation of peer instance.
PeerID is formed during the registry process of download process. It follows the format of fmt.Sprintf("%s-%s-%d", peerInfo.HostName.String(), peerInfo.IP.String(), time.Now().UnixNano())
.
1 | // generatePeerID generates an ID with hostname and ip. |
Questions encountered during reading
Why dose dfget defer deleting the dfget task in the
supernode/daemon/mgr/task/manager.go#Register
method?Answer: Note that the delete statement is in the if checking. And the deletion will only occur when the err != nil.
Would p2p downloading process share the same taskID with backsource downloading?
Answer: No. The only difference between the p2p downloader and backsource downloader is at the
dfget/core/core.go#doDownload
method, when thecfg.BackSourceReason > 0
, the downloader will be set to backsource downloader, and the downloading workflow continues. The taskID remains unchanged.Would different CID match the same taskID? For example, one to download, another to upload? I think this possibility is very large, because taskID fundamentally is the resource in supernode with the form between dfget and daemon.
Answer: CID means the downloading process, rather than all the process in one node. Thus, for the uploading process, it does not have any CID.
How is
syncTaskMap
field maintained in thepeerServer
struct? I think there exists the communication between the downloader and uploader.Answer: Once the file is successfully uploaded, the peerServer API
oneFinishedHandler
is called to store the metadata of finished file to thesyncTaskMap
.Would it be possible for node in peer network to specify the other peer by the peerID without its uniqueness?
Current P2P Downloader
Questions Encountered During Source Code Reading
The download flow of the p2p downloader is that, the p2p downloader will first call the PreRun()
method of the pieceWriter
with interface type PieceWriter
, and the default concrete implementation is client_writer
struct.
1 | func (cw *ClientWriter) PreRun(ctx context.Context) (err error) { |
will be called to create the files, queues, and targetWriter
. And then the Run()
method of the PieceWriter
is ran in the go routine.
1 | // Run starts writing downloading file. |
Inside the method, the main body of the function will loop forever until all pieces of the file is downloaded successfully. And those successfully downloaded pieces will be put from the ClientWriter.clientQueue
to clientWriter.targetWriter.targetQueue
, and the clientWriter.serviceFile
. What’s more, in the Run()
method of ClientWriter
, another new go routine is created to run the Run()
method of targetWriter
, which will wirte the same content in the targetQueue
. (Why is the value type not a pointer?)
Three main questions are:
- What’s the usage of target file and temp target file, which are maintained by the client writer and target writer?
- When will across write occur? Will that be the cloud disk?
- What’s the download flow of P2P downloader? The piece simple is first pushed into the p2p.queue, and then all other pieces will be requested by the HTTP request formed from simple piece. Then what? What’s the diff between process piece method and pull piece method? Why are new pieces requests formed by the received pieces?
dfget/core/downloader
There are three patterns currently in the Dragonfly, which are source, p2p and cdn. We can find the entrance of the downloading process in the Start() function, which is the downloadFile()
function. And I will mainly focus on the p2p downloading pattern.
The abstraction of the p2p downloader is really great: The downloader only cares about the downloading interface, without the necessarily to concern about the detailed client writer. While the client writer has two implementations, which are local client writer, and p2p client writer. This decouples the relationship between the downloader and writer.
Question:
What’s the relationship between the cfg.BackSourceReason and P2P downloader?BackSource is to directly download the file from the super node?
There are many kinds of back source reason. We can find those flags in the
dfget/config/constants.go
file.In the initialization process of the
init()
function inp2p_downloader.go
file, why is the piece size recorded twice in the pieceSizeHistory data structure?In the
client_writer.go
file, why does Dragonfly create the hard link for the temp target file, which will be deleted by the secondfileutils.Link()
call for the service file?used to check whether the target file and the service file exist in two file systems or not.
What’s the usage of the temp target file and the service file in the configuration file?
What’s the usage of the TargetDir and DataDir field in the RuntimeVariable struct?
Inside the
ClientWriter
struct, when will the reset situation occur?In the
write()
method, why is the p2p mode,acrossWrite
filed would result in different behavior?Which and how will the uploader fill the clientQueue with data?
What’s the usage of the sync queue in
PreRun()
method inClientWriter
?What’s the diff between flagslast
andreset
inRun()
method ofTargetWriter
struct?There three cases of the transmitted pieces: last, reset, normal?
The first one means that the current piece is the last one of the file, while the reset flag signals that the previous received pieces should be dropped and start over again. And the final case means that the downloading process continues.
When
p2p.pieceSizeHistory[0] != p2p.pieceSizeHistory[1]
, flagreset
is pushed into theP2PDownloader.clientQueue
What’s the diff between
TargetWritter.pieceQueue
andClientWriter.ClientQueue
?In the
writePieceToFile()
method ofclient_writer.go
file, what’s the point of header?How does the
range
field ofP2PDownloader
struct in methodgetItem
come into effect?
P2P Streaming Uploader Design
Since there could be multiple tasks running at the same time, which means that one peer server could have multiple client stream writers at the same time, it is necessary to add
csw []*ClientStreamWriter
field to the currentpeerServer
struct indfget/core/uploader/peer_server.go
file.Add Locker for the cache field in
ClientStreamWriter
struct indfget/core/downloader/p2p_downloader/client_stream_writer.go
file.Extract
PeerServer
launch process fromregisterToSuperNode()
method to theStart()
method.And register the client stream writer to peer server after the download process is created in
Start()
method.The
Start()
method lies at thedfget/core/core.go
file.Separate the stream uploading branch and normal uploading branch apart in the
uploadHandler()
ofdfget/core/uploader/peer_server.go
file. Add handler processstreamingUpload()
method andnormalUpload()
method.
Bandwidth Optimization
dfget/locator
SupernodeLocator defines the way how to get available supernodes for clients.
From the line, we can see that during the initialial phase of every downloading task, the locator will be called to get the available supernode. By default, the static locator will be created as the supernode locator.
The locator will be initializaed in the dfget/core/register process. From the configuration file, every super node is associated with a weight, which will be parsed in the static locator. After that, the regist.NewSupernodeRegister() method will be called to connect to the supernode. Inside this method, the way of decouple the locator and the registor is very subtle: the registor could directly call the Get() method and Next() method without the necessarity to concern about the detail of the locator.
dfget/core/core.go:Start()
The Start() method is the entrance of the whole downloading process. Thus, it is very necessary to fully understand this part of code. The method first declares
supernodeAPI
,supernodeLocator
based on configuration file,register
which is the combination of the configuration file, supernodeAPI, and locator.After that, the
prepare()
method is called to prepare the runtime variable related information and create the corresponding files, such as target directory, temp target directory, metadata file, work home directory and system data directory. The connection with the supernode is also checked in theprepare()
method.Next,
registerToSuperNode()
method first checks the transport pattern by thecfg.Pattern
field. Then, theregister.Register()
method is called. The file length and the pieces are requested by the HTTP protocol to supernode.*register.RegisterResult
is returned.After all of the above preparation are done, the downloading process finally starts.
Framework Design
API variable
Sometimes structs are used only for the collection of methods. In this case, the empty struct with a lot of API could be used as the API variable. For example, in the dfget/core/api/download_api.go
file, the downloadAPI is used to conveniently aggregrate a set of downloading related methods. And this struct could be passed to other function as one parameter.
And those API structs have the HTTP request method to send datagram and get the response.
Encapsulation in Golang
The interface is exposed into the outside, but not the concrete instance. The concrete struct is encapsulated in the package, and the user can only communcate with the the methods defined in interface.
In this way, the framework is very easy to be modified, and the decouple has been achieved between the method implementer and the user.
1 | // PeerServerExecutor starts an independent peer server process for uploading downloaded files. |
Reuse the code for different branches
I was going to seperate the stream upadloing branch and regular uploading branch apart in the uploadHandler()
of df/get/core/uploader/peer_server.go
file. But my mentor hints me that, it would be better to add another interface to do the job. In this way, the current code would not be effected at all, while the actual action of the uploader would differ.
That’s really a good lesson on resuing the current code.
Supernode Framework
\todo
Design Pattern
supernode/scheduler
The scheduler core function decouples the the piece fetch process in progress manager and the scheduler by passing the control parameter. This is called the control couple
.
Lesson on Behavior
Impossible Feature: Issue #1403
I was assigned this issue as the first step for the ASoC to familiar myself with Dragonfly community. But it turns out that this feature is impossible to implement. Check my post to get more information.
So in this step, I know that it is impossible to implement the feature in two ways: one is to modify the type of limit reader, the other is to move the sendFile handler from system library to user space.
But I was getting shy on talking about my results. For one thing, I didn’t directly comment my thoughts under the issue, rather I first went to the DingTalk to share it with mentor. This is not right. For another, I didn’t describe all my thoughts clear. Even if the other way is not impossible to implement, I should talk it aloud to prove that I worked on this.
Framework & Tools
GoMock
C语言的动态库是什么