Because StorageProvider object is dependent on the Storage Miner API object, the DI container will call StorageProvider function (node/modules/storageminer.go) to create it during the process of starting the storage miner. StorageProvider function flow is as follows: Call the NewFromLibp2pHost function to generate a StorageMarketNetwork object. net := smnet.NewFromLibp2pHost(h) Call the NewLocalFileStore function to generate a FileStore storage object. store, err := piecefilestore.NewLocalFileStore(piecefilestore.OsPath(r.Path())) The process of NewLocalFileStore function (go-fil-markets library filestore/filestore.go) is as follows: base := filepath.Clean(string(basedirectory))
info, err := os.Stat(string(base))
if !info.IsDir() { return nil, fmt.Errorf("%s is not a directory", base) } return &fileStore{string(base)}, nil The path used by the NewLocalFileStore function is the warehouse directory. That is, the temporary directory of the fragment is the warehouse directory. Call the CustomDealDecisionLogic function and return a function object. In the function object, call the callback function we provide to make a custom transaction logic decision. opt := storageimpl.CustomDealDecisionLogic(func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) {}) Generates and returns a StorageProvider object. p, err := storageimpl.NewProvider(net, namespace.Wrap(ds, datastore.NewKey("/deals/provider")), ibs, store, pieceStore, dataTransfer, spn, address.Address(minerAddress), ffiConfig.SealProofType, storedAsk, opt)return p, nil The NewProvider function handles this as follows: The environment object is providerDealEnvironment . The state object is MinerDeal . The state field is State . The event collection is ProviderEvents , refer to the storagemarket/impl/providerstates/provider_fsm.go file. The state processing function collection is ProviderStateEntryFuncs , and the state processor of the state machine obtains the specified function for processing according to the corresponding state. The final state collection is ProviderFinalityStates . The notification object is the dispatch method of Provider object. Generate a PieceIOWithStore object. carIO := cario.NewCarIO()
pio := pieceio.NewPieceIOWithStore(carIO, fs, bs) Generate Provider object. h := &Provider{
net: net,
proofType: rt,
spn: spn,
fs: fs,
pio: pio,
pieceStore: pieceStore,
conns: connmanager.NewConnManager(),
storedAsk: storedAsk,
actor:minerAddress,
dataTransfer: dataTransfer,
dealAcceptanceBuffer: DefaultDealAcceptanceBuffer,
pubSub: pubsub.New(providerDispatcher),
} Generates an fsm state group object. deals, err := NewProviderStateMachine(
ds,
&providerDealEnvironment{h},
h.dispatch,
)h.deals = deals The configuration parameters used by the fsm state group object are as follows: return fsm.New(ds, fsm.Parameters{
Environment: env,
StateType: storagemarket.MinerDeal{},
StateKeyField: "State",
Events: providerstates.ProviderEvents,
StateEntryFuncs: providerstates.ProviderStateEntryFuncs,
FinalityStates: providerstates.ProviderFinalityStates,
Notifier: notifier,
}) Configure Provider object using the configuration options. h.Configure(options...) Set the data transmission monitoring object. dataTransfer.SubscribeToEvents(dtutils.ProviderDataTransferSubscriber(deals)) When data transfer starts, ends, or fails, ProviderEventDataTransferInitiated , ProviderEventDataTransferCompleted , ProviderEventDataTransferFailed , and other events are sent to the fsm status group. Returns Provider object.
The HandleDeals function (node/modules/storageminer.go) is automatically called during the storage miner startup process. In this function, the Start method of StorageProvider object is called to start the object. The execution process of Start method is as follows: Call SetDelegate of the StorageMarketNetwork network object to set the proxy/delegate to itself. err := p.net.SetDelegate(p) The network object is implemented as the libp2pStorageMarketNetwork structure (storagemarket/network/libp2p_impl.go). Its SetDelegate method is as follows: impl.receiver = r
impl.host.SetStreamHandler(storagemarket.DealProtocolID, impl.handleNewDealStream)
impl.host.SetStreamHandler(storagemarket.AskProtocolID, impl.handleNewAskStream)
return nil The handleNewDealStream method of the network object is set above to handle the DealProtocolID protocol, which means storage; the handleNewAskStream method is set to handle AskProtocolID protocol, which means ask. The content of handleNewDealStream method is as follows: // Client peer id
remotePID := s.Conn().RemotePeer() buffered := bufio.NewReaderSize(s, 16) // Wrap the stream ds := &dealStream{remotePID, impl.host, s, buffered} // Call the HandleDealStream method of the StorageProvider object to process the client storage request impl.receiver.HandleDealStream(ds) Call the restartDeals method of StorageProvider object in the coroutine to reprocess the transaction. The restartDeals method process is as follows: If the current transaction object has been terminated, proceed to the next process. If the connection of the current transaction object has been closed, proceed to the next process. Sends an initial transaction event to the fsm status group. err = c.deals.Send(deal.ProposalCid, storagemarket.ProviderEventRestart) The Cid of a transaction proposal represents the name/number of the state machine.
Get all transaction objects from the fsm state group object. var deals []storagemarket.MinerDeal
err := c.deals.List(&deals) Traverse all transaction objects and perform the following processing:
Returns a null value.
Link to this article: https://www.8btc.com/article/630375 Please indicate the source of the article when reprinting |