Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
G
go
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
go
Commits
3de98466
Commit
3de98466
authored
Feb 11, 2011
by
Petar Maymounkov
Committed by
Russ Cox
Feb 11, 2011
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
http: add pipelining to ClientConn, ServerConn
R=rsc, bradfitzwork CC=golang-dev
https://golang.org/cl/4082044
parent
9c97af99
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
99 additions
and
39 deletions
+99
-39
src/pkg/http/persist.go
src/pkg/http/persist.go
+98
-38
src/pkg/http/serve_test.go
src/pkg/http/serve_test.go
+1
-1
No files found.
src/pkg/http/persist.go
View file @
3de98466
...
...
@@ -6,14 +6,17 @@ package http
import
(
"bufio"
"container/list"
"io"
"net"
"net/textproto"
"os"
"sync"
)
var
ErrPersistEOF
=
&
ProtocolError
{
"persistent connection closed"
}
var
(
ErrPersistEOF
=
&
ProtocolError
{
"persistent connection closed"
}
ErrPipeline
=
&
ProtocolError
{
"pipeline error"
}
)
// A ServerConn reads requests and sends responses over an underlying
// connection, until the HTTP keepalive logic commands an end. ServerConn
...
...
@@ -26,8 +29,10 @@ type ServerConn struct {
r
*
bufio
.
Reader
clsd
bool
// indicates a graceful close
re
,
we
os
.
Error
// read/write errors
last
B
ody
io
.
ReadCloser
last
b
ody
io
.
ReadCloser
nread
,
nwritten
int
pipe
textproto
.
Pipeline
pipereq
map
[
*
Request
]
uint
lk
sync
.
Mutex
// protected read/write to re,we
}
...
...
@@ -37,7 +42,7 @@ func NewServerConn(c net.Conn, r *bufio.Reader) *ServerConn {
if
r
==
nil
{
r
=
bufio
.
NewReader
(
c
)
}
return
&
ServerConn
{
c
:
c
,
r
:
r
}
return
&
ServerConn
{
c
:
c
,
r
:
r
,
pipereq
:
make
(
map
[
*
Request
]
uint
)
}
}
// Close detaches the ServerConn and returns the underlying connection as well
...
...
@@ -57,10 +62,25 @@ func (sc *ServerConn) Close() (c net.Conn, r *bufio.Reader) {
// Read returns the next request on the wire. An ErrPersistEOF is returned if
// it is gracefully determined that there are no more requests (e.g. after the
// first request on an HTTP/1.0 connection, or after a Connection:close on a
// HTTP/1.1 connection). Read can be called concurrently with Write, but not
// with another Read.
// HTTP/1.1 connection).
func
(
sc
*
ServerConn
)
Read
()
(
req
*
Request
,
err
os
.
Error
)
{
// Ensure ordered execution of Reads and Writes
id
:=
sc
.
pipe
.
Next
()
sc
.
pipe
.
StartRequest
(
id
)
defer
func
()
{
sc
.
pipe
.
EndRequest
(
id
)
if
req
==
nil
{
sc
.
pipe
.
StartResponse
(
id
)
sc
.
pipe
.
EndResponse
(
id
)
}
else
{
// Remember the pipeline id of this request
sc
.
lk
.
Lock
()
sc
.
pipereq
[
req
]
=
id
sc
.
lk
.
Unlock
()
}
}()
sc
.
lk
.
Lock
()
if
sc
.
we
!=
nil
{
// no point receiving if write-side broken or closed
defer
sc
.
lk
.
Unlock
()
...
...
@@ -73,12 +93,12 @@ func (sc *ServerConn) Read() (req *Request, err os.Error) {
sc
.
lk
.
Unlock
()
// Make sure body is fully consumed, even if user does not call body.Close
if
sc
.
last
B
ody
!=
nil
{
if
sc
.
last
b
ody
!=
nil
{
// body.Close is assumed to be idempotent and multiple calls to
// it should return the error that its first invokation
// returned.
err
=
sc
.
last
B
ody
.
Close
()
sc
.
last
B
ody
=
nil
err
=
sc
.
last
b
ody
.
Close
()
sc
.
last
b
ody
=
nil
if
err
!=
nil
{
sc
.
lk
.
Lock
()
defer
sc
.
lk
.
Unlock
()
...
...
@@ -102,7 +122,7 @@ func (sc *ServerConn) Read() (req *Request, err os.Error) {
return
}
}
sc
.
last
B
ody
=
req
.
Body
sc
.
last
b
ody
=
req
.
Body
sc
.
nread
++
if
req
.
Close
{
sc
.
lk
.
Lock
()
...
...
@@ -121,11 +141,24 @@ func (sc *ServerConn) Pending() int {
return
sc
.
nread
-
sc
.
nwritten
}
// Write writes
a repsonse
. To close the connection gracefully, set the
// Write writes
resp in response to req
. To close the connection gracefully, set the
// Response.Close field to true. Write should be considered operational until
// it returns an error, regardless of any errors returned on the Read side.
// Write can be called concurrently with Read, but not with another Write.
func
(
sc
*
ServerConn
)
Write
(
resp
*
Response
)
os
.
Error
{
func
(
sc
*
ServerConn
)
Write
(
req
*
Request
,
resp
*
Response
)
os
.
Error
{
// Retrieve the pipeline ID of this request/response pair
sc
.
lk
.
Lock
()
id
,
ok
:=
sc
.
pipereq
[
req
]
sc
.
pipereq
[
req
]
=
0
,
false
if
!
ok
{
sc
.
lk
.
Unlock
()
return
ErrPipeline
}
sc
.
lk
.
Unlock
()
// Ensure pipeline order
sc
.
pipe
.
StartResponse
(
id
)
defer
sc
.
pipe
.
EndResponse
(
id
)
sc
.
lk
.
Lock
()
if
sc
.
we
!=
nil
{
...
...
@@ -166,10 +199,11 @@ type ClientConn struct {
c
net
.
Conn
r
*
bufio
.
Reader
re
,
we
os
.
Error
// read/write errors
last
B
ody
io
.
ReadCloser
last
b
ody
io
.
ReadCloser
nread
,
nwritten
int
reqm
list
.
List
// request methods in order of execution
lk
sync
.
Mutex
// protects read/write to reqm,re,we
pipe
textproto
.
Pipeline
pipereq
map
[
*
Request
]
uint
lk
sync
.
Mutex
// protects read/write to re,we,pipereq,etc.
}
// NewClientConn returns a new ClientConn reading and writing c. If r is not
...
...
@@ -178,7 +212,7 @@ func NewClientConn(c net.Conn, r *bufio.Reader) *ClientConn {
if
r
==
nil
{
r
=
bufio
.
NewReader
(
c
)
}
return
&
ClientConn
{
c
:
c
,
r
:
r
}
return
&
ClientConn
{
c
:
c
,
r
:
r
,
pipereq
:
make
(
map
[
*
Request
]
uint
)
}
}
// Close detaches the ClientConn and returns the underlying connection as well
...
...
@@ -191,7 +225,6 @@ func (cc *ClientConn) Close() (c net.Conn, r *bufio.Reader) {
r
=
cc
.
r
cc
.
c
=
nil
cc
.
r
=
nil
cc
.
reqm
.
Init
()
cc
.
lk
.
Unlock
()
return
}
...
...
@@ -201,8 +234,23 @@ func (cc *ClientConn) Close() (c net.Conn, r *bufio.Reader) {
// keepalive connection is logically closed after this request and the opposing
// server is informed. An ErrUnexpectedEOF indicates the remote closed the
// underlying TCP connection, which is usually considered as graceful close.
// Write can be called concurrently with Read, but not with another Write.
func
(
cc
*
ClientConn
)
Write
(
req
*
Request
)
os
.
Error
{
func
(
cc
*
ClientConn
)
Write
(
req
*
Request
)
(
err
os
.
Error
)
{
// Ensure ordered execution of Writes
id
:=
cc
.
pipe
.
Next
()
cc
.
pipe
.
StartRequest
(
id
)
defer
func
()
{
cc
.
pipe
.
EndRequest
(
id
)
if
err
!=
nil
{
cc
.
pipe
.
StartResponse
(
id
)
cc
.
pipe
.
EndResponse
(
id
)
}
else
{
// Remember the pipeline id of this request
cc
.
lk
.
Lock
()
cc
.
pipereq
[
req
]
=
id
cc
.
lk
.
Unlock
()
}
}()
cc
.
lk
.
Lock
()
if
cc
.
re
!=
nil
{
// no point sending if read-side closed or broken
...
...
@@ -223,7 +271,7 @@ func (cc *ClientConn) Write(req *Request) os.Error {
cc
.
lk
.
Unlock
()
}
err
:
=
req
.
Write
(
cc
.
c
)
err
=
req
.
Write
(
cc
.
c
)
if
err
!=
nil
{
cc
.
lk
.
Lock
()
defer
cc
.
lk
.
Unlock
()
...
...
@@ -231,9 +279,6 @@ func (cc *ClientConn) Write(req *Request) os.Error {
return
err
}
cc
.
nwritten
++
cc
.
lk
.
Lock
()
cc
.
reqm
.
PushBack
(
req
.
Method
)
cc
.
lk
.
Unlock
()
return
nil
}
...
...
@@ -250,7 +295,21 @@ func (cc *ClientConn) Pending() int {
// returned together with an ErrPersistEOF, which means that the remote
// requested that this be the last request serviced. Read can be called
// concurrently with Write, but not with another Read.
func
(
cc
*
ClientConn
)
Read
()
(
resp
*
Response
,
err
os
.
Error
)
{
func
(
cc
*
ClientConn
)
Read
(
req
*
Request
)
(
resp
*
Response
,
err
os
.
Error
)
{
// Retrieve the pipeline ID of this request/response pair
cc
.
lk
.
Lock
()
id
,
ok
:=
cc
.
pipereq
[
req
]
cc
.
pipereq
[
req
]
=
0
,
false
if
!
ok
{
cc
.
lk
.
Unlock
()
return
nil
,
ErrPipeline
}
cc
.
lk
.
Unlock
()
// Ensure pipeline order
cc
.
pipe
.
StartResponse
(
id
)
defer
cc
.
pipe
.
EndResponse
(
id
)
cc
.
lk
.
Lock
()
if
cc
.
re
!=
nil
{
...
...
@@ -259,17 +318,13 @@ func (cc *ClientConn) Read() (resp *Response, err os.Error) {
}
cc
.
lk
.
Unlock
()
if
cc
.
nread
>=
cc
.
nwritten
{
return
nil
,
os
.
NewError
(
"persist client pipe count"
)
}
// Make sure body is fully consumed, even if user does not call body.Close
if
cc
.
last
B
ody
!=
nil
{
if
cc
.
last
b
ody
!=
nil
{
// body.Close is assumed to be idempotent and multiple calls to
// it should return the error that its first invokation
// returned.
err
=
cc
.
last
B
ody
.
Close
()
cc
.
last
B
ody
=
nil
err
=
cc
.
last
b
ody
.
Close
()
cc
.
last
b
ody
=
nil
if
err
!=
nil
{
cc
.
lk
.
Lock
()
defer
cc
.
lk
.
Unlock
()
...
...
@@ -278,18 +333,14 @@ func (cc *ClientConn) Read() (resp *Response, err os.Error) {
}
}
cc
.
lk
.
Lock
()
m
:=
cc
.
reqm
.
Front
()
cc
.
reqm
.
Remove
(
m
)
cc
.
lk
.
Unlock
()
resp
,
err
=
ReadResponse
(
cc
.
r
,
m
.
Value
.
(
string
))
resp
,
err
=
ReadResponse
(
cc
.
r
,
req
.
Method
)
if
err
!=
nil
{
cc
.
lk
.
Lock
()
defer
cc
.
lk
.
Unlock
()
cc
.
re
=
err
return
}
cc
.
last
B
ody
=
resp
.
Body
cc
.
last
b
ody
=
resp
.
Body
cc
.
nread
++
...
...
@@ -301,3 +352,12 @@ func (cc *ClientConn) Read() (resp *Response, err os.Error) {
}
return
}
// Do is convenience method that writes a request and reads a response.
func
(
cc
*
ClientConn
)
Do
(
req
*
Request
)
(
resp
*
Response
,
err
os
.
Error
)
{
err
=
cc
.
Write
(
req
)
if
err
!=
nil
{
return
}
return
cc
.
Read
(
req
)
}
src/pkg/http/serve_test.go
View file @
3de98466
...
...
@@ -192,7 +192,7 @@ func TestHostHandlers(t *testing.T) {
t
.
Errorf
(
"writing request: %v"
,
err
)
continue
}
r
,
err
:=
cc
.
Read
()
r
,
err
:=
cc
.
Read
(
&
req
)
if
err
!=
nil
{
t
.
Errorf
(
"reading response: %v"
,
err
)
continue
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment