老生常谈线程基础的码分几个问题
实现线程只有一种方式
我们知道启动线程至少可以通过以下四种方式:
实现Runnable接口
继承Thread类
线程池创建线程
带返回值的Callable创建线程
但是看它们的底层就一种方式,就是码分通过newThread()实现,其他的码分只不过在它的上面做了层封装。
实现Runnable接口要比继承Thread类的码分更好:
结构上分工更明确,线程本身属性和任务逻辑解耦。码分分时买卖点公式源码
某些情况下性能更好,码分直接把任务交给线程池执行,码分无需再次newThread()。码分
可拓展性更好:实现接口可以多个,码分而继承只能单继承。码分
有的码分时候可能会问到启动线程为什么是start()方法,而不是码分run()方法,这个问题很简单,码分执行run()方法其实就是码分在执行一个类的普通方法,并没有启动一个线程,而start()方法点进去看是一个native方法。
当我们在执行java中的start()方法的时候,它的底层会调JVM由c++编写的代码Thread::start,然后c++代码再调操作系统的create_thread创建线程,创建完线程以后并不会马上运行,要等待CPU的调度。CPU的调度算法有很多,比如先来先服务调度算法(FIFO),最短优先(就是对短作业的优先调度)、时间片轮转调度等。如下图所示:
线程的状态在Java中线程的生命周期中一共有6种状态。
NEW:初始状态,线程被构建,但是还没有调用start方法
RUNNABLE:运行状态,JAVA线程把操作系统中的ci项目源码就绪和运行两种状态统一称为运行中
BLOCKED:阻塞状态,表示线程进入等待状态,也就是线程因为某种原因放弃了CPU使用权
WAITING:等待状态
TIMED_WAITING:超时等待状态,超时以后自动返回
TERMINATED:终止状态,表示当前线程执行完毕
当然这也不是我说的,源码中就是这么定义的:
publicenumState{ /***Threadstateforathreadwhichhasnotyetstarted.*/NEW,/***Threadstateforarunnablethread.Athreadintherunnable*stateisexecutingintheJavavirtualmachinebutitmay*bewaitingforotherresourcesfromtheoperatingsystem*suchasprocessor.*/RUNNABLE,/***Threadstateforathreadblockedwaitingforamonitorlock.*Athreadintheblockedstateiswaitingforamonitorlock*toenterasynchronizedblock/methodor*reenterasynchronizedblock/methodaftercalling*{ @linkObject#wait()Object.wait}.*/BLOCKED,/***Threadstateforawaitingthread.*Athreadisinthewaitingstateduetocallingoneofthe*followingmethods:*<ul>*<li>{ @linkObject#wait()Object.wait}withnotimeout</li>*<li>{ @link#join()Thread.join}withnotimeout</li>*<li>{ @linkLockSupport#park()LockSupport.park}</li>*</ul>**<p>Athreadinthewaitingstateiswaitingforanotherthreadto*performaparticularaction.**Forexample,athreadthathascalled<tt>Object.wait()</tt>*onanobjectiswaitingforanotherthreadtocall*<tt>Object.notify()</tt>or<tt>Object.notifyAll()</tt>on*thatobject.Athreadthathascalled<tt>Thread.join()</tt>*iswaitingforaspecifiedthreadtoterminate.*/WAITING,/***Threadstateforawaitingthreadwithaspecifiedwaitingtime.*Athreadisinthetimedwaitingstateduetocallingoneof*thefollowingmethodswithaspecifiedpositivewaitingtime:*<ul>*<li>{ @link#sleepThread.sleep}</li>*<li>{ @linkObject#wait(long)Object.wait}withtimeout</li>*<li>{ @link#join(long)Thread.join}withtimeout</li>*<li>{ @linkLockSupport#parkNanosLockSupport.parkNanos}</li>*<li>{ @linkLockSupport#parkUntilLockSupport.parkUntil}</li>*</ul>*/TIMED_WAITING,/***Threadstateforaterminatedthread.*Thethreadhascompletedexecution.*/TERMINATED;}下面是这六种状态的转换:
New新创建New表示线程被创建但尚未启动的状态:当我们用newThread()新建一个线程时,如果线程没有开始调用start()方法,那么此时它的状态就是New。而一旦线程调用了start(),它的状态就会从New变成Runnable。
Runnable运行状态Java中的Runable状态对应操作系统线程状态中的两种状态,分别是Running和Ready,也就是说,Java中处于Runnable状态的线程有可能正在执行,也有可能没有正在执行,正在等待被分配CPU资源。
如果一个正在运行的线程是Runnable状态,当它运行到任务的一半时,执行该线程的CPU被调度去做其他事情,导致该线程暂时不运行,它的状态依然不变,还是Runnable,因为它有可能随时被调度回来继续执行任务。
在Java中Blocked、Waiting、TimedWaiting,这三种状态统称为阻塞状态,下面分别来看下。
Blocked从上图可以看出,从Runnable状态进入Blocked状态只有一种可能,就是进入synchronized保护的代码时没有抢到monitor锁,jvm会把当前的ribbon 源码解析线程放入到锁池中。当处于Blocked的线程抢到monitor锁,就会从Blocked状态回到Runnable状态。
Waiting状态我们看上图,线程进入Waiting状态有三种可能。
没有设置Timeout参数的Object.wait()方法,jvm会把当前线程放入到等待队列。
没有设置Timeout参数的Thread.join()方法。
LockSupport.park()方法。
Blocked与Waiting的区别是Blocked在等待其他线程释放monitor锁,而Waiting则是在等待某个条件,比如join的线程执行完毕,或者是notify()/notifyAll()。
当执行了LockSupport.unpark(),或者join的线程运行结束,或者被中断时可以进入Runnable状态。当调用notify()或notifyAll()来唤醒它,它会直接进入Blocked状态,因为唤醒Waiting状态的线程能够调用notify()或notifyAll(),肯定是已经持有了monitor锁,这时候处于Waiting状态的线程没有拿到monitor锁,就会进入Blocked状态,直到执行了notify()/notifyAll()唤醒它的线程执行完毕并释放monitor锁,才可能轮到它去抢夺这把锁,如果它能抢到,就会从Blocked状态回到Runnable状态。
TimedWaiting状态在Waiting上面是TimedWaiting状态,这两个状态是非常相似的,区别仅在于有没有时间限制,TimedWaiting会等待超时,由系统自动唤醒,java刮刮卡源码或者在超时前被唤醒信号唤醒。
以下情况会让线程进入TimedWaiting状态。
设置了时间参数的Thread.sleep(longmillis)方法。
设置了时间参数的Object.wait(longtimeout)方法。
设置了时间参数的Thread.join(longmillis)方法。
设置了时间参数的LockSupport.parkNanos(longnanos)。
LockSupport.parkUntil(longdeadline)方法。
在TimedWaiting中执行notify()和notifyAll()也是一样的道理,它们会先进入Blocked状态,然后抢夺锁成功后,再回到Runnable状态。当然,如果它的超时时间到了且能直接获取到锁/join的线程运行结束/被中断/调用了LockSupport.unpark(),会直接恢复到Runnable状态,而无需经历Blocked状态。
Terminated终止Terminated终止状态,要想进入这个状态有两种可能。
run()方法执行完毕,线程正常退出。
出现一个没有捕获的异常,终止了run()方法,最终导致意外终止。
线程的停止interrupt我们知道Thread提供了线程的一些操作方法,比如stop(),suspend()和resume(),这些方法已经被Java直接标记为@Deprecated,这就说明这些方法是不建议大家使用的。
因为stop()会直接把线程停止,这样就没有给线程足够的时间来处理想要在停止前保存数据的逻辑,任务戛然而止,预约APP源码会导致出现数据完整性等问题。这种行为类似于在linux系统中执行kill-9类似,它是一种不安全的操作。
而对于suspend()和resume()而言,它们的问题在于如果线程调用suspend(),它并不会释放锁,就开始进入休眠,但此时有可能仍持有锁,这样就容易导致死锁问题,因为这把锁在线程被resume()之前,是不会被释放的。
interrupt最正确的停止线程的方式是使用interrupt,但interrupt仅仅起到通知被停止线程的作用。而对于被停止的线程而言,它拥有完全的自主权,它既可以选择立即停止,也可以选择一段时间后停止,也可以选择压根不停止。
下面我们来看下例子:
publicclassInterruptExampleimplementsRunnable{ //interrupt相当于定义一个volatile的变量//volatilebooleanflag=false;publicstaticvoidmain(String[]args)throwsInterruptedException{ Threadt1=newThread(newInterruptExample());t1.start();Thread.sleep(5);//Main线程来决定t1线程的停止,发送一个中断信号,中断标记变为truet1.interrupt();}@Overridepublicvoidrun(){ while(!Thread.currentThread().isInterrupted()){ System.out.println(Thread.currentThread().getName()+"--");}}}执行一下,运行了一会就停止了
主线程在调用t1的interrupt()之后,这个线程的中断标记位就会被设置成true。每个线程都有这样的标记位,当线程执行时,会定期检查这个标记位,如果标记位被设置成true,就说明有程序想终止该线程。在while循环体判断语句中,通过Thread.currentThread().isInterrupt()判断线程是否被中断,如果被置为true了,则跳出循环,线程就结束了,这个就是interrupt的简单用法。
阻塞状态下的线程中断下面来看第二个例子,在循环中加了Thread.sleep秒。
publicclassInterruptSleepExampleimplementsRunnable{ //interrupt相当于定义一个volatile的变量//volatilebooleanflag=false;publicstaticvoidmain(String[]args)throwsInterruptedException{ Threadt1=newThread(newInterruptSleepExample());t1.start();Thread.sleep(5);//Main线程来决定t1线程的停止,发送一个中断信号,中断标记变为truet1.interrupt();}@Overridepublicvoidrun(){ while(!Thread.currentThread().isInterrupted()){ try{ Thread.sleep();}catch(InterruptedExceptione){ //中断标记变为falsee.printStackTrace();}System.out.println(Thread.currentThread().getName()+"--");}}}再来看下运行结果,卡主了,并没有停止。这是因为main线程调用了t1.interrupt(),此时t1正在sleep中,这时候是接收不到中断信号的,要sleep结束以后才能收到。这样的中断太不及时了,我让你中断了,你缺还在傻傻的sleep中。
Java开发的设计者已经考虑到了这一点,sleep、wait等方法可以让线程进入阻塞的方法使线程休眠了,而处于休眠中的线程被中断,那么线程是可以感受到中断信号的,并且会抛出一个InterruptedException异常,同时清除中断信号,将中断标记位设置成false。
这时候有几种做法:
直接捕获异常,不做处理,e.printStackTrace();打印下信息
将异常往外抛出,即在方法上throwsInterruptedException
再次中断,代码如下,加上Thread.currentThread().interrupt();
@Overridepublicvoidrun(){ while(!Thread.currentThread().isInterrupted()){ try{ Thread.sleep();}catch(InterruptedExceptione){ //中断标记变为falsee.printStackTrace();//把中断标记修改为trueThread.currentThread().interrupt();}System.out.println(Thread.currentThread().getName()+"--");}}这时候线程感受到了,我们人为的再把中断标记修改为true,线程就能停止了。一般情况下我们操作线程很少会用到interrupt,因为大多数情况下我们用的是线程池,线程池已经帮我封装好了,但是这方面的知识还是需要掌握的。感谢收看,多多点赞~
作者:小杰博士
LockSupportçparkçå¾ çåºå±å®ç°
ä»ä¸ä¸ç¯æç« ä¸çJDKç延è¿éåä¸,æç»æ¯éè¿LockSupport.parkå®ç°çº¿ç¨ççå¾ ï¼é£ä¹åºå±æ¯å¦ä½å®ç°çå¾ åè¶ æ¶çå¾ çï¼æ¬ææ们æ¥æ¢è®¨ä¸ä¸ãLockSupportçparkåunparkçæ¹æ³publicstaticvoidpark(){ UNSAFE.park(false,0L);}publicstaticvoidparkNanos(longnanos){ if(nanos>0)UNSAFE.park(false,nanos);}publicstaticvoidunpark(Threadthread){ if(thread!=null)UNSAFE.unpark(thread);}ä»ä¸é¢å¯ä»¥çå°å®é LockSupport.parkæ¯éè¿Unsafeççparkæ¹æ³å®ç°ï¼ä»ä¸é¢çæ¹æ³å¯ä»¥çåºè¿ä¸ªæ¯ä¸ä¸ªnativeæ¹æ³.
/***Blockscurrentthread,returningwhenabalancing*{ @codeunpark}occurs,orabalancing{ @codeunpark}has*alreadyoccurred,orthethreadisinterrupted,or,ifnot*absoluteandtimeisnotzero,thegiventimenanosecondshave*elapsed,orifabsolute,thegivendeadlineinmilliseconds*sinceEpochhaspassed,orspuriously(i.e.,returningforno*"reason").Note:ThisoperationisintheUnsafeclassonly*because{ @codeunpark}is,soitwouldbestrangetoplaceit*elsewhere.*/publicnativevoidpark(booleanisAbsolute,longtime);JVMçUnsafeçparkæ¹æ³ä»ä¸é¢JDKä¸ä»£ç ä¸å¯ä»¥threadçParkerç对象çparkæ¹æ³è¿è¡ä¸æ®µæ¶é´ççå¾ ã
UNSAFE_ENTRY(void,Unsafe_Park(JNIEnv*env,jobjectunsafe,jbooleanisAbsolute,jlongtime)){ HOTSPOT_THREAD_PARK_BEGIN((uintptr_t)thread->parker(),(int)isAbsolute,time);EventThreadParkevent;JavaThreadParkedStatejtps(thread,time!=0);thread->parker()->park(isAbsolute!=0,time);if(event.should_commit()){ constoopobj=thread->current_park_blocker();if(time==0){ post_thread_park_event(&event,obj,min_jlong,min_jlong);}else{ if(isAbsolute!=0){ post_thread_park_event(&event,obj,min_jlong,time);}else{ post_thread_park_event(&event,obj,time,min_jlong);}}}HOTSPOT_THREAD_PARK_END((uintptr_t)thread->parker());}UNSAFE_ENDThread.hppçæ件ä¸å é¨å®ä¹çPark对象
private:Parker_parker;public:Parker*parker(){ return&_parker;}ä¸é¢æ¯Os_posix.cppä¸æ¯Linuxä¸å®ç°çParkçparkçå®ç°æ¹å¼
é¦å å°_counterçåééè¿CAS设置为0ï¼è¿åå°±æ§çå¼ï¼å¦æä¹åæ¯å¤§äº0ï¼å说ææ¯å 许访é®ï¼ä¸ç¨é»å¡ï¼ç´æ¥è¿åã
è·åå½å线ç¨ã
å¤æ线ç¨æ¯å¦æ¯ä¸æä¸ï¼å¦ææ¯ï¼åç´æ¥è¿åï¼(ä¹å°±æ¯è¯´çº¿ç¨å¤äºä¸æç¶æä¸ä¼å¿½ç¥parkï¼ä¸ä¼é»å¡çå¾ )
å¤æå¦æä¼ å ¥çtimeåæ°å°äº0 æè æ¯ç»å¯¹æ¶é´å¹¶ä¸timeæ¯0,åç´æ¥è¿å,(ä¸é¢çUnsafeè°ç¨parkä¼ å ¥çåæ°æ¯ falseã0ï¼æ以ä¸æ»¡è¶³è¿ç§æ åµ)
å¦ætime大äº0ï¼å转æ¢æç»å¯¹æ¶é´ã
å建ThreadBlockInVM对象ï¼å¹¶ä¸è°ç¨pthread_mutex_trylockè·å线ç¨äºæ¥éï¼å¦æ没æè·åå°éï¼åç´æ¥è¿åï¼
å¤æ_counteråéæ¯å¦å¤§äº0ï¼å¦ææ¯ï¼åéç½®_counter为0ï¼éæ¾çº¿ç¨éï¼ç´æ¥è¿åã
è°ç¨ OrderAccess::fence(); å å ¥å åå±éï¼ç¦æ¢æ令éæåºï¼ç¡®ä¿å éåéæ¾éçæ令ç顺åºã
å建OSThreadWaitState对象ï¼
å¤ætimeæ¯å¦å¤§äº0ï¼å¦ææ¯0ï¼åè°ç¨pthread_cond_waitè¿è¡çå¾ ï¼å¦æä¸æ¯0ï¼ç¶åè°ç¨pthread_cond_timedwaitè¿è¡æ¶é´åæ°ä¸ºabsTimeççå¾ ï¼
è°ç¨pthread_mutex_unlockè¿è¡éæ¾_mutexéï¼
å次è°ç¨OrderAccess::fence()ç¦æ¢æ令éæåºã
//Parker::parkdecrementscountif>0,elsedoesacondvarwait.Unpark//setscountto1andsignalscondvar.Onlyonethreadeverwaits//onthecondvar.Contentionseenwhentryingtoparkimpliesthatsomeone//isunparkingyou,sodon'twait.Andspuriousreturnsarefine,sothere//isnoneedtotracknotifications.voidParker::park(boolisAbsolute,jlongtime){ //Optionalfast-pathcheck://Returnimmediatelyifapermitisavailable.//WedependonAtomic::xchg()havingfullbarriersemantics//sincewearedoingalock-freeupdateto_counter.if(Atomic::xchg(&_counter,0)>0)return;JavaThread*jt=JavaThread::current();//Optionaloptimization--avoidstatetransitionsifthere's//aninterruptpending.if(jt->is_interrupted(false)){ return;}//Next,demultiplex/decodetimeargumentsstructtimespecabsTime;if(time<0||(isAbsolute&&time==0)){ //don'twaitatallreturn;}if(time>0){ to_abstime(&absTime,time,isAbsolute,false);}//Entersafepointregion//Bewareofdeadlockssuchas.//Theper-threadParker::mutexisaclassicleaf-lock.//InparticularathreadmustneverblockontheThreads_lockwhile//holdingtheParker::mutex.Ifsafepointsarependingboththe//theThreadBlockInVM()CTORandDTORmaygrabThreads_lock.ThreadBlockInVMtbivm(jt);//Can'taccessinterruptstatenowthatweare_thread_blocked.Ifwe've//beeninterruptedsincewecheckedabovethen_counterwillbe>0.//Don'twaitifcannotgetlocksinceinterferencearisesfrom//unparking.if(pthread_mutex_trylock(_mutex)!=0){ return;}intstatus;if(_counter>0){ //nowaitneeded_counter=0;status=pthread_mutex_unlock(_mutex);assert_status(status==0,status,"invariant");//Paranoiatoensureourlockedandlock-freepathsinteract//correctlywitheachotherandJava-levelaccesses.OrderAccess::fence();return;}OSThreadWaitStateosts(jt->osthread(),false/*notObject.wait()*/);assert(_cur_index==-1,"invariant");if(time==0){ _cur_index=REL_INDEX;//arbitrarychoicewhennottimedstatus=pthread_cond_wait(&_cond[_cur_index],_mutex);assert_status(status==0MACOS_ONLY(||status==ETIMEDOUT),status,"cond_wait");}else{ _cur_index=isAbsolute?ABS_INDEX:REL_INDEX;status=pthread_cond_timedwait(&_cond[_cur_index],_mutex,&absTime);assert_status(status==0||status==ETIMEDOUT,status,"cond_timedwait");}_cur_index=-1;_counter=0;status=pthread_mutex_unlock(_mutex);assert_status(status==0,status,"invariant");//Paranoiatoensureourlockedandlock-freepathsinteract//correctlywitheachotherandJava-levelaccesses.OrderAccess::fence();Linuxæä½ç³»ç»æ¯å¦ä½å®ç°pthread_cond_timedwaitè¿è¡æ¶é´çå¾ çpthread_cond_timedwaitå½æ°ä½äºglibcä¸pthread_cond_wait.c, å¯ä»¥çå°æ¯è°ç¨__pthread_cond_wait_commonå®ç°
/*See__pthread_cond_wait_common.*/int___pthread_cond_timedwait(pthread_cond_t*cond,pthread_mutex_t*mutex,conststruct__timespec*abstime){ /*Checkparametervalidity.ThisshouldalsotellthecompilerthatitcanassumethatabstimeisnotNULL.*/if(!valid_nanoseconds(abstime->tv_nsec))returnEINVAL;/*RelaxedMOissufficebecauseclockIDbitisonlymodifiedinconditioncreation.*/unsignedintflags=atomic_load_relaxed(&cond->__data.__wrefs);clockid_tclockid=(flags&__PTHREAD_COND_CLOCK_MONOTONIC_MASK)?CLOCK_MONOTONIC:CLOCK_REALTIME;return__pthread_cond_wait_common(cond,mutex,clockid,abstime);}ä¸é¢__pthread_cond_wait_commonæ¯å®ç°éè¿__futex_abstimed_wait_cancelableå®ç°æ¶é´çå¾
static__always_inlineint__pthread_cond_wait_common(pthread_cond_t*cond,pthread_mutex_t*mutex,clockid_tclockid,conststruct__timespec*abstime){ ''çç¥''`err=__futex_abstimed_wait_cancelable(cond->__data.__g_signals+g,0,clockid,abstime,private);''çç¥''`}__futex_abstimed_wait_cancelableæ¯è°ç¨__futex_abstimed_wait_common
int__futex_abstimed_wait_cancelable(unsignedint*futex_word,unsignedintexpected,clockid_tclockid,conststruct__timespec*abstime,intprivate){ return__futex_abstimed_wait_common(futex_word,expected,clockid,abstime,private,true);}__futex_abstimed_wait_commonä¸é¢åæ¯éè¿å¤æå¹³å°æ¯ä½æè ä½,è°ç¨__futex_abstimed_wait_commonæè __futex_abstimed_wait_common
staticint__futex_abstimed_wait_common(unsignedint*futex_word,unsignedintexpected,clockid_tclockid,conststruct__timespec*abstime,intprivate,boolcancel){ interr;unsignedintclockbit;/*Workaroundthefactthatthekernelrejectsnegativetimeoutvaluesdespitethembeingvalid.*/if(__glibc_unlikely((abstime!=NULL)&&(abstime->tv_sec<0)))returnETIMEDOUT;if(!lll_futex_supported_clockid(clockid))returnEINVAL;clockbit=(clockid==CLOCK_REALTIME)?FUTEX_CLOCK_REALTIME:0;intop=__lll_private_flag(FUTEX_WAIT_BITSET|clockbit,private);#ifdef__ASSUME_TIME_SYSCALLSerr=__futex_abstimed_wait_common(futex_word,expected,op,abstime,private,cancel);#elseboolneed_time=abstime!=NULL&&!in_time_t_range(abstime->tv_sec);if(need_time){ err=__futex_abstimed_wait_common(futex_word,expected,op,abstime,private,cancel);if(err==-ENOSYS)err=-EOVERFLOW;}elseerr=__futex_abstimed_wait_common(futex_word,expected,op,abstime,private,cancel);#endifswitch(err){ case0:case-EAGAIN:case-EINTR:case-ETIMEDOUT:case-EINVAL:case-EOVERFLOW:/*Passedabsolutetimeoutusesbittime_ttype,butunderlyingkerneldoesnotsupportbittime_tfutexsyscalls.*/return-err;case-EFAULT:/*Musthavebeencausedbyaglibcorapplicationbug.*/case-ENOSYS:/*Musthavebeencausedbyaglibcbug.*//*Noothererrorsaredocumentedatthistime.*/default:futex_fatal_error();}}__futex_abstimed_wait_commonæ¯è°ç¨INTERNAL_SYSCALL_CANCELå®å®ä¹å®ç°
staticint__futex_abstimed_wait_common(unsignedint*futex_word,unsignedintexpected,intop,conststruct__timespec*abstime,intprivate,boolcancel){ if(cancel)returnINTERNAL_SYSCALL_CANCEL(futex_time,futex_word,op,expected,abstime,NULL/*Unused.*/,FUTEX_BITSET_MATCH_ANY);elsereturnINTERNAL_SYSCALL_CALL(futex_time,futex_word,op,expected,abstime,NULL/*Ununsed.*/,FUTEX_BITSET_MATCH_ANY);}ç³»ç»è°ç¨ççå®å®ä¹
/***Blockscurrentthread,returningwhenabalancing*{ @codeunpark}occurs,orabalancing{ @codeunpark}has*alreadyoccurred,orthethreadisinterrupted,or,ifnot*absoluteandtimeisnotzero,thegiventimenanosecondshave*elapsed,orifabsolute,thegivendeadlineinmilliseconds*sinceEpochhaspassed,orspuriously(i.e.,returningforno*"reason").Note:ThisoperationisintheUnsafeclassonly*because{ @codeunpark}is,soitwouldbestrangetoplaceit*elsewhere.*/publicnativevoidpark(booleanisAbsolute,longtime);0æ»ç»ä¸»è¦å¯¹LockSupportçparkçå¾ å®ç°çåºå±å®ç°çæµ æï¼é对äºLinuxçç³»ç»è°ç¨è¿æ²¡ææ¾å°æºç ï¼åç»ä¼ç»§ç»è·è¸ªï¼å¸ææ读è ç¥éç满å¸å¯ä»¥åç¥ä¸ï¼è°¢è°¢ã
é¾æ¥ï¼/post/ListenableFuture源码解析
ListenableFuture 是 spring 中对 JDK Future 接口的扩展,主要应用于解决在提交线程池的任务拿到 Future 后在 get 方法调用时会阻塞的问题。通过使用 ListenableFuture,可以向其注册回调函数(监听器),当任务完成时,触发回调。Promise 在 Netty 中也实现了类似的功能,用于处理类似 Future 的场景。
实现 ListenableFuture 的关键在于 FutureTask 的源码解析。FutureTask 是实现 Future 接口的基础类,ListenableFutureTask 在其基础上做了扩展。其主要功能是在任务提交后,当调用 get 方法时能够阻塞当前业务线程,直到任务完成时唤醒。
FutureTask 通过在内部实现一个轻量级的 Treiber stack 数据结构来管理等待任务完成的线程。这个数据结构由 WaitNode 节点组成,每个节点代表一个等待的线程。当业务线程调用 get 方法时,会将自己插入到 WaitNode 栈中,并且在插入的同时让当前线程进入等待状态。在任务执行完成后,会遍历 WaitNode 栈,唤醒等待的线程。
为了确保并发安全,FutureTask 使用 CAS(Compare and Swap)操作来管理 WaitNode 栈。每个新插入的节点都会使用 CAS 操作与栈顶节点进行比较,并在满足条件时更新栈顶。这一过程保证了插入操作的原子性,防止了并发条件下的数据混乱。同时,插入操作与栈顶节点的更新操作相互交织,确保了数据的一致性和完整性。
在 FutureTask 中,还利用了 LockSupport 类提供的 park 和 unpark 方法来实现线程的等待和唤醒。当线程插入到 WaitNode 栈中后,通过 park 方法将线程阻塞;任务执行完成后,通过 unpark 方法唤醒线程,完成等待与唤醒的流程。
综上所述,ListenableFuture 通过扩展 FutureTask 的功能,实现了任务执行与线程等待的高效管理。通过注册监听器并利用 CAS 操作与 LockSupport 方法,实现了在任务完成时通知回调,解决了异步任务执行时的线程阻塞问题,提高了程序的并发处理能力。
Java ä¸LockSupportç±»å¨C#ä¸çå®ç°
ããJava ä¹åæä¾ä¼ç§ç并ååºncurrent Netä¸ç¼ºä¹ç±»ä¼¼çåè½ ç±äºç¡¬ä»¶ä½ç³»åçäºåå å¤æ ¸æ¶ä»£æ¥ä¸´ NETä¸ç¼ºä¹å¹¶åç±»åºæ¾ç¶ä¸åæ¶å® ç¼è§£è¿ä¸çç¾çå ¶ä¸ä¸ä¸ªåæ³å°±æ¯å¨å¾C#ä¸ç§»æ¤javaçncurrentããjavaä¸çncurrentå ä¸æä¾äºä¸ä¸ªç±»LockSupport ncurrentå å¾å¤å ³é®å®ç°éè¦è°ç¨LockSupport å¦æéè¦æjavaçncurrentå è¿ç§»å°C#ä¸ LockSupportç±»çè¿ç§»æ¯ä¸å¯é¿å çé®é¢
ããå¨javaä¸ LockSupportç±»æå¦ä¸æ¹æ³
ãã以ä¸æ¯å¼ç¨ç段
ãã
ããpublic static void park(Object blocker) { ããThread t = Thread currentThread(); ããsetBlocker(t blocker); ããunsafe park(false L); ããsetBlocker(t null); ãã}
ããå½ä¸ä¸ªçº¿ç¨è°ç¨LockSupport parkä¹å 线ç¨å°±ä¼åä¸è½½ 类似äºObject wait æè NETä¸çSystem Threading Monitor Wait ä½é®é¢æ¯javaä¸çObject waitå NETä¸çMonitor wait é½éè¦ä¸ä¸ªwaitObject è¿ä¸ªé®é¢æ¾ç»å°æ°æ 为æ¤ç¿»äºä¸éJDK å®ç°æºç å°æååç°ç解å³åæ³å´æ¯å¾ç®å ä¹æ éäºè§£JDKçåºå±å®ç°æºç
ãã以ä¸æ¯å¼ç¨ç段
ãã
ããpublic class LockSupport ãã{ ããprivate static LocalDataStoreSlot slot = Thread GetNamedDataSlot ( LockSupport Park ); ããpublic static void Park(Object blocker) ãã{ ããThread thread = Thread CurrentThread; ããThread SetData(slot blocker); ããlock (thread) ãã{ ããMonitor Wait(thread); ãã} ãã} ããpublic static void Unpark(Thread thread) ãã{ ããif (thread == null) return; ããlock (thread) ãã{ ããMonitor Pulse(thread); ãã} ãã} ãã}
lishixinzhi/Article/program/net//从HotSpot源码,深度解读 park 和 unpark
我最近建立了一个在线自习室(App:番茄ToDO)用于相互监督学习,感兴趣的小伙伴可以加入。自习室加入码:D5A7A
Java并发包下的类大多基于AQS(AbstractQueuedSynchronizer)框架实现,而AQS线程安全的实现依赖于两个关键类:Unsafe和LockSupport。
其中,Unsafe主要提供CAS操作(关于CAS,在文章《读懂AtomicInteger源码(多线程专题)》中讲解过),LockSupport主要提供park/unpark操作。实际上,park/unpark操作的最终调用还是基于Unsafe类,因此Unsafe类才是核心。
Unsafe类的实现是由native关键字说明的,这意味着这个方法是原生函数,是用C/C++语言实现的,并被编译成了DLL,由Java去调用。
park函数的作用是将当前调用线程阻塞,而unpark函数则是唤醒指定线程。
park是等待一个许可,unpark是为某线程提供一个许可。如果线程A调用park,除非另一个线程调用unpark(A)给A一个许可,否则线程A将阻塞在park操作上。每次调用一次park,需要有一个unpark来解锁。
并且,unpark可以先于park调用,但不管unpark先调用多少次,都只提供一个许可,不可叠加。只需要一次park来消费掉unpark带来的许可,再次调用会阻塞。
在Linux系统下,park和unpark是通过Posix线程库pthread中的mutex(互斥量)和condition(条件变量)来实现的。
简单来说,mutex和condition保护了一个叫_counter的信号量。当park时,这个变量被设置为0,当unpark时,这个变量被设置为1。当_counter=0时线程阻塞,当_counter>0时直接设为0并返回。
每个Java线程都有一个Parker实例,Parker类的部分源码如下:
由源码可知,Parker类继承于PlatformParker,实际上是用Posix的mutex和condition来实现的。Parker类里的_counter字段,就是用来记录park和unpark是否需要阻塞的标识。
具体的执行逻辑已经用注释标记在代码中,简要来说,就是检查_counter是不是大于0,如果是,则把_counter设置为0,返回。如果等于零,继续执行,阻塞等待。
unpark直接设置_counter为1,再unlock mutex返回。如果_counter之前的值是0,则还要调用pthread_cond_signal唤醒在park中等待的线程。源码如下:
(如果不会下载JVM源码可以后台回复“jdk”,获得下载压缩包)
2024-11-30 09:17
2024-11-30 07:59
2024-11-30 07:09
2024-11-30 07:08
2024-11-30 06:59