WebSocket的支持

okhttp集成了websocket,使用websocket只需要这样写:

  OkHttpClient.Builder builder = new OkHttpClient.Builder();
        OkHttpClient client = builder.build();
        Request.Builder requestBuilder = new Request.Builder();
        requestBuilder.url("ws://192.168.1.101:8888/ws");
        final WebSocket webSocket = client.newWebSocket(requestBuilder.build(),
                                                        new WebSocketListener() {
            @Override
            public void onOpen(WebSocket webSocket, Response response) {
                super.onOpen(webSocket, response);
                webSocket.send("hello websocket");
            }

            @Override
            public void onMessage(WebSocket webSocket, String text) {
                super.onMessage(webSocket, text);
            }

            @Override
            public void onMessage(WebSocket webSocket, ByteString bytes) {
                super.onMessage(webSocket, bytes);
            }

            @Override
            public void onClosing(WebSocket webSocket, int code, String reason) {
                super.onClosing(webSocket, code, reason);
            }

            @Override
            public void onClosed(WebSocket webSocket, int code, String reason) {
                super.onClosed(webSocket, code, reason);
            }

            @Override
            public void onFailure(WebSocket webSocket, Throwable t, Response response) {
                super.onFailure(webSocket, t, response);
            }
        });

发送消息给服务器

webSocket.send("hello websocket");

来看底层实现: HttpClient.newWebSocket()调用后会new 一个RealWebSocket

 RealWebSocket webSocket = new RealWebSocket(request, listener, new Random());
    webSocket.connect(this);

RealWebSocket.java

的构造函数:

public RealWebSocket(Request request, WebSocketListener listener, Random random) {
    if (!"GET".equals(request.method())) {//必需要是get请求
      throw new IllegalArgumentException("Request must be GET: " + request.method());
    }
    this.originalRequest = request;
    this.listener = listener;
    this.random = random;

    byte[] nonce = new byte[16];
    random.nextBytes(nonce);
    this.key = ByteString.of(nonce).base64();//生成一个随机的key后面会成为Sec-WebSocket-Key的Header值

    this.writerRunnable = new Runnable() {//起了一个Runnable,无限循环
      @Override public void run() {
        try {
          while (writeOneFrame()) {
          }
        } catch (IOException e) {
          failWebSocket(e, null);
        }
      }
    };
  }

connect:

1.先是通过源request创建一个新的request,它会加上一些Header信息:Upgrade,Connection,Sec-WebSocket-Key等

2.创建RealCall发第一次请求

3.当第一次请求返回的时候需要走checkResponse

4.如果checkResponse通过,会告诉listener onOpen

5.初始化Reader和Writer initReaderAndWriter

6.把socket的读取时间设成0,表示没有超时时间,也就是长连接

7.启动loopReader

  public void connect(OkHttpClient client) {
      client = client.newBuilder()
          .protocols(ONLY_HTTP1)
          .build();
      final int pingIntervalMillis = client.pingIntervalMillis();
      final Request request = originalRequest.newBuilder()
          .header("Upgrade", "websocket")
          .header("Connection", "Upgrade")
          .header("Sec-WebSocket-Key", key)
          .header("Sec-WebSocket-Version", "13")
          .build();
      call = Internal.instance.newWebSocketCall(client, request);
      call.enqueue(new Callback() {
        @Override public void onResponse(Call call, Response response) {
          try {
            checkResponse(response);
          } catch (ProtocolException e) {
            failWebSocket(e, response);
            closeQuietly(response);
            return;
          }

          // Promote the HTTP streams into web socket streams.
          StreamAllocation streamAllocation = Internal.instance.streamAllocation(call);
          streamAllocation.noNewStreams(); // Prevent connection pooling!
          Streams streams = streamAllocation.connection().newWebSocketStreams(streamAllocation);

          // Process all web socket messages.
          try {
            listener.onOpen(RealWebSocket.this, response);
            String name = "OkHttp WebSocket " + request.url().redact();
            initReaderAndWriter(name, pingIntervalMillis, streams);
            streamAllocation.connection().socket().setSoTimeout(0);
            loopReader();
          } catch (Exception e) {
            failWebSocket(e, null);
          }
        }

        @Override public void onFailure(Call call, IOException e) {
          failWebSocket(e, null);
        }
      });
    }

