企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
``` //1) 自定义可写流 let {Readable} = require('stream'); // 想实现什么流 就继承这个流 // Readable 里面有一个 read() 方法,默认调_read(); // Readable中提供了一个push方法,你调用push方法就会触发data事件 // let index = 9; // class MyRead extends Readable{ // _read(){ // // 可读流什么时候停止呢? 当push null的时候停止 // if(index-->0)return this.push('123'); // this.push(null); // } // } // let mr = new MyRead; // mr.on('data',function(data){ // console.log(data); // }); // --- --- --- //2) 自定义可写流 let {Writable} = require('stream'); // 可写流实现_write方法 // 源码中默认调用的是Writable中的write方法 // class MyWrite extends Writable{ // _write(chunk,encoding,callback){ // console.log(chunk.toString()); // callback(); // clearBuffer() // } // } // let mw = new MyWrite(); // mw.write('珠峰','utf8',()=>{ // console.log(1); // }) // mw.write('珠峰','utf8',()=>{ // console.log(1); // }) // --- --- --- //3) Duplex双工流 let {Duplex} = require('stream'); // 双工流 又能读 又能写,而且读写可以没关系 // class D extends Duplex{ // _read(){ // } // _write(){ // } // } // let d = Duplex({ // read(){ //等同于上面的_read // this.push('hello'); // this.push(null); // } // ,write(chunk,encoding,callback){ // console.log(chunk); // callback(); // } // }) // d.on('data',function(data){ // console.log(data); //<Buffer 68 65 6c 6c 6f> // }) // d.write('world'); // <Buffer 77 6f 72 6c 64> //可以只实现一个 读或则写 // --- --- --- //4) transform流 他就是Duplex 但它不需要实现read和write,它实现的是transform let {Transform} = require('stream'); // 他的参数和可写流一样 let transform1 = Transform({ transform(chunk,encoding,callback){ // console.log(chunk); //<Buffer 31 0d 0a> // 0d \r 0a \n // callback(); //必须调 不然卡死 //类似于next this.push(chunk.toString().toUpperCase()); // 将输入的内容放入到可读流中 callback(); //比填 } }); let transform2 = Transform({ transform(chunk,encoding,callback){ console.log(chunk.toString()); callback(); } }); // 等待你输入 // rs.pipe(ws); process.stdin.pipe(transform1).pipe(transform2) // 可读流里只能放 buffer 或则 字符串 对象流里可以放对象 ```