AFABoot源码阅读手册-Kernel图形转码

AFABoot源码阅读手册-Kernel图形转码

AFABoot源码阅读手册-Kernel图形转码

[TOC]

AFABoot源码阅读手册-Kernel图形转码

本内容对应afa6体系的开发脚手架基座。主要面向组内的开发人员,辅助解析源码。

Kernel图形转码

​ 在afa6的设计里面,有一个很关键的设计思想:图码双行。也就是利用图形的方式构建业务逻辑,然后通过编译器转换为代码,要求代码要符合boot开发的习惯,做到:

  1. 出入参明确,每一个图形节点的入参和出参的定义都是清晰明确的,其类型也明确,不再使用afa5的JavaDict以及JavaList数据容器
  2. 每个图形节点的调用本质上都是一次方法调用。
  3. 组件的定义同样保留,但是在图形开发时不仅可以将特殊定义的组件类作为组件使用,还可以使用JPAMybatis接口类描述的方法作为组件使用
  4. 需要完整的异常处理逻辑
  5. 为了业务表达清晰明确,生成的代码也要保证开发者可以从源码中方便识别业务的流转过程
  6. 无状态,需要接入spring体系实现统一异常处理或者其它的AOP操作
  7. 支持for for each switch callback try-catch-finally等特殊的操作节点
  8. 兼容TCResult,在使用多出口的组件时避免通过抛异常的方式影响性能

方案概要

​ 在整体的状态描述上,区别于afa5的TCResult对象,在afa6中,一个方法调用的正常返回对应成功状态,抛出ReturnStatusException对应多状态出口异常,实际的返回状态是这个异常中的某个成员变量,抛出任意非ReturnStatusException异常,对应0状态出口异常

​ 为了实现上面的各种需求,我们提取了几个关键的技术点:reactor响应式框架lambda泛型,利用reactor驱动控制业务的处理流程,发布者的角色相当于发起一次组件调用,其返回值(或异常)通过reactor模型流转,决定下一个需要执行的节点,通过lambda的方式,可以很简单的将任意的组件调用包装为一次函数调用,同样通过泛型的方式明确每个节点的返回值即可以做到出参的定义都是清晰明确的。

​ 至于对TCResult的兼容,我们可以采用包装的方式,根据返回的状态码重新包装成对应的ReturnStatusException异常即可

详细设计

INode接口

首先我们先看一下对流程图上一个节点的定义。对应cn.com.agree.afa.afaboot.kernel.INode

public interface INode<T,R extends INode<T,R>> extends Executable {
    /**
     * 设置Call数据源
     *
     * @param consumer 数据源
     * @return
     */
    R onCall(Caller<T> consumer) ;
    /**
     * 设置Runnable数据源,仅可用于无返回值调用
     *
     * @return
     */
    R onRun(Runnable runnable) ;
    /**
     * 支持多状态多出参,要求值类型必须是List<Object>
     *
     * @param resultCaller
     * @return
     */
    R onMutable(Caller<TCResult> resultCaller);
    /**
     * 遍历数据源
     *
     * @param iterableSupplier
     * @return
     */
    R onIterable(Supplier<Iterable<T>> iterableSupplier);
    /**
     * 遍历数据源
     *
     * @param iteratorSupplier
     * @return
     */
    R onIterator(Supplier<Iterator<T>> iteratorSupplier);
    
    /**
     * callback数据源
     * @param supplier
     * @return
     */
    R onDispatch(Supplier<ObjectDispatcher<T>> supplier);
    
    /**
     * 从子流程获取操作结果
     * @param entry
     * @return
     */
    R onConcat(INode<?,?>  entry);
    /**
     * 正常出口
     *
     * @param next
     * @return
     */
    R onSuccess(INode<?,?> next) ;
    /**
     * 异常出口
     *
     * @param status
     * @param next
     * @return
     */
    R onError(int status, INode<?,?> next);
}

public interface Executable<T> {
    /**
     * 执行
     * @return
     * @throws Throwable
     */
    T execute() throws Throwable ;
}

可以看到对于一个节点,我们设定了run call mutable dispatch concat 等多种操作方式,这几个方法用于设定这个节点的真实执行内容,然后onSuccess onError 两个方法则用于将节点之间串联起来(即分别设定成功出口、其它状态出口),最后业务运行时通过execute方法触发运行。execute方法是没有入参的,这是因为没有统一的数据容器,所以无法定义统一的入参,因此在整体设计上,我们使用了线程变量的方式,定义时引用的是LocalVariable变量,运行时再从LocalVariable变量中获取实际的值

FluxSupport

这是基于reactor框架的flux机制实现的INode支撑类。cn.com.agree.afa.afaboot.kernel.FluxSupport