checkResponse:

1.状态码是101表示握手成功,否则就是握手异常

2.Connection的值必需是Upgrade

3.Upgrade的值必需是websocket

4.Sec-WebSocket-Accept的值必需是 ByteString.encodeUtf8(key + WebSocketProtocol.ACCEPT_MAGIC).sha1().base64()

也就是说服务器收到客户端的key,再在基础上加上ACCEPT_MAGIC然后sha1+base64得到的字符


void checkResponse(Response response) throws ProtocolException {
    if (response.code() != 101) {
      throw new ProtocolException("Expected HTTP 101 response but was '"
          + response.code() + " " + response.message() + "'");
    }

    String headerConnection = response.header("Connection");
    if (!"Upgrade".equalsIgnoreCase(headerConnection)) {
      throw new ProtocolException("Expected 'Connection' header value 'Upgrade' but was '"
          + headerConnection + "'");
    }

    String headerUpgrade = response.header("Upgrade");
    if (!"websocket".equalsIgnoreCase(headerUpgrade)) {
      throw new ProtocolException(
          "Expected 'Upgrade' header value 'websocket' but was '" + headerUpgrade + "'");
    }

    String headerAccept = response.header("Sec-WebSocket-Accept");
    String acceptExpected = ByteString.encodeUtf8(key + WebSocketProtocol.ACCEPT_MAGIC)
        .sha1().base64();
    if (!acceptExpected.equals(headerAccept)) {
      throw new ProtocolException("Expected 'Sec-WebSocket-Accept' header value '"
          + acceptExpected + "' but was '" + headerAccept + "'");
    }
  }

initReaderAndWriter:

  public void initReaderAndWriter(
      String name, long pingIntervalMillis, Streams streams) throws IOException {
    synchronized (this) {
      this.streams = streams;
      this.writer = new WebSocketWriter(streams.client, streams.sink, random);
      this.executor = new ScheduledThreadPoolExecutor(1, Util.threadFactory(name, false));
      if (pingIntervalMillis != 0) {
        executor.scheduleAtFixedRate(
            new PingRunnable(), pingIntervalMillis, pingIntervalMillis, MILLISECONDS);
      }
      if (!messageAndCloseQueue.isEmpty()) {
        runWriter(); // Send messages that were enqueued before we were connected.
      }
    }

    reader = new WebSocketReader(streams.client, streams.source, this);
  }

1.通过scheduleAtFixedRate每过一定的时间发ping

2.如果消息队列(messageAndCloseQueue)不为空就执行一次write

3.初始化WebSocketReader

心跳包之-发ping和接收pong的流程

client发起ping,会收到server的pong,同样server发ping,client会回一个pong ping-pong

pingRunnable会在executor中每隔一定的时间就执行一次(pingInterval可以配置)

private final class PingRunnable implements Runnable {
    PingRunnable() {
    }

    @Override public void run() {
      writePingFrame();
    }
  }

  void writePingFrame() {
    WebSocketWriter writer;
    synchronized (this) {
      if (failed) return;
      writer = this.writer;
    }

    try {
      writer.writePing(ByteString.EMPTY);
    } catch (IOException e) {
      failWebSocket(e, null);
    }
  }

而pong则是服务器的返回:收到pong的返回,只是pongCount++

  @Override public synchronized void onReadPong(ByteString buffer) {
    // This API doesn't expose pings.
    pongCount++;
  }

如果服务器主动发ping会走到onReadPing,此时会把要发的pong加入到Queue,然后启动writer

//作者用了一个Deque来装载所有服务器返回来的pong
private final ArrayDeque<ByteString> pongQueue = new ArrayDeque<>();

public synchronized void onReadPing(ByteString payload) {
    // Don't respond to pings after we've failed or sent the close frame.
    if (failed || (enqueuedClose && messageAndCloseQueue.isEmpty())) return;

    pongQueue.add(payload);
    runWriter();
    pingCount++;
}

runWriter只是执行了一个writerRunnable

private void runWriter() {
    assert (Thread.holdsLock(this));

    if (executor != null) {
      executor.execute(writerRunnable);
    }
  }

这个writerRunnable会循环执行writeOneFrame,如果返回true的话

 this.writerRunnable = new Runnable() {
      @Override public void run() {
        try {
          while (writeOneFrame()) {
          }
        } catch (IOException e) {
          failWebSocket(e, null);
        }
      }
    };
writeOneFrame代码
boolean writeOneFrame() throws IOException {

   //...ignore code
      writer = this.writer;
      pong = pongQueue.poll();
      if (pong == null) {
        messageOrClose = messageAndCloseQueue.poll();
        if (messageOrClose instanceof Close) {
            //...ignore code 处理Close的消息
            this.executor.shutdown();
          } else {
           //...ignore code
        }
      }
    }

    try {
      if (pong != null) {
      //不为空就写pong
        writer.writePong(pong);

      } else if (messageOrClose instanceof Message) {
       //...ignore code
       //处理 message
        sink.write(data);
        sink.close();
        synchronized (this) {
          queueSize -= data.size();
        }

      } else if (messageOrClose instanceof Close) {
       //...ignore code处理close

     //...ignore code
  }

WebSocketReader的loopReader

 public void loopReader() throws IOException {
    while (receivedCloseCode == -1) {
      reader.processNextFrame();
    }
  }

只要receivedCloseCode==-1就一直执行processNextFrame

1.解析reader

2.读控制类的数据(如ping,pong,close等与业务无关的数据)

3.读消息(与业务相关)

  void processNextFrame() throws IOException {
    readHeader();
    if (isControlFrame) {
      readControlFrame();
    } else {
      readMessageFrame();
    }
  }
readHeader
private void readHeader() throws IOException {
    if (closed) throw new IOException("closed");

    // Disable the timeout to read the first byte of a new frame.
    int b0;
    long timeoutBefore = source.timeout().timeoutNanos();
    source.timeout().clearTimeout();
    try {
      b0 = source.readByte() & 0xff;
    } finally {
      source.timeout().timeout(timeoutBefore, TimeUnit.NANOSECONDS);
    }
     opcode = b0 & B0_MASK_OPCODE;//0b00001111
     isFinalFrame = (b0 & B0_FLAG_FIN) != 0;//0b10000000
     isControlFrame = (b0 & OPCODE_FLAG_CONTROL) != 0;//0b00001000

读第一个byte也就是第一个8位的数,后四位是opcode:

数据包类型(frame type),占4bits

0x0:标识一个中间数据包
0x1:标识一个text类型数据包
0x2:标识一个binary类型数据包
0x3-7:保留
0x8:标识一个断开连接类型数据包
0x9:标识一个ping类型数据包
0xA:表示一个pong类型数据包
0xB-F:保留
isFinalFrame是不是最后一Frame,用第一位来表示
isControlFrame是不是操作Frame

来看看readControlFrame代码

1.处理ping
2.处理pong
3.处理关闭

   switch (opcode) {

    static final int OPCODE_CONTROL_CLOSE = 0x8;
     static final int OPCODE_CONTROL_PING = 0x9;
     static final int OPCODE_CONTROL_PONG = 0xa;
   }
        case OPCODE_CONTROL_PING:
          frameCallback.onReadPing(buffer.readByteString());
          break;
        case OPCODE_CONTROL_PONG:
          frameCallback.onReadPong(buffer.readByteString());
          break;
        case OPCODE_CONTROL_CLOSE:
          int code = CLOSE_NO_STATUS_CODE;
          //...ignore code
           frameCallback.onReadClose(code, reason);
readMessageFrame则是读取数据
  private void readMessageFrame() throws IOException {
    int opcode = this.opcode;
    if (opcode != OPCODE_TEXT && opcode != OPCODE_BINARY) {
      throw new ProtocolException("Unknown opcode: " + toHexString(opcode));
    }

    Buffer message = new Buffer();
    readMessage(message);

    if (opcode == OPCODE_TEXT) {
      frameCallback.onReadMessage(message.readUtf8());
    } else {
      frameCallback.onReadMessage(message.readByteString());
    }
  }