Skip to content

Commit

Permalink
started to do ack2 message and processing
Browse files Browse the repository at this point in the history
  • Loading branch information
noha committed Feb 3, 2024
1 parent 2fad09c commit cc9ae6d
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 22 deletions.
51 changes: 51 additions & 0 deletions source/Gossip-Tests/GossipAck2MessageTest.class.st
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
Class {
#name : #GossipAck2MessageTest,
#superclass : #TestCase,
#category : #'Gossip-Tests'
}

{ #category : #tests }
GossipAck2MessageTest >> testRemoteNeedsAllStates [
| localEndpoint endpoints endpointState ack remoteEndpoint ack2 |
remoteEndpoint := GossipEndpoint example1.
localEndpoint := GossipEndpoint example2.
endpoints := GossipEndpoints new.
ack := GossipAckMessage new
sender: remoteEndpoint;
digests: { remoteEndpoint generation: 4321 version: 0 }.
endpointState := (endpoints endpointStateAt: remoteEndpoint)
generation: 4321;
applicationStateAt: #foo put: #bar;
yourself.
ack2 := GossipAck2Message new
sender: localEndpoint;
initializeFromAckMessage: ack andEndpoints: endpoints .
self assert: ack2 delta size equals: 1.
self assert: (ack2 delta at: remoteEndpoint) applicationStates values first version equals: 2
]

{ #category : #tests }
GossipAck2MessageTest >> testRemoteNeedsDeltaStates [
| localEndpoint endpoints endpointState ack remoteEndpoint ack2 |
remoteEndpoint := GossipEndpoint example1.
localEndpoint := GossipEndpoint example2.
endpoints := GossipEndpoints new.
ack := GossipAckMessage new
sender: remoteEndpoint;
digests: { remoteEndpoint generation: 4321 version: 3 }.
endpointState := (endpoints endpointStateAt: remoteEndpoint)
generation: 4321;
applicationStateAt: #one put: 1;
applicationStateAt: #two put: 2;
applicationStateAt: #three put: 3;
applicationStateAt: #four put: 4;
applicationStateAt: #five put: 5;

yourself.
ack2 := GossipAck2Message new
sender: localEndpoint;
initializeFromAckMessage: ack andEndpoints: endpoints .
self assert: ack2 delta size equals: 1.
self assert: (ack2 delta at: remoteEndpoint) applicationStates size equals: 3.

]
29 changes: 19 additions & 10 deletions source/Gossip-Tests/GossipAckMessageTest.class.st
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,23 @@ Class {
#category : #'Gossip-Tests'
}

{ #category : #tests }
GossipAckMessageTest >> testLocalEndpointNotPresent [
| localEndpoint endpoints digest ack |
localEndpoint := GossipEndpoint example1.
endpoints := GossipEndpoints new.
digest := GossipDigest endpoint: localEndpoint generation: 4321 version: 3.
ack := GossipAckMessage new
sender: localEndpoint;
initializeFromDigests: { digest } andEndpoints: endpoints.
self assert: ack digests size equals: 1.
self assert: ack digests first version equals: 0
]

{ #category : #tests }
GossipAckMessageTest >> testRemoteStateHasHigherGeneration [
| localEndpoint endpoints endpointState digest ack remoteEndpoint |
| localEndpoint endpoints endpointState digest ack |
localEndpoint := GossipEndpoint example1.
remoteEndpoint := GossipEndpoint example2.
endpoints := GossipEndpoints new.
endpointState := (endpoints endpointStateAt: localEndpoint)
generation: 4321;
Expand All @@ -24,9 +36,8 @@ GossipAckMessageTest >> testRemoteStateHasHigherGeneration [

{ #category : #tests }
GossipAckMessageTest >> testRemoteStateHasHigherVersion [
| localEndpoint endpoints endpointState digest ack remoteEndpoint |
| localEndpoint endpoints endpointState digest ack |
localEndpoint := GossipEndpoint example1.
remoteEndpoint := GossipEndpoint example2.
endpoints := GossipEndpoints new.
endpointState := (endpoints endpointStateAt: localEndpoint)
generation: 4321;
Expand All @@ -42,9 +53,8 @@ GossipAckMessageTest >> testRemoteStateHasHigherVersion [

{ #category : #tests }
GossipAckMessageTest >> testRemoteStateHasLowerGeneration [
| localEndpoint endpoints endpointState digest ack remoteEndpoint |
| localEndpoint endpoints endpointState digest ack |
localEndpoint := GossipEndpoint example1.
remoteEndpoint := GossipEndpoint example2.
endpoints := GossipEndpoints new.
endpointState := (endpoints endpointStateAt: localEndpoint)
generation: 4321;
Expand All @@ -56,14 +66,13 @@ GossipAckMessageTest >> testRemoteStateHasLowerGeneration [
initializeFromDigests: { digest } andEndpoints: endpoints.
self assert: ack delta size equals: 1.
self assert: (ack delta includesKey: localEndpoint).
self assert: ((ack delta at: localEndpoint) at: #foo) version equals: 2
self assert: ((ack delta at: localEndpoint) applicationStates at: #foo) version equals: 2
]

{ #category : #tests }
GossipAckMessageTest >> testRemoteStateHasLowerVersion [
| localEndpoint endpoints endpointState digest ack remoteEndpoint |
| localEndpoint endpoints endpointState digest ack |
localEndpoint := GossipEndpoint example1.
remoteEndpoint := GossipEndpoint example2.
endpoints := GossipEndpoints new.
endpointState := (endpoints endpointStateAt: localEndpoint)
generation: 4321;
Expand All @@ -75,5 +84,5 @@ GossipAckMessageTest >> testRemoteStateHasLowerVersion [
initializeFromDigests: { digest } andEndpoints: endpoints.
self assert: ack delta size equals: 1.
self assert: (ack delta includesKey: localEndpoint).
self assert: ((ack delta at: localEndpoint) at: #foo) version equals: 2
self assert: ((ack delta at: localEndpoint) applicationStates at: #foo) version equals: 2
]
42 changes: 42 additions & 0 deletions source/Gossip/GossipAck2Message.class.st
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,54 @@ Class {
#category : #Gossip
}

{ #category : #accessing }
GossipAck2Message >> delta [
^ delta
]

{ #category : #accessing }
GossipAck2Message >> delta: anObject [

delta := anObject
]

{ #category : #initialization }
GossipAck2Message >> initialize [
super initialize.
delta := Dictionary new
]

{ #category : #initialization }
GossipAck2Message >> initializeFromAckMessage: aGossipAckMessage andEndpoints: endpoints [
endpoints applyChanges: aGossipAckMessage delta.
aGossipAckMessage digests do: [ :digest | | endpointState |
endpointState := endpoints
endpointStateAt: digest endpoint
ifAbsentPut: [
GossipEndpointState new
endpoint: digest endpoint;
generation: digest generation ].
self
processDigest: digest
forEndpointState: endpointState ]
]

{ #category : #initialization }
GossipAck2Message >> processDigest: digest forEndpointState: endpointState [
(digest generation = endpointState generation) ifFalse: [
(digest generation > endpointState generation)
ifTrue: [ self halt ]
ifFalse: [ self halt ]
].
(digest version = endpointState maxVersion) ifFalse: [
(digest version > endpointState maxVersion)
ifTrue: [ self halt]
ifFalse: [
delta
at: endpointState endpoint
put: (endpointState applicationStatesAfterVersion: digest version) ] ]
]

{ #category : #'as yet unclassified' }
GossipAck2Message >> processIn: aGossiper [
delta keysAndValuesDo: [ :endpoint :s |
Expand Down
19 changes: 10 additions & 9 deletions source/Gossip/GossipAckMessage.class.st
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,16 @@ GossipAckMessage >> initialize [

{ #category : #initialization }
GossipAckMessage >> initializeFromDigests: synDigests andEndpoints: endpoints [
synDigests do: [ :digest |
self
processDigest: digest
forEndpointState: (endpoints
endpointStateAt: digest endpoint
ifAbsentPut: [
GossipEndpointState new
endpoint: digest endpoint;
generation: digest generation ] ) ].
synDigests do: [ :digest |
(endpoints endpointStateAt: digest endpoint ifAbsent: [ nil ])
ifNotNil: [ :endpointState |
self
processDigest: digest
forEndpointState: endpointState ]
ifNil: [
"if endpoint is not known we need request all states from
remote peer"
digests add: digest fromStart ] ].
]

{ #category : #initialization }
Expand Down
8 changes: 8 additions & 0 deletions source/Gossip/GossipEndpoint.class.st
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ GossipEndpoint >> address: anObject [
address := anObject
]

{ #category : #'as yet unclassified' }
GossipEndpoint >> generation: generation version: version [
^ GossipDigest new
endpoint: self;
generation: generation;
version: version
]

{ #category : #comparing }
GossipEndpoint >> hash [
^ address hash bitXor: port hash
Expand Down
11 changes: 8 additions & 3 deletions source/Gossip/GossipEndpointState.class.st
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ Class {

{ #category : #'as yet unclassified' }
GossipEndpointState >> allStates [
^ applicationStates
^ GossipEndpointStateDelta new
heartbeatState: heartbeatState;
applicationStates: applicationStates

]

{ #category : #accessing }
Expand Down Expand Up @@ -40,8 +43,10 @@ GossipEndpointState >> applicationStateAt: aString put: anObject [

{ #category : #'as yet unclassified' }
GossipEndpointState >> applicationStatesAfterVersion: anInteger [
^ (applicationStates associations
select: [ :assoc | assoc value version > anInteger ]) asDictionary
^ GossipEndpointStateDelta new
heartbeatState: heartbeatState;
applicationStates: (applicationStates associations
select: [ :assoc | assoc value version > anInteger ]) asDictionary
]

{ #category : #accessing }
Expand Down
57 changes: 57 additions & 0 deletions source/Gossip/GossipEndpointStateDelta.class.st
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
Class {
#name : #GossipEndpointStateDelta,
#superclass : #Object,
#instVars : [
'heartbeatState',
'applicationStates'
],
#category : #Gossip
}

{ #category : #accessing }
GossipEndpointStateDelta >> applicationStateAt: aString [
^ applicationStates
at: aString
ifAbsent: [ KeyNotFound signal: 'state ', aString printString, ' not found' ]
]

{ #category : #accessing }
GossipEndpointStateDelta >> applicationStates [
^ applicationStates
]

{ #category : #accessing }
GossipEndpointStateDelta >> applicationStates: aCollection [
applicationStates := aCollection
]

{ #category : #accessing }
GossipEndpointStateDelta >> generation [
^ self heartbeatState generation
]

{ #category : #accessing }
GossipEndpointStateDelta >> generation: anInteger [
heartbeatState generation: anInteger
]

{ #category : #accessing }
GossipEndpointStateDelta >> heartbeatState [
^ heartbeatState
]

{ #category : #accessing }
GossipEndpointStateDelta >> heartbeatState: aGossipHeartbeatState [
heartbeatState := aGossipHeartbeatState
]

{ #category : #initialization }
GossipEndpointStateDelta >> initialize [
super initialize.
applicationStates := Dictionary new
]

{ #category : #testing }
GossipEndpointStateDelta >> isEmpty [
^ applicationStates isEmpty
]
12 changes: 12 additions & 0 deletions source/Gossip/GossipEndpoints.class.st
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ GossipEndpoints class >> instance [
instance := self new ]
]

{ #category : #'as yet unclassified' }
GossipEndpoints >> applyChanges: aCollection [
aCollection ifNotEmpty: [ self halt. ]
]

{ #category : #accessing }
GossipEndpoints >> digests [
| digests |
Expand All @@ -38,6 +43,13 @@ GossipEndpoints >> endpointStateAt: key [
endpoint: key ]
]

{ #category : #accessing }
GossipEndpoints >> endpointStateAt: key ifAbsent: aBlock [
^ endpointStates
at: key
ifAbsent: aBlock
]

{ #category : #accessing }
GossipEndpoints >> endpointStateAt: key ifAbsentPut: aBlock [
^ endpointStates
Expand Down

0 comments on commit cc9ae6d

Please sign in to comment.