* 基于Flux的骨架,
* 考虑流程图编译成代码后,体现原流程的可读性,代码清晰整洁,不需要展现底层非业务部分逻辑
* 节点的异常出口处理只对应当前节点自身的业务逻辑部分异常
* 当前节点的异常没有被处理,该异常会往上级节点传播,但是默认情况下上级节点继续往上传播异常,不对异常进行处理
* 上级节点可以选择处理子节点的业务异常,如果处理成功后则中断本次异常传播
* 节点0出口用于处理非{@link ReturnStatusException}异常以及{@link ReturnStatusException}异常状态为0的部分
public abstract class FluxSupport<T, R extends FluxSupport<T, R>> implements INode<T, R>, Tracker {
    private final AtomicReference<Flux<T>> flux = new AtomicReference<>();
    protected Logger logger = LoggerFactory.getLogger(getClass());
    public static final byte masterState = 0x01;
    public static final byte branchState = 0x02;
    /**
     * 调用路径跟踪
     */
    private static final ThreadLocal<Stack<FluxSupport<?, ?>>> tracer = new ThreadLocal<Stack<FluxSupport<?, ?>>>() {
        @Override
        protected Stack<FluxSupport<?, ?>> initialValue() {
            return new Stack();
        }
    };
    private String desc = String.valueOf(hashCode());
    /**
     * 防止状态重复连接
     */
    private BitSet stateSet = new BitSet(10240);
    protected final int id;
    private boolean handChildError = false;
    protected final RuntimeContext<T> runtime = RuntimeContext.newInstance(this);
    private AtomicBoolean completed = new AtomicBoolean();
    private boolean isStream = false;
    protected volatile FluxSupport<?, ?> defaultSuccess;
    
    protected volatile Linker<T> linker;
}
tracer是用于跟踪在运行过程中实际的节点出入栈过程,Stack是一个简单的栈实现,通过tracer机制,在输出日志时可以动态感知当前的调用位置,方便排查问题
RuntimeContext是整个运行过程的上下文,每个Node都有属于自己的上下文,Node在实际运行中产生的数据均存放在上下文中,如异常、运行结果、下一个节点信息等数据
protected ExecuteContext<T> runNow(ExecuteContext<?> parent) throws Throwable {
        tracer.get().push(this);//将当前运行节点添加到跟踪栈中
        //        runtime.remove();//重置当前的运行数据容器
        initIfNecessary();//第一次正式运行时结束所有的初始化动作
        Optional<Flux<T>> optionalMono = Optional.of(get());
        optionalMono.orElseThrow(() -> new Error("Node Not Found"));
        try {
            //            System.out.println(identify() + id + " entry...");
            logInfo(">>>>>>>>>>>> entry >>>>>>>>>>>>");
            runtime.pushIntoStack(parent);//创建一个运行上下文,同一个Node在同一个请求处理中可能会被重复调用,每次调用对应一个运行上下文
            logDebug("putting new context into stack.");
            runtime.setRunningState(masterState);
            optionalMono.ifPresent(flux -> flux.subscribe(null, throwable -> {
            }, null, null));//触发执行动作
            Throwable throwable = getThrowable();//如果最后存在异常没有被成功处理,在这里进行判断
            if (throwable != null) {
                if (handleError(throwable)) {
                    throwable = getThrowable();
                    if (throwable != null) {
                        logInfo("exception can't be handled at last. Throwing again. Desc:{}", throwable.getMessage());
                        throw throwable;
                    }
                } else {
                    throw throwable;
                }
            }
            return runtime.getExecuteContext();
        } finally {
            try {
                handleFinally();
            } finally {
                logDebug("remove context from stack.");
                logInfo("<<<<<<<<<<<< exit <<<<<<<<<<<<");
                runtime.removeFromStack();
                tracer.get().pop();
            }
        }
    }

​ 这个是运行的入口,分为初始化上下文、触发订阅执行、结束后检查是否有异常未处理,如果有说明可能是后续节点抛出的异常或者子节点抛出的异常,这种异常不应该在当前节点处理,需要往上继续传递

​ 这里面有一个特殊的概念,ExecuteContext,这是执行上下文,每进入一次节点都对应一个不同的执行上下文,换句话说,在同一次请求中如果同一个节点多次执行就会存在多个执行上下文,这种机制可以保证在存在环路调用时执行环境的一致性。

设置成功节点、失败节点

通过为每个节点设置成功、失败出口,将不同节点串连起来,以下例子:

		@Override
        @PostConstruct
        protected void build() {
            //Node定义,明确返回值类型
            FluxNode<UserPO> node2 = FluxNode.callable(2);
            FluxNode<Long> node3 = FluxNode.callable(3);
            FluxNode<UserPO> successEnd4 = FluxNode.callable(4);
            FluxNode<ReturnStatusException> errorEnd5 = FluxNode.callable(5);
            
            //定义保存各个节点的容器
            Reference<UserPO> node2Ref = node2;
            Reference<Long> node3Ref = node3;
            
            /**
             * 各个Node的调用方式以及出口异常处理
             */
            
            node2.onCall(() -> {
                UserPO info = userComp.findUser(billDigest.value().getUid());
                return info;
            }).onSuccess(node3).onError(0, node3);
            
            node3.onCall(() -> {
                System.out.println("流水登记:" + node2Ref.value());
                return System.currentTimeMillis();
            }).onSuccess(successEnd4).onError(0, errorEnd5);
            
            bindEndNode(successEnd4,
                () -> node2Ref.safeValue().orElseThrow(() -> new ReturnStatusException(0, "User Not Found")));
            bindErrorEndNode(errorEnd5, () -> new ReturnStatusException(0, "User Not Found"));
            bindStartNode(node2);
        }

上面可以看到node2节点的成功状态下一个节点是node3,0状态下一个节点是node3,0状态还有一个含义就是默认异常也是node3。同理,node3也是同样的方式关联node4和node5的。实际的图形如下:

image-20210701113143880

接下来,以设置成功状态为例说明具体的操作步骤,设置失败的方式也是类似的

/**
     * 正常出口
     *
     * @param next
     * @return
     */
    public R onSuccess(INode<?, ?> next) {
        checkState();
        checkStatus(1);
        logInfo("Setting success branch:{}", ((FluxSupport<?, ?>) next).id);
        _onSuccess(wrapSuccess((FluxSupport<?, ?>) next));
        //        defaultSuccess = next;
        return (R) this;
    }

    /**
     * 绑定成功状态到下一个节点
     *
     * @param next 下一个节点
     * @return
     */
    private Consumer<T> wrapSuccess(FluxSupport<?, ?> next) {
        return v -> {
            logDebug("executed successfully, next nodeId is \"{}\"", next.getId());
            this.setNext(next);
        };
    }

	protected void _onSuccess(Consumer<T> next) {
        if (linker != null) {
            set(linker.linkSuccess(get(), next, this));
        } else {
            set(get().doOnNext(next));
        }
    }

public interface Linker<T> {
    /**
     * 关联成功出口
     *
     * @param flux
     * @param next
     */
    Flux<T> linkSuccess(Flux<T> flux, Consumer<T> next, FluxSupport<?, ?> node);
    
    class NEW {
        static <T> Linker<T> callable() {
            return (flux, next, node) -> {
                return flux.doOnNext(next);
            };
        }
        
        static <T> Linker<T> runnable() {
            return (flux, next, node) -> {
                return flux.doOnComplete(() -> {
                    if (node.getThrowable() == null)
                        next.accept(null);
                });
            };
        }
    }
}

首先就是检查节点的状态,确实是否已经初始化过了,然后检查1状态的节点是否已经连接。接下来有个wrapSuccess的调用,它的作用就是将一个节点包装成一个可连接的对象,并返回一个lambda函数实现,然后通过_onSuccess方法将这个lambda函数实现设置到flux的合适位置,这样子当节点执行成功后,数据流到达相应的拦截点就会触发函数调用,从而将下一个节点设置为成功节点。设置的时候有个Linker接口,这是因为flux不能传递Null值,也就是说如果是无返回的组件调用就不能触发正常的doOnNext拦截点,所以抽象了这个接口,分别对应有响应和无响应两种实现。

设置执行方式

onCall

需要非null返回结果的调用

FluxNode<UserPO> node2 = FluxNode.callable(2);
node2.onCall(() -> {
    UserPO info = userComp.findUser(billDigest.value().getUid());
    return info;
}).onSuccess(node3).onError(0, node3);

onCall要求传入一个实现了cn.com.agree.afa.afaboot.kernel.Caller接口的实现,一般上这里会使用lambda方式,内容体为调用组件方法。可以抛出任意异常,如果抛出ReturnStatusException则可以指定异常状态码

/**
     * 带结果的调用
     *
     * @param consumer
     * @return
     */
    protected R _onCall(Caller<T> consumer) {
        checkState();
        Function<FluxSupport<T, R>, Flux<T>> monoFunction = node -> Flux.<T>from(callOrError(consumer))
            .concatWith(Flux.create(fluxSink -> {
                if (handChildError && runtime.getRunningState() == branchState && getThrowable() != null) {
                    logInfo("branch(call) error caused. In handleChildError mode, try to handle child error:{}",
                        getThrowable().getMessage());
                    fluxSink.error(getThrowable());
                } else {
                    logTrace("branch finish successfully.");
                    fluxSink.complete();
                }
            })).doOnError(node::setThrowable).doOnNext(node::setResult);
        this.set(monoFunction.apply(this));
        return (R) this;
    }

这里主要利用Flux的from函数创建一个发布者,通过callOrError方法将Caller包装成发布者。后面的concatWith主要是用于在主流程结束后处理子节点的异常。这种处理模式与后面的onRun等方法是一样的,区别只在于包装发布者的方式不一样

