Dragonfly - Source Code Note

This note is to reocrd the source code analysis of Dragonfly, which is the program that I participate during the 2020 summer as the remote internship. Thanks Alibaba for this great opportunity.

This blog (how to read source code?) helps me a lot from the perpective of theory of source code reading. While for the methodology, I will first locate the interested part of the source code, and then using githistory.xyz tool to keep track of the code from the very beginning of the development. With the help of the githistory.xyz, I could find the associated code together with the part that I’m focused on by looking at every commit of the file.

Linus说: “烂程序员关心的是代码。好程序员关心的是数据结构和它们之间的关系。”

Overrall

This section includes my understanding of the entire Dragonfly framework.

Dragonfly P2P Downloader

The entrance of the p2p downloader lies at the dfget/core/downloader/downloader.go file. The file will call the main function of the p2p downloader inside the ./p2p_downloader/p2p_downloader.go file, whcih is the run() method. Here is the psuedocode of the method.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (p2p * P2PDownloader) run(ctx context.Context, pieceWriter PieceWriter) error {
# initialization the piece writer, the concrete implementation includes client_writer.
pieceWriter.initialization()

# keep polling the piece writer's queue, and write the content to the target file
go func() {
pieceWriter.run()
}

# downloading
for {
# the initial element in the p2p.queue is the start piece.
# once the supernode receive the first piece, it will start to upload the following piece info.
# The initial piece includes the status `start`, while the other pieces may have the status as `running`
curItem := p2p.getItem()

# pull the piece download task from supernode, note that this is not downloading data
# but the metadata of the interested piece, eg. which peer has the piece.
# The supernode may return the metadata of the next available piece.
response, err := p2p.pullPieceTask(&curItem)

if err != nil {
error handler
} else {
# download the piece, and push the piece into the writer queue
p2p.processPiece(response, &curItem)
}
}
}

And that’s all about the downloader. After the Dragonfly has successfully download the piece and push it to the writer queue, the pieceWriter will start to write the piece to the local disk and the target file.

Note that the two files are different: the first one is to be sure located in the local file system, while for the later one, it may reside in the cluod disk, or another disk in local disk. Actually, there are two writers which corresponds to them: client_writer and target_writer. In this case, the writing process would be the network IO, and that will be the bottleneck of the whole system.

Proposal of ASoC 2020 Dragonfly

Issue link

Backgrounds

As mentioned in the #1164, the current download process would be a defect if the Dragonfly client is running on a cloud disk: the local file system is not the target place, and the downloader would have to write the file one more time to another file system. This is kind of like the sendfile function.

Idea

Remove the redundency of writing the file to the local file system. Directly upload the data to the target place by storing the data in memory as the sliding window in TCP protocol. Please look at #1164 to get more information about the great idea.

Implementation Plan

image

Design of Supernode Scheduler

The key point is to maintain the sender window, and to verify the stream mode is on.

  • Add wnd to apis/types/task_create_request.go:18#TaskCreateRequest: The task status would be initialized to progress manager during the task registry step.

    By default, the window size is set to -1, which means that the file is downloaded in regular mode.

  • Add new struct SlidingWindow to the progress manager struct, which has the fields wnd(window size) and una(oldest unacknowledged sequence number RFC 793).

  • Add wnd to supernode/daemon/mgr/progress/progress_manager.go:52#manager struct. During the registry process of task, the progress manager is initialized. Update the SlidingWindow field of progress manager.

    By default, the SlidingWindow is nil, which means that the file is downloaded in regular mode.

  • Modify the behavior of GetPieceProgressByCID method of progress manager based on the SlidingWindow field: only return the piece info within the window range.

  • Update the una field of SlidingWindow in progress manager in the updateTask method of task manager.

Design of Uploader

  • Since there could be multiple tasks running at the same time, which means that one peer server could have multiple client stream writers at the same time, it is necessary to add csw []*ClientStreamWriter field to the current peerServer struct in dfget/core/uploader/peer_server.go file. [It seems that this implementation would make the downloader and uploader tightly coupled. Optimization should be considered.]

  • Add Locker for the cache field in ClientStreamWriter struct in dfget/core/downloader/p2p_downloader/client_stream_writer.go file.

  • Extract PeerServer launch process from registerToSuperNode() method to the Start() method.

    And register the client stream writer to peer server after the download process is created in Start() method.

    The Start() method lies at the dfget/core/core.go file.

  • Separate the stream uploading branch and normal uploading branch apart in the uploadHandler() of dfget/core/uploader/peer_server.go file. Add new interface to handle the different behavior of getTaskFile and uploadPiece method.

Any recommendations and comments are welcome! Thank you.

ASoC

This section includes my understanding of the part associated with the ASoC program. There will be a lot of details compared to the Overrall section.

Current Uploader

UploaderHandler

The whole file is stored in the file system. When the uploader is going to fetch the part of the piece, it will read the part of the file. In the URL given by the another peer which request the piece, it will give the following information:

  • padSize: default size in the current architecture, which is 4 bytes in head, 1 byte in tail
  • Start: start index of the file
  • length: read length
  • PieceSize
  • PieceNum

By the above parameters, the starting piece could be located. It should also be noted that in CDN type, there is no pad, while for the p2p pattern, every piece includes the meta header and tail. What’s more, the meta header of the piece includes the size of the piece, along with the piece size. By using the meta data, we can locate the starting piece number.

Current Client Stream Writer

The dfget/core/downloader/p2p_downloader/client_stream_writer.go:153 ensures that the pieces in pipe are in order. So that we can no long worry about how to fetch the number of piece, and the size of piece: the pieces can be fetched in order!

Implementation of Cache

关于P2P流式传输在客户端方面的实现:基于目前非常漂亮的系统设计,我考虑在客户端dfget/core新添加一个模块streamcache。该模块用于

  • 从downloader处接受下载完成的数据
  • 与uploader进行交互分享数据
  • 根据碎片的下载情况以及缓存窗口的大小按照LRU策略进行更新缓存
  • 向supernode汇报缓存中的碎片信息

这样子设计的话不会破坏目前的模块功能:uploader单一负责查询数据上传,downloader单一负责下载数据,streamcache负责碎片的更新维护以及存储。关于新模块的具体实现,我考虑将cache作为一个单例实体存在于新的模块,暴露API与其它模块进行交互。

Challenge

  • the cache cannot exist as the API, since it has states like the cache window, which is different from the stateless HTTP request and response.
  • the cache cannot exist as the single module just like the downloader. that’s because the uploader exists as the single process which is responsible for all other tasks.

Based on the upon reasons, the cache must be included in the peer server struct. What’s more, the communication between the downloader and the uploader, should be finished by HTTP request.

Consideration

  • The reason why I don’t want to implement the cache manager in the client stream writer:

    Two caches would exist at the same time, and the cache scheduling result has to be passed to the uploader, which causes unnecessary HTTP request. But if the cache management is implemented in uploader, the downloader would only need to pass the piece content, and leave the cache management work to uploader.

    And then, uploader calls the cache module to do the management work.

  • Why the content of cache is not sync to the file system at last?

TODO

fill out the API frame work, create the interface

  • regist: regist the information of cache to the uploader by dfget [*]
    • add API for dfget in dfget/core/api/uploader_api.go:100
    • add API for uploader to register the cache window dfget/core/uploader/peer_server.go:286
  • pass pieces between downloader and uploader

    • add receivePiece API for uploader [*]
    • Add the for loop to read from the stream io.reader [**]

      • find out the way to get the whole piece from reader

        Answer: not necessary. The data in cache can be transfered into uploader in large block. And the uploader can directly fetch the particular piece based on the request info.

  • LRU cache [***]
  • Upload piece handler [*]

Implementation of Supernode Scheduler

Questions

  • inside supernode/daemon/mgr/progress/progress_manager.go:103, would supernode use the p2p streaming mode?

    Personally speaking, I think it won’t.

  • What should be the key of slidingWindowStates in progress manager?

    taskID, or clientID?

    Currently I use the taskID because it is also used as the key of piece info in piece progress manager. While in fact, the taskID is the mapping to one file in P2P network, which means that there could be multiple clients downloading the same file at the same time.

  • Should we only consider the P2P network with purely streaming download, or the hybrid mode?

Supernode Scheduler

First of all, we should understand the progress state regarding the download process. The supernode state maintains the piece bitSet of the given CID. If the bit is set, it means that the supernode has the location of the piece: the piece is available. And the client could download the piece if it is set in supernode state. During the downloading process, it would be put into the clientState running bitset. Once the downloading is finished, the piece would be removed from that bitset, and put to the piece bitset of the client state, with the mark of success [this is achieved by set specific bit of the piece status].

While the clientState not only maintains the piece status of CID, but also the runningPiece, which means the pieces that are currently downloaded from desCID to srcCID.

PR

This PR is part of the work proposed in #1436, which implement the P2P stream mode in supernode side. To summarize the work, the streaming download in supernode could be cateloged into two sections.

  • Maintain the sliding window in supernode.

    • Initialize: The size of window will be registered in the dfget task registration phrase. And then, the window state is recorded in the ProgressManager as the SyncMap indexed by ClientID. The size of window is staic once it is recorded by supernode.
    • Update: The dfget client will report to the supernode about the result of downloading for every piece. In the report API of supernode server, the window will be updated based on the received piece number and piece status. To be specific, the window keeps sliding until the unacknowledged piece.
    • Finish: Update the DeleteCID method of ProgressManager to delete the sliding window state if it is in stream mode.
  • Schedule the pieces according to the window state

    The only modification that I made in this part lies at the GetPieceProgressByCID method of ProgressManager. The available pieces in regular mode means the success pieces which are not running; while for the stream mode, the available pieces means the cached pieces which are not running. After the modification, the ProgressManager would only return the available pieces inside the window when stream mode is on.

    It should be noted that, the scheduler in stream mode currently uses the same scheduler as the regular mode, which may demands future optimization.

    Besides the above modifications, I also add two APIs in supernode server, which are addPieceCache and deletePieceCache. The dfget client calls these APIs to notify the supernode about the state of cache.

Discussion with Developer from Ant

问题

  • 目前蚂蚁的流式传输中碎片的调度策略
  • 客户端cache的碎片是怎样的范围?仅有窗口内的,或者是?

目前关于流式传输在服务器端的实现基本已经完成:

  • 需要修改以及添加的API
    • /peer/registry API 添加 streamMode 以及 windowSize 字段
    • 添加 /peer/piece/stream 的 POST 以及 DELETE 方法,分别用于告知服务器端关于碎片的cache状态
  • 窗口维护的流程
  • 窗口的大小是否需要变化?

Implementation Plan

The key point is to maintain the sender window, and to verify the stream mode is on.

  • Add wnd to apis/types/task_create_request.go:18#TaskCreateRequest: The task status would be initialized to progress manager during the task registry step.

    By default, the window size is set to -1, which means that the file is downloaded in regular mode.

  • Add new struct SlidingWindow to the progress manager struct, which has the fields wnd(window size) and una(oldest unacknowledged sequence number).

  • Add wnd to supernode/daemon/mgr/progress/progress_manager.go:52#manager struct. During the registry process of task, the progress manager is initialized. Update the SlidingWindow field of progress manager.

    By default, the SlidingWindow is nil, which means that the file is downloaded in regular mode.

  • Modify the behavior of GetPieceProgressByCID method of progress manager based on the SlidingWindow field: only return the piece info within the window range.

  • Update the una field of SlidingWindow in progress manager in the updateTask method of task manager.

Existing Implementation of sliding window

Reference: https://github.com/tass-belgium/picotcp.git

Questions encountered during reading

  • How does the current supernode schedule?

    The initial piece is a simple to heat the downloading process. It only has the following content:

    1
    2
    3
    4
    5
    6
    7
    return &Piece{
    TaskID: taskID,
    SuperNode: node,
    Status: status,
    Result: constants.ResultInvalid,
    Content: &bytes.Buffer{},
    }

    where status is constants.TaskStatusStart. Every piece status is related with the DfgetTaskStatus code. Check it out at supernode/server/0.3_bridge.go:65. This status code corresponds to the DfgetTaskStatus code types.PiecePullRequestDfgetTaskStatusSTARTED.

  • Would uploader register the upload task to supernode? Considering the path field in apis/types/task_create_request.go file.

    1
    2
    3
    4
    5
    // path is used in one peer A for uploading functionality. When peer B hopes
    // to get piece C from peer A, B must provide a URL for piece C.
    // Then when creating a task in supernode, peer A must provide this URL in request.
    //
    Path string `json:"path,omitempty"`
  • When would the dfget/core/downloader/p2p_downloader/p2p_downloader.go#getItem encounter the merge situation?

  • supernode/daemon/mgr/task/manage.go#Register The PieceSize and PieceTotal are calculated inside the method.

    How to get the file length by HTTP request to origin http server?

    A: In the supernode/daemon/mgr/task/manager_util.go:50#addOrUpdateTask method, the supernode would call the rawURL stored in the task struct. Though I’m not quite understand what’s inside the url. But I’ll guess that it is sending the HTTP request to the source.

  • supernode/daemon/mgr/task/manager.go:242 what’s the diff between dfgetTask and task?

    A: Actually the true usage of the dfgetTaskMgr is in the supernode/server/server.go:47. Here the supernode in the whole would maintain the dfgetTask info. And for every downloading task, it would maintain the task info in this struct.

    While for the dfgetTaskMgr in the taskMgr, it is only used to store the single task info that it is responsible for.

  • When is the progress updated? I didn’t notice the entry of the updation to client, supernode progress state. Understand the progess state.

    A: Once the piece in the dfget client is successfully downloaded, it will report to the supernode about the piece status.

    Client notifies the node that is downloading the pieces.

  • Why is piece number calculated as startIndex / (endIndex - startIndex + 1)?

    I think the piece size is a fixed value?

    A: Since the uploader would amend the range of piece by pad, to make sure that the piece is complete.

Identifiers

Client ID is defined in the apis/types/df_get_task.go#DfGetTask struct. And it is generated during the dfget/core/core.go#prepare method with the structure rv.LocalIP-cfg.Sign.

1
2
3
4
// CID means the client ID. It maps to the specific dfget process.
// When user wishes to download an image/file, user would start a dfget process to do this.
// This dfget is treated a client and carries a client ID.
// Thus, multiple dfget processes on the same peer have different CIDs.

TaskID could be used to uniquely define xxx. It is requested by the dfget from supernode in dfget/core/regist/register.go#Register method, which will call the df/get/core/api/supernode_api.go#Register method to request the register response from supernode. From the url, we can see that it is calling the /peer/registry API in supernode. From the supernode/server/router.go#initRoute method, which includes all the HTTP handlers, we can find the corresponding registry function in the supernode/server/0.3_bridge.go#registry method.

1
2
3
// /apis/types/task_register_request.go
// Dfdaemon or dfget could specific the taskID which will represents the key of this resource
// in supernode.

Inside the registry method, the peer instance in supernode is created, where the peerID is created for supernode to index peer server. What’s more, the task is added or updated after the creation of peer instance.

PeerID is formed during the registry process of download process. It follows the format of fmt.Sprintf("%s-%s-%d", peerInfo.HostName.String(), peerInfo.IP.String(), time.Now().UnixNano()).

1
2
// generatePeerID generates an ID with hostname and ip.
// Use timestamp to ensure the uniqueness.

Questions encountered during reading

  • Why dose dfget defer deleting the dfget task in the supernode/daemon/mgr/task/manager.go#Register method?

    Answer: Note that the delete statement is in the if checking. And the deletion will only occur when the err != nil.

  • Would p2p downloading process share the same taskID with backsource downloading?

    Answer: No. The only difference between the p2p downloader and backsource downloader is at the dfget/core/core.go#doDownload method, when the cfg.BackSourceReason > 0, the downloader will be set to backsource downloader, and the downloading workflow continues. The taskID remains unchanged.

  • Would different CID match the same taskID? For example, one to download, another to upload? I think this possibility is very large, because taskID fundamentally is the resource in supernode with the form between dfget and daemon.

    Answer: CID means the downloading process, rather than all the process in one node. Thus, for the uploading process, it does not have any CID.

  • How is syncTaskMap field maintained in the peerServer struct? I think there exists the communication between the downloader and uploader.

    Answer: Once the file is successfully uploaded, the peerServer API oneFinishedHandler is called to store the metadata of finished file to the syncTaskMap.

  • Would it be possible for node in peer network to specify the other peer by the peerID without its uniqueness?

