Posted:    January 20, 2000 
Status:    
		Completed 
	
 
 Tags :    
						
				 
 Categories :   
						
				 
解决方案
int read = channel.read(buffer);
if(read > 0){
byte[] data = buffer.array();
String msg = new String(data).trim();
System.out.println(“服务端收到信息:” + msg);
	//回写数据
	ByteBuffer outBuffer = ByteBuffer.wrap("好的".getBytes());
	channel.write(outBuffer);// 将消息回送给客户端
}else{
	System.out.println("客户端关闭");
	key.cancel();
}
selector.select()
selector.select(1000);不阻塞 会继续往下执行程序
selector.wakeup();也可以唤醒selector 继续往下执行程序
selector.selectNow();也可以立马返回程序 死循环
OP_WRITE表示底层缓冲区是否有空间,是则响应返还true netty版本大致版本分为 netty3.x 和 netty4.x、netty5.x
netty可以运用在那些领域?
例如: hadoop、dubbo、akka等具有分布式功能的框架,底层RPC通信都是基于netty实现的,这些框架使用的版本通常都还在用netty3.x
最新的游戏服务器有部分公司可能已经开始采用netty4.x 或 netty5.x
SimpleChannelHandler 处理消息接收和写
{
messageReceived接收消息
channelConnected新连接,通常用来检测IP是否是黑名单
channelDisconnected链接关闭,可以再用户断线的时候清楚用户的缓存数据等
}
channelDisconnected与channelClosed的区别?
channelDisconnected只有在连接建立后断开才会调用 channelClosed无论连接是否成功都会调用关闭资源
不是,一个系统可以有多个selector
不是,可以注册多个 一个thread + 队列 == 一个单线程线程池 =====> 线程安全的,任务是线性串行执行的 线程安全,不会产生阻塞效应 ,使用对象组 线程不安全,会产生阻塞效应, 使用对象池
心跳对于服务端来说,定时清除闲置会话inactive(netty5) channelclose(netty3) 心跳对客户端来说,用来检测会话是否断开,是否重连! 用来检测网络延时!
我有一个玩具Netty服务器,并且尝试在客户端的通道未发生任何事件时向其发送心跳消息。我正在通过telnet到服务器,编写消息然后不发送任何内容来对此进行测试,但是我听不到任何声音!
安慰:
>>telnet localhost 6969
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
>>foo
Did you say 'foo'?
MyPipelineFactory.java
public class MyPipelineFactory implements ChannelPipelineFactory {
    private final Timer timer;
    private static final ChannelHandler stringDecoder = new StringDecoder();
    private static final ChannelHandler stringEncoder = new StringEncoder();
    private final ChannelHandler idleStateHandler;
    public MyPipelineFactory(Timer t) {
        this.timer = t;
        this.idleStateHandler = new IdleStateHandler(timer, 5, 5, 5);
    }
    public ChannelPipeline getPipeline() {
        // create default pipeline from static method
        ChannelPipeline pipeline = Channels.pipeline();
        pipeline.addLast("idleStateHandler", this.idleStateHandler); // heartbeat
        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));
        //pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1024,0,1)); // get header from message
        pipeline.addLast("stringDecoder", stringDecoder);
        pipeline.addLast("stringEncoder", stringEncoder);
        pipeline.addLast("ServerHandler", new ServerHandler()); // goes at the end
        return pipeline;
    }
}
HeartbeatHandler.java
public class HeartbeatHandler extends IdleStateAwareChannelHandler {
    @Override
    public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) {
        if (e.getState() == IdleState.READER_IDLE) {
            System.out.println("Reader idle, closing channel");
            //e.getChannel().close();
            e.getChannel().write("heartbeat-reader_idle");
        }
        else if (e.getState() == IdleState.WRITER_IDLE) {
            System.out.println("Writer idle, sending heartbeat");
            e.getChannel().write("heartbeat-writer_idle");
        }
        else if (e.getState() == IdleState.ALL_IDLE) {
            System.out.println("All idle, sending heartbeat");
            e.getChannel().write("heartbeat-all_idle");
        }
    }
}
固定:
我忘记了HeartbeatHandler,它需要IdleStateHandler(这部分对我来说并不明显)。这样可行。
public class MyPipelineFactory implements ChannelPipelineFactory {
    private final Timer timer;
    private static final ChannelHandler stringDecoder = new StringDecoder();
    private static final ChannelHandler stringEncoder = new StringEncoder();
    private final ChannelHandler idleStateHandler;
    private final ChannelHandler heartbeatHandler;
    public MyPipelineFactory(Timer t) {
        this.timer = t;
        this.idleStateHandler = new IdleStateHandler(timer, 5, 5, 5);
        this.heartbeatHandler = new HeartbeatHandler();
    }
    public ChannelPipeline getPipeline() {
        // create default pipeline from static method
        ChannelPipeline pipeline = Channels.pipeline();
        pipeline.addLast("idleStateHandler", this.idleStateHandler);
        pipeline.addLast("heartbeatHandler", this.heartbeatHandler); // heartbeat
        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));
        //pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1024,0,1)); // get header from message
        pipeline.addLast("stringDecoder", stringDecoder);
        pipeline.addLast("stringEncoder", stringEncoder);
        pipeline.addLast("ServerHandler", new ServerHandler()); // goes at the end
        return pipeline;
    }
}
您能解释一下Netty如何使用线程池工作吗?我是否正确理解,线程池有两种:老板线程和工人线程。老板用于执行I / O,而worker用于调用用户回调(messageReceived)来处理数据?
这是来自NioServerSocketChannelFactory文档
一个ServerSocketChannelFactory,它创建一个基于NIO的服务器端ServerSocketChannel。它利用NIO引入的非阻塞I / O模式来有效地服务许多并发连接。
线程如何工作 NioServerSocketChannelFactory中有两种类型的线程:一个是老板线程,另一个是工作线程。
老板线程
每个绑定的ServerSocketChannel都有自己的老板线程。例如,如果您打开了两个服务器端口(例如80和443),则将有两个老板线程。Boss线程接受传入的连接,直到未绑定端口。一旦成功接受了连接,老板线程就将接受的Channel传递给NioServerSocketChannelFactory管理的工作线程之一。
工作线程 一个NioServerSocketChannelFactory可以具有一个或多个工作线程。工作线程以非阻塞模式对一个或多个通道执行非阻塞读写。
在Nio模型中,bossThread照顾所有有界套接字(监听套接字),workerThread照顾Accepted- socket(包括IO和调用messageMethod等接收事件的方法)。
尽管我的业务逻辑没有问题,但事实证明我没有使用Netty
ByteBuf。更新要使用的测试代码后ByteBuf,我遇到了IllegalReferenceCountException的无尽循环。我承认对Netty还是陌生的,但这并不能证明在手动分配和释放资源的日子里回来。创建GC就是为了避免这种混乱。迪斯科,有人吗?那贝尔底呢?
public class StringDecoder extends AbstractDecoder<String> {
    private static final IntPredicate NEWLINE_DELIMITER = b -> b == '\n' || b == '\r';
    @Override
    public Flux<String> decode(Publisher<DataBuffer> publisher, ResolvableType elementType, MimeType mimeType, Map<String, Object> hints) {
        return Flux.from(publisher)
            .scan(Tuples.<Flux<DataBuffer>, Optional<DataBuffer>>of(Flux.empty(), Optional.empty()),
                    (acc, buffer) -> {
                        List<DataBuffer> results = new ArrayList<>();
                        int startIdx = 0, endIdx = 0, limit = buffer.readableByteCount();
                        Optional<DataBuffer> incomplete = acc.getT2();
                        while (startIdx < limit && endIdx != -1) {
                            endIdx = buffer.indexOf(NEWLINE_DELIMITER, startIdx);
                            int length = (endIdx == -1 ? limit : endIdx) - startIdx;
                            DataBuffer slice = buffer.slice(startIdx, length);
                            DataBuffer tmp = incomplete.map(b -> b.write(slice))
                                    .orElse(buffer.factory().allocateBuffer(length).write(slice));
                            tmp = DataBufferUtils.retain(tmp);
                            if (endIdx != -1) {
                                startIdx = endIdx + 1;
                                results.add(tmp);
                                incomplete = Optional.empty();
                            } else {
                                incomplete = Optional.of(tmp);
                            }
                        }
                        releaseBuffer(buffer);
                        return Tuples.of(Flux.fromIterable(results), incomplete);
                    })
            .flatMap(t -> {
                t.getT2().ifPresent(this::releaseBuffer);
                return t.getT1();
            })
            .map(buffer -> {
                // charset resolution should in general use supplied mimeType
                String s = UTF_8.decode(buffer.asByteBuffer()).toString();
                releaseBuffer(buffer);
                return s;
            })
            .log();
    }
    private void releaseBuffer(DataBuffer buffer) {
        boolean release = DataBufferUtils.release(buffer);
        if (release) {
            System.out.println("Buffer was released.");
        }
    }
}
public class StringDecoderTest {
    private StringDecoder stringDecoder = new StringDecoder();
    DataBufferFactory dataBufferFactory = new NettyDataBufferFactory(UnpooledByteBufAllocator.DEFAULT);
    @Test
    public void testDecode() {
        Flux<DataBuffer> pub = Flux.just("abc\n", "abc", "def\n", "abc", "def\nxyz\n", "abc", "def", "xyz\n")
                .map(s -> dataBufferFactory.wrap(s.getBytes(UTF_8)));
        StepVerifier.create(stringDecoder.decode(pub, null, null, null))
                .expectNext("abc", "abcdef", "abcdef", "xyz", "abcdefxyz")
                .verifyComplete();
    }
}
我不断得到:
[ERROR] (main) onError(io.netty.util.IllegalReferenceCountException: refCnt: 0)
[ERROR] (main)  - io.netty.util.IllegalReferenceCountException: refCnt: 0
io.netty.util.IllegalReferenceCountException: refCnt: 0
    at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1415)
    at io.netty.buffer.UnpooledHeapByteBuf.nioBuffer(UnpooledHeapByteBuf.java:314)
    at io.netty.buffer.AbstractUnpooledSlicedByteBuf.nioBuffer(AbstractUnpooledSlicedByteBuf.java:434)
    at io.netty.buffer.CompositeByteBuf.nioBuffers(CompositeByteBuf.java:1496)
    at io.netty.buffer.CompositeByteBuf.nioBuffer(CompositeByteBuf.java:1468)
    at io.netty.buffer.AbstractByteBuf.nioBuffer(AbstractByteBuf.java:1205)
    at org.springframework.core.io.buffer.NettyDataBuffer.asByteBuffer(NettyDataBuffer.java:234)
    at org.abhijitsarkar.java.StringDecoder.lambda$decode$4(StringDecoder.java:61)