private Publisher<T> callOrError(Caller<T> consumer) {
        Optional.<Caller<T>>of(consumer).orElseThrow(() -> new ReturnStatusException(0, "Consumer Not Defined!"));
        return Mono.create((sink) -> {
            try {
                T value = Optional.ofNullable(consumer.call()).orElseThrow(() -> {
                    return new NoValueReturnedException("Unexpected Null result.[" + identify() + "::" + id + "]");
                });
                sink.success(value);
            } catch (Throwable throwable) {
                logInfo("error caused. Msg:{}", throwable.getMessage());
                sinkException(sink, throwable);
            }
        });
    }

Mono.create()这个是reator的编程式api,允许通过编程的方式调用success发布数据,通过error发布异常

onRun

无结果调用,即返回Void

private Publisher<T> callOrErrorWithNoResponse(Runnable runnable) {
        final Runnable fRun = Optional.<Runnable>ofNullable(runnable)
            .orElseThrow(() -> new ReturnStatusException(0, "Runnable Not Defined!"));
        return Flux.create(fluxSink -> {
            try {
                logDebug("runnable call beginning");
                fRun.run();
                if (getThrowable() != null) {
                    logInfo("runnable call error caused:{}", getThrowable().getMessage());
                    fluxSink.error(getThrowable());
                }
            } catch (Throwable e) {
                logInfo("runnable call error caused, msg:{}", e.getMessage());
                sinkException(fluxSink, e);
            } finally {
                fluxSink.complete();
            }
            logDebug("runnable call finished");
        });
    }

类似的处理方式,这里面会在调用结束后判断是否有异常未处理,这是因为Runnable接口无法抛出未检查异常,所以用户可能直接通过setThrowable方法设置异常

onMutable

支持TCResult返回的调用

 /**
     * 多状态调用
     *
     * @return
     */
    protected R _onMutable(Caller<TCResult> resultCaller) {
        checkState();
        Function<FluxSupport<T, R>, Flux<T>> monoFunction = node -> Flux.from(callOrError(() -> {
            TCResult result = Optional.of(resultCaller).isPresent() ? resultCaller.call() : null;
            Optional.ofNullable(result).orElseThrow(() -> new ReturnStatusException(0, "TCResult is null"));
            if (result.getStatus() != TCResult.STAT_SUCCESS) {
                throw new ReturnStatusException(result.getStatus(),
                    result.getErrorMsg() == null ? "TCResult Error:" + result.getStatus() : result.getErrorMsg());
            }
            return (T) Optional.ofNullable(result.getOutputParams()).orElseThrow(
                () -> new ReturnStatusException(0, "TCResult's status is 1, but OutputParams attribute is null"));
        })).concatWith(Flux.create(fluxSink -> {
            if (handChildError && runtime.getRunningState() == branchState && getThrowable() != null) {
                logInfo("branch(mutable) error caused. In handleChildError mode, try to handle child error:{}",
                    getThrowable().getMessage());
                fluxSink.error(getThrowable());
            } else {
                logTrace("branch finish successfully.");
            }
            fluxSink.complete();
        })).doOnError(node::setThrowable).doOnNext(node::setResult);
        this.set((Flux<T>) monoFunction.apply(this));
        return (R) this;
    }

这里底层实际上用的还是Caller的接口实现,只是要求返回值必须是TCResult的类型。通过lambda表达式包装了一层,对TCResult结果进行判断,如果是非1状态就转换为ReturnStatusException发布,这样就可以复用call的模式了

onIterable onIterator

这两个实际上是一样的功能,都是用于遍历。

private Publisher<T> callOrErrorWithIterable(Supplier<Iterable<T>> iterable) {
        Optional.<Supplier<Iterable<T>>>ofNullable(iterable)
            .orElseThrow(() -> new ReturnStatusException(0, "Iterable Not Defined!"));
        return callOrErrorWithIterator(() -> {
            Iterator<T> it = Optional.ofNullable(iterable.get())
                .orElseThrow(() -> new ReturnStatusException(0, "Source is null")).iterator();
            return it;
        });
    }
    
    private Publisher<T> callOrErrorWithIterator(Supplier<Iterator<T>> iteratorSupplier) {
        Optional.<Supplier<Iterator<T>>>ofNullable(iteratorSupplier)
            .orElseThrow(() -> new ReturnStatusException(0, "Iterator Not Defined!"));
        return Flux.create(fluxSink -> {
            try {
                Iterator<T> it = Optional.ofNullable(iteratorSupplier.get())
                    .orElseThrow(() -> new ReturnStatusException(0, "Source is null"));
                int count = 0;
                logDebug("iterator call beginning");
                while (it.hasNext() && !fluxSink.isCancelled()) {
                    logTrace("iterator call. count:{}", ++count);
                    Optional.ofNullable(it.next()).ifPresent(fluxSink::next);
                    if (getThrowable() != null) {
                        logInfo("iterator call error caused at:{}, msg:{}", count, getThrowable().getMessage());
                        //                        fluxSink.error(getFinalThrowable());
                        sinkException(fluxSink, getThrowable());
                        break;
                    }
                }
            } catch (Throwable e) {
                logInfo("iterator call error caused, msg:{}", e.getMessage());
                sinkException(fluxSink, e);
            } finally {
                fluxSink.complete();
            }
            logDebug("iterator call finished");
        });
    }

因为是需要多次发布数据的,所以由Mono换成了Flux,其它都是差不多的方式

onDispatch

回调方式调用。这种方式可以理解为一种编程式的接口,发布的数据是动态的,由用户指定

private Publisher<? extends T> callOrErrorWithDispatch(Supplier<ObjectDispatcher<T>> dispatcherSupplier) {        Optional.<Supplier<ObjectDispatcher<T>>>ofNullable(dispatcherSupplier)            .orElseThrow(() -> new ReturnStatusException(0, "Dispatcher Not Defined!"));        return Flux.create(fluxSink -> {            try {                ObjectDispatcher<T> it = Optional.ofNullable(dispatcherSupplier.get())                    .orElseThrow(() -> new ReturnStatusException(0, "Source is null"));                logDebug("dispatcher call beginning");                ObjectDispatcher.Sinker<T> sinker = new ObjectDispatcher.Sinker<T>() {                    @Override                    public void next(T obj) throws ReturnStatusException {                        fluxSink.next(obj);                        Throwable throwable = getThrowable();                        if (throwable != null) {                            if (throwable instanceof ReturnStatusException) {                                throw (ReturnStatusException) throwable;                            } else {                                throw new ReturnStatusException(0, throwable);                            }                        }                    }                                        @Override                    public int safeNext(T obj) {                        fluxSink.next(obj);                        Throwable throwable = getThrowable();                        if (throwable != null) {                            if (throwable instanceof ReturnStatusException) {                                return ((ReturnStatusException) throwable).getStatus();                            } else {                                return 0;                            }                        }                        return 1;                    }                                        @Override                    public int error(Throwable e) {                        fluxSink.error(e);                        return 0;                    }                };                TCResult result = it.sink(sinker);                if (result != null) {                    if (result.getStatus() == TCResult.STAT_SUCCESS) {                        setResult((T) result.getOutputParams());                    } else {                        setThrowable(new ReturnStatusException(result.getStatus(), result.getErrorMsg()));                        fluxSink.error(getThrowable());                    }                }            } catch (Throwable e) {                logInfo("dispatcher call error caused, msg:{}", getThrowable().getMessage());                fluxSink.error(e);            } finally {                fluxSink.complete();            }            logDebug("dispatcher call finished");                    });    }public interface ObjectDispatcher<T> {    public TCResult sink(Sinker<T> sinker);    interface Sinker<T>{        void next(T obj) throws ReturnStatusException;        int safeNext(T obj);        int error(Throwable e);    }}

实际上就是通过ObjectDispatcher接口将Sinker发布器交给用户处理,这样子用户拿到sinker实例后就可以自行决定如何发布数据了,实际上底层还是Flux的sinker

node7.onDispatch(() -> {
    return sinker -> {
      st_logger.debug("Dispatcher in:");
      for (int i = 0; i < 10; i++) {
          sinker.next("now is xxxxxxxxx" + i);
      }
      return TCResult.newSuccessResult();
   };
}).onSubMap(node7Join()).onSuccess(end);


@Component
public class CallbackComp {
    public ObjectDispatcher<String> callback(List<String> items) {
        return sinker -> {
            for(String it:items){
                sinker.next(it);
            }
            return new TCResult(1);
        };
    }
}

实际上for foreach机制也是可以利用callback方式实现的,这种编程式的api具备相当的灵活性

Stream模式与非Stream模式

Stream模式
stream模式就是当前节点执行完自身的逻辑代码后,不返回,而是直接调起下一个节点的执行逻辑,在代码上表现的效果就是 A->B->B(end)->A(end),在这种模式下A节点实际上还是处于执行中的状态,只是不再位于栈顶。
这种模式有个最大的问题就是会导致java的方法栈很长,很容易出现栈溢出问题,特别是真实场景下可能一个图有上百个节点时更加明显。所以这种模式一般只用于特殊的要求:
	* Step节点执行内部节点逻辑时
	* 逻辑图的第一个start节点,因为这个是一个空白节点,它返回后意味着整个Step就结束了
	* for foreach callback concat这种存在子图的操作模式
protected void handleStreamMode() {
        set(get().doOnEach((signal) -> {
            if (signal.isOnNext() || signal.isOnError()) {
                if (isStream) {
                    Optional.ofNullable(clearNext()).ifPresent((v) -> {
                        logDebug("with stream mode, will execute next node directly. next nodeId is:{}", v.id);
                        runMono(v);
                    });
                }
            }
        }));
    }

这个模式实际上就是在flux流程中直接执行下一个节点

非Stream模式
非Stream模式就是通过一个简单Engine不断获取下一个节点之后执行
private class _Engine {
        private final ExecuteContext<?> parent;
        
        public _Engine(ExecuteContext<?> parent) {
            this.parent = parent;
        }
        
        public ExecuteContext<T> call() throws Throwable {
            FluxSupport<?, ?> node = FluxSupport.this;
            ExecuteContext<?> rParent = this.parent;
            ExecuteContext<T> obj = FluxSupport.this.runNow(this.parent);
            node = node.clearNext();
            rParent = obj;
            try {
                while (node != null) {
                    rParent = node.runNow(rParent);
                    node = node.clearNext();
                    if (node != null) {
                        logDebug("next nodeId is:{}", node.getId());
                    }
                }
            } catch (Throwable e) {
                logWarn("Execute has been interrupted by unexpected error.{}",
                    StringUtils.hasText(e.getMessage()) ? e.getMessage() : ExceptionUtils.toDetailString(e));
                throw e;
            }
            return obj;
        }
 }

上下文设计

运行时的上下文整体使用了线程变量的方式存储,对应的实现类为:cn.com.agree.afa.afaboot.kernel.context.RuntimeContext
每个节点都有唯一的上下文变量,运行时通过该变量访问属于自己的数据。

<u>要求每次执行交易前都需要调用:RuntimeContext#reset方法初始化上下文,否则会运行异常**​ </u>

public class RuntimeContext<T> {
    private static final String PASSCODE = "runtimeContext";
    /**
     * 运行时实际的执行栈
     */
    static final ThreadLocal<Stack<ExecuteContext<?>>> EXECUTE_CONTEXT = new ThreadLocal<>();
    /**
     * 运行过程中访问过的节点上下文
     */
    static final ThreadLocal<ConcurrentHashSet<ContextReference>> CONTEXT_REFERENCES = new ThreadLocal<ConcurrentHashSet<ContextReference>>() {
        @Override
        protected ConcurrentHashSet<ContextReference> initialValue() {
            return null;
        }
    };
    private static final ThreadLocal<Options> OPTIONS = new ThreadLocal<Options>() {
        @Override
        protected Options initialValue() {
            return new Options();
        }
    };
    private final ContextAccess<T> actual;
    private final String credential;
    private final FluxSupport<T, ?> node;
    
    private RuntimeContext(FluxSupport<T, ?> node) {
        this.node = node;
        credential = Shadows.createShadow(PASSCODE, ContextAccess.class, (id, cls) -> {
            try {
                return SafeInstances.instance(id, cls, true, target -> {
                    CONTEXT_REFERENCES.get().add(new ContextReference(RuntimeContext.this));//关联的shadow初始化时将当前上下文填充到缓存
                    return target.newInstance();
                });
            } catch (Exception e) {
                throw new ShadowCreateException(e);
            }
        });
        actual = Shadows.getShadow(credential, PASSCODE, ContextAccess.class);
    }
    
    @Override
    protected void finalize() throws Throwable {
        super.finalize();
        Shadows.destroyShadow(credential, PASSCODE);
    }
    
    /**
     * 创建新的上下文实例
     *
     * @param <T>
     * @return
     */
    public static <T> RuntimeContext<T> newInstance(FluxSupport<T, ?> node) {
        return new RuntimeContext<>(node);
    }
    
    /**
     * 获取当前的运行上下文。需要在整体交易开始前调用{@link RuntimeContext#reset()}方法初始化
     *
     * @return
     */
    public static RuntimeContext<?> currentContext() {
        Stack<ExecuteContext<?>> stack = EXECUTE_CONTEXT.get();
        if (stack != null && stack.get() != null) {
            return stack.get().runtimeContext;
        }
        throw new IllegalStateException(
            "Unable to access RuntimeContext。Make sure RuntimeContext.reset() has been called.");
    }
    
    public static Options options() {
        return OPTIONS.get();
    }
    
    /**
     * 重置当前所有的上下文
     */
    public static void reset() {
        reset(null);
    }
    
    /**
     * 重置当前所有的上下文
     */
    public static void reset(Options options) {
        SafeInstances.reset();
        Optional.ofNullable(CONTEXT_REFERENCES.get()).ifPresent(contextReferences -> {
            Iterator<ContextReference> iterator = contextReferences.iterator();
            ContextReference reference;
            while (iterator.hasNext()) {
                reference = iterator.next();
                if (!reference.reset()) {
                    iterator.remove();
                }
            }
            CONTEXT_REFERENCES.remove();
        });
        CONTEXT_REFERENCES.set(new ConcurrentHashSet<>(24));
        
        EXECUTE_CONTEXT.set(new Stack<>());
        
        if (options != null) {
            OPTIONS.set(options);
        } else {
            OPTIONS.remove();
        }
    }
    
    /**
     * 安全方式获取当前的运行上下文
     *
     * @return
     */
    public static Optional<RuntimeContext<?>> safeCurrentContext() {
        Stack<ExecuteContext<?>> stack = EXECUTE_CONTEXT.get();
        if (stack != null && stack.get() != null) {
            return Optional.ofNullable(stack.get().runtimeContext);
        }
        return Optional.empty();
    }
    
