Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
E
ECE 428 - Distributed Systems
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
djq2
ECE 428 - Distributed Systems
Commits
b88b562f
Commit
b88b562f
authored
3 years ago
by
Kevin Villanueva
Browse files
Options
Downloads
Patches
Plain Diff
trying without being a go thread
parent
a760cb0a
No related branches found
No related tags found
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
mp3/src/servers/coordinator.go
+77
-77
77 additions, 77 deletions
mp3/src/servers/coordinator.go
with
77 additions
and
77 deletions
mp3/src/servers/coordinator.go
+
77
−
77
View file @
b88b562f
...
...
@@ -53,7 +53,7 @@ func (host *Server) client_write(message Client_resp) bool {
// Args:
// - key == account name
// - ltype == "R" for read "W" for write
func
(
host
*
Server
)
LockAccount
(
key
string
,
ltype
string
)
bool
{
func
(
host
*
Server
)
LockAccount
(
key
string
,
ltype
string
)
bool
{
obj
,
ok
:=
host
.
AccountData
.
Load
(
key
)
if
!
ok
{
fmt
.
Println
(
"Attempt to lock non-existent object."
)
...
...
@@ -113,7 +113,7 @@ func (host *Server) LockAccount(key string, ltype string) bool {
acc
.
RW
.
Lock
()
acc
.
Upgrade
.
Unlock
()
acc
.
LockType
=
"W"
// fmt.Println("Couldnt get lock after 50 attempts.")
}
return
true
...
...
@@ -170,7 +170,7 @@ func (host *Server) Release(account string, commit bool) bool {
acc
.
LockType
=
""
acc
.
RW
.
RUnlock
()
// Write Lock
// Write Lock
}
else
{
acc
.
LockType
=
""
acc
.
RW
.
Unlock
()
...
...
@@ -183,7 +183,7 @@ func (host *Server) Release(account string, commit bool) bool {
}
// Thread for handling client
func
(
host
*
Server
)
client_listen
(){
func
(
host
*
Server
)
client_listen
()
{
for
{
fmt
.
Println
(
"Awaiting Client Command."
)
...
...
@@ -209,7 +209,7 @@ func (host *Server) client_listen(){
}
// Thread for listening to servers
func
(
host
*
Server
)
server_comm
(
target_server
net
.
Conn
){
func
(
host
*
Server
)
server_comm
(
target_server
net
.
Conn
)
{
for
{
// Wait to receive data
var
latest_message
Message
...
...
@@ -222,9 +222,9 @@ func (host *Server) server_comm(target_server net.Conn){
}
// Parse message send thread
if
latest_message
.
Msg_type
==
"RESPONSE"
{
if
latest_message
.
Msg_type
==
"RESPONSE"
{
host
.
Trans_chan
<-
latest_message
}
else
if
latest_message
.
Msg_type
==
"REQUEST"
{
}
else
if
latest_message
.
Msg_type
==
"REQUEST"
{
go
host
.
handle_request
(
latest_message
)
}
else
if
latest_message
.
Msg_type
==
"PROBE"
{
go
host
.
handle_probe
(
latest_message
)
...
...
@@ -232,7 +232,7 @@ func (host *Server) server_comm(target_server net.Conn){
}
}
func
(
host
*
Server
)
Coordinator
(
name
string
){
func
(
host
*
Server
)
Coordinator
(
name
string
)
{
// Store Transaction
host
.
Tr
=
Transaction
{
Name
:
name
,
Accounts
:
make
(
map
[
string
]
Account
),
WaitFor
:
""
}
...
...
@@ -254,24 +254,24 @@ func (host *Server) Coordinator(name string){
fmt
.
Println
(
"Ended Deposit."
)
continue
}
if
msg
.
Command
==
"WITHDRAW"
{
if
msg
.
Command
==
"WITHDRAW"
{
fmt
.
Println
(
"Started Withdraw."
)
host
.
withdraw
(
msg
)
fmt
.
Println
(
"Ended Withdraw."
)
continue
}
if
msg
.
Command
==
"BALANCE"
{
if
msg
.
Command
==
"BALANCE"
{
fmt
.
Println
(
"Started Balance."
)
host
.
balance
(
msg
)
fmt
.
Println
(
"Ended Balance."
)
continue
}
if
msg
.
Command
==
"ABORT"
{
if
msg
.
Command
==
"ABORT"
{
fmt
.
Println
(
"Started Abort."
)
host
.
abort
()
return
}
if
msg
.
Command
==
"COMMIT"
{
if
msg
.
Command
==
"COMMIT"
{
fmt
.
Println
(
"Started Commit."
)
host
.
commit
()
return
...
...
@@ -279,7 +279,7 @@ func (host *Server) Coordinator(name string){
}
}
// Hierarchy:
// Hierarchy:
// If Account in host.Tr.Accounts
// -> Make local change
// Else If operation.Branch == this branch
...
...
@@ -294,10 +294,10 @@ func (host *Server) deposit(operation Client_msg) {
var
response
Message
req_out
:=
Message
{
Msg_type
:
"REQUEST"
,
Msg_type
:
"REQUEST"
,
Transaction
:
host
.
Tr
.
Name
,
Server
:
host
.
Id
,
Acc
:
operation
.
Account
}
Server
:
host
.
Id
,
Acc
:
operation
.
Account
}
// Case 1: Already in map
local_key
:=
operation
.
Branch
+
"."
+
operation
.
Account
...
...
@@ -309,8 +309,8 @@ func (host *Server) deposit(operation Client_msg) {
if
val
.
LockType
==
"W"
{
val
.
Balance
+=
operation
.
Amount
host
.
Tr
.
Accounts
[
local_key
]
=
val
// Lock not "W", need upgrade
// Lock not "W", need upgrade
}
else
{
// Local upgrade
if
operation
.
Branch
==
host
.
Id
{
...
...
@@ -318,24 +318,24 @@ func (host *Server) deposit(operation Client_msg) {
val
.
Balance
+=
operation
.
Amount
val
.
LockType
=
"W"
host
.
Tr
.
Accounts
[
local_key
]
=
val
// Otherwise need remote to upgrade
// Otherwise need remote to upgrade
}
else
{
req_out
.
Request
=
"UPGRADE"
go
host
.
server_write
(
operation
.
Branch
,
req_out
)
resp_in
:=
<-
host
.
Trans_chan
if
resp_in
.
Status
{
val
.
Balance
+=
operation
.
Amount
val
.
LockType
=
"W"
host
.
Tr
.
Accounts
[
local_key
]
=
val
}
else
{
}
else
{
fmt
.
Println
(
"Remote failed to upgrade lock."
)
}
}
}
// Case 2: Account not in local copy but at local branch
// Case 2: Account not in local copy but at local branch
}
else
if
host
.
Id
==
operation
.
Branch
{
cast
,
ok
:=
host
.
AccountData
.
Load
(
operation
.
Account
)
...
...
@@ -356,7 +356,7 @@ func (host *Server) deposit(operation Client_msg) {
host
.
Uncommitted
[
operation
.
Account
]
=
new_queue
}
host
.
Mu_Uncommitted
.
Unlock
()
// Acquire write lock
host
.
LockAccount
(
operation
.
Account
,
"W"
)
...
...
@@ -385,7 +385,7 @@ func (host *Server) deposit(operation Client_msg) {
host
.
Tr
.
Accounts
[
local_key
]
=
new_acc
}
// If Present and committed
// If Present and committed
}
else
{
// Get lock and add locally
host
.
LockAccount
(
operation
.
Account
,
"W"
)
...
...
@@ -394,7 +394,7 @@ func (host *Server) deposit(operation Client_msg) {
host
.
Tr
.
Accounts
[
local_key
]
=
new_acc
}
// If not in map, create uncommitted account
// If not in map, create uncommitted account
}
else
{
fmt
.
Println
(
"Add to global map."
)
...
...
@@ -440,7 +440,7 @@ func (host *Server) deposit(operation Client_msg) {
host
.
Tr
.
Accounts
[
local_key
]
=
acc_copy
}
// First to load
// First to load
}
else
{
// ALready have lock, add our copy locally
acc_copy
:=
*
stored_acc
...
...
@@ -453,15 +453,15 @@ func (host *Server) deposit(operation Client_msg) {
}
// Case 3: Acquire Lock and fetch from remote server
// Case 3: Acquire Lock and fetch from remote server
}
else
{
go
host
.
server_write
(
operation
.
Branch
,
req_out
)
// Receive response and upgrade local
response
=
<-
host
.
Trans_chan
response
=
<-
host
.
Trans_chan
new_local
:=
Account
{
Branch
:
operation
.
Branch
,
Balance
:
response
.
Balance
,
Branch
:
operation
.
Branch
,
Balance
:
response
.
Balance
,
LockType
:
"W"
,
// Committed?
}
...
...
@@ -479,10 +479,10 @@ func (host *Server) withdraw(operation Client_msg) {
var
response
Message
req_out
:=
Message
{
Msg_type
:
"REQUEST"
,
Msg_type
:
"REQUEST"
,
Transaction
:
host
.
Tr
.
Name
,
Server
:
host
.
Id
,
Acc
:
operation
.
Account
}
Server
:
host
.
Id
,
Acc
:
operation
.
Account
}
// Case 1: Already in map
local_key
:=
operation
.
Branch
+
"."
+
operation
.
Account
...
...
@@ -491,33 +491,33 @@ func (host *Server) withdraw(operation Client_msg) {
if
val
.
LockType
==
"W"
{
val
.
Balance
-=
operation
.
Amount
host
.
Tr
.
Accounts
[
local_key
]
=
val
// Lock not "W", need upgrade
}
else
{
// Lock not "W", need upgrade
}
else
{
// Local upgrade
if
operation
.
Branch
==
host
.
Id
{
host
.
Upgrade
(
operation
.
Account
)
val
.
Balance
-=
operation
.
Amount
val
.
LockType
=
"W"
host
.
Tr
.
Accounts
[
local_key
]
=
val
// Remote upgrade
// Remote upgrade
}
else
{
req_out
.
Request
=
"UPGRADE"
go
host
.
server_write
(
operation
.
Branch
,
req_out
)
resp_in
:=
<-
host
.
Trans_chan
if
resp_in
.
Status
{
val
.
Balance
+=
operation
.
Amount
val
.
LockType
=
"W"
host
.
Tr
.
Accounts
[
local_key
]
=
val
}
else
{
}
else
{
fmt
.
Println
(
"Remote failed to upgrade lock."
)
}
}
}
// Case 2: Account not in local copy but at local branch
// Case 2: Account not in local copy but at local branch
}
else
if
host
.
Id
==
operation
.
Branch
{
// Check if exists in global map
cast
,
ok
:=
host
.
AccountData
.
Load
(
operation
.
Account
)
...
...
@@ -530,10 +530,10 @@ func (host *Server) withdraw(operation Client_msg) {
host
.
abort
()
return
// Else grab lock, update and add locally
// Else grab lock, update and add locally
}
else
{
realAcc
:=
cast
.
(
*
Account
)
host
.
LockAccount
(
operation
.
Account
,
"W"
)
// If account uncommitted, unlock and abort
...
...
@@ -550,12 +550,12 @@ func (host *Server) withdraw(operation Client_msg) {
host
.
Tr
.
Accounts
[
local_key
]
=
acc_copy
}
// Case 3: Acquire Lock and fetch from remote server
// Case 3: Acquire Lock and fetch from remote server
}
else
{
go
host
.
server_write
(
operation
.
Branch
,
req_out
)
host
.
server_write
(
operation
.
Branch
,
req_out
)
// Receive response and upgrade local
response
=
<-
host
.
Trans_chan
response
=
<-
host
.
Trans_chan
// Abort nonexistent
if
!
response
.
Acc_Exists
{
...
...
@@ -566,10 +566,10 @@ func (host *Server) withdraw(operation Client_msg) {
}
new_local
:=
Account
{
Branch
:
operation
.
Branch
,
Balance
:
response
.
Balance
,
Branch
:
operation
.
Branch
,
Balance
:
response
.
Balance
,
LockType
:
"W"
,
// Committed?
// Committed?
}
new_local
.
Balance
-=
operation
.
Amount
...
...
@@ -584,11 +584,11 @@ func (host *Server) balance(operation Client_msg) {
var
response
Message
var
bal
int
req_out
:=
Message
{
Msg_type
:
"REQUEST"
,
Msg_type
:
"REQUEST"
,
Transaction
:
host
.
Tr
.
Name
,
Server
:
host
.
Id
,
Request
:
"RLOCK"
,
Acc
:
operation
.
Account
}
Server
:
host
.
Id
,
Request
:
"RLOCK"
,
Acc
:
operation
.
Account
}
// Case 1: Already in map
local_key
:=
operation
.
Branch
+
"."
+
operation
.
Account
...
...
@@ -596,7 +596,7 @@ func (host *Server) balance(operation Client_msg) {
// Just return balance
bal
=
val
.
Balance
// Case 2: Account not in local copy but at local branch
// Case 2: Account not in local copy but at local branch
}
else
if
host
.
Id
==
operation
.
Branch
{
// Check if exists in global map
cast
,
ok
:=
host
.
AccountData
.
Load
(
operation
.
Account
)
...
...
@@ -609,10 +609,10 @@ func (host *Server) balance(operation Client_msg) {
host
.
abort
()
return
// Else grab read lock
// Else grab read lock
}
else
{
realAcc
:=
cast
.
(
*
Account
)
host
.
LockAccount
(
operation
.
Account
,
"R"
)
// If account uncommitted, unlock and abort
...
...
@@ -629,12 +629,12 @@ func (host *Server) balance(operation Client_msg) {
}
}
// Case 3: Acquire Lock and fetch from remote server
// Case 3: Acquire Lock and fetch from remote server
}
else
{
go
host
.
server_write
(
operation
.
Branch
,
req_out
)
// Receive response and upgrade local
response
=
<-
host
.
Trans_chan
response
=
<-
host
.
Trans_chan
// Abort nonexistent
if
!
response
.
Acc_Exists
{
...
...
@@ -645,10 +645,10 @@ func (host *Server) balance(operation Client_msg) {
}
new_local
:=
Account
{
Branch
:
operation
.
Branch
,
Balance
:
response
.
Balance
,
Branch
:
operation
.
Branch
,
Balance
:
response
.
Balance
,
LockType
:
"R"
,
// Committed?
// Committed?
}
host
.
Tr
.
Accounts
[
local_key
]
=
new_local
bal
=
new_local
.
Balance
...
...
@@ -661,7 +661,7 @@ func (host *Server) balance(operation Client_msg) {
// Abort Process:
// 1.) Parse Local Accounts and release all remote and local locks
// 2.) Clear Host Struct to end transaction
func
(
host
*
Server
)
abort
(){
func
(
host
*
Server
)
abort
()
{
// Iterate Local Transaction
for
acc
,
data
:=
range
host
.
Tr
.
Accounts
{
// Local Account
...
...
@@ -669,14 +669,14 @@ func (host *Server) abort(){
acc_name
:=
div
[
1
]
if
data
.
Branch
==
host
.
Id
{
host
.
Release
(
acc_name
,
false
)
// Remote Account
// Remote Account
}
else
{
msg_out
:=
Message
{
Msg_type
:
"REQUEST"
,
Msg_type
:
"REQUEST"
,
Transaction
:
host
.
Tr
.
Name
,
Server
:
host
.
Id
,
Request
:
"ABORT"
,
Acc
:
acc_name
}
Server
:
host
.
Id
,
Request
:
"ABORT"
,
Acc
:
acc_name
}
go
host
.
server_write
(
data
.
Branch
,
msg_out
)
}
}
...
...
@@ -694,12 +694,12 @@ func (host *Server) abort(){
// 2.) Release all locks
// 3.) Clear Server struct to end transaction
// - Careful about timing with threads, make sure all updates are done
func
(
host
*
Server
)
commit
(){
func
(
host
*
Server
)
commit
()
{
// Iterate Local Transaction
for
acc
,
data
:=
range
host
.
Tr
.
Accounts
{
div
:=
strings
.
Split
(
acc
,
"."
)
acc_name
:=
div
[
1
]
// Local Commit
if
data
.
Branch
==
host
.
Id
{
cast
,
ok
:=
host
.
AccountData
.
Load
(
acc_name
)
...
...
@@ -712,17 +712,17 @@ func (host *Server) commit(){
fmt
.
Println
(
"Failed to Commit local change."
)
}
host
.
Release
(
acc_name
,
true
)
// Remote Commit
// Remote Commit
}
else
{
msg_out
:=
Message
{
Msg_type
:
"REQUEST"
,
Msg_type
:
"REQUEST"
,
Transaction
:
host
.
Tr
.
Name
,
Server
:
host
.
Id
,
Request
:
"COMMIT"
,
Server
:
host
.
Id
,
Request
:
"COMMIT"
,
New_Balance
:
data
.
Balance
,
Acc
:
acc_name
}
Acc
:
acc_name
}
// POSSIBLE TIMING ISSUE IF FUNCTION ENDS WHILE STILL WAITING ON CHANNEL RESPONSES
go
host
.
server_write
(
data
.
Branch
,
msg_out
)
}
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment