如何使用ftrace实时获取系统中的spinlock快照

接上文:
https://blog.csdn.net/dog250/article/details/108349046
在这篇文章中,我给出了一个拯救panic的方法,其目的更多的是恶作剧性质。但仍然有不足,请看下面代码段中的注释:

void stub_panic(const char *fmt, ...)
{
    ...
    local_irq_enable();
    // 这个时候如果current持有自旋锁,那可怎么办???
    printk("rq:%d   %d\n", preempt_count(), irqs_disabled());
    __set_current_state(TASK_UNINTERRUPTIBLE);
    schedule();

如果持有自旋锁的task被schedule出去了,由于该task不会再回来了,那么只要另一个task抢锁,系统立马就会死锁。

需求自然就出来了,我能不能在调用schedule之前遍历系统当前所有自旋锁的持锁情况,将自己持有的自旋锁给unlock了呢?

有需求就有方案。当然可以。

大秀hook手艺的时候来了。虽然作为手艺人用手工的方式拼接指令可以实现任何功能,但现在的目标已经不仅仅是秀手艺了,而是实现上述的需求,所以我尽量先使用stap/kprobe,ftrace这些场面还不是太宏大的工具。

需求图示如下:
在这里插入图片描述

很显然,直接的思路就是使用kretprobe了,于是我写出了如下的代码:

#include <linux/module.h>
#include <linux/kallsyms.h>
#include <linux/kprobes.h>

struct lock_owner {
    struct raw_spinlock *lock;
    struct task_struct *owner;
    struct task_struct *lock_owner;
    int idx;
};

struct lock_owner  plane[256];
unsigned long bitmap[4];

int lock_handler(struct kretprobe_instance *ri, struct pt_regs *regs)
{
    struct lock_owner *owner, **pdata;
    struct raw_spinlock *lock = (struct raw_spinlock *)regs->di;
    int i = -1, retry = 10;

    pdata = (struct lock_owner **)&ri->data;
    *pdata = NULL;

again:
    i = find_first_zero_bit(&bitmap[0], 256);
    if (i > 256 || test_and_set_bit(i, bitmap)) {
        if (retry --)
            goto again;
        goto end;
    }
    owner->idx = i;
    owner = &plane[i];
    owner->lock = lock;
    owner->owner = current;
    owner->lock_owner = NULL;
    *pdata = owner;
end:
    return 0;
}

int lock_ret_handler(struct kretprobe_instance *ri, struct pt_regs *regs)
{
    struct lock_owner *owner = (struct lock_owner *)&ri->data[0];

    if (owner == NULL)
        return 0;
    owner->lock_owner = current;
    return 0;
}

static struct kretprobe lock_kretprobe = {
    .handler = lock_ret_handler,
    .entry_handler = lock_handler,
    .data_size = sizeof(struct lock_owner *),
    .maxactive              = 20,
};

int unlock_handler(struct kretprobe_instance *ri, struct pt_regs *regs)
{
    struct lock_owner *owner, **pdata;
    struct raw_spinlock *lock = (struct raw_spinlock *)regs->di;
    int i = -1;

    pdata = (struct lock_owner **)&ri->data;
    *pdata = NULL;
    for (i = 0; i < 256; i++) {
        owner = &plane[i];
        if (test_bit(owner->idx, &bitmap[0]) &&
            owner->owner == current &&
            owner->lock_owner == current &&
            owner->lock == lock &&
            owner->idx == i) {
            *pdata = owner;
            break;
        }
    }
    return 0;
}

int unlock_ret_handler(struct kretprobe_instance *ri, struct pt_regs *regs)
{
    struct lock_owner *owner = (struct lock_owner *)&ri->data[0];

    if (owner == NULL)
        return 0;
    owner->lock = NULL;
    owner->owner = NULL;
    owner->lock_owner = NULL;
    owner->idx = -1;
    clear_bit(owner->idx, &bitmap[0]);

    return 0;
}

static struct kretprobe unlock_kretprobe = {
    .handler = unlock_ret_handler,
    .entry_handler = unlock_handler,
    .data_size = sizeof(struct lock_owner *),
    .maxactive              = 20,
};

static int __init lock_detect_init(void)
{
    memset((char *)&plane[0], 0, sizeof(plane));
    memset((char *)&bitmap[0], 0, sizeof(plane));

    lock_kretprobe.kp.symbol_name = "_raw_spin_lock";
    register_kretprobe(&lock_kretprobe);
    unlock_kretprobe.kp.symbol_name = "_raw_spin_unlock";
    register_kretprobe(&unlock_kretprobe);

    return 0;
}

static void __exit lock_detect_exit(void)
{
    unregister_kretprobe(&lock_kretprobe);
    unregister_kretprobe(&unlock_kretprobe);
}

module_init(lock_detect_init);
module_exit(lock_detect_exit);

请忽略具体的spinlock统计逻辑,现在仅仅关注框架,我敢说,这个玩法对于一般的wrap函数,简直就是模版:

  • 这个框架可以实现几乎一切wrap函数。

所谓的wrap函数其实很简单:

my_function(void *param)
{
    pre_process(param);
    orig_function(param);
    post_process(param);
}

kretprobe的跳转逻辑如下:
在这里插入图片描述

然而,它偏偏不能用于_raw_spin_lock/_raw_spin_unlock的wrap!因为会死锁!

  • kprobe/kretprobe框架内部使用spinlock来进行同步,在ret_handler执行的时候,它已经持有了该spinlock,而属于kretprobe的ret_handler本身同样也需要该spinlock,因此会死锁。

啦啦啦,这就是为什么我喜欢纯手工活儿的原因了,因为它可控啊!

来来来,试试ftrace,相比于kretprobe而言,它更简单,我觉得它应该没问题,如果再不行,就只能上纯手工艺了。

先来试试框架,下面是一个什么都不做的框架代码:

#include <linux/module.h>
#include <linux/kallsyms.h>
#include <linux/ftrace.h>

struct wrap_hook {
    char *name;
    struct ftrace_ops ops;
    unsigned long wrap_func;
    unsigned long entry;
};

void (*real_raw_spin_lock)(spinlock_t *lock);

// _raw_spin_lock的wrap函数!
void my_raw_spin_lock(spinlock_t *lock)
{
    real_raw_spin_lock(lock);
}

void (*real_raw_spin_unlock)(spinlock_t *lock);

// _raw_spin_unlock的wrap函数!
void my_raw_spin_unlock(spinlock_t *lock)
{
    real_raw_spin_unlock(lock);
}
struct wrap_hook spinlock_hooks[2] = {
    {
        .name = "_raw_spin_lock",
        .wrap_func = (unsigned long)my_raw_spin_lock,
    },
    {
        .name = "_raw_spin_unlock",
        .wrap_func = (unsigned long)my_raw_spin_unlock,
    }
};

// 该函数是一个汇聚器,起到将逻辑return到new function的目的。
// 采用汇聚器可以避免为两个函数编写两个独立的hook函数,这是设计模式应用的例子。
void stub_func(unsigned long ip, unsigned long parent_ip,
        struct ftrace_ops *ops, struct pt_regs *regs)
{
    struct wrap_hook *hook = container_of(ops, struct wrap_hook, ops);

    // 替换返回地址
    regs->ip = hook->wrap_func;
}

static int wrap_spinlock_init(void)
{
    int i;

    for (i = 0; i < 2; i++) {
        spinlock_hooks[i].ops.func = stub_func;
        spinlock_hooks[i].ops.flags = FTRACE_OPS_FL_SAVE_REGS|FTRACE_OPS_FL_RECURSION_SAFE;
        spinlock_hooks[i].entry = kallsyms_lookup_name(spinlock_hooks[i].name);
        if (i == 0)
            real_raw_spin_lock = (void *)(spinlock_hooks[i].entry + MCOUNT_INSN_SIZE);
        else
            real_raw_spin_unlock = (void *)(spinlock_hooks[i].entry + MCOUNT_INSN_SIZE);
        ftrace_set_filter_ip(&spinlock_hooks[i].ops, spinlock_hooks[i].entry, 0, 0);
        register_ftrace_function(&spinlock_hooks[i].ops);
    }
    return 0;
}

static void wrap_spinlock_exit(void)
{
    int i;

    for (i = 0; i < 2; i++) {
        unregister_ftrace_function(&spinlock_hooks[i].ops);
        ftrace_set_filter_ip(&spinlock_hooks[i].ops, spinlock_hooks[i].entry, 1, 0);
    }
}
module_init(wrap_spinlock_init);
module_exit(wrap_spinlock_exit);

MODULE_LICENSE("GPL");

当我加载这个模块的时候,系统像什么都没有发生一样平静,而我用crash看_raw_spin_lock/_raw_spin_unlock的时候,显然它们已经被hook了,这意味着,wrap起作用了。

下图展示了ftrace实现wrap的原理:
在这里插入图片描述
这个已经和手工做法几乎无异了。可以拿这个ftrace和上面的kretprobe对比一下,感受一下雷同和差异:

  • kretprobe一般也是用来probe函数而不是指令,这一点和ftrace一致。
  • ftrace机制上更加直接,而kretprobe则更加trick一点。
  • 如果要完成一个业务需求,我选择用ftrace,如果要表演,我会照抄一套kretprobe的机制,如果纯自己玩,我选择纯手工艺。

好了,现在把spinlock统计逻辑放进去,代码简单,自己感受:

#include <linux/module.h>
#include <linux/kallsyms.h>
#include <linux/ftrace.h>

struct wrap_hook {
    char *name;
    struct ftrace_ops ops;
    unsigned long wrap_func;
    unsigned long entry;
};

struct lock_owner {
    int idx;
    spinlock_t *lock;
    struct task_struct *owner;
    struct task_struct *lock_owner;
};

struct lock_owner  plane[32768];
unsigned long bitmap[512];

void (*real_raw_spin_lock)(spinlock_t *lock);
void (*real_raw_spin_unlock)(spinlock_t *lock);

#pragma GCC optimize ("O0")
void my_raw_spin_lock(spinlock_t *lock)
{
    struct lock_owner *owner = NULL;
    int i = -1, retry = 0;

    // 通篇的header和tailer不要有spinlock,因此即便是printk也不能使用,否则会造成嵌套死锁(毕竟把spin_lock/unlock给hook了)。
    // 因此,下面实现了一个简单的lock机制,使用原子的test_and_set操作。
again:
    i = find_first_zero_bit(bitmap, 32768);
    if (test_and_set_bit(i, bitmap)) {
        if (retry ++ > 10)
            goto real;
        goto again;
    }
    owner = &plane[i];
    owner->idx = i;
    owner->lock = lock;
    owner->owner = current;
    owner->lock_owner = NULL;
    set_bit(i, bitmap);
real:
    barrier();
    real_raw_spin_lock(lock);

    barrier();
    if (owner)
        owner->lock_owner = current;
}

#if 0
void test()
{
    struct lock_owner *owner = NULL;
    int i = -1;
    int cnt = 4;

again:
    spin_lock(&maplock);
    i = find_first_zero_bit(bitmap, 32768);
    set_bit(i, bitmap);
    spin_unlock(&maplock);
    owner = &plane[i];
    printk("i is:%d at :%p\n", i, owner);
    if (cnt --)
        goto again;
}
#endif

#pragma GCC optimize ("O0")
void my_raw_spin_unlock(spinlock_t *lock)
{
    struct lock_owner *owner = NULL;
    int i = -1;

    real_raw_spin_unlock(lock);
    barrier();

    for (i = 0; i < 32768; i++) {
        // 注意,这里没有原子性保证,因此我的代码无法cover 100%要求原子的场景,可能会统计不准。
        if (test_bit(i, bitmap) &&
            plane[i].owner == current &&
            plane[i].lock_owner == current &&
            plane[i].lock == lock) {
            owner = &plane[i];
            owner->lock = NULL;
            owner->owner = NULL;
            owner->lock_owner = NULL;
            owner->idx = -1;
            barrier();
            clear_bit(i, bitmap);
            break;
        }
    }
}

#pragma GCC optimize ("O0")
struct wrap_hook spinlock_hooks[2] = {
    {
        .name = "_raw_spin_lock",
        .wrap_func = (unsigned long)my_raw_spin_lock,
    },
    {
        .name = "_raw_spin_unlock",
        .wrap_func = (unsigned long)my_raw_spin_unlock,
    }
};

void stub_func(unsigned long ip, unsigned long parent_ip,
        struct ftrace_ops *ops, struct pt_regs *regs)
{
    struct wrap_hook *hook = container_of(ops, struct wrap_hook, ops);

    regs->ip = hook->wrap_func;
}

#pragma GCC optimize ("O0")
static int wrap_spinlock_init(void)
{
    int i;

    memset((char *)plane, 0, sizeof(plane));
    memset((char *)bitmap, 0, sizeof(bitmap));

    // 打印出位图的地址,以便panic_resched模块使用!
    printk("plane:%p  bitmap:%p\n", &plane[0], &bitmap[0]);
#if 0
    test();
    return 0;
#endif
    for (i = 0; i < 2; i++) {
        spinlock_hooks[i].ops.func = stub_func;
        spinlock_hooks[i].ops.flags = FTRACE_OPS_FL_SAVE_REGS|FTRACE_OPS_FL_RECURSION_SAFE;
        spinlock_hooks[i].entry = kallsyms_lookup_name(spinlock_hooks[i].name);
        if (i == 0)
            real_raw_spin_lock = (void *)(spinlock_hooks[i].entry + 5);
        else
            real_raw_spin_unlock = (void *)(spinlock_hooks[i].entry + 5);
        ftrace_set_filter_ip(&spinlock_hooks[i].ops, spinlock_hooks[i].entry, 0, 0);
        register_ftrace_function(&spinlock_hooks[i].ops);
    }

    return 0;
}

static void wrap_spinlock_exit(void)
{
    int i;
    struct lock_owner *owner;
#if 0
    for (i = 0; i < 32768; i++) {
        owner = &plane[i];
        if (test_bit(i, bitmap)) {
            if (owner->lock == NULL || owner->idx != i)
                continue;
            printk("[%d] lock:%p  owner:%s[%d]  lock_owner:%s[%d]\n",
                    owner->idx,
                    owner->lock,
                    owner->owner?owner->owner->comm:"noone",
                    owner->owner?owner->owner->pid:-2,
                    owner->lock_owner?owner->lock_owner->comm:"null",
                    owner->lock_owner?owner->lock_owner->pid:-1);
        }
    }
#endif
    for (i = 0; i < 2; i++) {
        unregister_ftrace_function(&spinlock_hooks[i].ops);
        ftrace_set_filter_ip(&spinlock_hooks[i].ops, spinlock_hooks[i].entry, 1, 0);
    }
}
module_init(wrap_spinlock_init);
module_exit(wrap_spinlock_exit);

MODULE_LICENSE("GPL");

值得注意的是,wrap函数中不能再调用任何会使用spinlock的函数(避免循环嵌套),因此一开始我决定自己用原子原语实现一个自己的自旋锁:

void lock(unsigned long *lock)
{
    while (test_and_set_bit(0, lock));
}

void unlock(unsigned long *lock)
{
    *lock = 0; // 比clear_bit更帅
}

然而系统很快就卡死了:

  • 把系统所有的spinlock操作全部在此处唯一的自旋锁上串行化,不卡死才怪!

于是,只能退而求其次,采用宽松的约束了:

  • 实在没有slot了,就不统计该次记录了。

所以,我这个spinlock快照记录机制, 它是不准的。

以上就是一个简单的spinlock快照机制的代码和说明,它可以展示:

  • 当前系统中都有哪些task在争抢哪一把spinlock。
  • 当前系统中某个spinlock被哪一个task所持有。

这个机制有什么用呢?

回到本文的开头,如果想让panic被schedule出去而不是宕机,我在担心current持有锁怎么办,我希望有一个办法让我知道current是否持有spinlock以及持有了哪些spinlock,然后将它们unlock。

现在有办法了:

// 通过lock_detect模块init函数中printk出来的bitmap地址来设置。
unsigned long pbitmap;
module_param(pbitmap, ulong, 0644);

// 通过lock_detect模块init函数中printk出来的plane地址来设置。
unsigned long pplane;
module_param(pplane, ulong, 0644);

void stub_panic(const char *fmt, ...)
{
    int i;
    unsigned long *bitmap = (unsigned long *)pbitmap;
    struct lock_owner *plane = (struct lock_owner *)pplane;

    // 循环遍历所有当前系统当事的spinlock,解锁current所持有的spinlock。
    for (i = 0; i < 32768; i++) {
        if (test_bit(i, bitmap) &&
            plane[i].owner == current &&
            plane[i].lock_owner == current &&
            plane[i].lock &&
            plane[i].idx == i) {
            printk("lock hold:%p   %s   %d\n",
                    plane[i].lock,
                    plane[i].lock_owner?plane[i].lock_owner->comm:"aabb",
                    plane[i].lock_owner?plane[i].lock_owner->pid:-1);
            spin_unlock(plane[i].lock);
        }
    }
    if (preempt_count())
        return;

    local_irq_enable();
    // 安全地退场!!
    __set_current_state(TASK_UNINTERRUPTIBLE);
    schedule();
}
...

强调一点,本文介绍的把戏无法揪出所有的spinlock状态,因为Linux内核spinlock的lock/unlock操作并非_raw_spin_lock/_raw_spin_unlock入口的,比如spin_lock_irqsave/spin_unlock_irqrestore就不使用_raw_spin_lock/_raw_spin_unlock入口,它们有自己的入口。因此你需要把所有这些入口都给ftrace hook了,才能全咯。

不过,这仅仅是一个把戏,何必去留这么多TODO,权当玩吧。


浙江温州皮鞋湿,下雨进水不会胖。

原文链接: https://blog.csdn.net/dog250/article/details/108376624

欢迎关注

微信关注下方公众号,第一时间获取干货硬货;公众号内回复【pdf】免费获取数百本计算机经典书籍;

也有高质量的技术群,里面有嵌入式、搜广推等BAT大佬

    如何使用ftrace实时获取系统中的spinlock快照

原创文章受到原创版权保护。转载请注明出处:https://www.ccppcoding.com/archives/405835

非原创文章文中已经注明原地址,如有侵权,联系删除

关注公众号【高性能架构探索】,第一时间获取最新文章

转载文章受原作者版权保护。转载请注明原作者出处!

(0)
上一篇 2023年4月26日 上午9:27
下一篇 2023年4月26日 上午9:27

相关推荐