network: new stream! protocol and pull syncer implementation#1538
network: new stream! protocol and pull syncer implementation#1538
Conversation
63263ee to
d6f6042
Compare
83fca80 to
a0f0e35
Compare
a9965ac to
23f841e
Compare
network/newstream/wire.go
Outdated
| To uint64 `rlp:nil` | ||
| To *uint64 `rlp:"nil"` | ||
| BatchSize uint | ||
| Roundtrip bool |
There was a problem hiding this comment.
In general do not add code that is not used for no good reason. A good reason is if you will have an immediate PR that will make use of it, for example the PR with bzz-retrieval, even though the code was not immediately used is a good example.
| // Function used only in tests to detect chunks that are synced | ||
| // multiple times within the same stream. This function pointer must be | ||
| // nil in production. | ||
| var putSeenTestHook func(addr chunk.Address, id enode.ID) |
There was a problem hiding this comment.
I think this is a good use-case of something we could test if we redirect logs with a simulation. Then we could check for presence of the given log message containing addr and id. For now we don't have this functionality, so fine to leave this here, but ultimately we will be able to test this kind of impl. behaviour without custom hooks.
Just an idea.
|
|
||
| func (s *syncProvider) StreamName() string { return s.name } | ||
|
|
||
| func (s *syncProvider) Boundedness() bool { return false } |
There was a problem hiding this comment.
I fail to see why we need Boundedness and Bound. Why do we add such indirection without using it. If this is so important how come we don't have an implementation of it.
From the spec:
when a client requests a bounded range, server should respond to the client range requests with either offered hashes (if roundtrip is required) or chunks (if not) or an end-of-batch message if there are no more to offer. If none of these responses arrive within a timeout interval, client must drop the upstream peer.
We don't have an example of Boundedness or of Roundtrip, so I fail to see why this is useful, apart from introducing indirection in already not simple functionality. We've been working on pull-sync new version for months, the previous version has been buggy in production for probably more than a year (and still throwing subscription errors), but we still continue to add code prematurely so that we make our lives harder...
swarm.go
Outdated
| if !config.SyncEnabled || config.LightNodeEnabled { | ||
| syncing = stream.SyncingDisabled | ||
| } | ||
| //if !config.SyncEnabled || config.LightNodeEnabled { |
There was a problem hiding this comment.
You are already unhooking the old stream protocol, so please remove this code, not just commented out, and also remove all the code in network/stream that is no longer used. No point in keeping it around, specially when it is not hooked up anymore here in swarm.go, and we are directly switching to the new implementation.
| return nil | ||
| } | ||
|
|
||
| func TestCorrectCursorsExchangeRace(t *testing.T) { |
There was a problem hiding this comment.
The mental taxation in understanding what this is doing is a bit too high for my taste. I'm happy we have started to push away complexity from production Swarm code, but we should probably rethink the complexity in tests or at least be mindful about that.
It is not immediately obvious what this test is testing. Tests should generally be simple and I shouldn't have to read the code and think for 15min to figure that out. What is this testing?
- We are using a simulation framework.
- We are mocking a devp2p protocol and injecting some custom hook
- We are building peer fixtures using the p2p library.
We should probably be building more tools to write tests in a more simple manner.
No action required (or at least I don't know what I would expect), but this is def. not the direction we should be going in general IMO.
|
@acud @janos overall great work. I don't see any blockers from my side. I've added a few comments, and talked to @acud off-band about some things that need to be fleshed out, but nothing major.
These are my general comments. The most important thing is that the code seems to be correct and integration tests seem to be passing, so I see this generally as an improvement to |
|
We created a super spec for stream! and didnt implement it. instead we implement a temporary mix of sync and stream which was identified as a problem with the current implementation. This PR saw the tension between two different approaches one that condones any abstraction in the spirit of 'just write what the thing should do now nothing more' type of simplicity on the one hand and I kindly ask you to consider the following simple sketchy architecture. and think how much it would simplify testing, help understanding and incrementally PR-ing this thing. Thanks package stream
// Range represents a closed interval
type Range struct {
From, To uint64 // closed interval
}
// individual streams - no need for actual interface
type Stream interface {
String() string
Bound() bool
}
// Provider manages the source of a type of stream
// this is completely stateless
type Provider interface {
Type() string // Name of provider, eg. SYNC
CanHandle(Name) (lastIndex uint64, bound bool, err error) // provider can serve the stream given as argument, return cursor, boundedness and an error if not provided
OfferedHashes(Name, Range) []Address // returns the offered hashes for the given range
GetDataByHash(Name, ...Address) []Chunk // retrieves the chunks based on Address
}
// Streamer implements Registry interface to be used in provider packages
//
type Registry interface {
Register(Provider) error
}
// LocalStoreProvider wraps localstore and implements GetDataByHash call with localstore get
type LocalStoreProvider struct {
localStore
}
func (l *LocalProvider) GetDataByHash(ctx context.Context, hash []byte) (chunk.Chunk, error) {
return l.localStore.Get(ctx, chunk.ModeGetSync, hash)
}
// Client Logic
// Request Object - no need for interface, just to show methods
type Request interface {
Cancel()
Done() (close <-chan struct{}) // like in context
Err() // error to extract after Done() closed
}
// Peer - this is not an interface just demonstrating the relevant methods
type Peer interface {
GetRange(ctx context.Context, n Name, r Range, need func([]byte) bool, deliver func(Chunk) error) (Request, error)
Providers() []Info
}
// NetStoreClient is a stream client that
// - proxies need function as localstore Has
// - puts chunk in netstore
// These clients are stateless
type StorerClient struct {
Streamer
intervals IntervalStore
store NetStore
need func([]byte) bool // implements it with netStore.Has and opens a fetcher
deliver func(Chunk) error // implements it with netStore.Put chunk.ModePutSync
}
// Request will try to cover the indicated range using interval store
// and peer requests for each gap in the intervals
func (s *StorerClient) Request(p Peer, n Name, r Range) (Request, error) {
// starts a go routine that keeps getting the intervals based on the interval store
// continuously requesting subranges according to the interval store
// and calls
p.GetRange(ctx, n, r, s.need, s.deliver)
// as batches are sealed, intervals are updated
// the request is alive until the whole range is covered
}
// and now the level doing full sync of a stream
type Mode = uin64
const (
History Mode = iota
Live
Both
)
// StorerClient implements Syncer, here stream.Syncer means syncing a stream
// only interface used by the pull syncer client
type Syncer interface {
Request(Context, Peer, Name, Mode) (Request, error)
// this falls back to until cursor (history) and from cursor (live) range requests from the StorerClient
//
}
////////////////////////////
// package pullsync
// pullSync.Server implements stream.Provider
// uses localstore SubscribePull for the bins
// server is node-wide
type Server struct {
// ...
*stream.LocalProvider
}
// the node-wide pullsync.Client
type Client struct {
stream.Syncer // embed stream.Syncer
// when pullsync
// here you simply put the update sync logic listening to kademlia depth changes
// and call `Request`
// remember the request, when no longer relevant just call request.Cancel()
} |
zelig
left a comment
There was a problem hiding this comment.
ok for moral's sake lets just merge this PR after some critical changes. The rest should be addressed in subsequent PRs.
- there is a lot of changes in packages totally unrelated to this PR, (docker, pyramid chunker, etc) quite a few unnecessary
- filestore, hasherstore, localstore and intervals store changes are important and unexplained
- the package should be in
network/stream/v2andnewstreamshould be dropped - error handling and peer drops could be massively simplified
- select on routine termination conditions are scattered around in many unnecessary places
- the technical debt this PR introduces is way too high. Lets avoid this in future
| return strings.Join(hostChunks, "") | ||
| } | ||
|
|
||
| func (i *Inspector) PeerStreams() (string, error) { |
api/inspector_test.go
Outdated
| t.Fatal(err) | ||
| } | ||
|
|
||
| // if want := hex.EncodeToString(baseKey)[:16]; peerInfo.Base != want { |
cmd/swarm/main.go
Outdated
| } | ||
|
|
||
| func main() { | ||
| runtime.SetMutexProfileFraction(1) |
network/newstream/peer.go
Outdated
| done chan error | ||
| ruid uint // the request uid | ||
| from uint64 // want from index | ||
| to *uint64 //want to index, nil signifies top of range not yet known |
| func NewSlipStream(intervalsStore state.Store, kad *network.Kademlia, providers ...StreamProvider) *SlipStream { | ||
| slipStream := &SlipStream{ | ||
| // New creates a new stream protocol handler | ||
| func New(intervalsStore state.Store, baseKey []byte, providers ...StreamProvider) *Registry { |
There was a problem hiding this comment.
I would prefer an explicit register method, but can wait till next iteration
network/newstream/stream.go
Outdated
| return | ||
| } | ||
|
|
||
| w.to = &msg.LastIndex // now that we know the range of the batch we can set the upped bound of the interval to the open want |
There was a problem hiding this comment.
upped -> upper
and better say 'we can set the open wants upper bound to the index supplied in the msg.
Maybe some checks would be needed about this number
| // wait for all handlers to finish | ||
| done := make(chan struct{}) | ||
| go func() { | ||
| r.handlersWg.Wait() |
| } | ||
| } | ||
| go func(chunks ...chunk.Chunk) { | ||
| s.cacheMtx.Lock() |
There was a problem hiding this comment.
better have cache in a separate object
| @@ -0,0 +1 @@ | |||
| {"nodes":[{"node":{"info":{"id":"0b405ded87ce4e712aaebb3055e70716c971fe049a54961294c7a7a0471d12f6","name":"node_0b405ded87ce4e712aaebb3055e70716c971fe049a54961294c7a7a0471d12f6","enode":"enode://918f13e29fcea21b47e553c60ea0ae63f856db5745c6010e31b566f819195f9a5f250788862c43b230dd93fbc24cb6b3d33c86069cc21de3a3612b2f32eeccbe@127.0.0.1:0","enr":"0xf88fb840ad1a67c3b07299bb50337351be97595ef05e5ab7988ab7831e06a3003d1fc4ad74e925ec495ab312ccc889eb9c05f2e14f625e56f813a76dc53ddd94ea0955f90183636170cdc583627a7a08c6846869766508826964827634826970847f00000189736563703235366b31a102918f13e29fcea21b47e553c60ea0ae63f856db5745c6010e31b566f819195f9a","ip":"127.0.0.1","ports":{"discovery":0,"listener":0},"listenAddr":"","protocols":{"bzz":"C0Bd7YfOTnEqrrswVecHFslx/gSaVJYSlMenoEcdEvY=","hive":"\n=========================================================================\nThu Feb 28 17:59:23 UTC 2019 KΛÐΞMLIΛ hive: queen's address: 0b405d\npopulation: 3 (3), NeighbourhoodSize: 2, MinBinSize: 2, MaxBinSize: 4\n000 1 a430 | 1 a430 (0)\n============ DEPTH: 1 ==========================================\n001 2 77ba 675c | 2 77ba (0) 675c (0)\n002 0 | 0\n003 0 | 0\n004 0 | 0\n005 0 | 0\n006 0 | 0\n007 0 | 0\n008 0 | 0\n009 0 | 0\n010 0 | 0\n011 0 | 0\n012 0 | 0\n013 0 | 0\n014 0 | 0\n015 0 | 0\n========================================================================="}},"config":{"id":"0b405ded87ce4e712aaebb3055e70716c971fe049a54961294c7a7a0471d12f6","private_key":"173c23bc3aec26afe7d299eb9556e7c189851378fe56125d929966ac7207bcb6","name":"node_0b405ded87ce4e712aaebb3055e70716c971fe049a54961294c7a7a0471d12f6","services":["streamer"],"enable_msg_events":true,"port":38277},"up":true}},{"node":{"info":{"id":"a4301045c0cde5acb5406e1fba46ec868c056439ab43ef795830748fb6ba2e5a","name":"node_a4301045c0cde5acb5406e1fba46ec868c056439ab43ef795830748fb6ba2e5a","enode":"enode://89b66f574168570cc5d347051c0e2b05b33578d42142ef8cc930f1fbb384266f2827e4b087f5e2985a32d8f2b353fb7be416193632da5c692cace99d1b9e4274@127.0.0.1:0","enr":"0xf88fb8402e6392d3d5377977fc4b4d29f4dbf4ca96e963df056ead84ec2fcb7ee071996670a935d3b49250455e02c392baacb4c8499f815f74b09c405316a5d2fdee94e60183636170cdc583627a7a08c6846869766508826964827634826970847f00000189736563703235366b31a10289b66f574168570cc5d347051c0e2b05b33578d42142ef8cc930f1fbb384266f","ip":"127.0.0.1","ports":{"discovery":0,"listener":0},"listenAddr":"","protocols":{"bzz":"pDAQRcDN5ay1QG4fukbshowFZDmrQ+95WDB0j7a6Llo=","hive":"\n=========================================================================\nThu Feb 28 17:59:23 UTC 2019 KΛÐΞMLIΛ hive: queen's address: a43010\npopulation: 3 (3), NeighbourhoodSize: 2, MinBinSize: 2, MaxBinSize: 4\n============ DEPTH: 0 ==========================================\n000 3 675c 77ba 0b40 | 3 675c (0) 77ba (0) 0b40 (0)\n001 0 | 0\n002 0 | 0\n003 0 | 0\n004 0 | 0\n005 0 | 0\n006 0 | 0\n007 0 | 0\n008 0 | 0\n009 0 | 0\n010 0 | 0\n011 0 | 0\n012 0 | 0\n013 0 | 0\n014 0 | 0\n015 0 | 0\n========================================================================="}},"config":{"id":"a4301045c0cde5acb5406e1fba46ec868c056439ab43ef795830748fb6ba2e5a","private_key":"5bcd3bbb886e5ad2317a6ac1de2b19ebe27db9337ba8e7daa76a1cdaa6a3d158","name":"node_a4301045c0cde5acb5406e1fba46ec868c056439ab43ef795830748fb6ba2e5a","services":["streamer"],"enable_msg_events":true,"port":41325},"up":true}},{"node":{"info":{"id":"77ba145a4623b9c83e0a14a375ba49c68719f2abe162ffca66de7ef2e31a75f8","name":"node_77ba145a4623b9c83e0a14a375ba49c68719f2abe162ffca66de7ef2e31a75f8","enode":"enode://ecb56004a128067628a3198244080ba4ac79860cb2cf74407d7571955a80fb9243ad5f8265063a5aa209ea95329b4b57d0f1b492a85835ab87fce79881a895a9@127.0.0.1:0","enr":"0xf88fb84035ed878b2a99ed1bbd55df1701cd82115aebc649b1e8e68302e7caf5480a61322c74189d1fe66f2d16e79174f0e3479213f0bf963a4518d583078e164bd94a220183636170cdc583627a7a08c6846869766508826964827634826970847f00000189736563703235366b31a103ecb56004a128067628a3198244080ba4ac79860cb2cf74407d7571955a80fb92","ip":"127.0.0.1","ports":{"discovery":0,"listener":0},"listenAddr":"","protocols":{"bzz":"d7oUWkYjucg+ChSjdbpJxocZ8qvhYv/KZt5+8uMadfg=","hive":"\n=========================================================================\nThu Feb 28 17:59:23 UTC 2019 KΛÐΞMLIΛ hive: queen's address: 77ba14\npopulation: 3 (3), NeighbourhoodSize: 2, MinBinSize: 2, MaxBinSize: 4\n000 1 a430 | 1 a430 (0)\n============ DEPTH: 1 ==========================================\n001 1 0b40 | 1 0b40 (0)\n002 0 | 0\n003 1 675c | 1 675c (0)\n004 0 | 0\n005 0 | 0\n006 0 | 0\n007 0 | 0\n008 0 | 0\n009 0 | 0\n010 0 | 0\n011 0 | 0\n012 0 | 0\n013 0 | 0\n014 0 | 0\n015 0 | 0\n========================================================================="}},"config":{"id":"77ba145a4623b9c83e0a14a375ba49c68719f2abe162ffca66de7ef2e31a75f8","private_key":"6f30c3d0135202cbf540669c8c76d9144bd53764aaf4ae18e8bee98ce05c6211","name":"node_77ba145a4623b9c83e0a14a375ba49c68719f2abe162ffca66de7ef2e31a75f8","services":["streamer"],"enable_msg_events":true,"port":40785},"up":true}},{"node":{"info":{"id":"675cfed0a7b461f43cfebd6a10ecc8868d35b245381d731abec4608b6dce250a","name":"node_675cfed0a7b461f43cfebd6a10ecc8868d35b245381d731abec4608b6dce250a","enode":"enode://64a6b33b028b818a1961272651d1d7a976329e541b6e3e983f518d89326d7427f4c9243f78bb03fd2e2dc059f610ee50d1e664e3107db01ac603b9c3b20ebb4f@127.0.0.1:0","enr":"0xf88fb8407dbbb99300e04c6917c27d61bf396481aae19287c2869e8dd74e11b0f2ebb74174376a6152c26af95173a4868a0349aaf3be5d063918590fd5962f4af174e0010183636170cdc583627a7a08c6846869766508826964827634826970847f00000189736563703235366b31a10364a6b33b028b818a1961272651d1d7a976329e541b6e3e983f518d89326d7427","ip":"127.0.0.1","ports":{"discovery":0,"listener":0},"listenAddr":"","protocols":{"bzz":"Z1z+0Ke0YfQ8/r1qEOzIho01skU4HXMavsRgi23OJQo=","hive":"\n=========================================================================\nThu Feb 28 17:59:23 UTC 2019 KΛÐΞMLIΛ hive: queen's address: 675cfe\npopulation: 3 (3), NeighbourhoodSize: 2, MinBinSize: 2, MaxBinSize: 4\n000 1 a430 | 1 a430 (0)\n============ DEPTH: 1 ==========================================\n001 1 0b40 | 1 0b40 (0)\n002 0 | 0\n003 1 77ba | 1 77ba (0)\n004 0 | 0\n005 0 | 0\n006 0 | 0\n007 0 | 0\n008 0 | 0\n009 0 | 0\n010 0 | 0\n011 0 | 0\n012 0 | 0\n013 0 | 0\n014 0 | 0\n015 0 | 0\n========================================================================="}},"config":{"id":"675cfed0a7b461f43cfebd6a10ecc8868d35b245381d731abec4608b6dce250a","private_key":"d77006035afa5194053c0347021f7825a71f9643028c45b65756ef88ee53e6dd","name":"node_675cfed0a7b461f43cfebd6a10ecc8868d35b245381d731abec4608b6dce250a","services":["streamer"],"enable_msg_events":true,"port":37069},"up":true}}],"conns":[{"one":"0b405ded87ce4e712aaebb3055e70716c971fe049a54961294c7a7a0471d12f6","other":"a4301045c0cde5acb5406e1fba46ec868c056439ab43ef795830748fb6ba2e5a","up":true},{"one":"a4301045c0cde5acb5406e1fba46ec868c056439ab43ef795830748fb6ba2e5a","other":"77ba145a4623b9c83e0a14a375ba49c68719f2abe162ffca66de7ef2e31a75f8","up":true},{"one":"77ba145a4623b9c83e0a14a375ba49c68719f2abe162ffca66de7ef2e31a75f8","other":"675cfed0a7b461f43cfebd6a10ecc8868d35b245381d731abec4608b6dce250a","up":true},{"one":"675cfed0a7b461f43cfebd6a10ecc8868d35b245381d731abec4608b6dce250a","other":"0b405ded87ce4e712aaebb3055e70716c971fe049a54961294c7a7a0471d12f6","up":true},{"one":"77ba145a4623b9c83e0a14a375ba49c68719f2abe162ffca66de7ef2e31a75f8","other":"0b405ded87ce4e712aaebb3055e70716c971fe049a54961294c7a7a0471d12f6","up":true},{"one":"a4301045c0cde5acb5406e1fba46ec868c056439ab43ef795830748fb6ba2e5a","other":"675cfed0a7b461f43cfebd6a10ecc8868d35b245381d731abec4608b6dce250a","up":true}]} No newline at end of file | |||
There was a problem hiding this comment.
please just use the snapshots innetwork/stream/testing
There was a problem hiding this comment.
The idea is to remove the whole network/stream package. Why create references between the new and old packages, when we should have only one in the codebase?
There was a problem hiding this comment.
@nonsense, no the idea is to reimplement streamer
- we want to use the same snapshots as before
- we want pr to be smaller
- bitvector and intervalstore are also dependencies from old stream
- the new streamer should be in
stream/v2
There was a problem hiding this comment.
OK, I was unaware that we will be maintaining both streamers. cc @janos
storage/netstore.go
Outdated
| } | ||
|
|
||
| log.Trace("netstore.chunk-not-in-localstore", "ref", ref.String()) | ||
| //n.logger.Trace("netstore.chunk-not-in-localstore", "ref", ref.String()) |
Dockerfile.alltools
Outdated
| RUN echo "http://dl-cdn.alpinelinux.org/alpine/edge/testing" >> /etc/apk/repositories | ||
| RUN apk --update add perf | ||
| RUN wget https://github.com/stedolan/jq/releases/download/jq-1.6/jq-linux64 | ||
| RUN chmod +x /jq-linux64 |
There was a problem hiding this comment.
I am not sure why we need jq and websocat within the image. What is the use-case here? I use those tools from my local machine, why do we need them in the swarm binary?
cmd/swarm-smoke/upload_and_sync.go
Outdated
| } | ||
|
|
||
| randomBytes := testutil.RandomBytes(seed, filesize*1000) | ||
| randomBytes := testutil.RandomBytes(seed, filesize) |
There was a problem hiding this comment.
I think we should revert this. I didn't understand why we wanted to have it smaller in the first place to be honest.
| if !ok { | ||
| p.logger.Error("ruid not found, dropping peer", "ruid", ruid) | ||
| p.Drop() | ||
| return o, true |
There was a problem hiding this comment.
return nil, true, let's be explicit that o is no found.
| func BenchmarkHistoricalStream_15000(b *testing.B) { benchmarkHistoricalStream(b, 15000) } | ||
| func BenchmarkHistoricalStream_20000(b *testing.B) { benchmarkHistoricalStream(b, 20000) } | ||
|
|
||
| func benchmarkHistoricalStream(b *testing.B, chunks uint64) { |
There was a problem hiding this comment.
@zelig @nonsense Could you check this benchmark also? I think that the measuring method is not correct. The simulation is constructed outside of the benchmark loop and in the benchmark loop chunks are uploaded on the same node of the same simulation, making the state on every benchmark iteration different. Every iteration measures different state and with larger number of chunks in the localstore, it becomes slower. This makes this benchmark result dependent on the number of iterations performed, where only the precisions of results should be influenced by the number of iterations. I think that this benchmark should be rewritten in a way that every iteration measures time on the independent state of other iterations, or to remove the benchmark as that may make this benchmark time consuming.
There was a problem hiding this comment.
@janos it does look like the benchmark is indeed not correct for the reasons you mentioned.
There was a problem hiding this comment.
I have changed the benchmark to address this problem, and it is even usable.
zelig
left a comment
There was a problem hiding this comment.
thanks @janos for the quick cleanup and reorg, I dont like that you put in the deletion of old stream code in this PR.
i approve it but lets make sure:
- we heed the issues raised in later PRs
- we understand the changes to localstore, intervalstore, simulation, etc packages
|
@zelig Thank you for the review and approval. I will create issues on the comments that are there after this PR gets merged. I do not mind putting back the old streamer code. Would you like to restore the files? |
| log.Info(fmt.Sprintf("Running test with %d chunks and %d nodes...", *chunks, *nodes)) | ||
| testSyncingViaGlobalSync(t, *chunks, *nodes) | ||
| } else { | ||
| chunkCounts := []int{4, 32} |
There was a problem hiding this comment.
We can't sync 32 chunks ? :)
There was a problem hiding this comment.
It times out on travis. :) And it actually does not even flake locally. The only thing that I managed to see is the goroutine leak in hive that is fixed. If it is flaky even with only 4 chunks on travis, we should disable this test and dedicate more time to find the root cause.
This PR is the result of joint efforts in the core track to realise the exact, up to date requirements of the
stream!protocol and implement them so that their functionality and behavior is easier to reason about.The design borrows some code from the old
streampackage. Namely the way we manage bins we want to stream in the case of syncing and the existing intervals store. However everything else including tests have been built from the ground up.The PR does not yet remove the existing
streamcode, does not move theintervalspackage, but rather builds on top of those. This is in order to decrease the strain of diffing those changes. These would be deferred to subsequent PRs after this one is merged.The new stream implementation suggested in this PR brings more guarantees, better performance and safety to how we handle
streams, namely:streamsis far less likely due to tight checking of wanted streams from a certain peerChunkDeliverymessageIn terms of reduced complexity:
offers, clients maintainwantsPLEASE NOTE: that tests constitute about a half of this PR. Some of them overlap in functionality and will be removed, namely some of the tests in
cursors_test.go.Actionable items that still need to be addressed, however I would personally like to defer to a later PR:
streampackage. In laymen terminology - we would need to call certain functions onstreamwhen a depth change happens for example, but then also to have hooks onstreamto know that aStreamInfoResmessage has arrived and what to do accordingly....maxPeers- oldstreampackage had a notion of maximum peers we'd like to stream to/with. we have concluded partially thatkademliashould somehow shield us from having to implement this at the moment, since number of connections are/should be enforced bykademliaat the moment. so this needs more info before we jump the gun to implement such a requirementHOW TO REVIEW:
Since there is a fair amount of business logic here, a brief overview on how the protocol works is needed in order to facilitate an effective review process.
Flow is as follows:
swarm.go. Stream handles the protocol message exchange,syncProviderhandles sinking the data and managing streams according to kademliastream.gofileInitPeerfunction onsync_provider.go)StreamInfoReq. Server replies with info about streams (boundedness, cursors (previously known assessionIndex) inStreamInfoResmessageStreamInfoRes, and according to wantedness of streams, initiatesGetRangequeries to fetch the stream from the server.OfferedHashesmessage for the requested rangeOfferedHasheswith aWantedHashesmessageChunkDeliverymessage. This message can contain multiple chunks as needed. Also, theoretically ,multipleChunkDeliverymessages could be sent in reply for the sameWantedHashesin order to optimise amounts of messages sent and according to db performance to sink the datarelated to ethersphere/user-stories#2
fixes #1451
somewhat fixes #1393
closes #1457
closes #1247
closes #1246
closes #1234
closes #1204
closes #1203
closes #1105