use super::message::Message; use super::peer; use crate::blockchain::Blockchain; use crate::block::{BlockP,BlockT}; use crate::transaction::{SignedTransaction,verify}; use crate::crypto::hash::{H256,H160,Hashable}; use crate::network::server::Handle as ServerHandle; use crossbeam::channel; use log::{debug,info,warn}; use std::sync::{Arc, Mutex}; use std::collections::HashMap; use std::thread; #[derive(Clone)] pub struct Context { msg_chan: channel::Receiver<(Vec<u8>, peer::Handle)>, num_worker: usize, server: ServerHandle, blockchain: Arc<Mutex<Blockchain>>, buffer: Arc<Mutex<HashMap<H256 ,Vec<BlockP>>>>, mempool: Arc<Mutex<HashMap<H256 ,SignedTransaction>>>, // orphan_transaction: Arc<Mutex<HashMap<H256 ,bool>>>, glob_state: Arc<Mutex<HashMap<H256, HashMap<(H256,u32), (u32, H160)> >>>, table: Arc<Mutex<Vec<H160>>>, txpool: Arc<Mutex<HashMap<H256 ,BlockT>>>, unseen: Arc<Mutex<HashMap<H256 ,bool>>>, } pub fn new( num_worker: usize, msg_src: channel::Receiver<(Vec<u8>, peer::Handle)>, server: &ServerHandle, blockchain: &Arc<Mutex<Blockchain>>, buffer: &Arc<Mutex<HashMap<H256 ,Vec<BlockP>>>>, mempool: &Arc<Mutex<HashMap<H256 ,SignedTransaction>>>, // orphan_transaction: &Arc<Mutex<HashMap<H256 ,bool>>>, glob_state: &Arc<Mutex<HashMap<H256, HashMap<(H256,u32), (u32, H160)> >>>, table: &Arc<Mutex<Vec<H160>>>, txpool: &Arc<Mutex<HashMap<H256 ,BlockT>>>, unseen: &Arc<Mutex<HashMap<H256 ,bool>>>, ) -> Context { Context { msg_chan: msg_src, num_worker, server: server.clone(), blockchain: Arc::clone(blockchain), buffer: Arc::clone(buffer), mempool: Arc::clone(mempool), // // orphan_transaction: Arc::clone(orphan_transaction), glob_state: Arc::clone(glob_state), table: Arc::clone(table), txpool: Arc::clone(txpool), unseen: Arc::clone(unseen), } } impl Context { pub fn start(self) { let num_worker = self.num_worker; for i in 0..num_worker { let cloned = self.clone(); thread::spawn(move || { cloned.worker_loop(); warn!("Worker thread {} exited", i); }); } } fn revert_transactions(&self){ let head = self.blockchain.lock().unwrap().tip(); let mut curr = self.blockchain.lock().unwrap().blocks[&head].header.parent; loop{ let b = self.blockchain.lock().unwrap(); let mut changed = false; for (k,v) in b.blocks.iter(){ if k != &curr && b.heights[k] == b.heights[&curr]{ changed = true; for item in v.content.data.clone(){ // self.mempool.lock().unwrap().insert(item.hash(),item.clone()); self.unseen.lock().unwrap().insert(item.clone(),true); self.server.broadcast(Message::NewTxBlockHashes(vec![item.clone()])); } } } if !changed{ break; } curr = b.blocks[&curr].header.parent; std::mem::drop(b); } } fn new_parent(&self,block_item:&BlockP, peer: &peer::Handle){ if self.buffer.lock().unwrap().contains_key(&block_item.hash()){ let vec = self.buffer.lock().unwrap()[&block_item.hash()].clone(); self.buffer.lock().unwrap().remove(&block_item.hash()); for item in &vec{ // Remove transactions from orphan_transaction // for transaction in item.content.data.clone(){ // // self.orphan_transaction.lock().unwrap().remove(&transaction.hash()); // } self.block_handler(item,peer); } } } fn verify_transaction(&self,block_item:&BlockP) -> bool{ let mut valid = true; let tip = block_item.header.parent; for txb_hash in block_item.content.data.clone(){ if !self.txpool.lock().unwrap().contains_key(&txb_hash){ continue; } let txb = self.txpool.lock().unwrap()[&txb_hash].clone(); for tx in txb.content.data.clone(){ println!("Glob state: {:?}\ncurr tx: {:?}",self.glob_state.lock().unwrap()[&tip],tx.transaction); //1. Check if the transactions are signed correctly by the public keys if verify(&tx.transaction, &tx.sig.key, &tx.sig.signature) == false{ valid = false; debug!("Transaction check 1 fail!"); break; } if tx.transaction.input_previous[0] == H256::from([0;32]){ continue; } let mut sum = 0; for idx in 0..tx.transaction.input_previous.len(){ //2. Check if the inputs to the transactions exist in State if self.glob_state.lock().unwrap()[&tip].contains_key(&( tx.transaction.input_previous[idx], tx.transaction.input_index[idx])) == false{ debug!("Transaction check 2 fail!"); valid=false; break; } //3. Check the public key matches the owner's address of these inputs. else{ let (value,addr) = self.glob_state.lock().unwrap()[&tip][&(tx.transaction.input_previous[idx],tx.transaction.input_index[idx])]; let h:H256 = ring::digest::digest(&ring::digest::SHA256, &(tx.sig.key[..])).into(); let pub_key = H160::from(h); if addr != pub_key{ debug!("Transaction check 3 fail!"); valid = false; break; } sum = sum + value; } } //4. Check the values of inputs are not less than(I think should be strictly equal) those of outputs. let mut out = 0; for idx in 0..tx.transaction.output_value.len(){ out = out + tx.transaction.output_value[idx]; } if sum < out { debug!("Transaction check 4 fail!"); valid = false; break; } } } return valid; } fn block_handler(&self,block_item:&BlockP,peer: &peer::Handle){ //Initial Check: If block hash is correct if block_item.hash()>block_item.header.difficulty_m{ debug!("Initial Check 1 fail!"); return; } //Initial Check: If block is already in blockchain if self.blockchain.lock().unwrap().blocks.contains_key(&block_item.hash()) == true { debug!("Initial Check 2 fail!"); return; } //Case 1: Orphan block if self.blockchain.lock().unwrap().blocks.contains_key(&block_item.header.parent) == false{ debug!("Orphan Block!"); // Add transactions to orphan_transaction // for transaction in block_item.content.data.clone(){ // // self.orphan_transaction.lock().unwrap().insert(transaction.hash(),true); // } if self.buffer.lock().unwrap().contains_key(&block_item.header.parent){ if let Some(x) = self.buffer.lock().unwrap().get_mut(&block_item.header.parent){ x.push(block_item.clone()); } } else{ self.buffer.lock().unwrap().insert(block_item.header.parent, vec![block_item.clone()]); } peer.write(Message::GetBlocks(vec![block_item.header.parent.clone()])); return; } info!("Block heard! {:?}",block_item.hash()); //Default: Verify transaction for both stale and normal blocks if !self.verify_transaction(block_item){ debug!("Verify transaction fail!"); return; } //Default: Update the global state let mut curr_state = self.glob_state.lock().unwrap()[&block_item.header.parent].clone(); let mut tx_seen:HashMap<H256 ,bool> = HashMap::new(); for txb_hash in block_item.content.data.clone(){ if !self.txpool.lock().unwrap().contains_key(&txb_hash){ continue; } let txb = self.txpool.lock().unwrap()[&txb_hash].clone(); for transaction in txb.content.data.clone(){ if tx_seen.contains_key(&transaction.hash()){ continue; } tx_seen.insert(transaction.hash(),true); if transaction.transaction.input_previous[0] != H256::from([0;32]){ for idx in 0..transaction.transaction.input_previous.len(){ // println!("before:{:?}\n{:?}",curr_state,transaction.transaction.input_previous[idx]); curr_state.remove(&( transaction.transaction.input_previous[idx], transaction.transaction.input_index[idx])); // println!("after:{:?}",curr_state); } } for idx in 0..transaction.transaction.output_value.len(){ curr_state.insert((transaction.hash(),idx as u32),( transaction.transaction.output_value[idx], transaction.transaction.output_address[idx])); } //1. Remove transactions from pool self.mempool.lock().unwrap().remove(&transaction.hash()); } } self.glob_state.lock().unwrap().insert(block_item.hash(),curr_state.clone()); // for transaction in block_item.content.data.clone(){ // println!("normal transaction removed. (Not ICO tx)"); // self.mempool.lock().unwrap().remove(&transaction.hash()); // } //Case 2: Stale block if self.blockchain.lock().unwrap().tip() != block_item.header.parent{ //2. Put into blockchain and broadcast new transaction self.blockchain.lock().unwrap().insert(&block_item); self.server.broadcast(Message::NewBlockHashes(vec![block_item.hash()])); //3. If longest chain changes if self.blockchain.lock().unwrap().tip() == block_item.hash(){ self.revert_transactions(); println!("Longest chain changes"); } //4. Check if this block is parent to any orphan self.new_parent(&block_item,peer); } //Case 3: normal block else{ //2. Put into blockchain and broadcast new transaction self.blockchain.lock().unwrap().insert(&block_item); self.server.broadcast(Message::NewBlockHashes(vec![block_item.hash()])); //3. Check if this block is parent to any orphan self.new_parent(&block_item,peer); } } fn worker_loop(&self) { // use std::time; // let ten_millis = time::Duration::from_millis(100); // thread::sleep(ten_millis); loop { let msg = self.msg_chan.recv().unwrap(); let (msg, peer) = msg; let msg: Message = bincode::deserialize(&msg).unwrap(); match msg { Message::Ping(nonce) => { // peer.write(Message::Pong("ddd".to_string())); let tips = self.blockchain.lock().unwrap().tip(); let coins = self.glob_state.lock().unwrap()[&tips].clone(); debug!("{}",nonce); for item in coins{ println!("{:?}",item); } } Message::Pong(nonce) => { debug!("Pong: {}", nonce); } Message::NewAddr(addr) => { // debug!("New Addr {:?}",addr); let mut t = self.table.lock().unwrap(); t.push(addr); peer.write(Message::UpdateAddr(t.clone())); self.server.broadcast(Message::BroadcastAddr(addr)); let tx_local = self.txpool.lock().unwrap(); let mut tx_iter = tx_local.keys(); loop{ match tx_iter.next(){ Some(keys) => { self.server.broadcast(Message::NewTxBlockHashes(vec![keys.clone()])); } None => { break; } } } let m = self.mempool.lock().unwrap(); let mut iter = m.keys(); loop{ match iter.next(){ Some(keys) => { self.server.broadcast(Message::NewTransactionHashes(vec![keys.clone()])); } None => { break; } } } } Message::BroadcastAddr(addr) => { // debug!("Broadcast Addr {:?}",addr); let mut t = self.table.lock().unwrap(); if t.contains(&addr) != true{ t.push(addr); peer.write(Message::BroadcastAddr(addr)); } } Message::UpdateAddr(addr_vec) => { // debug!("Update Addr {:?}",addr_vec); let mut t = self.table.lock().unwrap(); if t.len()<2 { for addr in addr_vec{ if t.contains(&addr) != true{ t.push(addr); } } } } Message::NewTransactionHashes(transaction_vec) => { for transaction_hash in transaction_vec{ if self.mempool.lock().unwrap().contains_key(&transaction_hash) == false { peer.write(Message::GetTransactions(vec![transaction_hash])); } } } Message::GetTransactions(transaction_vec) => { // println!("Yeah! Heard get "); for transaction_hash in transaction_vec{ if self.mempool.lock().unwrap().contains_key(&transaction_hash) == true { peer.write(Message::Transactions(vec![self.mempool.lock().unwrap()[&transaction_hash].clone()])); } } } Message::Transactions(transaction_vec) => { // println!("Yeah! Heard trans"); let mut new = false; for transaction in transaction_vec{ //1. Verify the transaction if verify(&transaction.transaction, &transaction.sig.key, &transaction.sig.signature){ //2. put transaction into mempool // println!("Yeah! key correct"); new = true; self.mempool.lock().unwrap().insert(transaction.hash(), transaction.clone()); self.server.broadcast(Message::NewTransactionHashes(vec![transaction.hash()])); } else{ // println!("Damn! key wrong"); } } if new{ // println!("Yeah! Heard new"); let m = self.mempool.lock().unwrap(); let mut iter = m.keys(); let mut sender: Vec<H256> = Vec::new(); loop{ match iter.next(){ Some(keys) => { sender.push(keys.clone()); } None => { break; } } } peer.write(Message::NewTransactionHashes(sender)); std::mem::drop(m); } } Message::NewTxBlockHashes(block_vec) => { // println!("NewTxBlockHashes!"); for block_hash in block_vec{ if self.txpool.lock().unwrap().contains_key(&block_hash) == false { peer.write(Message::GetTxBlocks(vec![block_hash])); } } } Message::GetTxBlocks(block_vec) => { // println!("GetTxBlockHashes!"); for block_hash in block_vec{ // println!("Get block req {:?}",block_hash); if self.txpool.lock().unwrap().contains_key(&block_hash) == true { peer.write(Message::TxBlocks(vec![self.txpool.lock().unwrap()[&block_hash].clone()])); } } } Message::TxBlocks(block_vec) => { // println!("TxBlockHashes!"); for block_item in block_vec{ //check if hash is in correct range if (block_item.hash() > block_item.header.difficulty) || (block_item.hash() <= block_item.header.difficulty_m){ println!("Tx: nonce not in range"); continue; } self.txpool.lock().unwrap().insert(block_item.hash(), block_item.clone()); self.unseen.lock().unwrap().insert(block_item.hash(),true); for transaction in block_item.content.data.clone(){ if !self.mempool.lock().unwrap().contains_key(&transaction.hash()){ continue; } self.mempool.lock().unwrap().remove(&transaction.hash()); } self.server.broadcast(Message::NewTxBlockHashes(vec![block_item.hash()])); } } Message::NewBlockHashes(block_vec) => { for block_hash in block_vec{ if self.blockchain.lock().unwrap().blocks.contains_key(&block_hash) == false { peer.write(Message::GetBlocks(vec![block_hash])); } } } Message::GetBlocks(block_vec) => { for block_hash in block_vec{ // println!("Get block req {:?}",block_hash); if self.blockchain.lock().unwrap().blocks.contains_key(&block_hash) == true { peer.write(Message::Blocks(vec![self.blockchain.lock().unwrap().blocks[&block_hash].clone()])); } } } Message::Blocks(block_vec) => { // let m = self.mempool.lock().unwrap(); // println!("Prev Mempool size: {:?}",m.len()); // for (_,item) in m.clone(){ // println!("Prev Transactions: {:?}",item.transaction); // } // std::mem::drop(m); for block_item in block_vec{ //check if hash is in correct range if block_item.hash() > block_item.header.difficulty_m{ println!("P: nonce not in range"); continue; } self.block_handler(&block_item,&peer); } // println!("Total blocks{:?} orphan size: {}",self.blockchain.lock().unwrap().blocks.len(),self.buffer.lock().unwrap().len()); // let m = self.mempool.lock().unwrap(); // println!("After Mempool size: {:?}",m.len()); // for (_,item) in m.clone(){ // println!("After Transactions: {:?}",item.transaction); // } // std::mem::drop(m); } } } } }