接上篇,当Client receives the user's storage transaction, it creates a stream of the /fil/storage/mk/1.0.1 protocol and then sends the storage transaction through the stream. The HandleDealStream method is the one that handles this protocol. This method directly calls its own receiveDeal method for processing. receiveDeal method processes as follows: Reads a stored Proposal object from the stream. proposal, err := s.ReadDealProposal() The stream object here is the dealStream object (storagemarket/network/deal_stream.go), which encapsulates the original stream object. Get the ipld node object. proposalNd, err := cborutil.AsIpld(proposal.DealProposal) Generate a miner transaction object. deal := &storagemarket.MinerDeal{
Client: s.RemotePeer(),
Miner: p.net.ID(),
ClientDealProposal: *proposal.DealProposal,
ProposalCid: proposalNd.Cid(),
State: storagemarket.StorageDealUnknown,
Ref: proposal.Piece,
} Call the Begin method of the fsm state group to generate a state machine and start tracking miner transaction objects. err = p.deals.Begin(proposalNd.Cid(), deal) Save the stream object to the connection manager. err = p.conns.AddStream(proposalNd.Cid(), s) Sends an event to the fsm state group to start processing the transaction object. return p.deals.Send(proposalNd.Cid(), storagemarket.ProviderEventOpen) When the processor receives the ProviderEventOpen state event, because the initial state is the default value 0, that is, StorageDealUnknown , the event processor object finds the corresponding destination state as StorageDealValidating through internal processing, and thus calls its processing function ValidateDealProposal function for processing.
1. `ValidateDealProposal` function This function is used to validate the transaction proposal object. Call the GetChainHead method of the Lotus Provider adapter object to get the tipset key and height of the top of the blockchain. tok, height, err := environment.Node().GetChainHead(ctx.Context())if err != nil {
return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("node error getting most recent state id: %w", err))
} Verify the transaction proposal object sent by the client. If the verification fails, a rejection event is sent. if err := providerutils.VerifyProposal(ctx.Context(), deal.ClientDealProposal, tok, environment.Node().VerifySignature); err != nil {
return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("verifying StorageDealProposal: %w", err))
} Checks if the miner address specified in the transaction proposal is correct. If not, sends a rejection event. proposal := deal.Proposalif proposal.Provider != environment.Address() {
return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("incorrect provider for deal"))
} Checks if the height specified by the transaction is correct. If not, sends a rejection event. if height > proposal.StartEpoch-environment.DealAcceptanceBuffer() {
return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("deal start epoch is too soon or deal already expired"))
} Check if the charge is OK, if not, send a rejection event. minPrice := big.Div(big.Mul(environment.Ask().Price, abi.NewTokenAmount(int64(proposal.PieceSize))), abi.NewTokenAmount(1<<30))
if proposal.StoragePricePerEpoch.LessThan(minPrice) {
return ctx.Trigger(storagemarket.ProviderEventDealRejected,
xerrors.Errorf("storage price per epoch less than asking price: %s < %s", proposal.StoragePricePerEpoch, minPrice))
} Check if the transaction size matches. If not, send a rejection event. if proposal.PieceSize < environment.Ask().MinPieceSize {
return ctx.Trigger(storagemarket.ProviderEventDealRejected,
xerrors.Errorf("piece size less than minimum required size: %d < %d", proposal.PieceSize, environment.Ask().MinPieceSize))
}if proposal.PieceSize > environment.Ask().MaxPieceSize {
return ctx.Trigger(storagemarket.ProviderEventDealRejected,
xerrors.Errorf("piece size more than maximum allowed size: %d > %d", proposal.PieceSize, environment.Ask().MaxPieceSize))
} Access to client funds. clientMarketBalance, err := environment.Node().GetBalance(ctx.Context(), proposal.Client, tok)
if err != nil {
return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("node error getting client market balance failed: %w", err))
} If the client's available funds are less than the total transaction fee, a rejection event is sent. if clientMarketBalance.Available.LessThan(proposal.TotalStorageFee()) {
return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.New("clientMarketBalance.Available too small"))
} If the transaction is verified, it is verified. The Trigger method of the fsm context object sends an event. return ctx.Trigger(storagemarket.ProviderEventDealDeciding) When the state machine receives this event, the event handler changes the state from StorageDealUnknown to StorageDealAcceptWait , thereby calling its processing function DecideOnProposal to determine whether to accept the transaction.
2. `DecideOnProposal` function This function is used to decide whether to accept or reject a transaction. Call the RunCustomDecisionLogic method of the environment object to run the custom logic to verify that the customer transaction is not accepted. accept, reason, err := environment.RunCustomDecisionLogic(ctx.Context(), deal)if err != nil {
return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("custom deal decision logic failed: %w", err))
} If not accepted, a rejection event is sent. if !accept {
return ctx.Trigger(storagemarket.ProviderEventDealRejected, fmt.Errorf(reason))
} Call SendSignedResponse method of the environment object to send the signed response to the client. err = environment.SendSignedResponse(ctx.Context(), &network.Response{
State: storagemarket.StorageDealWaitingForData,
Proposal: deal.ProposalCid,
})if err != nil {
return ctx.Trigger(storagemarket.ProviderEventSendResponseFailed, err)
} This method finds the corresponding stream, then signs the response, generates a signed response object, and finally sends the response through the stream. Disconnect the client. if err := environment.Disconnect(deal.ProposalCid); err != nil {
log.Warnf("closing client connection: %+v", err)
} Call the Trigger method of the fsm context object to send an event. return ctx.Trigger(storagemarket.ProviderEventDataRequested) When the state machine receives this event, the event handler changes the state from StorageDealAcceptWait to StorageDealWaitingForData . Because there is no specified processing function, the function will not be called for processing, and it will continue to wait for the data transmission process to send an event. When data transfer starts, the data transfer component sends the ProviderEventDataTransferInitiated event. The event handler changes the state from StorageDealWaitingForData to StorageDealTransferring . Because there is no specified processing function, the function will not be called for processing, and the process will continue to wait for the data transfer process to send an event. When the data transfer is completed, the data transfer component sends a ProviderEventDataTransferCompleted event, and the event handler changes the state from StorageDealTransferring to StorageDealVerifyData , thereby calling its processing function VerifyData to verify the data.
3. `VerifyData` function This function verifies that the received data matches the pieceCID in the transaction proposal. The VerifyData function flow is as follows: Call the GeneratePieceCommitmentToFile method of the environment object to generate the fragment's CID, the fragment's directory, and the metadata directory. pieceCid, piecePath, metadataPath, err := environment.GeneratePieceCommitmentToFile(deal.Ref.Root, shared.AllSelector()) The GeneratePieceCommitmentToFile method is as follows: Call the CreateTemp method of the file storage object to create a temporary file. f, err := pio.store.CreateTemp() Generate a cleanup function. cleanup := func() {
f.Close()
_ = pio.store.Delete(f.Path())
} Get the contents of the specified CID from the underlying storage object and write them to the specified file. err = pio.carIO.WriteCar(context.Background(), pio.bs, payloadCid, selector, f, userOnNewCarBlocks...) Get the file size, that is, the fragment size. pieceSize := uint64(f.Size()) Position to the beginning of the file. _, err = f.Seek(0, io.SeekStart) Generate a shard ID using the file contents. commitment, paddedSize, err := GeneratePieceCommitment(rt, f, pieceSize) Close the file. _ = f.Close() Returns the fragment CID and file path. return commitment, f.Path(), paddedSize, nil
If the miner sets the universalRetrievalEnabled flag, the GeneratePieceCommitmentWithMetadata function is called directly for processing. if ppuniversalRetrievalEnabled {
return providerutils.GeneratePieceCommitmentWithMetadata(ppfs, pppio.GeneratePieceCommitmentToFile, ppproofType, payloadCid, selector)
} The universalRetrievalEnabled flag, if true, causes the storage miner to keep track of all CIDs in the shard, so all CIDs can be retrieved, not just the Root CID. Otherwise, call the GeneratePieceCommitmentToFile method of the piece IO object for processing. pieceCid, piecePath, _, err := pppio.GeneratePieceCommitmentToFile(ppproofType, payloadCid, selector) payloadCid indicates the root CID. The GeneratePieceCommitmentToFile method of the pieceIO object handles this as follows: Returns the fragment CID and fragment path. return pieceCid, piecePath, filestore.Path(""), err
Verify that the generated shard CID is consistent with the shard CID of the transaction proposal in the miner transaction. If not, send a rejection event. if pieceCid != deal.Proposal.PieceCID {
return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("proposal CommP doesn't match calculated CommP"))
}
3. Call the Trigger method of the fsm context object to send an event. return ctx.Trigger(storagemarket.ProviderEventVerifiedData, piecePath, metadataPath) When the state machine receives this event, the event handler changes the state from `StorageDealVerifyData` to `StorageDealEnsureProviderFunds`, and calls its processing function `EnsureProviderFunds` to determine whether to accept the transaction. At the same time, before calling the processing function, the `Action` function is used to modify the `PiecePath` and `MetadataPath` properties of the miner transaction object. 4. `EnsureProviderFunds` function This function is used to determine whether the miner has enough funds to process the current transaction. Get the Lotus Provider adapter. node := environment.Node() Get the key and height corresponding to the tipset at the top of the blockchain. tok, _, err := node.GetChainHead(ctx.Context())if err != nil {
return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("acquiring chain head: %w", err))
} Get the miner's worker address. waddr, err := node.GetMinerWorkerAddress(ctx.Context(), deal.Proposal.Provider, tok)if err != nil {
return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("looking up miner worker: %w", err))
} Call the EnsureFunds method of the Lotus Provider adapter to ensure that the miner has enough funds to process the current transaction. mcid, err := node.EnsureFunds(ctx.Context(), deal.Proposal.Provider, waddr, deal.Proposal.ProviderCollateral, tok)if err != nil {
return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("ensuring funds: %w", err))
} If the returned mcid is empty, it means that it has been confirmed in real time, then the Trigger method of the fsm context object is called to send an event. if mcid == cid.Undef {
return ctx.Trigger(storagemarket.ProviderEventFunded)
} Otherwise, call the Trigger method of the fsm context object to send another event. return ctx.Trigger(storagemarket.ProviderEventFundingInitiated, mcid) When the state machine receives this event, the event processor changes the state from StorageDealEnsureProviderFunds to StorageDealProviderFunding , and calls its processing function WaitForFunding to wait for the next message to be uploaded to the chain. At the same time, before calling the processing function, PublishCid property of the miner transaction object is modified through the Action function.
5. `WaitForFunding` function This function is used to wait for a message to be sent to the chain. After the message is sent to the chain, Trigger method of the fsm context object is called to send an event. The function content is as follows: node := environment.Node()return node.WaitForMessage(ctx.Context(), *deal.AddFundsCid, func(code exitcode.ExitCode, bytes []byte, err error) error {
if err != nil {
return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("AddFunds errored: %w", err))
}
if code != exitcode.Ok {
return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("AddFunds exit code: %s", code.String()))
}
return ctx.Trigger(storagemarket.ProviderEventFunded)
}) When the state machine receives the ProviderEventFunded event, the event handler changes the state from StorageDealProviderFunding to StorageDealPublish , and calls its processing function PublishDeal to put the transaction information on the chain. At the same time, before calling the processing function, PublishCid property of the miner transaction object is modified through the Action function. 6. `PublishDeal` function This function is mainly used to submit transaction information to the chain. Generate a miner transaction object. smDeal := storagemarket.MinerDeal{
Client: deal.Client,
ClientDealProposal: deal.ClientDealProposal,
ProposalCid: deal.ProposalCid,
State: deal.State,
Ref: deal.Ref,
} Call PublishDeals of the Lotus Provider adapter object to upload the transaction information to the chain. mcid, err := environment.Node().PublishDeals(ctx.Context(), smDeal)
if err != nil {
return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("publishing deal: %w", err))
} Call the Trigger method of the fsm context object to send an event. return ctx.Trigger(storagemarket.ProviderEventDealPublishInitiated, mcid) When the state machine receives this event, the event processor changes the state from StorageDealPublish to StorageDealPublishing , thereby calling its processing function WaitForPublish to wait for the transaction information to be uploaded to the chain.
7. `WaitForPublish` function This function is used to wait for the transaction information to be uploaded to the chain, then send a response to the client, and then disconnect from the client. Finally, the Trigger method of the fsm context object is called to generate an event object through event processing, and then send the event object to the state machine. The name of the event object generated here is ProviderEventDealPublished . When the state machine receives this event, the event handler changes the state from StorageDealPublishing to StorageDealStaged , and calls its processing function HandoffDeal to start the sector sealing process. At the same time, before calling the processing function, ConnectionClosed and DealID properties of the miner transaction object are modified through the Action function. return environment.Node().WaitForMessage(ctx.Context(), *deal.PublishCid, func(code exitcode.ExitCode, retBytes []byte, err error) error {
if err != nil {
return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals errored: %w", err))
}
if code != exitcode.Ok {
return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals exit code: %s", code.String()))
}
var retval market.PublishStorageDealsReturn
err = retval.UnmarshalCBOR(bytes.NewReader(retBytes))
if err != nil {
return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals error unmarshalling result: %w", err))
} return ctx.Trigger(storagemarket.ProviderEventDealPublished, retval.IDs[0])
}) 8. `HandoffDeal` function This function calls the miner's Provide adapter Generate a file object using the fragment path. file, err := environment.FileStore().Open(deal.PiecePath)if err != nil {
return ctx.Trigger(storagemarket.ProviderEventFileStoreErrored, xerrors.Errorf("reading piece at path %s: %w", deal.PiecePath, err))
} Generate a fragment stream using a fragment file stream. paddedReader, paddedSize := padreader.New(file, uint64(file.Size())) Call the OnDealComplete method of the Lotus Provider adapter object to notify that the transaction has been completed, thereby adding the fragment to a sector. err = environment.Node().OnDealComplete(
ctx.Context(),
storagemarket.MinerDeal{
Client: deal.Client,
ClientDealProposal: deal.ClientDealProposal,
ProposalCid: deal.ProposalCid,
State: deal.State,
Ref: deal.Ref,
DealID: deal.DealID,
FastRetrieval: deal.FastRetrieval,
PiecePath: filestore.Path(environment.FileStore().Filename(deal.PiecePath)),
},
paddedSize,
paddedReader,
)if err != nil {
return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, err)
} Call the Trigger method of the fsm context object to send an event. return ctx.Trigger(storagemarket.ProviderEventDealHandedOff) When the state machine receives this event, the event handler changes the state from StorageDealStaged to StorageDealSealing , thereby calling its processing function VerifyDealActivated to wait for the sector sealing result.
9. `VerifyDealActivated` function Generate callback function. cb := func(err error) {
if err != nil {
_ = ctx.Trigger(storagemarket.ProviderEventDealActivationFailed, err)
} else {
_ = ctx.Trigger(storagemarket.ProviderEventDealActivated)
}
} When the Lotus Provider adapter object detects changes in the transaction object, it calls this callback function and sends the corresponding event. When the state machine receives this event, the event handler changes the state from StorageDealSealing to StorageDealActive , thereby calling its processing function RecordPieceInfo to record relevant information. Call OnDealSectorCommitted method of the Lotus Provider adapter object to wait for the sector to be committed. err := environment.Node().OnDealSectorCommitted(ctx.Context(), deal.Proposal.Provider, deal.DealID, cb)if err != nil {
return ctx.Trigger(storagemarket.ProviderEventDealActivationFailed, err)
} Returns empty. return nil
9. `RecordPieceInfo` function This function mainly records relevant information. Finally, the Trigger method of the fsm context object is called to generate an event object through event processing, and then send the event object to the state machine. The name of the event object generated here is ProviderEventDealCompleted . When the state machine receives this event, the event handler changes the state from StorageDealActive to StorageDealCompleted , and finally ends the state machine processing. Fragmented temporary files will be deleted here. Link to this article: https://www.8btc.com/article/632253 Please indicate the source of the article when reprinting |