[libp2p] running echo examples in the browser

Hi,
I am trying to replicate the echo example [https://github.com/libp2p/js-libp2p/tree/master/examples/echo] in the browser and have found that there is not any simple documentations about using stream. Also this example uses pip function which does is not available on the browser. Is it possible that someone give me some hint on it?
here is my example

here I have stream object and I want to send a message using it to another peer

const {
            stream
        } = await dialerNode.libp2p.dialProtocol(listenerMultiaddr, '/echo/1.0.0')

        console.log('nodeA dialed to nodeB on protocol: /echo/1.0.0')

        // ['hey'] |> stream |> async function (source) {
        //         // For each chunk of data
        //         for await (const data of source) {
        //             // Output the data
        //             console.log('received echo:', data.toString())
        //         }
        //     };
        stream(['hey'])

But here stream is an object not a function. I have no idea how pipe (in the original example) is passing the ['hey'] to the stream object.

The same is here. I cannot read from stream since in that example a pipe is used.

await node.libp2p.handle('/echo/1.0.0', ({
            stream
        }) => {
            stream => (async function () {
                    for await (const msg of source) {
                        console.log(msg.toString())
                    }
                })()
        })

Thanks

Hey @am2222

Thanks for reaching out. I agree that we should have better documentation around the stream usage in js-libp2p. I will be opening an issue to track that.
Another example that can be useful for you to understand this is https://github.com/libp2p/js-libp2p/tree/master/examples/protocol-and-stream-muxing

These examples use it-pipe https://github.com/alanshaw/it-pipe. A libp2p stream is an async iterable not a function, and it-pipe is responsible for piping async iterables together, similarly to pipe from node streams. It works in the browser and it is widely used across all libp2 code base. You can look into the it-pipe readme as well as https://github.com/alanshaw/it-awesome to know more on how you can use these async iterables.

The example codebase should be identical for a browser environment. The only changes that are needed is how nodes discover each other and the transport they use to exchange data. In this example, it is used TCP transport that is not supported in a browser environment. https://github.com/libp2p/js-libp2p/tree/master/examples/libp2p-in-the-browser shows how you can connect to browser nodes. Then, the echo example can be easily added on top of it. Let me know if you are not able to get this working

1 Like

@vasco-santos Thanks so much for the detailed reply. I really appreciate your help. I finally could make it work but I have two questions which I appreciate if you help me.
1- Is peer id constant per each user? For example if I store a user’s peerid how is guaranteed that I can dial that peer next time? with considering that user is online when we dial them.
2- I could read the message from other peer. but I can I change the message and return it using echo to the user? something like this:

        pipe(
            // Read from the stream (the source)
            stream.source,
            // Sink function
            async function (source) {
                    // For each chunk of data
                    for await (const msg of source) {
                        // Output the data as a utf8 string
                        console.log('> ' + msg.toString('utf8').replace('\n', ''))
                       
                    }
                    //=========== how can I change stream and return it to the sender using echo. 

                },
                stream.sink
        )


I know that I can dial the dialer and send back an answer, but I was wondering to return message using echo so I don’t need to dial sender again

1 - It really depends on your setup. If you are using IPFS, it has the init and start methods. The init will initiate an IPFS repo and create a PeerId. This PeerId derivates from the key pair generated for the peer that is also stored in the repo. On every single time this peer starts it will read its repo and start accordingly, so the PeerId will be the same. If you remove the repo and init IPFS again, a new PeerId is generated. If you are using libp2p directly, You will need to have the above logic yourself. Libp2p will be create with a previously computed PeerId if it is provided on Libp2p.create({ ..., peerId }), otherwise a new one is created.
It is also important pointing out that a peer can be started listening on a given multiaddr, but when it stops and starts in the future, its addresses may change. If that is the case, the new multiaddrs need to be discovered by peers that already knew it. Or, if the peer who starts with new multiaddrs dials the peers it knew before, this will make the other peers to update the known multiaddrs for it. All this informations is stored in the peers’ PeerStore https://github.com/libp2p/js-libp2p/tree/master/src/peer-store

2 - Yeah, you basically are using pipe as follows:

    pipe (
      // Read
      // Data operation, transformer, logger, wtv
      // Write
    )

This means that the first argument of pipe must be an async iterator source, the second parameter must be an async iterators with source and sink and the third one a sync. The issue in your example is that you have an async iterator with a sync (consume data from the source above), but you don’t have a source for the following sink.

The solution would be something as:

    pipe (
      stream.source,
      function logger (source) {
        return (async function * () { // A generator is async iterable
          for await (const msg of source) {
            console.log('> ' + msg.toString('utf8').replace('\n', ''))
            yield val
          }
        })()
      },
      stream.sink
    )

You have a cleanner way of doing this :slight_smile: https://github.com/bustle/streaming-iterables#tap

   pipe (
     stream.source,
     tap((msg) => console.log(msg.toString())),
     stream.sink
   )
1 Like

@vasco-santos, thanks so much for your reply and your help. I just had a question about rewriting into an incoming stream. In this example that you shared is it possible to write into stream and use stream.sink to return a new message to the dialer?

You can rewrite to the stream many times you want, considering that the other side continue with the readable part of the stream open and is parsing your messages. Otherwise, you can also create a new stream for the new exchange, as streams are not expensive to run.

1 Like

@vasco-santos thanks so much, Is there any examples of reading them and then writing the same stream? Since If I think making a new stream needs dialling peer again. right? Is it possible to have something like this?
Peer1->dial peer2 -> peer1 writes stream -> peer2 reads steam -> peer2 writes a new message in stream -> peer2 responds to the peer1 -> peer1 reads stream of the response

You can do something as below:

node2.handle('/example', async ({ stream }) => {
    await pipe(
      stream.source,
      function transform(source) {
        return (async function* () { // A generator is async iterable
          for await (const val of source) {
            yield String(Number(val.toString()) * 2)
          }
        })()
      },
      stream.sink
    )
  })

  const { stream: stream1 } = await node1.dialProtocol(node2.peerId, ['/example'])
  const result = await pipe(
    ['1', '2', '3'],
    stream1,
    async function collect(source) {
      const vals = []
      for await (const val of source) {
        vals.push(val.toString())
      }
      return vals
    }
  )
  console.log('result: ', result)

I did the full code to show you the internals, but you can use streaming-iterables goodies for the transformer and collect functions.

const { collect, take, consume } = require('streaming-iterables')

node2.handle('/example', async ({ stream }) => {
    await pipe(
      stream.source,
      function transform(source) {
        return (async function* () { // A generator is async iterable
          for await (const val of source) {
            yield String(Number(val.toString()) * 2)
          }
        })()
      },
      stream.sink
    )
  })

  const { stream: stream1 } = await node1.dialProtocol(node2.peerId, ['/example'])
  const result = await pipe(
    ['1', '2', '3'],
    stream1,
    collect
  )
  console.log('result: ', result)

Similar for transform

Since If I think making a new stream needs dialling peer again. right?

Not really. Going a bit into libp2p core details, libp2p.dialProtocol calls libp2p.dial which will establish a connection with the other peer. After this, conn.newStream(protocol) is done to get a stream within the connection. You cal also do:

const conn = libp2p.dial(peer)
const { stream } = conn.newStream(protocol)

// do stuff and close stream
// create new stream as above

Anyway, libp2p.dial is more intelligent! So, before establishing a connection, it will verify via the libp2p.connectionManager.get(peerId) if we already have a connection for the given peer. If so, we return that connection and do not create a new one.

1 Like

@vasco-santos thanks so much for your detailed respond.