工作代码:
public class StringDecoder extends AbstractDecoder<String> {
    private static final IntPredicate NEWLINE_DELIMITER = b -> b == '\n' || b == '\r';
    @Override
    public Flux<String> decode(Publisher<DataBuffer> publisher, ResolvableType elementType, MimeType mimeType, Map<String, Object> hints) {
        DataBuffer incomplete = new NettyDataBufferFactory(UnpooledByteBufAllocator.DEFAULT).allocateBuffer(0);
        return Flux.from(publisher)
                .scan(Tuples.<Flux<DataBuffer>, DataBuffer>of(Flux.empty(), retain(incomplete)),
                      (acc, buffer) -> {
                          List<DataBuffer> results = new ArrayList<>();
                          int startIdx = 0, endIdx = 0, limit = buffer.readableByteCount();
                          while (startIdx < limit && endIdx != -1) {
                              endIdx = buffer.indexOf(NEWLINE_DELIMITER, startIdx);
                              int length = (endIdx == -1 ? limit : endIdx) - startIdx;
                              DataBuffer slice = buffer.slice(startIdx, length);
                              byte[] slice1 = new byte[length];
                              slice.read(slice1, 0, slice1.length);
                              if (endIdx != -1) {
                                  byte[] slice2 = new byte[incomplete.readableByteCount()];
                                  incomplete.read(slice2, 0, slice2.length);
                                  // call retain to match release during decoding to string later
                                  results.add(retain(
                                          incomplete.factory().allocateBuffer()
                                                  .write(slice2)
                                                  .write(slice1)
                                  ));
                                  startIdx = endIdx + 1;
                              } else {
                                  incomplete.write(slice1);
                              }
                          }
                          return Tuples.of(Flux.fromIterable(results), incomplete);
                      })
                .flatMap(Tuple2::getT1)
                .map(buffer -> {
                    // charset resolution should in general use supplied mimeType
                    String s = UTF_8.decode(buffer.asByteBuffer()).toString();
                    return s;
                })
                .doOnTerminate(() -> release(incomplete))
                .log();
    }
}
该代码可能更简洁一些,但是适用于Spring bug SPR-16351。
我编写了使用文本协议接受连接和轰炸消息(〜100字节)的服务器,并且我的实现能够与3rt客户端发送约400K / sec的回送消息。我为此任务选择了Netty,即SUSE 11 RealTime,JRockit RTS。但是,当我开始基于Netty开发自己的客户端时,吞吐量却急剧下降(从400K msg / sec降低到1.3K msg / sec)。客户端的代码非常简单。能否请您提供建议或示例,说明如何编写更有效的客户。实际上,我实际上更关心延迟,但是从吞吐量测试开始,我认为在环回中以1.5Kmsg / sec的速度正常是不正常的。PS客户端的目的只是接收来自服务器的消息,很少发送心跳信号。
Client.java
public class Client {
private static ClientBootstrap bootstrap;
private static Channel connector;
public static boolean start()
{
    ChannelFactory factory =
        new NioClientSocketChannelFactory(
                Executors.newCachedThreadPool(),
                Executors.newCachedThreadPool());
    ExecutionHandler executionHandler = new ExecutionHandler( new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576));
    bootstrap = new ClientBootstrap(factory);
    bootstrap.setPipelineFactory( new ClientPipelineFactory() );
    bootstrap.setOption("tcpNoDelay", true);
    bootstrap.setOption("keepAlive", true);
    bootstrap.setOption("receiveBufferSize", 1048576);
    ChannelFuture future = bootstrap
            .connect(new InetSocketAddress("localhost", 9013));
    if (!future.awaitUninterruptibly().isSuccess()) {
        System.out.println("--- CLIENT - Failed to connect to server at " +
                           "localhost:9013.");
        bootstrap.releaseExternalResources();
        return false;
    }
    connector = future.getChannel();
    return connector.isConnected();
}
public static void main( String[] args )
{
    boolean started = start();
    if ( started )
        System.out.println( "Client connected to the server" );
}
}
ClientPipelineFactory.java
public class ClientPipelineFactory  implements ChannelPipelineFactory{
private final ExecutionHandler executionHandler;
public ClientPipelineFactory( ExecutionHandler executionHandle )
{
    this.executionHandler = executionHandle;
}
@Override
public ChannelPipeline getPipeline() throws Exception {
    ChannelPipeline pipeline = pipeline();
    pipeline.addLast("framer", new DelimiterBasedFrameDecoder(
              1024, Delimiters.lineDelimiter()));
    pipeline.addLast( "executor", executionHandler);
    pipeline.addLast("handler", new MessageHandler() );
    return pipeline;
}
}
MessageHandler.java
public class MessageHandler extends SimpleChannelHandler{
long max_msg = 10000;
long cur_msg = 0;
long startTime = System.nanoTime();
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
    cur_msg++;
    if ( cur_msg == max_msg )
    {
        System.out.println( "Throughput (msg/sec) : " + max_msg* NANOS_IN_SEC/(     System.nanoTime() - startTime )   );
        cur_msg = 0;
        startTime = System.nanoTime();
    }
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
    e.getCause().printStackTrace();
    e.getChannel().close();
}
}
更新。在服务器端,存在一个定期线程,该线程写入已接受的客户端通道。而且该频道很快就无法写入。更新N2。在管道中添加了OrderedMemoryAwareExecutor,但是吞吐量仍然很低(大约4k msg / sec)
固定。我将执行程序放在整个管道堆栈的前面,结果成功了!
如果服务器正在发送固定大小(〜100字节)的消息,则可以将ReceiveBufferSizePredictor设置为客户端引导程序,这将优化读取
bootstrap.setOption("receiveBufferSizePredictorFactory",
            new AdaptiveReceiveBufferSizePredictorFactory(MIN_PACKET_SIZE, INITIAL_PACKET_SIZE, MAX_PACKET_SIZE));
根据您发布的代码段:客户端的nio工作线程正在做管道中的所有事情,因此它将忙于解码和执行消息处理程序。您必须添加一个执行处理程序。
您已经说过,通道从服务器端变得不可写,因此您可能必须在服务器引导程序中调整水印大小。您可以定期监视写缓冲区大小(写队列大小),并确保由于消息无法写到网络而使通道变得不可写。可以通过以下类似的util类来完成。
package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.channel.Channel;
public final class NioChannelUtil {
  public static long getWriteTaskQueueCount(Channel channel) {
    NioSocketChannel nioChannel = (NioSocketChannel) channel;
    return nioChannel.writeBufferSize.get();
  }
}
https://www.cnblogs.com/xuxinstyle/p/9872915.html
https://blog.csdn.net/qq_38772518/article/details/105834573
http://www.mianshigee.com/
Be the first one to comment on this page!