1.从项目的一个 panic 说起:Go 中 Sync 包的分析应用
2.golangï¼contextä»ç»
3.图解Go里面的WaitGroup了解编程语言核心实现源码
4.Go并发实战--sync WaitGroup
5.基于 Golang 实现的 Shadowsocks 源码解析
6.Go并åç¼ç¨ï¼goroutineï¼channelåsync详解
从项目的一个 panic 说起:Go 中 Sync 包的分析应用
在项目开发过程中,遇到一个常见的错误——"fatal error: concurrent map read and map write",这是由于Golang内建的map在并发环境下不安全导致的。解决这个问题的方法并不复杂,就是转向使用sync包提供的并发安全的map。
sync包在Golang 1.9之后被官方支持,音乐公司网站源码其中包含了丰富的同步原语,是并发编程的关键部分。在Golang 1.9之前,解决map并发问题通常会借助sync包中的sync.RWMutex或其他锁机制。Golang作为支持用户态进程的编程语言,对并发编程的处理自然离不开锁,这是一种确保多个Goroutine在同一片内存中协同工作的同步机制。
sync包的源码目录结构清晰,包含Mutex、RWmutex、WaitGroup、Map、react.js项目源码Once、Cond、Pool等组件。接下来,我们将逐个分析这些同步原语的用途和使用注意事项,重点讨论在项目中常见的sync.Map。
sync.Map是sync包中的一种高效并发安全的map实现,与内建map相比,它提供了Load、Store、LoadOrStore、Delete和Range等方法,并且具有更高的并发性能。虽然sync.Map没有len方法,但其内部机制使得在并发环境中的操作更加稳健。
通过结合实际项目案例和面试题中的陷阱,本文简要探讨了sync包中Mutex、微信订单监控源码RWMutex、WaitGroup、Once以及Map的使用技巧和注意事项。在实际编程中,正确使用这些同步原语对于避免并发问题至关重要。
golangï¼contextä»ç»
1 åè¨
æè¿å®ç°ç³»ç»çåå¸å¼æ¥å¿ä¸äºå¡ç®¡çæ¶,å¨å¯»æ±æè°çå ¨å±å¯ä¸Goroutine IDæ æä¹å,å³å®è¿æ¯ç®åå©ç¨Contextæºå¶å®ç°äºåºæ¬çæ³æ³,ä¸å¤é«æ,ä½æ¯å¥½ç¨.äºæ¯å¯¹å®å½åç设计æ¯è¾å¥½å¥,便æäºæ¤æ.
Contextæ¯golangå®æ¹å®ä¹çä¸ä¸ªpackage,å®å®ä¹äºContextç±»å,éé¢å å«äºDeadline/Done/Erræ¹æ³ä»¥åç»å®å°Contextä¸çæååéå¼Value,å ·ä½å®ä¹å¦ä¸ï¼
typeContextinterface{ //è¿åContextçè¶ æ¶æ¶é´ï¼è¶ æ¶è¿ååºæ¯ï¼Deadline()(deadlinetime.Time,okbool)//å¨Contextè¶ æ¶æåæ¶æ¶ï¼å³ç»æäºï¼è¿åä¸ä¸ªå ³éçchannel//å³å¦æå½åContextè¶ æ¶æåæ¶æ¶,Doneæ¹æ³ä¼è¿åä¸ä¸ªchannel,ç¶åå ¶ä»å°æ¹å°±å¯ä»¥éè¿å¤æDoneæ¹æ³æ¯å¦æè¿åï¼channelï¼,å¦ææå说æContextå·²ç»æ//æ å ¶å¯ä»¥ä½ä¸ºå¹¿æéç¥å ¶ä»ç¸å ³æ¹æ¬Contextå·²ç»æ,请åç¸å ³å¤ç.Done()<-chanstruct{ }//è¿åContextåæ¶çåå Err()error//è¿åContextç¸å ³æ°æ®Value(keyinterface{ })interface{ }}é£ä¹å°åºä»ä¹Contextï¼å¯ä»¥åé¢ææå¯ä»¥ç解为ä¸ä¸æ,æ¯è¾çæçæè¿ç¨/线ç¨ä¸çº¿æ,å ³äºgolangä¸çä¸ä¸æ,ä¸å¥è¯æ¦æ¬å°±æ¯ï¼ goroutineçç¸å ³ç¯å¢å¿«ç §,å ¶ä¸å å«å½æ°è°ç¨ä»¥åæ¶åçç¸å ³çåéå¼. éè¿Contextå¯ä»¥åºåä¸åçgoroutine请æ±,å 为å¨golang Seversä¸,æ¯ä¸ªè¯·æ±é½æ¯å¨å个goroutineä¸å®æç.
æè¿å¨å ¬å¸åægRPCæºç ,protoæ件çæç代ç ,æ¥å£å½æ°ç¬¬ä¸ä¸ªåæ°ç»ä¸æ¯ctx context.Contextæ¥å£,å ¬å¸ä¸å°åäºé½ä¸äºè§£è¿æ ·è®¾è®¡çåºåç¹æ¯ä»ä¹,å ¶å®æä¹ä¸äºè§£å ¶èåçåç.ä»å¤©è¶ç妮妲å°é£å¦¹åæ£é¢ç»éæ·±å³,å ¨å¸åå·¥,å课,åä¸,å¨å®¶ä¼æ¯æ¾äºä¸äºèµæç 究æç©ä¸æ.
Contexté常被è¯ä½ä¸ä¸æ,å®æ¯ä¸ä¸ªæ¯è¾æ½è±¡çæ¦å¿µ.å¨å ¬å¸ææ¯è®¨è®ºæ¶ä¹ç»å¸¸ä¼æå°ä¸ä¸æ.ä¸è¬ç解为ç¨åºåå çä¸ä¸ªè¿è¡ç¶æ,ç°åº,å¿«ç §,èç¿»è¯ä¸ä¸ä¸åå¾å¥½å°è¯ éäºå ¶æ¬è´¨,ä¸ä¸ä¸ä¸åæ¯åå¨ä¸ä¸å±çä¼ é,ä¸ä¼æå å®¹ä¼ éç»ä¸.å¨Goè¯è¨ä¸,ç¨åºåå ä¹å°±æçæ¯Goroutine.
æ¯ä¸ªGoroutineå¨æ§è¡ä¹å,é½è¦å ç¥éç¨åºå½åçæ§è¡ç¶æ,é常å°è¿äºæ§è¡ç¶æå°è£ å¨ä¸ä¸ªContextåéä¸,ä¼ éç»è¦æ§è¡çGoroutineä¸. ä¸ä¸æåå ä¹å·²ç»æä¸ºä¼ éä¸è¯·æ±åçåå¨æåéçæ åæ¹æ³.å¨ç½ç»ç¼ç¨ä¸,å½æ¥æ¶å°ä¸ä¸ªç½ç»è¯·æ±Request,å¤çRequestæ¶,æ们å¯è½éè¦å¼å¯ä¸åçGoroutineæ¥è·åæ°æ®ä¸é»è¾å¤ç,å³ä¸ä¸ªè¯·æ±Request,ä¼å¨å¤ä¸ªGoroutineä¸å¤ç. èè¿äºGoroutineå¯è½éè¦å ±äº«Requestçä¸äºä¿¡æ¯;åæ¶å½Request被åæ¶æè è¶ æ¶çæ¶å,ææä»è¿ä¸ªRequestå建çææGoroutineä¹åºè¯¥è¢«ç»æ.
注ï¼å ³äºgoroutineçç解å¯ä»¥ç§»æ¥è¿é.
2 为ä»ä¹ä½¿ç¨contextç±äºå¨golang seversä¸,æ¯ä¸ªrequesté½æ¯å¨å个goroutineä¸å®æ,并ä¸å¨å个goroutineï¼ä¸å¦¨ç§°ä¹ä¸ºAï¼ä¸ä¹ä¼æ请æ±å ¶ä»æå¡ï¼å¯å¨å¦ä¸ä¸ªgoroutineï¼ç§°ä¹ä¸ºBï¼å»å®æï¼çåºæ¯,è¿å°±ä¼æ¶åå¤ä¸ªGoroutineä¹é´çè°ç¨.å¦ææä¸æ¶å»è¯·æ±å ¶ä»æå¡è¢«åæ¶æè è¶ æ¶,åä½ä¸ºæ·±é·å ¶ä¸çå½ågoroutine Béè¦ç«å³éåº,ç¶åç³»ç»æå¯åæ¶Bæå ç¨çèµæº. å³ä¸ä¸ªrequestä¸é常å å«å¤ä¸ªgoroutine,è¿äºgoroutineä¹é´é常ä¼æ交äº.
é£ä¹,å¦ä½ææ管çè¿äºgoroutineæ为ä¸ä¸ªé®é¢ï¼ä¸»è¦æ¯éåºéç¥åå æ°æ®ä¼ éé®é¢ï¼,Googleç解å³æ¹æ³æ¯Contextæºå¶,ç¸äºè°ç¨çgoroutineä¹é´éè¿ä¼ écontextåéä¿æå ³è,è¿æ ·å¨ä¸ç¨æ´é²ågoroutineå é¨å®ç°ç»èçåæä¸,ææå°æ§å¶ågoroutineçè¿è¡.
å¦æ¤ä¸æ¥,éè¿ä¼ éContextå°±å¯ä»¥è¿½è¸ªgoroutineè°ç¨æ ,并å¨è¿äºè°ç¨æ ä¹é´ä¼ ééç¥åå æ°æ®. è½ç¶goroutineä¹é´æ¯å¹³è¡ç,没æ继æ¿å ³ç³»,ä½æ¯Context设计ææ¯å å«ç¶åå ³ç³»ç,è¿æ ·å¯ä»¥æ´å¥½çæè¿°goroutineè°ç¨ä¹é´çæ åå ³ç³».
3 æä¹ä½¿ç¨contextçæä¸ä¸ªContext主è¦æ两类æ¹æ³ï¼
3.1 顶å±Contextï¼Backgroundè¦å建Contextæ ,é¦å å°±æ¯è¦åå»ºæ ¹èç¹
//è¿åä¸ä¸ªç©ºçContext,å®ä½ä¸ºææç±æ¤ç»§æ¿Contextçæ ¹èç¹funcBackground()Context该Contexté常ç±æ¥æ¶requestç第ä¸ä¸ªgoroutineå建,å®ä¸è½è¢«åæ¶,没æå¼,ä¹æ²¡æè¿ææ¶é´,常ä½ä¸ºå¤çrequestç顶å±contextåå¨.
3.2 ä¸å±Contextï¼WithCancel/WithDeadline/WithTimeoutæäºæ ¹èç¹ä¹å,æ¥ä¸æ¥å°±æ¯å建ååèç¹.为äºå¯ä»¥å¾å¥½çæ§å¶ååèç¹,Contextå æä¾çå建æ¹æ³åæ¯å¸¦æ第äºè¿åå¼ï¼CancelFuncç±»åï¼,å®ç¸å½äºä¸ä¸ªHook,å¨ågoroutineæ§è¡è¿ç¨ä¸,å¯ä»¥éè¿è§¦åHookæ¥è¾¾å°æ§å¶ågoroutineçç®çï¼é常æ¯åæ¶,å³è®©å ¶åä¸æ¥ï¼.åé åContextæä¾çDoneæ¹æ³,ågoroutineå¯ä»¥æ£æ¥èªèº«æ¯å¦è¢«ç¶çº§èç¹Cancelï¼
select{ case<-ctx.Done()://dosomecleanâ¦}注ï¼ç¶èç¹Contextå¯ä»¥ä¸»å¨éè¿è°ç¨cancelæ¹æ³åæ¶åèç¹Context,èåèç¹Contextåªè½è¢«å¨çå¾ .åæ¶ç¶èç¹Contextèªèº«ä¸æ¦è¢«åæ¶ï¼å¦å ¶ä¸çº§èç¹Cancelï¼,å ¶ä¸çææåèç¹Contextåä¼èªå¨è¢«åæ¶.
æä¸ç§å建æ¹æ³ï¼
//带cancelè¿åå¼çContext,ä¸æ¦cancel被è°ç¨,å³åæ¶è¯¥å建çcontextfuncWithCancel(parentContext)(ctxContext,cancelCancelFunc)//带æææcancelè¿åå¼çContext,å³å¿ é¡»å°è¾¾æå®æ¶é´ç¹è°ç¨çcacelæ¹æ³æä¼è¢«æ§è¡funcWithDeadline(parentContext,deadlinetime.Time)(Context,CancelFunc)//å¸¦è¶ æ¶æ¶é´cancelè¿åå¼çContext,类似Deadline,åè æ¯æ¶é´ç¹,åè 为æ¶é´é´é//ç¸å½äºWithDeadline(parent,time.Now().Add(timeout)).funcWithTimeout(parentContext,timeouttime.Duration)(Context,CancelFunc)ä¸é¢æ¥çæ¹ç¼èªAdvanced Go Concurrency Patternsè§é¢æä¾çä¸ä¸ªç®åä¾åï¼
packagemainimport("context""fmt""time")funcsomeHandler(){ //å建继æ¿Backgroundçåèç¹Contextctx,cancel:=context.WithCancel(context.Background())godoSth(ctx)//模æç¨åºè¿è¡-Sleep5ç§time.Sleep(5*time.Second)cancel()}//æ¯1ç§workä¸ä¸,åæ¶ä¼å¤æctxæ¯å¦è¢«åæ¶,å¦ææ¯å°±éåºfuncdoSth(ctxcontext.Context){ vari=1for{ time.Sleep(1*time.Second)select{ case<-ctx.Done():fmt.Println("done")returndefault:fmt.Printf("work%dseconds:\n",i)}i++}}funcmain(){ fmt.Println("start...")someHandler()fmt.Println("end.")}è¾åºç»æï¼
注æ,æ¤æ¶doSthæ¹æ³ä¸caseä¹doneçfmt.Println("done")并没æ被æå°åºæ¥.
è¶ æ¶åºæ¯ï¼
packagemainimport("context""fmt""time")functimeoutHandler(){ //å建继æ¿Backgroundçåèç¹Contextctx,cancel:=context.WithTimeout(context.Background(),3*time.Second)godoSth(ctx)//模æç¨åºè¿è¡-Sleepç§time.Sleep(*time.Second)cancel()//3ç§åå°æååæ¶doSthgoroutine}//æ¯1ç§workä¸ä¸,åæ¶ä¼å¤æctxæ¯å¦è¢«åæ¶,å¦ææ¯å°±éåºfuncdoSth(ctxcontext.Context){ vari=1for{ time.Sleep(1*time.Second)select{ case<-ctx.Done():fmt.Println("done")returndefault:fmt.Printf("work%dseconds:\n",i)}i++}}funcmain(){ fmt.Println("start...")timeoutHandler()fmt.Println("end.")}è¾åºç»æï¼
4 contextæ¯ä¸ä¸ªä¼é ç设计å?ç¡®å®,éè¿å¼å ¥Contextå ,ä¸ä¸ªrequestèå´å æægoroutineè¿è¡æ¶çåæ¶å¯ä»¥å¾å°æRæçæ§å¶.ä½æ¯è¿ç§è§£å³æ¹å¼å´ä¸å¤ä¼é .
4.1 context åç æ¯ä¸æ ·æ©æ£ä¸æ¦ä»£ç ä¸æå¤ç¨å°äºContext,ä¼ éContextåéï¼é常ä½ä¸ºå½æ°ç第ä¸ä¸ªåæ°ï¼ä¼åç æ¯ä¸æ ·è延å¨åå¤è°ç¨å®çå°æ¹. æ¯å¦å¨ä¸ä¸ªrequestä¸å®ç°æ°æ®åºäºå¡æè åå¸å¼æ¥å¿è®°å½, å建çcontext,ä¼ä½ä¸ºåæ°ä¼ éå°ä»»ä½ææ°æ®åºæä½ææ¥å¿è®°å½éæ±çå½æ°ä»£ç å¤. å³æ¯ä¸ä¸ªç¸å ³å½æ°é½å¿ é¡»å¢å ä¸ä¸ªcontext.Contextç±»åçåæ°,ä¸ä½ä¸ºç¬¬ä¸ä¸ªåæ°,è¿å¯¹æ å ³ä»£ç å®å ¨æ¯ä¾µå ¥å¼ç.
æ´å¤è¯¦ç»å 容å¯åè§ï¼Michal Strba çcontext-should-go-away-go2æç«
Google Groupä¸ç讨论å¯ç§»æ¥è¿é.
4.2 Context ä¸ä» ä» åªæ¯cancelä¿¡å·Contextæºå¶ææ ¸å¿çåè½æ¯å¨goroutineä¹é´ä¼ écancelä¿¡å·,ä½æ¯å®çå®ç°æ¯ä¸å®å ¨ç.
Cancelå¯ä»¥ç»å为主å¨ä¸è¢«å¨ä¸¤ç§,éè¿ä¼ écontextåæ°,让è°ç¨goroutineå¯ä»¥ä¸»å¨cancel被è°ç¨goroutine.ä½æ¯å¦ä½å¾ç¥è¢«è°ç¨goroutineä»ä¹æ¶åæ§è¡å®æ¯,è¿é¨åContextæºå¶æ¯æ²¡æå®ç°ç.èç°å®ä¸çç¡®åæä¸äºè¿æ ·çåºæ¯,æ¯å¦ä¸ä¸ªç»è£ æ°æ®çgoroutineå¿ é¡»çå¾ å ¶ä»goroutineå®ææå¯å¼å§æ§è¡,è¿æ¯contextææ¾ä¸å¤ç¨äº,å¿ é¡»åå©sync.WaitGroup.
funcserve(lnet.Listener)error{ varwgsync.WaitGroupvarconnnet.Connvarerrerrorfor{ conn,err=l.Accept()iferr!=nil{ break}wg.Add(1)gofunc(cnet.Conn){ deferwg.Done()handle(c)}(conn)}wg.Wait()returnerr}4.3 context.valuecontext.Valueç¸å½äºgoroutineçTLSï¼Thread Local Storageï¼,ä½å®ä¸æ¯éæç±»åå®å ¨ç,ä»»ä½ç»æä½åéé½å¿ é¡»ä½ä¸ºå符串形å¼åå¨.åæ¶,ææcontexté½ä¼å¨å ¶ä¸å®ä¹åé,å¾å®¹æé æå½åå²çª.
5 æ»ç»contextå éè¿æ建æ åå ³ç³»çContext,æ¥è¾¾å°ä¸ä¸å±Goroutineè½å¯¹ä¼ éç»ä¸ä¸å±Goroutineçæ§å¶.对äºå¤çä¸ä¸ªRequest请æ±æä½,éè¦éç¨contextæ¥å±å±æ§å¶Goroutine,以åä¼ éä¸äºåéæ¥å ±äº«.
Context对象ççåå¨æä¸è¬ä» 为ä¸ä¸ªè¯·æ±çå¤çå¨æ.å³é对ä¸ä¸ªè¯·æ±å建ä¸ä¸ªContextåéï¼å®ä¸ºContextæ ç»æçæ ¹ï¼;å¨è¯·æ±å¤çç»æå,æ¤éæ¤ctxåé,éæ¾èµæº.
æ¯æ¬¡å建ä¸ä¸ªGoroutine,è¦ä¹å°åæçContextä¼ éç»Goroutine,è¦ä¹å建ä¸ä¸ªåContextå¹¶ä¼ éç»Goroutine.
Contextè½çµæ´»å°åå¨ä¸åç±»å,ä¸åæ°ç®çå¼,并ä¸ä½¿å¤ä¸ªGoroutineå®å ¨å°è¯»åå ¶ä¸çå¼.
å½éè¿ç¶Context对象å建åContext对象æ¶,å¯åæ¶è·å¾åContextçä¸ä¸ªæ¤éå½æ°,è¿æ ·ç¶Context对象çå建ç¯å¢å°±è·å¾äºå¯¹åContextå°è¦è¢«ä¼ éå°çGoroutineçæ¤éæ.
å¨åContextè¢«ä¼ éå°çgoroutineä¸,åºè¯¥å¯¹è¯¥åContextçDoneä¿¡éï¼channelï¼è¿è¡çæ§,ä¸æ¦è¯¥ä¿¡éè¢«å ³éï¼å³ä¸å±è¿è¡ç¯å¢æ¤éäºæ¬goroutineçæ§è¡ï¼,åºä¸»å¨ç»æ¢å¯¹å½å请æ±ä¿¡æ¯çå¤ç,éæ¾èµæºå¹¶è¿å.
6 è´è°¢pkg/context
context-should-go-away-go2
ç解 Go Context æºå¶
context-isnt-for-cancellation
context-is-for-cancelation
thread-local-a-convenient-abomination
图解Go里面的WaitGroup了解编程语言核心实现源码
sync.WaitGroup核心实现逻辑简单,主要用于等待一组goroutine退出。它通过Add方法指定等待的goroutine数量,Done方法递减计数。计数为0时,等待结束。sync.WaitGroup内部使用了一个state1数组,其中只有一个元素,类型为[3]uint。这是为了内存对齐,确保数据按照4字节对齐,从而在位和位平台间兼容。视频聊天app源码
内部元素采用uint类型进行计数,长度为8字节。这是为了防止在位平台上对字节的uint操作可能不是原子的情况。使用uint保证了原子操作的执行和性能。在CPU缓存线(cache line)的上下文中,8字节长度可能有助于确保对缓存线的操作是原子的,从而避免数据损坏。
测试8字节指针的构造,验证了在经过编译器进行内存分配对齐后,如果元素指针的地址不能被8整除,则其地址+4可以被8整除。这展示了编译器层内存对齐的实现细节。
sync.WaitGroup中的8字节uint采用分段计数的方式,高位记录需要Done的数量,低位记录正在等待结束的计数。
源码的核心原理包括使用位uint进行计数,通过高位记录需要Done的struts2框架源码数量和低位记录等待的数量。当发现count>0时,Wait的goroutine会排队等待。任务完成后,goroutine执行Done操作,直到count==0,完成并唤醒所有等待的goroutine。
计数与信号量的实现通过根据当前指针的地址确定采用哪个分段进行计数和等待。添加等待计数和Done完成等待事件分别对应sync.WaitGroup的Add和Done方法。等待所有操作完成时,sync.WaitGroup确保所有任务完成。
为了深入理解这些概念,可以参考相关文章和资源,如关于CPU缓存线大小和原子操作的讨论。此外,更多源码分析文章可关注特定的公告号或网站,如www.sreguide.com。本篇文章由ArtiPub自动发布平台发布。
Go并发实战--sync WaitGroup
Go语言并发编程中,sync WaitGroup是一种极其实用的工具,它类似于Java的CyclicBarrier,但作用于协程。在处理并发任务时,WaitGroup可以帮助我们监控一组协程的执行状态,以便决定后续操作。其基本操作包括Add()增加等待数,Done()减少等待数,以及Wait()阻塞协程直到所有任务完成。下面将通过实例和原理深入探讨WaitGroup的使用和工作原理。语法基础
WaitGroup的核心功能体现在Add(), Done(), 和 Wait()三个函数上:- Add():增加等待数,可能加1或加n,它会调整计数器,当计数器为零时,等待的协程会被释放。
- Done():相当于Add(-1),用于减少等待数,确保在返回Wait之前计数器为零。
- Wait():阻塞当前协程,直到所有任务完成(即计数器为零)才继续执行。
例如:
(代码片段)
实现原理 WaitGroup的内部实现相当简洁,主要由一个结构体组成,其中包含一个计数器和一个信号量。Add()函数会以原子操作更新计数器,如果计数器减为零,所有等待的goroutine会被释放。需要注意的是:- 在创建goroutine或调用Wait之前,必须确保Add()的正增量调用已经完成。
- 如果重用WaitGroup,每次新的等待事件后,必须先完成之前的Wait调用。
源码解析 Wait()函数的源码实现了协程的阻塞与释放机制,当所有任务完成后,会解除阻塞并继续执行后续代码。总结
sync WaitGroup是Go并发编程中不可或缺的工具,它通过Add(), Done(), 和 Wait()函数协同管理协程,确保并发任务的正确执行顺序。掌握其用法和原理,有助于在实际项目中更高效地管理并发任务。基于 Golang 实现的 Shadowsocks 源码解析
本教程旨在解析基于Golang实现的Shadowsocks源码,帮助大家理解如何通过Golang实现一个隧道代理转发工具。首先,让我们从代理和隧道的概念入手。
代理(Proxy)是一种网络服务,允许客户端通过它与服务器进行非直接连接。代理服务器在客户端与服务器之间充当中转站,可以提供隐私保护或安全防护。隧道(Tunnel)则是一种网络通讯协议,允许在不兼容网络之间传输数据或在不安全网络上创建安全路径。
实验环境要求搭建从本地到远程服务器的隧道代理,实现客户端访问远程内容。基本开发环境需包括目标网络架构。实验目的为搭建隧道代理,使客户端能够访问到指定远程服务器的内容。
Shadowsocks通过TCP隧道代理实现,涉及客户端和服务端关键代码分析。
客户端处理数据流时,监听本地代理地址,接收数据流并根据配置文件获取目的端IP,将此IP写入数据流中供服务端识别。
服务端接收请求,向目的地址发送流量。目的端IP通过特定函数解析,实现数据流的接收与识别。
数据流转发利用io.Copy()函数实现,阻塞式读取源流数据并复制至目标流。此过程可能引入阻塞问题,通过使用协程解决。
解析源码可学习到以下技术点:
1. 目的端IP写入数据流机制。
2. Golang中io.Copy()函数实现数据流转发。
3. 使用协程避免阻塞式函数影响程序运行效率。
4. sync.WaitGroup优化并行任务执行。
希望本文能为你的学习之旅提供指导,欢迎关注公众号获取更多技术分析内容。
Go并åç¼ç¨ï¼goroutineï¼channelåsync详解
ä¼é ç并åç¼ç¨èå¼ï¼å®åç并åæ¯æï¼åºè²ç并åæ§è½æ¯Goè¯è¨åºå«äºå ¶ä»è¯è¨çä¸å¤§ç¹è²ãå¨å½ä»è¿ä¸ªå¤æ ¸æ¶ä»£ï¼å¹¶åç¼ç¨çæä¹ä¸è¨èå»ã使ç¨Goå¼å并åç¨åºï¼æä½èµ·æ¥é常ç®åï¼è¯è¨çº§å«æä¾å ³é®ågoç¨äºå¯å¨åç¨ï¼å¹¶ä¸å¨åä¸å°æºå¨ä¸å¯ä»¥å¯å¨æåä¸ä¸ä¸ªåç¨ã
ä¸é¢å°±æ¥è¯¦ç»ä»ç»ã
goroutineGoè¯è¨ç并åæ§è¡ä½ç§°ä¸ºgoroutineï¼ä½¿ç¨å ³é®è¯goæ¥å¯å¨ä¸ä¸ªgoroutineã
goå ³é®è¯åé¢å¿ é¡»è·ä¸ä¸ªå½æ°ï¼å¯ä»¥æ¯æåå½æ°ï¼ä¹å¯ä»¥æ¯æ åå½æ°ï¼å½æ°çè¿åå¼ä¼è¢«å¿½ç¥ã
goçæ§è¡æ¯éé»å¡çã
å æ¥çä¸ä¸ªä¾åï¼
packagemainimport("fmt""time")funcmain(){ gospinner(*time.Millisecond)constn=fibN:=fib(n)fmt.Printf("\rFibonacci(%d)=%d\n",n,fibN)//Fibonacci()=}funcspinner(delaytime.Duration){ for{ for_,r:=range`-\|/`{ fmt.Printf("\r%c",r)time.Sleep(delay)}}}funcfib(xint)int{ ifx<2{ returnx}returnfib(x-1)+fib(x-2)}ä»æ§è¡ç»ææ¥çï¼æå计ç®åºäºææ³¢é£å¥æ°åçå¼ï¼è¯´æç¨åºå¨spinnerå¤å¹¶æ²¡æé»å¡ï¼èä¸spinnerå½æ°è¿ä¸ç´å¨å±å¹ä¸æå°æ示å符ï¼è¯´æç¨åºæ£å¨æ§è¡ã
å½è®¡ç®å®ææ³¢é£å¥æ°åçå¼ï¼mainå½æ°æå°ç»æ并éåºï¼spinnerä¹è·çéåºã
åæ¥çä¸ä¸ªä¾åï¼å¾ªç¯æ§è¡æ¬¡ï¼æå°ä¸¤ä¸ªæ°çåï¼
packagemainimport"fmt"funcAdd(x,yint){ z:=x+yfmt.Println(z)}funcmain(){ fori:=0;i<;i++{ goAdd(i,i)}}æé®é¢äºï¼å±å¹ä¸ä»ä¹é½æ²¡æï¼ä¸ºä»ä¹å¢ï¼
è¿å°±è¦çGoç¨åºçæ§è¡æºå¶äºãå½ä¸ä¸ªç¨åºå¯å¨æ¶ï¼åªæä¸ä¸ªgoroutineæ¥è°ç¨mainå½æ°ï¼ç§°ä¸ºä¸»goroutineãæ°çgoroutineéè¿goå ³é®è¯å建ï¼ç¶å并åæ§è¡ãå½mainå½æ°è¿åæ¶ï¼ä¸ä¼çå¾ å ¶ä»goroutineæ§è¡å®ï¼èæ¯ç´æ¥æ´åç»ææægoroutineã
é£æ没æåæ³è§£å³å¢ï¼å½ç¶æ¯æçï¼è¯·å¾ä¸çã
channelä¸è¬åå¤è¿ç¨ç¨åºæ¶ï¼é½ä¼éå°ä¸ä¸ªé®é¢ï¼è¿ç¨é´éä¿¡ã常è§çéä¿¡æ¹å¼æä¿¡å·ï¼å ±äº«å åçãgoroutineä¹é´çéä¿¡æºå¶æ¯ééchannelã
使ç¨makeå建ééï¼
ch:=make(chanint)//chçç±»åæ¯chanintééæ¯æä¸ä¸ªä¸»è¦æä½ï¼sendï¼receiveåcloseã
ch<-x//åéx=<-ch//æ¥æ¶<-ch//æ¥æ¶ï¼ä¸¢å¼ç»æclose(ch)//å ³éæ ç¼å²channelmakeå½æ°æ¥å两个åæ°ï¼ç¬¬äºä¸ªåæ°æ¯å¯éåæ°ï¼è¡¨ç¤ºéé容éãä¸ä¼ æè ä¼ 0表示å建äºä¸ä¸ªæ ç¼å²ééã
æ ç¼å²ééä¸çåéæä½å°ä¼é»å¡ï¼ç´å°å¦ä¸ä¸ªgoroutineå¨å¯¹åºçééä¸æ§è¡æ¥æ¶æä½ãç¸åï¼å¦ææ¥æ¶å æ§è¡ï¼é£ä¹æ¥æ¶goroutineå°ä¼é»å¡ï¼ç´å°å¦ä¸ä¸ªgoroutineå¨å¯¹åºééä¸æ§è¡åéã
æ以ï¼æ ç¼å²ééæ¯ä¸ç§åæ¥ééã
ä¸é¢æ们使ç¨æ ç¼å²ééæä¸é¢ä¾åä¸åºç°çé®é¢è§£å³ä¸ä¸ã
packagemainimport"fmt"funcAdd(x,yint,chchanint){ z:=x+ych<-z}funcmain(){ ch:=make(chanint)fori:=0;i<;i++{ goAdd(i,i,ch)}fori:=0;i<;i++{ fmt.Println(<-ch)}}å¯ä»¥æ£å¸¸è¾åºç»æã
主goroutineä¼é»å¡ï¼ç´å°è¯»åå°ééä¸çå¼ï¼ç¨åºç»§ç»æ§è¡ï¼æåéåºã
ç¼å²channelå建ä¸ä¸ªå®¹éæ¯5çç¼å²ééï¼
ch:=make(chanint,5)ç¼å²ééçåéæä½å¨ééå°¾é¨æå ¥ä¸ä¸ªå ç´ ï¼æ¥æ¶æä½ä»ééç头é¨ç§»é¤ä¸ä¸ªå ç´ ãå¦æéé满äºï¼åéä¼é»å¡ï¼ç´å°å¦ä¸ä¸ªgoroutineæ§è¡æ¥æ¶ãç¸åï¼å¦æééæ¯ç©ºçï¼æ¥æ¶ä¼é»å¡ï¼ç´å°å¦ä¸ä¸ªgoroutineæ§è¡åéã
æ没ææè§ï¼å ¶å®ç¼å²ééåéåä¸æ ·ï¼ææä½é½è§£è¦äºã
ååchannelç±»åchan<-intæ¯ä¸ä¸ªåªè½åéçééï¼ç±»å<-chanintæ¯ä¸ä¸ªåªè½æ¥æ¶çééã
ä»»ä½ååééé½å¯ä»¥ç¨ä½ååééï¼ä½åè¿æ¥ä¸è¡ã
è¿æä¸ç¹éè¦æ³¨æï¼closeåªè½ç¨å¨åéééä¸ï¼å¦æç¨å¨æ¥æ¶ééä¼æ¥éã
çä¸ä¸ªååééçä¾åï¼
packagemainimport"fmt"funccounter(outchan<-int){ forx:=0;x<;x++{ out<-x}close(out)}funcsquarer(outchan<-int,in<-chanint){ forv:=rangein{ out<-v*v}close(out)}funcprinter(in<-chanint){ forv:=rangein{ fmt.Println(v)}}funcmain(){ n:=make(chanint)s:=make(chanint)gocounter(n)gosquarer(s,n)printer(s)}syncsyncå æä¾äºä¸¤ç§éç±»åï¼sync.Mutexåsync.RWMutexï¼åè æ¯äºæ¥éï¼åè æ¯è¯»åéã
å½ä¸ä¸ªgoroutineè·åäºMutexåï¼å ¶ä»goroutineä¸ç®¡è¯»åï¼åªè½çå¾ ï¼ç´å°é被éæ¾ã
packagemainimport("fmt""sync""time")funcmain(){ varmutexsync.Mutexwg:=sync.WaitGroup{ }//主goroutineå è·åéfmt.Println("Locking(G0)")mutex.Lock()fmt.Println("locked(G0)")wg.Add(3)fori:=1;i<4;i++{ gofunc(iint){ //ç±äºä¸»goroutineå è·åéï¼ç¨åºå¼å§5ç§ä¼é»å¡å¨è¿éfmt.Printf("Locking(G%d)\n",i)mutex.Lock()fmt.Printf("locked(G%d)\n",i)time.Sleep(time.Second*2)mutex.Unlock()fmt.Printf("unlocked(G%d)\n",i)wg.Done()}(i)}//主goroutine5ç§åéæ¾étime.Sleep(time.Second*5)fmt.Println("readyunlock(G0)")mutex.Unlock()fmt.Println("unlocked(G0)")wg.Wait()}RWMutexå±äºç»å ¸çååå¤è¯»æ¨¡åï¼å½è¯»é被å ç¨æ¶ï¼ä¼é»æ¢åï¼ä½ä¸é»æ¢è¯»ãèåéä¼é»æ¢åå读ã
packagemainimport("fmt""sync""time")funcmain(){ varrwMutexsync.RWMutexwg:=sync.WaitGroup{ }Data:=0wg.Add()fori:=0;i<;i++{ gofunc(tint){ //第ä¸æ¬¡è¿è¡åï¼å解éã//循ç¯å°ç¬¬äºæ¬¡æ¶ï¼è¯»éå®åï¼goroutine没æé»å¡ï¼åæ¶è¯»æåãfmt.Println("Locking")rwMutex.RLock()deferrwMutex.RUnlock()fmt.Printf("Readdata:%v\n",Data)wg.Done()time.Sleep(2*time.Second)}(i)gofunc(tint){ //åéå®ä¸æ¯éè¦è§£éåæè½åçrwMutex.Lock()deferrwMutex.Unlock()Data+=tfmt.Printf("WriteData:%v%d\n",Data,t)wg.Done()time.Sleep(2*time.Second)}(i)}wg.Wait()}æ»ç»å¹¶åç¼ç¨ç®æ¯Goçç¹è²ï¼ä¹æ¯æ ¸å¿åè½ä¹ä¸äºï¼æ¶åçç¥è¯ç¹å ¶å®æ¯é常å¤çï¼æ¬æä¹åªæ¯èµ·å°ä¸ä¸ªæç å¼ççä½ç¨èå·²ã
æ¬æå¼å§ä»ç»äºgoroutineçç®åç¨æ³ï¼ç¶åå¼åºäºééçæ¦å¿µã
ééæä¸ç§ï¼
æ ç¼å²éé
ç¼å²éé
ååéé
æåä»ç»äºGoä¸çéæºå¶ï¼åå«æ¯syncå æä¾çsync.Mutexï¼äºæ¥éï¼åsync.RWMutexï¼è¯»åéï¼ã
goroutineå大精深ï¼åé¢çåè¿æ¯è¦æ ¢æ ¢è¸©çã
æç« ä¸çèå¾åæºç é½ä¸ä¼ å°äºGitHubï¼æéè¦çåå¦å¯èªè¡ä¸è½½ã
å°åï¼github.com/yongxinz/gopher/tree/main/sc
ä½è ï¼yongxinz