[go-libp2p] [go-libp2p] [go-libp2p] [go-libp2p] [go-libp2p] [go-libp2p] [go-libp2p] [go-libp2p] [go-libp2p] [go-libp2p] [go-libp2p] [go-libp2p] [go-libp2p] [go-libp2p] [go-libp2p] [go-libp2p] [go-libp2p] [go-libp2p] The functionality is basically the same as go-libp2p, with a slight simplification, removing some features of Go-libp2p dialing, such as synchronous dialing restriction in DialSync. Libp2p-rs swarm can be used to call the libp2P-RS swarm.

Code organization

Warehouse address: github.com/netwarps/li… The dial-related codes are mainly distributed in swarm/ SRC /lib.rs and swarm/ SRC /dial.rs files

The class diagram is as follows:

  • The dialing implementation mainly revolves around AsyncDialer, which combines the functions of DialLimiter and Backoff. AsyncDialer realizes dialing retry, dialing task start and dialing result collection and feedback. Dialing does not retry by default. You can modify environment variablesLIBP2P_SWARM_DIAL_ATTEMPTSExample Modify the retry times.
  • DialParam wraps multiple dialing parameters that are passed between AsyncDialer methods
  • Transports can match the appropriate Transport to dial based on the dialing address (such as TCP or WebSocket)
  • DialBackoff marks the address where the Peer fails to dial up to avoid frequent dialing
  • DialLimiter limits the number of concurrent calls to 100 by default, or by modifying the environment variableLIBP2P_SWARM_DIAL_LIMITModify the number of concurrent dials

The working process

The sequence diagram is as follows:

  1. Swarm can create a stream by calling new_connection via control or by calling open_stream directly to create a stream. Swarm receives the command and calls on_new_connection or on_new_stream. In on_new_stream, if a connection exists, create a stream. If a connection does not exist, dial a new connection and create a stream. At last, call dial_peer to dial the peer. In this case, the parameters used for DialParam will be copied from Swarm to DialParam.

Note: We dialed without returning connection directly (since connection is only used for open_stream, returning a value would be a bit redundant and there would be lifecycle issues with returning a mutable reference). A closure is constructed (primarily to open the stream and return it), which is eventually executed in the ConnectionEstablished event or OutgoingConnectionError event handler. Since dialing requires multiple tasks to be started, the closure needs to support clone. The closure capture the external oneshot::Sender, which does not support clone, so for convenience we temporarily stored the closure in dial_transactions in Swarm. It is a HashMap data structure where the key value is a unique value generated for each operation and we name it TransactionId. This TransactionId will eventually be taken to the handler for either the ConnectionEstablished event or the OutgoingConnectionError event, and we can remove the closure to execute based on the TransactionId.

Partial code snippet

type DialCallback = Box<dyn FnOnce(Result<&mut Connection>) + Send>;
Copy the code
fn on_new_stream(&mut self, peer_id: PeerId, pids: Vec<ProtocolId>, reply: oneshot::Sender<Result<Substream>>) -> Result<()> { if let Some(connection) = self.get_best_conn(&peer_id) { ...... } else { // dialing peer, and opening a new stream in the post-processing callback self.dial_peer(peer_id.clone(), |r: Result<&mut Connection>| { match r { Ok(connection) => { connection.open_stream(pids, |r| { let _ = reply.send(r.map_err(|e| e.into())); }); } Err(e) => { let _ = reply.send(Err(e)); }}}); } Ok(()) }Copy the code
 fn dial_peer<F: FnOnce(Result<&mut Connection>) + Send + 'static>(&mut self, peer_id: PeerId, f: F) {
        ......
        
           // allocate transaction id and push box::f into hashmap for post-processing
        let tid = self.assign_tid();
        self.dial_transactions.insert(tid, Box::new(f));
        self.dialer
            .dial(peer_id, self.transports.clone(), addrs, self.event_sender.clone(), tid);
    }
Copy the code
fn handle_connection_opened(&mut self, stream_muxer: IStreamMuxer, dir: Direction, tid: Option<TransactionId>) -> Result<()> { ...... // dial callback for post-processing // note that it must cleanup the tid entry if let Some(id) = tid { // the entry must be there let callback = self.dial_transactions.remove(&id).expect("no match tid found"); callback(Ok(&mut connection)); }... }Copy the code
  1. Swarm calls the Dial method of AsyncDialer. Start a new task, then call the start_Dialing method. The Start_Dialing method provides the retry function for dialing. It waits for the dialing result and sends the dialing result back to Dial. A ConnectionEstablished event is sent if the dialing is successful, and an OutgoingConnectionError event is sent if the dialing fails. The closure passed in the first step is directly passed in the event handler.
pub(crate) fn dial( &self, peer_id: PeerId, transports: Transports, addrs: EitherDialAddr, mut event_sender: mpsc::UnboundedSender<SwarmEvent>, tid: TransactionId, ) { let dial_param = DialParam { transports, addrs, peer_id, tid, limiter: self.limiter.clone(), backoff: self.backoff.clone(), attempts: self.attempts, }; task::spawn(async move { let tid = dial_param.tid; let peer_id = dial_param.peer_id.clone(); let r = AsyncDialer::start_dialing(dial_param).await; match r { Ok(stream_muxer) => { let _ = event_sender .send(SwarmEvent::ConnectionEstablished { stream_muxer, direction: Direction::Outbound, tid: Some(tid), }) .await; } Err(err) => { let _ = event_sender .send(SwarmEvent::OutgoingConnectionError { tid, peer_id, error: err }) .await; }}}); }Copy the code
async fn start_dialing(dial_param: DialParam) -> Result<IStreamMuxer> { let mut dial_count: u32 = 0; loop { dial_count += 1; let active_param = dial_param.clone(); let r = AsyncDialer::dial_addrs(active_param).await; if let Err(e) = r { log::info! ("[Dialer] dialer failed at attempt={} error={:? }", dial_count, e); if dial_count < dial_param.attempts { log::info! ( "[Dialer] All addresses of {:? } cannot be dialed successfully. Now try dialing again, attempts={}", dial_param.peer_id, dial_count ); //TODO: task::sleep(BACKOFF_BASE).await; } else if dial_param.attempts > 1 { break Err(SwarmError::MaxDialAttempts(dial_param.attempts)); } else { break Err(e); } } else { break r; }}}Copy the code
  1. Start internally invokes dial_addrs, that is, dial up multiple addresses of the peer. First check backoff, if just failed to dial, then return an error. Then construct a DialJob for each address, and start a task for each DialJob to call limiter’s do_dial_job for dial-up check and dial-up operation. Because we don’t know when task can complete dial-up, we pass a channel tx into it. Once the dial is complete, a message is sent back, which is then received outside. Several tasks are started and channel RX messages are received several times. Once a successful dial is found, the result is returned directly. The ones that succeed, we don’t care, we let them self-destruct; Backoff is added to those that fail to dial to avoid frequent dialing of failed addresses.
let (tx, rx) = mpsc::unbounded::<(Result<IStreamMuxer>, Multiaddr)>(); let mut num_jobs = 0; for addr in addrs_rank { // first of all, check the transport let r = param.transports.lookup_by_addr(addr.clone()); if r.is_err() { log::info! ("[Dialer] no transport found for {:? }", addr); continue; } num_jobs += 1; let dj = DialJob { addr, peer: peer_id.clone(), tx: tx.clone(), transport: r.unwrap(), }; // spawn a task to dial let limiter = self.limiter.clone(); task::spawn(async move { limiter.do_dial_job(dj).await; }); } log::trace! ("total {} dialing jobs started, collecting..." , num_jobs); self.collect_dialing_result(rx, num_jobs, param).awaitCopy the code
async fn collect_dialing_result(&self, mut rx: UnboundedReceiver<(Result<IStreamMuxer>, Multiaddr)>, jobs: u32, param: DialParam) -> Result<IStreamMuxer> { for i in 0.. jobs { let peer_id = param.peer_id.clone(); log::trace! ("[Dialer] receiving dial result, finished jobs={} ..." , i); let r = rx.next().await; match r { Some((Ok(stream_muxer), addr)) => { let reported_pid = stream_muxer.remote_peer(); if peer_id == reported_pid { return Ok(stream_muxer); } else { self.backoff.add_peer(peer_id, addr).await; } } Some((Err(err), addr)) => { if let SwarmError::Transport(_) = err { self.backoff.add_peer(peer_id, addr).await; } } None => { log::warn! ("[Dialer] should not happen"); } } } return Err(SwarmError::AllDialsFailed); }Copy the code
  1. Compared with the implementation of GO, DialLimiter is simplified and the waiting list is removed. If we fail to dial, we will not put it into the waiting list, but directly return an error. Do_dial_job is called by dial_addrs of AsyncDialer. Do_dial_job checks the number of dialers currently being dialed. If the number exceeds our limit, a ConcurrentDialLimit error is returned. Otherwise, the number of concurrent calls is increased by one, and execute_dial is called for the actual dial operation, and the number of concurrent calls after the dial is completed is decreased by one. Here, the dialing of transport is encapsulated with a timeout (default timeout is 5 seconds for local addresses and 60 seconds for external addresses). If a timeout occurs, a DialTimeout error will be returned directly. The message is sent back to AsyncDialer through a channel regardless of whether the dial is successful or not.
 async fn do_dial_job(&self, mut dj: DialJob) {
        if self.dial_consuming.load(Ordering::SeqCst) >= self.dial_limit {
            let _ = dj.tx.send((Err(SwarmError::ConcurrentDialLimit(self.dial_limit)), dj.addr)).await;
            return;
        }
        self.dial_consuming.fetch_add(1, Ordering::SeqCst);
        self.execute_dial(dj).await;
    }
Copy the code
 fn dial_timeout(&self, ma: &Multiaddr) -> Duration {
        let mut timeout: Duration = DIAL_TIMEOUT;
        if ma.is_private_addr() {
            timeout = DIAL_TIMEOUT_LOCAL;
        }
        timeout
    }
  async fn execute_dial(&self, mut dj: DialJob) {
        let timeout = self.dial_timeout(&dj.addr);

        let dial_r = future::timeout(timeout, dj.transport.dial(dj.addr.clone())).await;
        if let Ok(r) = dial_r {
            let _ = dj.tx.send((r.map_err(|e|e.into()), dj.addr)).await;
        } else {
            let _ = dj.tx.send((Err(SwarmError::DialTimeout(dj.addr.clone(), timeout.as_secs())), dj.addr)).await;
        }
        self.dial_consuming.fetch_sub(1, Ordering::SeqCst);
    }
Copy the code

Netwarps is composed of a senior cloud computing and distributed technology development team in China, which has rich experience in the financial, power, communication and Internet industries. Netwarps has set up research and development centers in Shenzhen and Beijing, with a team size of 30+, most of which are technical personnel with more than 10 years of development experience, respectively from the Internet, finance, cloud computing, blockchain and scientific research institutions and other professional fields. Netwarps focuses on the development and application of secure storage technology products, the main products are decentralized file system (DFS), decentralized computing platform (DCP), is committed to providing distributed storage and distributed computing platform based on decentralized network technology, with high availability, low power consumption and low network technical characteristics. Applicable to scenarios such as the Internet of Things and industrial Internet. Public account: Netwarps