    /**
     * 获取当前的执行上下文
     *
     * @return
     */
    public ExecuteContext<T> getExecuteContext() {
        return actual.getExecuteContext();
    }
    
    /**
     * 在栈顶创建一个新的上下文.
     *
     * @param parent 相关联的父级上下文
     * @return
     */
    public ExecuteContext pushIntoStack(ExecuteContext<?> parent) {
        safeCheck();
        actual.pushIntoStack(new ExecuteContext<>(this, node, parent));
        Optional.ofNullable(EXECUTE_CONTEXT.get()).ifPresent(contextStack -> {
            contextStack.push(actual.getExecuteContext());
        });
        return actual.getExecuteContext();
    }
    
    /**
     * 移除栈顶的上下文
     */
    public void removeFromStack() {
        safeCheck();
        actual.removeFromStack();
        Optional.ofNullable(EXECUTE_CONTEXT.get()).ifPresent(contextStack -> {
            contextStack.pop();
        });
    }
    
    /**
     * 获取当前的执行状态
     *
     * @return
     */
    public byte getRunningState() {
        return actual.getRunningState();
    }
    
    /**
     * 设置当前的执行状态
     *
     * @param runningState
     */
    public void setRunningState(byte runningState) {
        actual.setRunningState(runningState);
    }
    
    /**
     * 获取下一个执行节点
     *
     * @return
     */
    public FluxSupport<?, ?> getNext() {
        return actual.getNext();
    }
    
    /**
     * 设置下一个执行节点
     *
     * @param next
     */
    public void setNext(FluxSupport<?, ?> next) {
        actual.setNext(next);
    }
    
    /**
     * 获取该节点的执行栈
     *
     * @return
     */
    public Stack<ExecuteContext<T>> getStack() {
        return actual.getStack();
    }
    
    /**
     * 获取最后一次的执行结果
     *
     * @return
     */
    public T getLastValue() {
        return actual.getLastValue();
    }
    
    /**
     * 设置最后一次的执行异常
     *
     * @param lastError
     */
    public void setLastError(Throwable lastError) {
        actual.setLastError(lastError);
    }
    
    /**
     * 设置最后一次的执行结果
     *
     * @param lastValue
     */
    public void setLastValue(T lastValue) {
        actual.setLastValue(lastValue);
    }
    
    /**
     * 获取最后一次的执行异常
     *
     * @return
     */
    public Throwable getLastError() {
        return actual.getLastError();
    }
    
    /**
     * 安全检查
     */
    private void safeCheck() {
    }
    
    /**
     * 清理重置上下文
     */
    private void clean() {
        actual.clean();
    }
    
    private static class ContextReference extends WeakReference<RuntimeContext> {
        
        public ContextReference(RuntimeContext<?> referent) {
            super(referent);
        }
        
        public boolean reset() {
            RuntimeContext<?> context = get();
            if (context != null) {
                context.clean();
                return true;
            }
            return false;
        }
    }
}

主要通过pushIntoStack 以及 removeFromStack 两个方法实现上下文的出栈入栈操作,保持运行中的上下文位于栈顶。实际上的执行上下文对应的是ExecuteContext 对象,RuntimeContext用于保存最终的执行结果或者异常,以及获取ExecuteContext执行上下文。

/** * 执行上下文,对于每个{@link FluxSupport}节点,多次执行会产生多个执行上下文 * @author LYF * @version 1.0 * @date 2021/6/21 11:56 * @since */public final class ExecuteContext<T> {    final RuntimeContext<T> runtimeContext;    final FluxSupport<?, ?> parent;    final FluxSupport<T, ?> self;    final int deep;    Throwable throwable;    //下一个同级节点    ExecuteContext<?> next;    //第一个子节点    ExecuteContext<?> childContext;    //父节点    final ExecuteContext<?> parentContext;    T result;    final String desc;        protected ExecuteContext(RuntimeContext<T> runtimeContext, FluxSupport<T, ?> self,        ExecuteContext<?> parent) {        this.runtimeContext = runtimeContext;        this.parentContext = parent;        this.self = self;        this.desc = self.tracker() + self.traceId();        if (this.parentContext != null) {            this.parent = this.parentContext.self;            this.parentContext.addChild(this);        } else {            this.parent = null;        }        this.deep = determineDeep();    }        private int determineDeep() {        Stack<ExecuteContext<?>> contextStack = RuntimeContext.EXECUTE_CONTEXT.get();        if (contextStack != null) {            return contextStack.deep() + 1;        }        if (this.parentContext != null) {            return this.parentContext.deep + 1;        } else {            return 1;        }    }        /**     * 搜寻Root节点     *     * @return     */    public ExecuteContext<?> toRoot() {        ExecuteContext<?> first = this;        while (first.parentContext != null) {            first = first.parentContext;        }        return first;    }        /**     * 获取直接子节点   A     * /  \     * B     C     *     * @return     */    public Iterator<ExecuteContext<?>> forDirectChild() {        final ExecuteContext<?> firstChild = this.childContext;        return new Iterator<ExecuteContext<?>>() {            ExecuteContext current = firstChild;                        @Override            public boolean hasNext() {                return current != null;            }                        @Override            public ExecuteContext<?> next() {                ExecuteContext<?> ret = current;                current = current.next;                return ret;            }        };    }        /**     * 获取当前节点的调用路径, 从当前节点向前遍历直到Root节点     *     * @return     */    public Iterator<ExecuteContext<?>> forTrace() {        final ExecuteContext<?> firstChild = this;        return new Iterator<ExecuteContext<?>>() {            ExecuteContext current = firstChild;                        @Override            public boolean hasNext() {                return current != null;            }                        @Override            public ExecuteContext<?> next() {                ExecuteContext<?> ret = current;                current = current.parentContext;                return ret;            }        };    }        /**     * 以当前节点为基点, 广度优先方式向下遍历     *     * @return     */    public Iterator<ExecuteContext<?>> tree() {        final ExecuteContext<?> firstChild = this;        return new Iterator<ExecuteContext<?>>() {            ExecuteContext current = firstChild;            Queue<ExecuteContext<?>> contextQueue;                        @Override            public boolean hasNext() {                if (contextQueue == null) {                    contextQueue = new ArrayDeque<>();                    initStack(current, contextQueue);                }                return !contextQueue.isEmpty();            }                        private void initStack(ExecuteContext<?> current,                Queue<ExecuteContext<?>> contextQueue) {                if (current == null) {                    return;                }                contextQueue.add(current);                Iterator<ExecuteContext<?>> iterator = current.forDirectChild();                while (iterator.hasNext()) {                    initStack(iterator.next(), contextQueue);                }            }                        @Override            public ExecuteContext<?> next() {                return contextQueue.poll();            }        };    }        /**     * 添加一个子节点     *     * @param child     */    private void addChild(ExecuteContext<?> child) {        if (childContext == null) {            this.childContext = child;        } else {            ExecuteContext<?> last = this.childContext;            int max = 10;            while (last.next != null) {                if (max-- < 0) {//防止处于循环体中大量上下文导致内存溢出                    this.childContext = child;                    return;                }                last = last.next;            }            last.next = child;        }    }        public String getDesc() {        return desc;    }        public int deep() {        return deep;    }        /**     * 获取实际的运行结果,可能从最终上下文中获取     *     * @return     */    public T getFinalResult() {        return result == null ? runtimeContext.getLastValue() : result;    }        /**     * 获取当前结果     * @return     */    public T getResult(){        return result;    }        /**     * 获取实际的运行异常,可能从最终上下文中获取     *     * @return     */    public Throwable getFinalThrowable() {        return throwable == null ? runtimeContext.getLastError() : throwable;    }        /**     * 获取运行时异常     *     * @return     */    public Throwable getThrowable() {        return throwable ;    }        public void setThrowable(Throwable throwable) {        this.throwable = throwable;    }        public void setNext(ExecuteContext<?> next) {        this.next = next;    }        public void setResult(T result) {        this.result = result;    }        public FluxSupport<?, ?> getParent() {        return parent;    }        public FluxSupport<T, ?> getSelf() {        return self;    }

执行上下文中除了保存某次调用的真实结果,还提供了toRoot(从当前节点往前搜索找到Root的执行节点、forDirectChild(获取直接的子节点)、forTrace(获取当前节点的调用路径, 从当前节点向前遍历直到Root节点)、tree(以当前节点为基点, 广度优先方式向下遍历),这些方法就是后续为我们跟踪调用图的调用过程提供基础的数据。

参数设计

/** * 获取当前调用结果以及异常 * @author LYF * @version 1.0 * @date 2021/4/8 14:18 * @since */public interface Reference<T> {    /**     * 获取结果     * @return 值不存在时返回null     */    T value();        /**     * 安全获取结果     * @return     */    Optional<T> safeValue();        /**     * 是否异常     * @return     */    default boolean hasError(){        return safeError().isPresent();    }        /**     * 获取异常     * @return 值不存在时返回null     */    Throwable error();        /**     * 安全获取异常     * @return     */    Optional<Throwable> safeError();        /**     * 获取父节点结果     * @return 值不存在时返回null     */    Object parentValue();        /**     * 安全获取父节点结果     * @return     */    Optional<Object> safeParentValue();        /**     * 获取父节点异常     * @return 值不存在时返回null     */    Throwable parentError();        /**     * 安全获取父节点异常     * @return     */    Optional<Throwable> safeParentError();        /**     * 上级节点是否异常     * @return     */    default boolean hasParentError(){        return safeParentError().isPresent();    }}

这个是参数获取的接口定义,每个节点都提供了相应的实现用于对外提供获取结果的方式。要注意的是这里获取到的都是最终的真实结果,对于执行中的节点是无法获取的。同样的ExecuteContext中也同样可以获取结果,那个结果是运行中的结果,只有节点处于执行中或者执行完毕才能正常获取