Current P2P Downloader

Questions Encountered During Source Code Reading

The download flow of the p2p downloader is that, the p2p downloader will first call the PreRun() method of the pieceWriter with interface type PieceWriter, and the default concrete implementation is client_writer struct.

1
func (cw *ClientWriter) PreRun(ctx context.Context) (err error) {

will be called to create the files, queues, and targetWriter. And then the Run() method of the PieceWriter is ran in the go routine.

1
2
// Run starts writing downloading file.
func (cw *ClientWriter) Run(ctx context.Context) {

Inside the method, the main body of the function will loop forever until all pieces of the file is downloaded successfully. And those successfully downloaded pieces will be put from the ClientWriter.clientQueue to clientWriter.targetWriter.targetQueue, and the clientWriter.serviceFile. What’s more, in the Run() method of ClientWriter, another new go routine is created to run the Run() method of targetWriter, which will wirte the same content in the targetQueue. (Why is the value type not a pointer?)

Three main questions are:

  • What’s the usage of target file and temp target file, which are maintained by the client writer and target writer?
  • When will across write occur? Will that be the cloud disk?
  • What’s the download flow of P2P downloader? The piece simple is first pushed into the p2p.queue, and then all other pieces will be requested by the HTTP request formed from simple piece. Then what? What’s the diff between process piece method and pull piece method? Why are new pieces requests formed by the received pieces?

dfget/core/downloader

There are three patterns currently in the Dragonfly, which are source, p2p and cdn. We can find the entrance of the downloading process in the Start() function, which is the downloadFile() function. And I will mainly focus on the p2p downloading pattern.

The abstraction of the p2p downloader is really great: The downloader only cares about the downloading interface, without the necessarily to concern about the detailed client writer. While the client writer has two implementations, which are local client writer, and p2p client writer. This decouples the relationship between the downloader and writer.

Question:

  • What’s the relationship between the cfg.BackSourceReason and P2P downloader?

    BackSource is to directly download the file from the super node?

    There are many kinds of back source reason. We can find those flags in the dfget/config/constants.go file.

  • In the initialization process of the init() function in p2p_downloader.go file, why is the piece size recorded twice in the pieceSizeHistory data structure?

  • In the client_writer.go file, why does Dragonfly create the hard link for the temp target file, which will be deleted by the second fileutils.Link() call for the service file?

    used to check whether the target file and the service file exist in two file systems or not.

  • What’s the usage of the temp target file and the service file in the configuration file?

  • What’s the usage of the TargetDir and DataDir field in the RuntimeVariable struct?

  • Inside the ClientWriter struct, when will the reset situation occur?

    In the write() method, why is the p2p mode, acrossWrite filed would result in different behavior?

  • Which and how will the uploader fill the clientQueue with data?

  • What’s the usage of the sync queue in PreRun() method in ClientWriter?

  • What’s the diff between flags last and reset in Run() method of TargetWriter struct?

    There three cases of the transmitted pieces: last, reset, normal?

    The first one means that the current piece is the last one of the file, while the reset flag signals that the previous received pieces should be dropped and start over again. And the final case means that the downloading process continues.

    When p2p.pieceSizeHistory[0] != p2p.pieceSizeHistory[1], flag reset is pushed into the P2PDownloader.clientQueue

  • What’s the diff between TargetWritter.pieceQueue and ClientWriter.ClientQueue?

  • In the writePieceToFile() method of client_writer.go file, what’s the point of header?

  • How does the range field of P2PDownloader struct in method getItem come into effect?

P2P Streaming Uploader Design

  • Since there could be multiple tasks running at the same time, which means that one peer server could have multiple client stream writers at the same time, it is necessary to add csw []*ClientStreamWriter field to the current peerServer struct in dfget/core/uploader/peer_server.go file.

  • Add Locker for the cache field in ClientStreamWriter struct in dfget/core/downloader/p2p_downloader/client_stream_writer.go file.

  • Extract PeerServer launch process from registerToSuperNode() method to the Start() method.

    And register the client stream writer to peer server after the download process is created in Start() method.

    The Start() method lies at the dfget/core/core.go file.

  • Separate the stream uploading branch and normal uploading branch apart in the uploadHandler() of dfget/core/uploader/peer_server.go file. Add handler process streamingUpload() method and normalUpload() method.

Bandwidth Optimization

dfget/locator

SupernodeLocator defines the way how to get available supernodes for clients.

From the line, we can see that during the initialial phase of every downloading task, the locator will be called to get the available supernode. By default, the static locator will be created as the supernode locator.

The locator will be initializaed in the dfget/core/register process. From the configuration file, every super node is associated with a weight, which will be parsed in the static locator. After that, the regist.NewSupernodeRegister() method will be called to connect to the supernode. Inside this method, the way of decouple the locator and the registor is very subtle: the registor could directly call the Get() method and Next() method without the necessarity to concern about the detail of the locator.

  • dfget/core/core.go:Start()

    The Start() method is the entrance of the whole downloading process. Thus, it is very necessary to fully understand this part of code. The method first declares supernodeAPI, supernodeLocator based on configuration file, register which is the combination of the configuration file, supernodeAPI, and locator.

    After that, the prepare() method is called to prepare the runtime variable related information and create the corresponding files, such as target directory, temp target directory, metadata file, work home directory and system data directory. The connection with the supernode is also checked in the prepare() method.

    Next, registerToSuperNode() method first checks the transport pattern by the cfg.Pattern field. Then, the register.Register() method is called. The file length and the pieces are requested by the HTTP protocol to supernode. *register.RegisterResult is returned.

    After all of the above preparation are done, the downloading process finally starts.

Framework Design

API variable

Sometimes structs are used only for the collection of methods. In this case, the empty struct with a lot of API could be used as the API variable. For example, in the dfget/core/api/download_api.go file, the downloadAPI is used to conveniently aggregrate a set of downloading related methods. And this struct could be passed to other function as one parameter.

And those API structs have the HTTP request method to send datagram and get the response.

Encapsulation in Golang

The interface is exposed into the outside, but not the concrete instance. The concrete struct is encapsulated in the package, and the user can only communcate with the the methods defined in interface.

In this way, the framework is very easy to be modified, and the decouple has been achieved between the method implementer and the user.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// PeerServerExecutor starts an independent peer server process for uploading downloaded files.
type PeerServerExecutor interface {
StartPeerServerProcess(cfg *config.Config) (port int, err error)
}

// ---------------------------------------------------------------------------
// PeerServerExecutor default implementation
type peerServerExecutor struct {
}

var _ PeerServerExecutor = &peerServerExecutor{}

func (pe *peerServerExecutor) StartPeerServerProcess(cfg *config.Config) (port int, err error) {
if port = pe.checkPeerServerExist(cfg, 0); port > 0 {
return port, nil
}

// ... //

return
}

Reuse the code for different branches

I was going to seperate the stream upadloing branch and regular uploading branch apart in the uploadHandler() of df/get/core/uploader/peer_server.go file. But my mentor hints me that, it would be better to add another interface to do the job. In this way, the current code would not be effected at all, while the actual action of the uploader would differ.

That’s really a good lesson on resuing the current code.

Supernode Framework

\todo

Design Pattern

supernode/scheduler

The scheduler core function decouples the the piece fetch process in progress manager and the scheduler by passing the control parameter. This is called the control couple.

Lesson on Behavior

Impossible Feature: Issue #1403

I was assigned this issue as the first step for the ASoC to familiar myself with Dragonfly community. But it turns out that this feature is impossible to implement. Check my post to get more information.

So in this step, I know that it is impossible to implement the feature in two ways: one is to modify the type of limit reader, the other is to move the sendFile handler from system library to user space.

But I was getting shy on talking about my results. For one thing, I didn’t directly comment my thoughts under the issue, rather I first went to the DingTalk to share it with mentor. This is not right. For another, I didn’t describe all my thoughts clear. Even if the other way is not impossible to implement, I should talk it aloud to prove that I worked on this.

0%